This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f6d55723d8 [HUDI-8865] Fix error message of unresolved columns in 
MERGE INTO on Spark 3.5 (#12640)
1f6d55723d8 is described below

commit 1f6d55723d88fa9881464e94610eb15f245bea52
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Jan 15 18:27:31 2025 -0800

    [HUDI-8865] Fix error message of unresolved columns in MERGE INTO on Spark 
3.5 (#12640)
---
 .../spark/sql/hudi/dml/TestMergeIntoTable.scala    | 92 +++++++++++++++++++++-
 .../spark/sql/HoodieSpark35CatalystPlanUtils.scala |  8 +-
 2 files changed, 93 insertions(+), 7 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
index af679664adc..fb9a7b7e224 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
@@ -17,16 +17,14 @@
 
 package org.apache.spark.sql.hudi.dml
 
-import org.apache.hudi.{DataSourceReadOptions, ScalaAssertionSupport}
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, 
ScalaAssertionSupport}
 import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES
 import 
org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.testutils.DataSourceTestUtils
 
-import org.apache.spark.sql.hudi.ProvidesHoodieConfig.getClass
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
 import org.apache.spark.sql.internal.SQLConf
-
 import org.slf4j.LoggerFactory
 
 class TestMergeIntoTable extends HoodieSparkSqlTestBase with 
ScalaAssertionSupport {
@@ -1445,4 +1443,92 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
       })
     }
   }
+
+  test("Test unresolved columns in Merge Into statement") {
+    Seq("cow", "mor").foreach { tableType =>
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        // Create a partitioned table
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  dt string,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | tblproperties (primaryKey = 'id', type = '$tableType')
+             | partitioned by (dt)
+             | location '${tmp.getCanonicalPath}'
+             | """.stripMargin)
+
+        spark.sql(
+          s"""
+             | insert into $tableName partition(dt = '2024-01-14')
+             | select 1 as id, 'a1' as name, 10 as price, 1000 as ts
+             | union
+             | select 2 as id, 'a2' as name, 20 as price, 1002 as ts
+             | """.stripMargin)
+        checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+          Seq(1, "a1", 10.0, 1000, "2024-01-14"),
+          Seq(2, "a2", 20.0, 1002, "2024-01-14")
+        )
+
+        val targetTableFields = spark.sql(s"select * from 
$tableName").schema.fields
+          .map(e => (e.name, tableName, 
s"spark_catalog.default.$tableName.${e.name}"))
+        val sourceTableFields = Seq("s0._id", "s0._price", "s0._ts", "s0.dt", 
"s0.name")
+          .map(e => {
+            val splits = e.split('.')
+            (splits(1), splits(0), e)
+          })
+        // Target table column cannot be found
+        val sqlStatement1 =
+          s"""
+             | merge into $tableName
+             | using (
+             |  select 2 as _id, '2024-01-14' as dt, 'a2_new' as name, 25 as 
_price, 1005 as _ts
+             |  union
+             |  select 3 as _id, '2024-01-14' as dt, 'a3' as name, 30 as 
_price, 1003 as _ts
+             | ) s0
+             | on s0._id = $tableName.id
+             | when matched then update set
+             | id = s0._id, dt = s0.dt, new_col = s0.name, price = s0._price + 
$tableName.price,
+             | ts = s0._ts
+             | """.stripMargin
+        // Source table column cannot be found
+        val sqlStatement2 =
+          s"""
+             | merge into $tableName
+             | using (
+             |  select 2 as _id, '2024-01-14' as dt, 'a2_new' as name, 25 as 
_price, 1005 as _ts
+             |  union
+             |  select 3 as _id, '2024-01-14' as dt, 'a3' as name, 30 as 
_price, 1003 as _ts
+             | ) s0
+             | on s0._id = $tableName.id
+             | when matched then update set
+             | id = s0._id, dt = s0.dt, name = s0.new_col, price = s0._price + 
$tableName.price,
+             | ts = s0._ts
+             | """.stripMargin
+
+        checkExceptionContain(sqlStatement1)(
+          getExpectedExceptionMessage("new_col", targetTableFields))
+        checkExceptionContain(sqlStatement2)(
+          getExpectedExceptionMessage("s0.new_col", sourceTableFields ++ 
targetTableFields))
+      }
+    }
+  }
+
+  private def getExpectedExceptionMessage(columnName: String,
+                                          fieldNameTuples: Seq[(String, 
String, String)]): String = {
+    val fieldNames = fieldNameTuples.sortBy(e => (e._1, e._2))
+      .map(e => e._3).mkString("[", ", ", "]")
+    if (HoodieSparkUtils.gteqSpark3_5) {
+      "[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with 
name " +
+        s"$columnName cannot be resolved. Did you mean one of the following? 
$fieldNames."
+    } else {
+      s"cannot resolve $columnName in MERGE command given columns $fieldNames" 
+
+        (if (HoodieSparkUtils.gteqSpark3_4) "." else "")
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
index 22316ddacac..0ab9febd567 100644
--- 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
@@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.planning.ScanOperation
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
 import org.apache.spark.sql.execution.command.RepairTableCommand
-import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, 
ParquetFileFormat}
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, 
ParquetFileFormat}
 import org.apache.spark.sql.types.StructType
 
 object HoodieSpark35CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
@@ -74,10 +74,10 @@ object HoodieSpark35CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
 
   override def failAnalysisForMIT(a: Attribute, cols: String): Unit = {
     a.failAnalysis(
-      errorClass = "_LEGACY_ERROR_TEMP_2309",
+      errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
       messageParameters = Map(
-        "sqlExpr" -> a.sql,
-        "cols" -> cols))
+        "objectName" -> a.sql,
+        "proposal" -> cols))
   }
 
   override def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan, 
String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String, 
String])] = {

Reply via email to