This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1f6d55723d8 [HUDI-8865] Fix error message of unresolved columns in
MERGE INTO on Spark 3.5 (#12640)
1f6d55723d8 is described below
commit 1f6d55723d88fa9881464e94610eb15f245bea52
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Jan 15 18:27:31 2025 -0800
[HUDI-8865] Fix error message of unresolved columns in MERGE INTO on Spark
3.5 (#12640)
---
.../spark/sql/hudi/dml/TestMergeIntoTable.scala | 92 +++++++++++++++++++++-
.../spark/sql/HoodieSpark35CatalystPlanUtils.scala | 8 +-
2 files changed, 93 insertions(+), 7 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
index af679664adc..fb9a7b7e224 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
@@ -17,16 +17,14 @@
package org.apache.spark.sql.hudi.dml
-import org.apache.hudi.{DataSourceReadOptions, ScalaAssertionSupport}
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils,
ScalaAssertionSupport}
import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES
import
org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.testutils.DataSourceTestUtils
-import org.apache.spark.sql.hudi.ProvidesHoodieConfig.getClass
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
import org.apache.spark.sql.internal.SQLConf
-
import org.slf4j.LoggerFactory
class TestMergeIntoTable extends HoodieSparkSqlTestBase with
ScalaAssertionSupport {
@@ -1445,4 +1443,92 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
})
}
}
+
+ test("Test unresolved columns in Merge Into statement") {
+ Seq("cow", "mor").foreach { tableType =>
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // Create a partitioned table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | dt string,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | tblproperties (primaryKey = 'id', type = '$tableType')
+ | partitioned by (dt)
+ | location '${tmp.getCanonicalPath}'
+ | """.stripMargin)
+
+ spark.sql(
+ s"""
+ | insert into $tableName partition(dt = '2024-01-14')
+ | select 1 as id, 'a1' as name, 10 as price, 1000 as ts
+ | union
+ | select 2 as id, 'a2' as name, 20 as price, 1002 as ts
+ | """.stripMargin)
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1", 10.0, 1000, "2024-01-14"),
+ Seq(2, "a2", 20.0, 1002, "2024-01-14")
+ )
+
+ val targetTableFields = spark.sql(s"select * from
$tableName").schema.fields
+ .map(e => (e.name, tableName,
s"spark_catalog.default.$tableName.${e.name}"))
+ val sourceTableFields = Seq("s0._id", "s0._price", "s0._ts", "s0.dt",
"s0.name")
+ .map(e => {
+ val splits = e.split('.')
+ (splits(1), splits(0), e)
+ })
+ // Target table column cannot be found
+ val sqlStatement1 =
+ s"""
+ | merge into $tableName
+ | using (
+ | select 2 as _id, '2024-01-14' as dt, 'a2_new' as name, 25 as
_price, 1005 as _ts
+ | union
+ | select 3 as _id, '2024-01-14' as dt, 'a3' as name, 30 as
_price, 1003 as _ts
+ | ) s0
+ | on s0._id = $tableName.id
+ | when matched then update set
+ | id = s0._id, dt = s0.dt, new_col = s0.name, price = s0._price +
$tableName.price,
+ | ts = s0._ts
+ | """.stripMargin
+ // Source table column cannot be found
+ val sqlStatement2 =
+ s"""
+ | merge into $tableName
+ | using (
+ | select 2 as _id, '2024-01-14' as dt, 'a2_new' as name, 25 as
_price, 1005 as _ts
+ | union
+ | select 3 as _id, '2024-01-14' as dt, 'a3' as name, 30 as
_price, 1003 as _ts
+ | ) s0
+ | on s0._id = $tableName.id
+ | when matched then update set
+ | id = s0._id, dt = s0.dt, name = s0.new_col, price = s0._price +
$tableName.price,
+ | ts = s0._ts
+ | """.stripMargin
+
+ checkExceptionContain(sqlStatement1)(
+ getExpectedExceptionMessage("new_col", targetTableFields))
+ checkExceptionContain(sqlStatement2)(
+ getExpectedExceptionMessage("s0.new_col", sourceTableFields ++
targetTableFields))
+ }
+ }
+ }
+
+ private def getExpectedExceptionMessage(columnName: String,
+ fieldNameTuples: Seq[(String,
String, String)]): String = {
+ val fieldNames = fieldNameTuples.sortBy(e => (e._1, e._2))
+ .map(e => e._3).mkString("[", ", ", "]")
+ if (HoodieSparkUtils.gteqSpark3_5) {
+ "[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with
name " +
+ s"$columnName cannot be resolved. Did you mean one of the following?
$fieldNames."
+ } else {
+ s"cannot resolve $columnName in MERGE command given columns $fieldNames"
+
+ (if (HoodieSparkUtils.gteqSpark3_4) "." else "")
+ }
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
index 22316ddacac..0ab9febd567 100644
---
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
@@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.execution.command.RepairTableCommand
-import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait,
ParquetFileFormat}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait,
ParquetFileFormat}
import org.apache.spark.sql.types.StructType
object HoodieSpark35CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
@@ -74,10 +74,10 @@ object HoodieSpark35CatalystPlanUtils extends
HoodieSpark3CatalystPlanUtils {
override def failAnalysisForMIT(a: Attribute, cols: String): Unit = {
a.failAnalysis(
- errorClass = "_LEGACY_ERROR_TEMP_2309",
+ errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
messageParameters = Map(
- "sqlExpr" -> a.sql,
- "cols" -> cols))
+ "objectName" -> a.sql,
+ "proposal" -> cols))
}
override def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan,
String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String,
String])] = {