yihua commented on code in PR #18147:
URL: https://github.com/apache/hudi/pull/18147#discussion_r2795763747
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -298,19 +298,59 @@ object HoodieAnalysis extends SparkAdapterSupport {
private object ProducesHudiMetaFields {
def unapply(plan: LogicalPlan): Option[Seq[Attribute]] = {
- val resolved = if (plan.resolved) {
- plan
- } else {
- val analyzer = spark.sessionState.analyzer
- analyzer.execute(plan)
- }
+ try {
+ val resolved = if (plan.resolved) {
+ plan
+ } else {
+ val analyzer = spark.sessionState.analyzer
+ analyzer.execute(plan)
+ }
- if (resolved.output.exists(attr => isMetaField(attr.name))) {
- Some(resolved.output)
- } else {
- None
+ if (resolved.output.exists(attr => isMetaField(attr.name))) {
+ Some(resolved.output)
+ } else {
+ None
+ }
+ } catch {
+ case e: UnresolvedException =>
+ val unresolvedRefs = collectUnresolvedReferences(plan)
Review Comment:
This extractor is used inside pattern matches like `query match { case
ProducesHudiMetaFields(output) => ...; case _ => None }`. Before this change,
an `UnresolvedException` from `analyzer.execute()` would propagate naturally
and Spark's own analyzer would eventually produce its own (usually quite good)
error message. Now we're catching it and throwing a new `AnalysisException` —
this short-circuits Spark's normal error-handling and could surface a less
precise Hudi-specific message for cases where Spark would have reported the
exact column/table problem. Have you considered returning `None` here instead
of throwing, and letting Spark's built-in analysis error reporting handle it?
That would preserve the pattern-match fallthrough semantics this extractor is
designed for.
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -298,19 +298,59 @@ object HoodieAnalysis extends SparkAdapterSupport {
private object ProducesHudiMetaFields {
def unapply(plan: LogicalPlan): Option[Seq[Attribute]] = {
- val resolved = if (plan.resolved) {
- plan
- } else {
- val analyzer = spark.sessionState.analyzer
- analyzer.execute(plan)
- }
+ try {
+ val resolved = if (plan.resolved) {
+ plan
+ } else {
+ val analyzer = spark.sessionState.analyzer
+ analyzer.execute(plan)
+ }
- if (resolved.output.exists(attr => isMetaField(attr.name))) {
- Some(resolved.output)
- } else {
- None
+ if (resolved.output.exists(attr => isMetaField(attr.name))) {
+ Some(resolved.output)
+ } else {
+ None
+ }
+ } catch {
+ case e: UnresolvedException =>
+ val unresolvedRefs = collectUnresolvedReferences(plan)
+ val unresolvedInfo = if (unresolvedRefs.nonEmpty) {
+ s" Unresolved references: [${unresolvedRefs.mkString(", ")}]."
+ } else {
+ ""
+ }
+ throw new AnalysisException(
+ s"Failed to resolve query. The query contains unresolved columns
or tables.$unresolvedInfo " +
+ s"Please check for: (1) typos in column or table names, (2)
missing table definitions, " +
Review Comment:
It would be good to check version-specific exception handling in Hudi Spark
integration as well.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodieAnalysisErrorHandling.scala:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.analysis
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+
+/**
+ * Tests for improved error handling in HoodieAnalysis when queries contain
+ * unresolved columns or tables.
+ */
+class TestHoodieAnalysisErrorHandling extends HoodieSparkSqlTestBase {
+
+ test("MergeInto with unresolved column in source query should provide
helpful error message") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // Create target Hudi table
+ spark.sql(
+ s"""
+ |CREATE TABLE $tableName (
+ | id INT,
+ | name STRING,
+ | price DOUBLE,
+ | ts INT
+ |) USING hudi
+ |LOCATION '${tmp.getCanonicalPath}'
+ |TBLPROPERTIES (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ |)
+ """.stripMargin)
+
+ // Insert initial data
+ spark.sql(s"INSERT INTO $tableName VALUES (1, 'a1', 10.0, 1000)")
+
+ // Test MERGE INTO with non-existent column in source query
+ val exception = intercept[AnalysisException] {
+ spark.sql(
+ s"""
+ |MERGE INTO $tableName AS target
+ |USING (
+ | SELECT 1 AS id, 'updated' AS name, 20.0 AS price, 2000 AS ts,
nonexistent_column AS extra
+ |) AS source
+ |ON target.id = source.id
+ |WHEN MATCHED THEN UPDATE SET *
+ |WHEN NOT MATCHED THEN INSERT *
+ """.stripMargin)
+ }
+
+ // Verify the error message contains helpful information
+ val errorMessage = exception.getMessage
Review Comment:
Several test assertions use `||` with `contains("cannot be resolved")` —
that would match Spark's existing error messages even without this PR. Could
you tighten the assertions to specifically verify the new behavior (e.g.,
assert on `"Failed to resolve query"` or `"Please check for"`) so these tests
actually validate the change? and also account for exception message difference
across Spark versions.
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -298,19 +298,59 @@ object HoodieAnalysis extends SparkAdapterSupport {
private object ProducesHudiMetaFields {
def unapply(plan: LogicalPlan): Option[Seq[Attribute]] = {
- val resolved = if (plan.resolved) {
- plan
- } else {
- val analyzer = spark.sessionState.analyzer
- analyzer.execute(plan)
- }
+ try {
+ val resolved = if (plan.resolved) {
+ plan
+ } else {
+ val analyzer = spark.sessionState.analyzer
+ analyzer.execute(plan)
+ }
- if (resolved.output.exists(attr => isMetaField(attr.name))) {
- Some(resolved.output)
- } else {
- None
+ if (resolved.output.exists(attr => isMetaField(attr.name))) {
+ Some(resolved.output)
+ } else {
+ None
+ }
+ } catch {
+ case e: UnresolvedException =>
+ val unresolvedRefs = collectUnresolvedReferences(plan)
+ val unresolvedInfo = if (unresolvedRefs.nonEmpty) {
+ s" Unresolved references: [${unresolvedRefs.mkString(", ")}]."
+ } else {
+ ""
+ }
+ throw new AnalysisException(
+ s"Failed to resolve query. The query contains unresolved columns
or tables.$unresolvedInfo " +
+ s"Please check for: (1) typos in column or table names, (2)
missing table definitions, " +
Review Comment:
For example, we have certain fail analysis method and on Spark 3.5 it uses
proper classification, which should be used if required:
```
sparkAdapter.getCatalystPlanUtils.failAnalysisForMIT
override def failAnalysisForMIT(a: Attribute, cols: String): Unit = {
a.failAnalysis(
errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
messageParameters = Map(
"objectName" -> a.sql,
"proposal" -> cols))
}
```
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -298,19 +298,59 @@ object HoodieAnalysis extends SparkAdapterSupport {
private object ProducesHudiMetaFields {
def unapply(plan: LogicalPlan): Option[Seq[Attribute]] = {
- val resolved = if (plan.resolved) {
- plan
- } else {
- val analyzer = spark.sessionState.analyzer
- analyzer.execute(plan)
- }
+ try {
+ val resolved = if (plan.resolved) {
+ plan
+ } else {
+ val analyzer = spark.sessionState.analyzer
+ analyzer.execute(plan)
+ }
- if (resolved.output.exists(attr => isMetaField(attr.name))) {
- Some(resolved.output)
- } else {
- None
+ if (resolved.output.exists(attr => isMetaField(attr.name))) {
+ Some(resolved.output)
+ } else {
+ None
+ }
+ } catch {
+ case e: UnresolvedException =>
+ val unresolvedRefs = collectUnresolvedReferences(plan)
+ val unresolvedInfo = if (unresolvedRefs.nonEmpty) {
+ s" Unresolved references: [${unresolvedRefs.mkString(", ")}]."
+ } else {
+ ""
+ }
+ throw new AnalysisException(
+ s"Failed to resolve query. The query contains unresolved columns
or tables.$unresolvedInfo " +
+ s"Please check for: (1) typos in column or table names, (2)
missing table definitions, " +
Review Comment:
For example, on Spark 3.5, the second and fourth test cases already throw
readable exception without the changes in this PR:
```
[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name
`target`.`nonexistent_id` cannot be resolved. Did you mean one of the
following? [`target`.`name`, `target`.`price`, `target`.`id`, `target`.`ts`,
`target`.`_hoodie_file_name`].; line 6 pos 3;
'MergeIntoTable ('target.nonexistent_id = id#106), [updateaction(None,
assignment(id#115, id#106), assignment(name#116, name#107),
assignment(price#117, price#108), assignment(ts#118, ts#109))],
[insertaction(None, assignment(id#115, id#106), assignment(name#116, name#107),
assignment(price#117, price#108), assignment(ts#118, ts#109))]
:- SubqueryAlias target
: +- SubqueryAlias spark_catalog.default.htesthoodieanalysiserrorhandling_2
: +- Relation
spark_catalog.default.htesthoodieanalysiserrorhandling_2[_hoodie_commit_time#110,_hoodie_commit_seqno#111,_hoodie_record_key#112,_hoodie_partition_path#113,_hoodie_file_name#114,id#115,name#116,price#117,ts#118]
HudiFileGroup
+- SubqueryAlias source
+- Project [1 AS id#106, updated AS name#107, 20.0 AS price#108, 2000 AS
ts#109]
+- OneRowRelation
at
org.apache.spark.sql.errors.QueryCompilationErrors$.unresolvedAttributeError(QueryCompilationErrors.scala:306)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$$failUnresolvedAttribute(CheckAnalysis.scala:141)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$6(CheckAnalysis.scala:299)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$6$adapted(CheckAnalysis.scala:297)
at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:244)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:243)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:243)
at scala.collection.Iterator.foreach(Iterator.scala:943)
```
```
[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name
source.pricee cannot be resolved. Did you mean one of the following?
[target._hoodie_commit_seqno, target._hoodie_commit_time,
target._hoodie_file_name, target._hoodie_partition_path,
target._hoodie_record_key, source.id, target.id, source.name, target.name,
source.price, target.price, source.ts, target.ts].; line 10 pos 10
org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION]
A column or function parameter with name source.pricee cannot be resolved. Did
you mean one of the following? [target._hoodie_commit_seqno,
target._hoodie_commit_time, target._hoodie_file_name,
target._hoodie_partition_path, target._hoodie_record_key, source.id, target.id,
source.name, target.name, source.price, target.price, source.ts, target.ts].;
line 10 pos 10
at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:52)
at
org.apache.spark.sql.HoodieSpark35CatalystPlanUtils$.failAnalysisForMIT(HoodieSpark35CatalystPlanUtils.scala:80)
at
org.apache.spark.sql.hudi.analysis.ResolveReferences.$anonfun$resolveMergeExprOrFail$2(HoodieSparkBaseAnalysis.scala:270)
at
org.apache.spark.sql.hudi.analysis.ResolveReferences.$anonfun$resolveMergeExprOrFail$2$adapted(HoodieSparkBaseAnalysis.scala:265)
at
scala.collection.mutable.LinkedHashSet.foreach(LinkedHashSet.scala:95)
at
org.apache.spark.sql.catalyst.expressions.AttributeSet.foreach(AttributeSet.scala:137)
at
org.apache.spark.sql.hudi.analysis.ResolveReferences.resolveMergeExprOrFail(HoodieSparkBaseAnalysis.scala:265)
at
org.apache.spark.sql.hudi.analysis.ResolveReferences.$anonfun$resolveAssignments$1(HoodieSparkBaseAnalysis.scala:254)
```
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -298,19 +298,59 @@ object HoodieAnalysis extends SparkAdapterSupport {
private object ProducesHudiMetaFields {
def unapply(plan: LogicalPlan): Option[Seq[Attribute]] = {
- val resolved = if (plan.resolved) {
- plan
- } else {
- val analyzer = spark.sessionState.analyzer
- analyzer.execute(plan)
- }
+ try {
+ val resolved = if (plan.resolved) {
+ plan
+ } else {
+ val analyzer = spark.sessionState.analyzer
+ analyzer.execute(plan)
+ }
- if (resolved.output.exists(attr => isMetaField(attr.name))) {
- Some(resolved.output)
- } else {
- None
+ if (resolved.output.exists(attr => isMetaField(attr.name))) {
+ Some(resolved.output)
+ } else {
+ None
+ }
+ } catch {
+ case e: UnresolvedException =>
+ val unresolvedRefs = collectUnresolvedReferences(plan)
+ val unresolvedInfo = if (unresolvedRefs.nonEmpty) {
+ s" Unresolved references: [${unresolvedRefs.mkString(", ")}]."
+ } else {
+ ""
+ }
+ throw new AnalysisException(
+ s"Failed to resolve query. The query contains unresolved columns
or tables.$unresolvedInfo " +
+ s"Please check for: (1) typos in column or table names, (2)
missing table definitions, " +
Review Comment:
I wonder if the error message is too Hudi-specific for something that's
really a general Spark analysis failure. Spark already provides messages like
below — wrapping it in a generic "check for typos" message might actually lose
information. Could we revisit if the exception throwing logic to follow Spark
standard?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]