This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 10722044f42 [SPARK-40977][CONNECT][PYTHON] Complete Support for Union 
in Python client
10722044f42 is described below

commit 10722044f429b1a825018673ca139d698559f6fa
Author: Rui Wang <rui.w...@databricks.com>
AuthorDate: Thu Nov 3 13:53:23 2022 +0900

    [SPARK-40977][CONNECT][PYTHON] Complete Support for Union in Python client
    
    ### What changes were proposed in this pull request?
    
    1. Improve testing coverage for `Union` and `UnionAll` (they are actually 
both `UnionAll`)
    2. Add the API which does `UnionByName`.
    
    ### Why are the changes needed?
    
    Improve API Coverage.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    UT
    
    Closes #38453 from amaliujia/python_union.
    
    Authored-by: Rui Wang <rui.w...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/sql/connect/dataframe.py            | 27 ++++++++++++++++++++++
 python/pyspark/sql/connect/plan.py                 |  6 ++++-
 .../sql/tests/connect/test_connect_plan_only.py    | 10 ++++++++
 3 files changed, 42 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index b9ddb0db300..b9ba4b99ba0 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -293,6 +293,33 @@ class DataFrame(object):
             raise ValueError("Argument to Union does not contain a valid 
plan.")
         return DataFrame.withPlan(plan.UnionAll(self._plan, other._plan), 
session=self._session)
 
+    def unionByName(self, other: "DataFrame", allowMissingColumns: bool = 
False) -> "DataFrame":
+        """Returns a new :class:`DataFrame` containing union of rows in this 
and another
+        :class:`DataFrame`.
+
+        This is different from both `UNION ALL` and `UNION DISTINCT` in SQL. 
To do a SQL-style set
+        union (that does deduplication of elements), use this function 
followed by :func:`distinct`.
+
+        .. versionadded:: 3.4.0
+
+        Parameters
+        ----------
+        other : :class:`DataFrame`
+            Another :class:`DataFrame` that needs to be combined.
+        allowMissingColumns : bool, optional, default False
+           Specify whether to allow missing columns.
+
+        Returns
+        -------
+        :class:`DataFrame`
+            Combined DataFrame.
+        """
+        if other._plan is None:
+            raise ValueError("Argument to UnionByName does not contain a valid 
plan.")
+        return DataFrame.withPlan(
+            plan.UnionAll(self._plan, other._plan, allowMissingColumns), 
session=self._session
+        )
+
     def where(self, condition: Expression) -> "DataFrame":
         return self.filter(condition)
 
diff --git a/python/pyspark/sql/connect/plan.py 
b/python/pyspark/sql/connect/plan.py
index 2f1f70ec1a9..cc59a493d5a 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -606,9 +606,12 @@ class Join(LogicalPlan):
 
 
 class UnionAll(LogicalPlan):
-    def __init__(self, child: Optional["LogicalPlan"], other: "LogicalPlan") 
-> None:
+    def __init__(
+        self, child: Optional["LogicalPlan"], other: "LogicalPlan", by_name: 
bool = False
+    ) -> None:
         super().__init__(child)
         self.other = other
+        self.by_name = by_name
 
     def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation:
         assert self._child is not None
@@ -617,6 +620,7 @@ class UnionAll(LogicalPlan):
         rel.set_op.right_input.CopyFrom(self.other.plan(session))
         rel.set_op.set_op_type = proto.SetOperation.SET_OP_TYPE_UNION
         rel.set_op.is_all = True
+        rel.set_op.by_name = self.by_name
         return rel
 
     def print(self, indent: int = 0) -> str:
diff --git a/python/pyspark/sql/tests/connect/test_connect_plan_only.py 
b/python/pyspark/sql/tests/connect/test_connect_plan_only.py
index e40a54b7d0c..8a9b98e73fd 100644
--- a/python/pyspark/sql/tests/connect/test_connect_plan_only.py
+++ b/python/pyspark/sql/tests/connect/test_connect_plan_only.py
@@ -190,6 +190,16 @@ class SparkConnectTestsPlanOnly(PlanOnlyTestFixture):
         self.assertIsNotNone(plan.root, "Root relation must be set")
         self.assertIsNotNone(plan.root.read)
 
+    def test_union(self):
+        df1 = self.connect.readTable(table_name=self.tbl_name)
+        df2 = self.connect.readTable(table_name=self.tbl_name)
+        plan1 = df1.union(df2)._plan.to_proto(self.connect)
+        self.assertTrue(plan1.root.set_op.is_all)
+        plan2 = df1.union(df2)._plan.to_proto(self.connect)
+        self.assertTrue(plan2.root.set_op.is_all)
+        plan3 = df1.unionByName(df2, True)._plan.to_proto(self.connect)
+        self.assertTrue(plan3.root.set_op.by_name)
+
 
 if __name__ == "__main__":
     from pyspark.sql.tests.connect.test_connect_plan_only import *  # noqa: 
F401


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to