This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.1.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 68707164cb3b6a93192774e1ab7f173cc2079b74
Author: Vamshi Krishna Kyatham 
<[email protected]>
AuthorDate: Mon Oct 20 22:28:31 2025 -0700

    fix: Fixing error msg when table is not found with MergeIntoCommand (#14118)
---
 .../spark/sql/HoodieCatalystPlansUtils.scala       |  5 ++++
 .../hudi/analysis/HoodieSparkBaseAnalysis.scala    | 10 ++++++-
 .../sql/hudi/dml/others/TestMergeIntoTable.scala   | 33 ++++++++++++++++++++++
 .../spark/sql/HoodieSpark33CatalystPlanUtils.scala |  4 +++
 .../spark/sql/HoodieSpark34CatalystPlanUtils.scala |  6 ++++
 .../spark/sql/HoodieSpark35CatalystPlanUtils.scala |  6 ++++
 .../spark/sql/HoodieSpark40CatalystPlanUtils.scala |  6 ++++
 7 files changed, 69 insertions(+), 1 deletion(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
index 42dcc02cca43..a7a652a393f2 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
@@ -144,6 +144,11 @@ trait HoodieCatalystPlansUtils {
    */
   def failAnalysisForMIT(a: Attribute, cols: String): Unit = {}
 
+  /**
+   * Throws TABLE_OR_VIEW_NOT_FOUND error for non-existent table
+   */
+  def failTableNotFound(tableName: String): Unit = {}
+
   def createMITJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType, 
condition: Option[Expression], hint: String): LogicalPlan
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSparkBaseAnalysis.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSparkBaseAnalysis.scala
index 523049e2a3b7..4443889a6cd4 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSparkBaseAnalysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSparkBaseAnalysis.scala
@@ -22,7 +22,7 @@ import org.apache.hudi.storage.StoragePath
 
 import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.BaseHoodieCatalystPlanUtils.MatchResolvedTable
-import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, 
NamedRelation, ResolvedFieldName, UnresolvedAttribute, UnresolvedFieldName, 
UnresolvedPartitionSpec}
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, 
NamedRelation, ResolvedFieldName, UnresolvedAttribute, UnresolvedFieldName, 
UnresolvedPartitionSpec, UnresolvedRelation}
 import 
org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolveExpressionByPlanChildren
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
 import org.apache.spark.sql.catalyst.expressions.Expression
@@ -155,6 +155,14 @@ case class ResolveReferences(spark: SparkSession) extends 
Rule[LogicalPlan]
       if !mO.resolved =>
       lazy val analyzer = spark.sessionState.analyzer
       val targetTable = if (targetTableO.resolved) targetTableO else 
analyzer.execute(targetTableO)
+      EliminateSubqueryAliases(targetTable) match {
+        case u: UnresolvedRelation =>
+          // If target table is still unresolved after analysis, it means the 
table doesn't exist
+          
sparkAdapter.getCatalystPlanUtils.failTableNotFound(u.multipartIdentifier.mkString("."))
+        case _ =>
+          // Target table exists, proceed with normal resolution
+      }
+
       val sourceTable = if (sourceTableO.resolved) sourceTableO else 
analyzer.execute(sourceTableO)
       val m = mO.asInstanceOf[MergeIntoTable].copy(targetTable = targetTable, 
sourceTable = sourceTable)
       // END: custom Hudi change
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
index 243ebd93b414..de5abd03fa57 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
@@ -1759,4 +1759,37 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
       }
     }
   }
+
+  test("Test MergeInto with non existent target table") {
+    withTempDir { tmp =>
+      val sourceTable = generateTableName
+      spark.sql(
+        s"""
+           | create table $sourceTable (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts int
+           | ) using parquet
+           | location '${tmp.getCanonicalPath}/$sourceTable'
+         """.stripMargin)
+      spark.sql(s"insert into $sourceTable values(1, 'a1', 10, 1000)")
+      val nonExistentTable = "hudi_test_table"
+      val exception = intercept[org.apache.spark.sql.AnalysisException] {
+        spark.sql(
+          s"""
+             | MERGE INTO $nonExistentTable AS target
+             | USING $sourceTable AS source
+             | ON target.id = source.id
+             | WHEN MATCHED THEN UPDATE SET
+             |   name = source.name,
+             |   ts = source.ts
+             | WHEN NOT MATCHED THEN INSERT *
+         """.stripMargin)
+      }
+      assert(exception.getMessage.contains("TABLE_OR_VIEW_NOT_FOUND") ||
+             exception.getMessage.contains("Table or view not found"),
+        s"Expected TABLE_OR_VIEW_NOT_FOUND error but got: 
${exception.getMessage}")
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
index 7eb13cf43cd5..9a434e90e245 100644
--- 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
@@ -76,6 +76,10 @@ object HoodieSpark33CatalystPlanUtils extends 
BaseHoodieCatalystPlanUtils {
     a.failAnalysis(s"cannot resolve ${a.sql} in MERGE command given columns 
[$cols]")
   }
 
+  override def failTableNotFound(tableName: String): Unit = {
+    throw new AnalysisException(s"Table or view not found: $tableName")
+  }
+
   override def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan, 
String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String, 
String])] = {
     plan match {
       case ci @ CreateIndex(table, indexName, indexType, ignoreIfExists, 
columns, properties) =>
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
index 635341f14f9c..8fecd76a89cf 100644
--- 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
@@ -81,6 +81,12 @@ object HoodieSpark34CatalystPlanUtils extends 
BaseHoodieCatalystPlanUtils {
         "cols" -> cols))
   }
 
+  override def failTableNotFound(tableName: String): Unit = {
+    throw new AnalysisException(
+      errorClass = "TABLE_OR_VIEW_NOT_FOUND",
+      messageParameters = Map("relationName" -> s"`$tableName`"))
+  }
+
   override def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan, 
String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String, 
String])] = {
     plan match {
       case ci@CreateIndex(table, indexName, indexType, ignoreIfExists, 
columns, properties) =>
diff --git 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
index 7b9666e10b23..64b8a985559b 100644
--- 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
@@ -80,6 +80,12 @@ object HoodieSpark35CatalystPlanUtils extends 
BaseHoodieCatalystPlanUtils {
         "proposal" -> cols))
   }
 
+  override def failTableNotFound(tableName: String): Unit = {
+    throw new AnalysisException(
+      errorClass = "TABLE_OR_VIEW_NOT_FOUND",
+      messageParameters = Map("relationName" -> s"`$tableName`"))
+  }
+
   override def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan, 
String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String, 
String])] = {
     plan match {
       case ci@CreateIndex(table, indexName, indexType, ignoreIfExists, 
columns, properties) =>
diff --git 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark40CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark40CatalystPlanUtils.scala
index 0111249e959c..859ba13530bd 100644
--- 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark40CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark40CatalystPlanUtils.scala
@@ -80,6 +80,12 @@ object HoodieSpark40CatalystPlanUtils extends 
BaseHoodieCatalystPlanUtils {
         "proposal" -> cols))
   }
 
+  override def failTableNotFound(tableName: String): Unit = {
+    throw new AnalysisException(
+      errorClass = "TABLE_OR_VIEW_NOT_FOUND",
+      messageParameters = Map("relationName" -> s"`$tableName`"))
+  }
+
   override def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan, 
String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String, 
String])] = {
     plan match {
       case ci@CreateIndex(table, indexName, indexType, ignoreIfExists, 
columns, properties) =>

Reply via email to