This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d64d1f76ff28 [SPARK-49366][CONNECT] Treat Union node as leaf in
dataframe column resolution
d64d1f76ff28 is described below
commit d64d1f76ff28f97db0ad3f3647ae2683e80095a2
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Aug 27 09:23:52 2024 +0800
[SPARK-49366][CONNECT] Treat Union node as leaf in dataframe column
resolution
### What changes were proposed in this pull request?
Treat Union node as leaf in column resolution
### Why are the changes needed?
bug fix:
```
from pyspark.sql.functions import concat, lit, col
df1 = spark.range(10).withColumn("value", lit(1))
df2 = df1.union(df1)
df1.join(df2, df1.id == df2.id, "left").show()
```
fails with `AMBIGUOUS_COLUMN_REFERENCE`
```
resolveExpressionByPlanChildren: e = '`==`('id, 'id)
resolveExpressionByPlanChildren: q =
'[id=63]Join LeftOuter, '`==`('id, 'id)
:- [id=61]Project [id#550L, 1 AS value#553]
: +- Range (0, 10, step=1, splits=Some(12))
+- [id=62]Union false, false
:- [id=61]Project [id#564L, 1 AS value#565]
: +- Range (0, 10, step=1, splits=Some(12))
+- [id=61]Project [id#566L, 1 AS value#567]
+- Range (0, 10, step=1, splits=Some(12))
'id with id = 61
[id=61]Project [id#564L, 1 AS value#565]
+- Range (0, 10, step=1, splits=Some(12))
[id=61]Project [id#566L, 1 AS value#567]
+- Range (0, 10, step=1, splits=Some(12))
resolved: Vector((Some((id#564L,1)),true), (Some((id#566L,1)),true))
```
When resolving `'id with id = 61`, existing detection fails in the second
child.
### Does this PR introduce _any_ user-facing change?
yes, bug fix
### How was this patch tested?
added tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #47853 from zhengruifeng/fix_ambgious_union.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/sql/tests/test_dataframe.py | 14 ++++++++++++++
.../sql/catalyst/analysis/ColumnResolutionHelper.scala | 7 ++++++-
2 files changed, 20 insertions(+), 1 deletion(-)
diff --git a/python/pyspark/sql/tests/test_dataframe.py
b/python/pyspark/sql/tests/test_dataframe.py
index 7dd42eecde7f..4e2d3b9ba42a 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -130,6 +130,20 @@ class DataFrameTestsMixin:
self.assertTrue(df3.columns, ["aa", "b", "a", "b"])
self.assertTrue(df3.count() == 2)
+ def test_self_join_III(self):
+ df1 = self.spark.range(10).withColumn("value", lit(1))
+ df2 = df1.union(df1)
+ df3 = df1.join(df2, df1.id == df2.id, "left")
+ self.assertTrue(df3.columns, ["id", "value", "id", "value"])
+ self.assertTrue(df3.count() == 20)
+
+ def test_self_join_IV(self):
+ df1 = self.spark.range(10).withColumn("value", lit(1))
+ df2 = df1.withColumn("value", lit(2)).union(df1.withColumn("value",
lit(3)))
+ df3 = df1.join(df2, df1.id == df2.id, "right")
+ self.assertTrue(df3.columns, ["id", "value", "id", "value"])
+ self.assertTrue(df3.count() == 20)
+
def test_duplicated_column_names(self):
df = self.spark.createDataFrame([(1, 2)], ["c", "c"])
row = df.select("*").first()
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
index c10e000a098c..1947c884694b 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
@@ -585,7 +585,12 @@ trait ColumnResolutionHelper extends Logging with
DataTypeErrorsBase {
}
(resolved.map(r => (r, currentDepth)), true)
} else {
- resolveDataFrameColumnByPlanId(u, id, isMetadataAccess, p.children,
currentDepth + 1)
+ val children = p match {
+ // treat Union node as the leaf node
+ case _: Union => Seq.empty[LogicalPlan]
+ case _ => p.children
+ }
+ resolveDataFrameColumnByPlanId(u, id, isMetadataAccess, children,
currentDepth + 1)
}
// In self join case like:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]