This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 9040932f641c [SPARK-54730][SQL][CONNECT] Delay failure of dataframe 
column resolution
9040932f641c is described below

commit 9040932f641cf040e8d03657e51a2bef3a096d50
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Thu Dec 18 08:31:02 2025 +0800

    [SPARK-54730][SQL][CONNECT] Delay failure of dataframe column resolution
    
    ### What changes were proposed in this pull request?
    Delay failure of dataframe column resolution
    
    ### Why are the changes needed?
    it cause conflicts with delta rules that add hidden column
    
    ### Does this PR introduce _any_ user-facing change?
    yes, delta query fails before this fix
    ```py
    df = spark.read.option("readChangeFeed", True).option("startingVersion", 
0).table("sample_table")
    
    df.select(df._commit_version).show() <- fail with 
[CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column 
"_commit_version".
    ```
    
    ### How was this patch tested?
    added test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #53503 from zhengruifeng/df_col_delay_fail_v2.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
    (cherry picked from commit 36f8f1e1003c676969e5108d58fed57bfbba4ddc)
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 .../catalyst/analysis/ColumnResolutionHelper.scala | 14 ++++-----
 .../spark/sql/SparkSessionExtensionSuite.scala     | 34 ++++++++++++++++++++--
 2 files changed, 39 insertions(+), 9 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
index 34541a8840cb..870e03364225 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
@@ -140,7 +140,9 @@ trait ColumnResolutionHelper extends Logging with 
DataTypeErrorsBase {
           }
           matched(ordinal)
 
-        case u @ UnresolvedAttribute(nameParts) =>
+        case u @ UnresolvedAttribute(nameParts)
+          if u.getTagValue(LogicalPlan.PLAN_ID_TAG).isEmpty =>
+          // UnresolvedAttribute with PLAN_ID_TAG should be resolved in 
resolveDataFrameColumn
           val result = withPosition(u) {
             resolveColumnByName(nameParts)
               .orElse(LiteralFunctionResolution.resolve(nameParts))
@@ -495,8 +497,7 @@ trait ColumnResolutionHelper extends Logging with 
DataTypeErrorsBase {
   //    1. extract the attached plan id from UnresolvedAttribute;
   //    2. top-down traverse the query plan to find the plan node that matches 
the plan id;
   //    3. if can not find the matching node, fails with 
'CANNOT_RESOLVE_DATAFRAME_COLUMN';
-  //    4, if the matching node is found, but can not resolve the column, also 
fails with
-  //       'CANNOT_RESOLVE_DATAFRAME_COLUMN';
+  //    4, if the matching node is found, but can not resolve the column, 
return the original one;
   //    5, resolve the expression against the target node, the resolved 
attribute will be
   //       filtered by the output attributes of nodes in the path (from 
matching to root node);
   //    6. if more than one resolved attributes are found in the above 
recursive process,
@@ -571,10 +572,9 @@ trait ColumnResolutionHelper extends Logging with 
DataTypeErrorsBase {
       } else {
         None
       }
-      if (resolved.isEmpty) {
-        // The targe plan node is found, but the column cannot be resolved.
-        throw QueryCompilationErrors.cannotResolveDataFrameColumn(u)
-      }
+      // The targe plan node is found, but might still fail to resolve.
+      // In this case, return None to delay the failure, so it is possible to 
be
+      // resolved in the next iteration.
       (resolved.map(r => (r, currentDepth)), true)
     } else {
       val children = p match {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 6ee0029b6839..66826a9ca762 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
@@ -26,18 +26,19 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, 
TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, 
UnresolvedRelation}
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
Final, Max, Partial}
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, 
ParserInterface}
 import org.apache.spark.sql.catalyst.plans.{PlanTest, SQLHelper}
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, AggregateHint, 
ColumnStat, Limit, LocalRelation, LogicalPlan, Sort, SortHint, Statistics, 
UnresolvedHint}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, AggregateHint, 
ColumnStat, Limit, LocalRelation, LogicalPlan, Project, Sort, SortHint, 
Statistics, UnresolvedHint}
 import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
SinglePartition}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreeNodeTag
 import org.apache.spark.sql.classic.ClassicConversions._
+import org.apache.spark.sql.classic.Dataset
 import org.apache.spark.sql.connector.write.WriterCommitMessage
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
AdaptiveSparkPlanHelper, AQEShuffleReadExec, QueryStageExec, 
ShuffleQueryStageExec}
@@ -91,6 +92,22 @@ class SparkSessionExtensionSuite extends SparkFunSuite with 
SQLHelper with Adapt
     }
   }
 
+  test("inject analyzer rule - hidden column") {
+    withSession(Seq(_.injectResolutionRule(MyHiddenColumn))) { session: 
SparkSession =>
+      val rel = LocalRelation(
+        AttributeReference("a", IntegerType)(),
+        AttributeReference("b", IntegerType)())
+      rel.setTagValue[Long](LogicalPlan.PLAN_ID_TAG, 0L)
+
+      val u = UnresolvedAttribute("x")
+      u.setTagValue[Long](LogicalPlan.PLAN_ID_TAG, 0L)
+      val proj = Project(Seq(u), rel)
+
+      val df = Dataset.ofRows(session, proj)
+      assert(df.schema.fieldNames === Array("x"))
+    }
+  }
+
   test("inject post hoc resolution analyzer rule") {
     withSession(Seq(_.injectPostHocResolutionRule(MyRule))) { session =>
       
assert(session.sessionState.analyzer.postHocResolutionRules.contains(MyRule(session)))
@@ -608,6 +625,19 @@ case class MyRule(spark: SparkSession) extends 
Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan
 }
 
+case class MyHiddenColumn(spark: SparkSession) extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+    case rel: LocalRelation if rel.output.size == 2 =>
+      // rel.output.size == 2 for idempotence
+      val newRel = rel.copy(
+        output = rel.output :+ AttributeReference("x", IntegerType)()
+      )
+      assert(rel.getTagValue(LogicalPlan.PLAN_ID_TAG).contains(0L))
+      newRel.setTagValue(LogicalPlan.PLAN_ID_TAG, 0L)
+      newRel
+  }
+}
+
 case class MyCheckRule(spark: SparkSession) extends (LogicalPlan => Unit) {
   override def apply(plan: LogicalPlan): Unit = { }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to