This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new d56f02586289 [SPARK-55070][SQL][CONNECT] Allow hidden column in 
dataframe column resolution
d56f02586289 is described below

commit d56f02586289b61a6043ab04ea53f1c781ac6111
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Mon Jan 19 15:32:45 2026 +0800

    [SPARK-55070][SQL][CONNECT] Allow hidden column in dataframe column 
resolution
    
    ### What changes were proposed in this pull request?
    Allow hidden column in dataframe column resolution
    
    ### Why are the changes needed?
    https://github.com/apache/spark/pull/53503 was to fix a regression, but it 
also introduced another issue:
    
    ```py
    lhs = spark.createDataFrame([(1, 'A'), (2, 'B')], ['ID', 'join_key'])
    rhs = spark.createDataFrame([(3, 'A'), (4, 'C')], ['ID', 'join_key'])
    lhs.join(rhs, 'join_key').select(rhs['join_key'])
    ```
    falis after https://github.com/apache/spark/pull/53503
    
    ```
    'join_key[id=3] against
    [id=6]Project [join_key#39, ID#38L, ID#40L]
    +- Join Inner, (join_key#39 = join_key#41)
       :- [id=1]Project [ID#28L AS ID#38L, join_key#29 AS join_key#39]
       :  +- [id=0]LocalRelation [ID#28L, join_key#29]
       +- [id=3]Project [ID#36L AS ID#40L, join_key#37 AS join_key#41]
          +- [id=2]LocalRelation [ID#36L, join_key#37]
    
    ```
    
    resloving `'join_key[id=3]` against the plan:
    1, find the corresponding node `[id=3]Project [ID#36L AS ID#40L, 
join_key#37 AS join_key#41]`;
    2, resolve `'join_key[id=3]` to `join_key#41`;
    3, the result was dropped when filtering with `[id=6]Project [join_key#39, 
ID#38L, ID#40L]` because `join_key#41` is not in the node output;
    
    before https://github.com/apache/spark/pull/53503, the steps are:
    1, find the corresponding node `[id=3]Project [ID#36L AS ID#40L, 
join_key#37 AS join_key#41]`;
    2, resolve `'join_key[id=3]` to `join_key#41`;
    3, the result was dropped;
    4, return None, and `resolveExpression` resolves it without the plan id, 
but incorrectly resolve it to the left key `join_key#39`.
    
    ### Does this PR introduce _any_ user-facing change?
    yes, query fails before this fix
    
    ### How was this patch tested?
    added tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #53832 from zhengruifeng/fix_proj_hidden.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
    (cherry picked from commit 76bdd24677e4756bb5bda3d1e40e356fc7c4a941)
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/sql/tests/test_dataframe.py                         | 7 +++++++
 .../spark/sql/catalyst/analysis/ColumnResolutionHelper.scala       | 7 ++-----
 2 files changed, 9 insertions(+), 5 deletions(-)

diff --git a/python/pyspark/sql/tests/test_dataframe.py 
b/python/pyspark/sql/tests/test_dataframe.py
index 75a553b62838..a726fc85d90a 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -159,6 +159,13 @@ class DataFrameTestsMixin:
         self.assertTrue(df3.columns, ["id", "value", "id", "value"])
         self.assertTrue(df3.count() == 20)
 
+    def test_select_join_keys(self):
+        df1 = self.spark.range(10).withColumn("v1", lit(1))
+        df2 = self.spark.range(10).withColumn("v2", lit(2))
+        for how in ["inner", "left", "right", "full", "cross"]:
+            self.assertTrue(df1.join(df2, "id", how).select(df1["id"]).count() 
>= 0, how)
+            self.assertTrue(df1.join(df2, "id", how).select(df2["id"]).count() 
>= 0, how)
+
     def test_lateral_column_alias(self):
         df1 = self.spark.range(10).select(
             (col("id") + lit(1)).alias("x"), (col("x") + lit(1)).alias("y")
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
index 870e03364225..1172ecee7223 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
@@ -617,11 +617,8 @@ trait ColumnResolutionHelper extends Logging with 
DataTypeErrorsBase {
     // the dataframe column 'df.id' will remain unresolved, and the analyzer
     // will try to resolve 'id' without plan id later.
     val filtered = resolved.filter { r =>
-      if (isMetadataAccess) {
-        r._1.references.subsetOf(AttributeSet(p.output ++ p.metadataOutput))
-      } else {
-        r._1.references.subsetOf(p.outputSet)
-      }
+      // A DataFrame column can be resolved as a metadata column, we should 
keep it.
+      r._1.references.subsetOf(AttributeSet(p.output ++ p.metadataOutput))
     }
     (filtered, matched)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to