This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 17c206fb71d [SPARK-44602][SQL][CONNECT][PS] Make `WidenSetOperationTypes` retain the `Plan_ID_TAG` 17c206fb71d is described below commit 17c206fb71d03aefa75ecb87ca82772980dab954 Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Mon Jul 31 19:37:13 2023 +0900 [SPARK-44602][SQL][CONNECT][PS] Make `WidenSetOperationTypes` retain the `Plan_ID_TAG` ### What changes were proposed in this pull request? Make Analyzer rule `WidenSetOperationTypes` retains the `Plan_ID_TAG` ### Why are the changes needed? for functionality parity ### Does this PR introduce _any_ user-facing change? yes, APIs enabled: 1. `frame.append` 2. `series.argsort` 3. `ps.concat` ### How was this patch tested? enabled UTs Closes #42230 from zhengruifeng/ps_connect_analyze_union. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../pandas/tests/connect/computation/test_parity_combine.py | 11 +---------- .../pandas/tests/connect/indexes/test_parity_base.py | 6 ------ .../pandas/tests/connect/series/test_parity_arg_ops.py | 6 +----- .../pyspark/pandas/tests/connect/test_parity_namespace.py | 12 ------------ .../apache/spark/sql/catalyst/analysis/TypeCoercion.scala | 13 +++++++++---- 5 files changed, 11 insertions(+), 37 deletions(-) diff --git a/python/pyspark/pandas/tests/connect/computation/test_parity_combine.py b/python/pyspark/pandas/tests/connect/computation/test_parity_combine.py index 175404ff750..af23600055e 100644 --- a/python/pyspark/pandas/tests/connect/computation/test_parity_combine.py +++ b/python/pyspark/pandas/tests/connect/computation/test_parity_combine.py @@ -16,22 +16,13 @@ # import unittest -from pyspark import pandas as ps from pyspark.pandas.tests.computation.test_combine import FrameCombineMixin from pyspark.testing.connectutils import ReusedConnectTestCase from pyspark.testing.pandasutils import PandasOnSparkTestUtils class FrameParityCombineTests(FrameCombineMixin, PandasOnSparkTestUtils, ReusedConnectTestCase): - @property - def psdf(self): - return ps.from_pandas(self.pdf) - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_append(self): - super().test_append() + pass if __name__ == "__main__": diff --git a/python/pyspark/pandas/tests/connect/indexes/test_parity_base.py b/python/pyspark/pandas/tests/connect/indexes/test_parity_base.py index d5dec01bde5..3cf4dc9b3d2 100644 --- a/python/pyspark/pandas/tests/connect/indexes/test_parity_base.py +++ b/python/pyspark/pandas/tests/connect/indexes/test_parity_base.py @@ -29,12 +29,6 @@ class IndexesParityTests( def psdf(self): return ps.from_pandas(self.pdf) - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_append(self): - super().test_append() - @unittest.skip("TODO(SPARK-43620): Support `Column` for SparkConnectColumn.__getitem__.") def test_factorize(self): super().test_factorize() diff --git a/python/pyspark/pandas/tests/connect/series/test_parity_arg_ops.py b/python/pyspark/pandas/tests/connect/series/test_parity_arg_ops.py index b3df55cb68e..bd17521dd84 100644 --- a/python/pyspark/pandas/tests/connect/series/test_parity_arg_ops.py +++ b/python/pyspark/pandas/tests/connect/series/test_parity_arg_ops.py @@ -22,11 +22,7 @@ from pyspark.testing.pandasutils import PandasOnSparkTestUtils class SeriesParityArgOpsTests(SeriesArgOpsMixin, PandasOnSparkTestUtils, ReusedConnectTestCase): - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_argsort(self): - super().test_argsort() + pass if __name__ == "__main__": diff --git a/python/pyspark/pandas/tests/connect/test_parity_namespace.py b/python/pyspark/pandas/tests/connect/test_parity_namespace.py index 72f638ca23c..db7f62fdbd5 100644 --- a/python/pyspark/pandas/tests/connect/test_parity_namespace.py +++ b/python/pyspark/pandas/tests/connect/test_parity_namespace.py @@ -22,18 +22,6 @@ from pyspark.testing.pandasutils import PandasOnSparkTestUtils class NamespaceParityTests(NamespaceTestsMixin, PandasOnSparkTestUtils, ReusedConnectTestCase): - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_concat_index_axis(self): - super().test_concat_index_axis() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_concat_multiindex_sort(self): - super().test_concat_multiindex_sort() - @unittest.skip("TODO(SPARK-43655): Enable NamespaceParityTests.test_get_index_map.") def test_get_index_map(self): super().test_get_index_map() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index bf9e461744e..190e72a8e66 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -265,8 +265,10 @@ abstract class TypeCoercionBase { s -> Nil } else { assert(newChildren.length == 2) + val newExcept = Except(newChildren.head, newChildren.last, isAll) + newExcept.copyTagsFrom(s) val attrMapping = left.output.zip(newChildren.head.output) - Except(newChildren.head, newChildren.last, isAll) -> attrMapping + newExcept -> attrMapping } case s @ Intersect(left, right, isAll) if s.childrenResolved && @@ -276,19 +278,22 @@ abstract class TypeCoercionBase { s -> Nil } else { assert(newChildren.length == 2) + val newIntersect = Intersect(newChildren.head, newChildren.last, isAll) + newIntersect.copyTagsFrom(s) val attrMapping = left.output.zip(newChildren.head.output) - Intersect(newChildren.head, newChildren.last, isAll) -> attrMapping + newIntersect -> attrMapping } case s: Union if s.childrenResolved && !s.byName && s.children.forall(_.output.length == s.children.head.output.length) && !s.resolved => val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(s.children) - if (newChildren.isEmpty) { s -> Nil } else { val attrMapping = s.children.head.output.zip(newChildren.head.output) - s.copy(children = newChildren) -> attrMapping + val newUnion = s.copy(children = newChildren) + newUnion.copyTagsFrom(s) + newUnion -> attrMapping } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org