This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 8637205 [SPARK-34319][SQL] Resolve duplicate attributes for
FlatMapCoGroupsInPandas/MapInPandas
8637205 is described below
commit 8637205fe515eba3b11c5dd80f8ec75336911b16
Author: yi.wu <[email protected]>
AuthorDate: Tue Feb 2 16:25:32 2021 +0900
[SPARK-34319][SQL] Resolve duplicate attributes for
FlatMapCoGroupsInPandas/MapInPandas
### What changes were proposed in this pull request?
Resolve duplicate attributes for `FlatMapCoGroupsInPandas`.
### Why are the changes needed?
When performing self-join on top of `FlatMapCoGroupsInPandas`, analysis can
fail because of conflicting attributes. For example,
```scala
df = spark.createDataFrame([(1, 1)], ("column", "value"))
row = df.groupby("ColUmn").cogroup(
df.groupby("COLUMN")
).applyInPandas(lambda r, l: r + l, "column long, value long")
row.join(row).show()
```
error:
```scala
...
Conflicting attributes: column#163321L,value#163322L
;;
’Join Inner
:- FlatMapCoGroupsInPandas [ColUmn#163312L], [COLUMN#163312L],
<lambda>(column#163312L, value#163313L, column#163312L, value#163313L),
[column#163321L, value#163322L]
: :- Project [ColUmn#163312L, column#163312L, value#163313L]
: : +- LogicalRDD [column#163312L, value#163313L], false
: +- Project [COLUMN#163312L, column#163312L, value#163313L]
: +- LogicalRDD [column#163312L, value#163313L], false
+- FlatMapCoGroupsInPandas [ColUmn#163312L], [COLUMN#163312L],
<lambda>(column#163312L, value#163313L, column#163312L, value#163313L),
[column#163321L, value#163322L]
:- Project [ColUmn#163312L, column#163312L, value#163313L]
: +- LogicalRDD [column#163312L, value#163313L], false
+- Project [COLUMN#163312L, column#163312L, value#163313L]
+- LogicalRDD [column#163312L, value#163313L], false
...
```
### Does this PR introduce _any_ user-facing change?
yes, the query like the above example won't fail.
### How was this patch tested?
Adde unit tests.
Closes #31429 from Ngone51/fix-conflcting-attrs-of-FlatMapCoGroupsInPandas.
Lead-authored-by: yi.wu <[email protected]>
Co-authored-by: wuyi <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit e9362c2571f4a329218ff466fce79eef45e8f992)
Signed-off-by: HyukjinKwon <[email protected]>
---
.../pyspark/sql/tests/test_pandas_cogrouped_map.py | 12 +++++++
python/pyspark/sql/tests/test_pandas_map.py | 8 +++++
.../spark/sql/catalyst/analysis/Analyzer.scala | 8 +++++
.../sql/catalyst/analysis/AnalysisSuite.scala | 42 ++++++++++++++++++++++
4 files changed, 70 insertions(+)
diff --git a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
index c1cb30c..8025534 100644
--- a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
+++ b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
@@ -209,6 +209,18 @@ class CogroupedMapInPandasTests(ReusedSQLTestCase):
).applyInPandas(lambda r, l: r + l, "column long, value long").first()
self.assertEquals(row.asDict(), Row(column=2, value=2).asDict())
+ def test_self_join(self):
+ # SPARK-34319: self-join with FlatMapCoGroupsInPandas
+ df = self.spark.createDataFrame([(1, 1)], ("column", "value"))
+
+ row = df.groupby("ColUmn").cogroup(
+ df.groupby("COLUMN")
+ ).applyInPandas(lambda r, l: r + l, "column long, value long")
+
+ row = row.join(row).first()
+
+ self.assertEqual(row.asDict(), Row(column=2, value=2).asDict())
+
@staticmethod
def _test_with_key(left, right, isLeft):
diff --git a/python/pyspark/sql/tests/test_pandas_map.py
b/python/pyspark/sql/tests/test_pandas_map.py
index f1956a2..35d6ce4 100644
--- a/python/pyspark/sql/tests/test_pandas_map.py
+++ b/python/pyspark/sql/tests/test_pandas_map.py
@@ -117,6 +117,14 @@ class MapInPandasTests(ReusedSQLTestCase):
expected = df.collect()
self.assertEquals(actual, expected)
+ def test_self_join(self):
+ # SPARK-34319: self-join with MapInPandas
+ df1 = self.spark.range(10)
+ df2 = df1.mapInPandas(lambda iter: iter, 'id long')
+ actual = df2.join(df2).collect()
+ expected = df1.join(df1).collect()
+ self.assertEqual(sorted(actual), sorted(expected))
+
if __name__ == "__main__":
from pyspark.sql.tests.test_pandas_map import *
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index fbe6041..afa99eb 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1198,6 +1198,14 @@ class Analyzer(
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty
=>
Seq((oldVersion, oldVersion.copy(output =
output.map(_.newInstance()))))
+ case oldVersion @ FlatMapCoGroupsInPandas(_, _, _, output, _, _)
+ if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty
=>
+ Seq((oldVersion, oldVersion.copy(output =
output.map(_.newInstance()))))
+
+ case oldVersion @ MapInPandas(_, output, _)
+ if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty
=>
+ Seq((oldVersion, oldVersion.copy(output =
output.map(_.newInstance()))))
+
case oldVersion: Generate
if
oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty =>
val newOutput = oldVersion.generatorOutput.map(_.newInstance())
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 63c2779..ca6cb37 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -610,6 +610,48 @@ class AnalysisSuite extends AnalysisTest with Matchers {
Project(Seq(UnresolvedAttribute("temp0.a"),
UnresolvedAttribute("temp1.a")), join))
}
+ test("SPARK-34319: analysis fails on self-join with
FlatMapCoGroupsInPandas") {
+ val pythonUdf = PythonUDF("pyUDF", null,
+ StructType(Seq(StructField("a", LongType))),
+ Seq.empty,
+ PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF,
+ true)
+ val output = pythonUdf.dataType.asInstanceOf[StructType].toAttributes
+ val project1 = Project(Seq(UnresolvedAttribute("a")), testRelation)
+ val project2 = Project(Seq(UnresolvedAttribute("a")), testRelation2)
+ val flatMapGroupsInPandas = FlatMapCoGroupsInPandas(
+ Seq(UnresolvedAttribute("a")),
+ Seq(UnresolvedAttribute("a")),
+ pythonUdf,
+ output,
+ project1,
+ project2)
+ val left = SubqueryAlias("temp0", flatMapGroupsInPandas)
+ val right = SubqueryAlias("temp1", flatMapGroupsInPandas)
+ val join = Join(left, right, Inner, None, JoinHint.NONE)
+ assertAnalysisSuccess(
+ Project(Seq(UnresolvedAttribute("temp0.a"),
UnresolvedAttribute("temp1.a")), join))
+ }
+
+ test("SPARK-34319: analysis fails on self-join with MapInPandas") {
+ val pythonUdf = PythonUDF("pyUDF", null,
+ StructType(Seq(StructField("a", LongType))),
+ Seq.empty,
+ PythonEvalType.SQL_MAP_PANDAS_ITER_UDF,
+ true)
+ val output = pythonUdf.dataType.asInstanceOf[StructType].toAttributes
+ val project = Project(Seq(UnresolvedAttribute("a")), testRelation)
+ val mapInPandas = MapInPandas(
+ pythonUdf,
+ output,
+ project)
+ val left = SubqueryAlias("temp0", mapInPandas)
+ val right = SubqueryAlias("temp1", mapInPandas)
+ val join = Join(left, right, Inner, None, JoinHint.NONE)
+ assertAnalysisSuccess(
+ Project(Seq(UnresolvedAttribute("temp0.a"),
UnresolvedAttribute("temp1.a")), join))
+ }
+
test("SPARK-24488 Generator with multiple aliases") {
assertAnalysisSuccess(
listRelation.select(Explode($"list").as("first_alias").as("second_alias")))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]