This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 68707164cb3b6a93192774e1ab7f173cc2079b74 Author: Vamshi Krishna Kyatham <[email protected]> AuthorDate: Mon Oct 20 22:28:31 2025 -0700 fix: Fixing error msg when table is not found with MergeIntoCommand (#14118) --- .../spark/sql/HoodieCatalystPlansUtils.scala | 5 ++++ .../hudi/analysis/HoodieSparkBaseAnalysis.scala | 10 ++++++- .../sql/hudi/dml/others/TestMergeIntoTable.scala | 33 ++++++++++++++++++++++ .../spark/sql/HoodieSpark33CatalystPlanUtils.scala | 4 +++ .../spark/sql/HoodieSpark34CatalystPlanUtils.scala | 6 ++++ .../spark/sql/HoodieSpark35CatalystPlanUtils.scala | 6 ++++ .../spark/sql/HoodieSpark40CatalystPlanUtils.scala | 6 ++++ 7 files changed, 69 insertions(+), 1 deletion(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala index 42dcc02cca43..a7a652a393f2 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala @@ -144,6 +144,11 @@ trait HoodieCatalystPlansUtils { */ def failAnalysisForMIT(a: Attribute, cols: String): Unit = {} + /** + * Throws TABLE_OR_VIEW_NOT_FOUND error for non-existent table + */ + def failTableNotFound(tableName: String): Unit = {} + def createMITJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType, condition: Option[Expression], hint: String): LogicalPlan /** diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSparkBaseAnalysis.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSparkBaseAnalysis.scala index 523049e2a3b7..4443889a6cd4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSparkBaseAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSparkBaseAnalysis.scala @@ -22,7 +22,7 @@ import org.apache.hudi.storage.StoragePath import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.BaseHoodieCatalystPlanUtils.MatchResolvedTable -import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NamedRelation, ResolvedFieldName, UnresolvedAttribute, UnresolvedFieldName, UnresolvedPartitionSpec} +import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NamedRelation, ResolvedFieldName, UnresolvedAttribute, UnresolvedFieldName, UnresolvedPartitionSpec, UnresolvedRelation} import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolveExpressionByPlanChildren import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils} import org.apache.spark.sql.catalyst.expressions.Expression @@ -155,6 +155,14 @@ case class ResolveReferences(spark: SparkSession) extends Rule[LogicalPlan] if !mO.resolved => lazy val analyzer = spark.sessionState.analyzer val targetTable = if (targetTableO.resolved) targetTableO else analyzer.execute(targetTableO) + EliminateSubqueryAliases(targetTable) match { + case u: UnresolvedRelation => + // If target table is still unresolved after analysis, it means the table doesn't exist + sparkAdapter.getCatalystPlanUtils.failTableNotFound(u.multipartIdentifier.mkString(".")) + case _ => + // Target table exists, proceed with normal resolution + } + val sourceTable = if (sourceTableO.resolved) sourceTableO else analyzer.execute(sourceTableO) val m = mO.asInstanceOf[MergeIntoTable].copy(targetTable = targetTable, sourceTable = sourceTable) // END: custom Hudi change diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala index 243ebd93b414..de5abd03fa57 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala @@ -1759,4 +1759,37 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo } } } + + test("Test MergeInto with non existent target table") { + withTempDir { tmp => + val sourceTable = generateTableName + spark.sql( + s""" + | create table $sourceTable ( + | id int, + | name string, + | price double, + | ts int + | ) using parquet + | location '${tmp.getCanonicalPath}/$sourceTable' + """.stripMargin) + spark.sql(s"insert into $sourceTable values(1, 'a1', 10, 1000)") + val nonExistentTable = "hudi_test_table" + val exception = intercept[org.apache.spark.sql.AnalysisException] { + spark.sql( + s""" + | MERGE INTO $nonExistentTable AS target + | USING $sourceTable AS source + | ON target.id = source.id + | WHEN MATCHED THEN UPDATE SET + | name = source.name, + | ts = source.ts + | WHEN NOT MATCHED THEN INSERT * + """.stripMargin) + } + assert(exception.getMessage.contains("TABLE_OR_VIEW_NOT_FOUND") || + exception.getMessage.contains("Table or view not found"), + s"Expected TABLE_OR_VIEW_NOT_FOUND error but got: ${exception.getMessage}") + } + } } diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala index 7eb13cf43cd5..9a434e90e245 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala @@ -76,6 +76,10 @@ object HoodieSpark33CatalystPlanUtils extends BaseHoodieCatalystPlanUtils { a.failAnalysis(s"cannot resolve ${a.sql} in MERGE command given columns [$cols]") } + override def failTableNotFound(tableName: String): Unit = { + throw new AnalysisException(s"Table or view not found: $tableName") + } + override def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan, String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String, String])] = { plan match { case ci @ CreateIndex(table, indexName, indexType, ignoreIfExists, columns, properties) => diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala index 635341f14f9c..8fecd76a89cf 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala @@ -81,6 +81,12 @@ object HoodieSpark34CatalystPlanUtils extends BaseHoodieCatalystPlanUtils { "cols" -> cols)) } + override def failTableNotFound(tableName: String): Unit = { + throw new AnalysisException( + errorClass = "TABLE_OR_VIEW_NOT_FOUND", + messageParameters = Map("relationName" -> s"`$tableName`")) + } + override def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan, String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String, String])] = { plan match { case ci@CreateIndex(table, indexName, indexType, ignoreIfExists, columns, properties) => diff --git a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala index 7b9666e10b23..64b8a985559b 100644 --- a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala @@ -80,6 +80,12 @@ object HoodieSpark35CatalystPlanUtils extends BaseHoodieCatalystPlanUtils { "proposal" -> cols)) } + override def failTableNotFound(tableName: String): Unit = { + throw new AnalysisException( + errorClass = "TABLE_OR_VIEW_NOT_FOUND", + messageParameters = Map("relationName" -> s"`$tableName`")) + } + override def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan, String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String, String])] = { plan match { case ci@CreateIndex(table, indexName, indexType, ignoreIfExists, columns, properties) => diff --git a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark40CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark40CatalystPlanUtils.scala index 0111249e959c..859ba13530bd 100644 --- a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark40CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark40CatalystPlanUtils.scala @@ -80,6 +80,12 @@ object HoodieSpark40CatalystPlanUtils extends BaseHoodieCatalystPlanUtils { "proposal" -> cols)) } + override def failTableNotFound(tableName: String): Unit = { + throw new AnalysisException( + errorClass = "TABLE_OR_VIEW_NOT_FOUND", + messageParameters = Map("relationName" -> s"`$tableName`")) + } + override def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan, String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String, String])] = { plan match { case ci@CreateIndex(table, indexName, indexType, ignoreIfExists, columns, properties) =>
