This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d64d1f76ff28 [SPARK-49366][CONNECT] Treat Union node as leaf in 
dataframe column resolution
d64d1f76ff28 is described below

commit d64d1f76ff28f97db0ad3f3647ae2683e80095a2
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Aug 27 09:23:52 2024 +0800

    [SPARK-49366][CONNECT] Treat Union node as leaf in dataframe column 
resolution
    
    ### What changes were proposed in this pull request?
    
    Treat Union node as leaf in column resolution
    
    ### Why are the changes needed?
    bug fix:
    ```
    from pyspark.sql.functions import concat, lit, col
    df1 = spark.range(10).withColumn("value", lit(1))
    df2 = df1.union(df1)
    df1.join(df2, df1.id == df2.id, "left").show()
    ```
    fails with `AMBIGUOUS_COLUMN_REFERENCE`
    
    ```
    resolveExpressionByPlanChildren: e = '`==`('id, 'id)
    resolveExpressionByPlanChildren: q =
    '[id=63]Join LeftOuter, '`==`('id, 'id)
    :- [id=61]Project [id#550L, 1 AS value#553]
    :  +- Range (0, 10, step=1, splits=Some(12))
    +- [id=62]Union false, false
       :- [id=61]Project [id#564L, 1 AS value#565]
       :  +- Range (0, 10, step=1, splits=Some(12))
       +- [id=61]Project [id#566L, 1 AS value#567]
          +- Range (0, 10, step=1, splits=Some(12))
    
    'id with id = 61
    
    [id=61]Project [id#564L, 1 AS value#565]
    +- Range (0, 10, step=1, splits=Some(12))
    
    [id=61]Project [id#566L, 1 AS value#567]
    +- Range (0, 10, step=1, splits=Some(12))
    
    resolved: Vector((Some((id#564L,1)),true), (Some((id#566L,1)),true))
    ```
    
    When resolving `'id with id = 61`, existing detection fails in the second 
child.
    
    ### Does this PR introduce _any_ user-facing change?
    yes, bug fix
    
    ### How was this patch tested?
    added tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #47853 from zhengruifeng/fix_ambgious_union.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/sql/tests/test_dataframe.py                 | 14 ++++++++++++++
 .../sql/catalyst/analysis/ColumnResolutionHelper.scala     |  7 ++++++-
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/tests/test_dataframe.py 
b/python/pyspark/sql/tests/test_dataframe.py
index 7dd42eecde7f..4e2d3b9ba42a 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -130,6 +130,20 @@ class DataFrameTestsMixin:
         self.assertTrue(df3.columns, ["aa", "b", "a", "b"])
         self.assertTrue(df3.count() == 2)
 
+    def test_self_join_III(self):
+        df1 = self.spark.range(10).withColumn("value", lit(1))
+        df2 = df1.union(df1)
+        df3 = df1.join(df2, df1.id == df2.id, "left")
+        self.assertTrue(df3.columns, ["id", "value", "id", "value"])
+        self.assertTrue(df3.count() == 20)
+
+    def test_self_join_IV(self):
+        df1 = self.spark.range(10).withColumn("value", lit(1))
+        df2 = df1.withColumn("value", lit(2)).union(df1.withColumn("value", 
lit(3)))
+        df3 = df1.join(df2, df1.id == df2.id, "right")
+        self.assertTrue(df3.columns, ["id", "value", "id", "value"])
+        self.assertTrue(df3.count() == 20)
+
     def test_duplicated_column_names(self):
         df = self.spark.createDataFrame([(1, 2)], ["c", "c"])
         row = df.select("*").first()
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
index c10e000a098c..1947c884694b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
@@ -585,7 +585,12 @@ trait ColumnResolutionHelper extends Logging with 
DataTypeErrorsBase {
       }
       (resolved.map(r => (r, currentDepth)), true)
     } else {
-      resolveDataFrameColumnByPlanId(u, id, isMetadataAccess, p.children, 
currentDepth + 1)
+      val children = p match {
+        // treat Union node as the leaf node
+        case _: Union => Seq.empty[LogicalPlan]
+        case _ => p.children
+      }
+      resolveDataFrameColumnByPlanId(u, id, isMetadataAccess, children, 
currentDepth + 1)
     }
 
     // In self join case like:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to