This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 33a3cda0b46 [SPARK-41832][CONNECT][PYTHON] Fix 
`DataFrame.unionByName`, add allow_missing_columns
33a3cda0b46 is described below

commit 33a3cda0b460029542e526ba72d3e2e529bd76f4
Author: Sandeep Singh <[email protected]>
AuthorDate: Sat Jan 14 17:16:24 2023 +0800

    [SPARK-41832][CONNECT][PYTHON] Fix `DataFrame.unionByName`, add 
allow_missing_columns
    
    ### What changes were proposed in this pull request?
    Fix Implementation of `DataFrame.unionByName`, it wasn't assigning 
`by_name` correctly and `allow_missing_columns` wasn't implemented
    
    ### Why are the changes needed?
    Fixing the implementation and adding allow_missing_columns params
    
    ### Does this PR introduce _any_ user-facing change?
    Yes
    
    ### How was this patch tested?
    Enabling Doctests and added unit tests
    
    Closes #39451 from techaddict/SPARK-41832.
    
    Authored-by: Sandeep Singh <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 .../main/protobuf/spark/connect/relations.proto    |   5 +
 .../org/apache/spark/sql/connect/dsl/package.scala |  13 +-
 .../sql/connect/planner/SparkConnectPlanner.scala  |   7 +-
 .../connect/planner/SparkConnectPlannerSuite.scala |  19 +++
 python/pyspark/sql/connect/dataframe.py            |   9 +-
 python/pyspark/sql/connect/plan.py                 |   3 +
 python/pyspark/sql/connect/proto/relations_pb2.py  | 152 ++++++++++-----------
 python/pyspark/sql/connect/proto/relations_pb2.pyi |  20 +++
 .../sql/tests/connect/test_connect_basic.py        |  22 +++
 9 files changed, 166 insertions(+), 84 deletions(-)

diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
index e283c522aa7..9e9233cab59 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
@@ -216,6 +216,11 @@ message SetOperation {
   // Only UNION supports this option.
   optional bool by_name = 5;
 
+  // (Optional) If to perform the Set operation and allow missing columns.
+  //
+  // Only UNION supports this option.
+  optional bool allow_missing_columns = 6;
+
   enum SetOpType {
     SET_OP_TYPE_UNSPECIFIED = 0;
     SET_OP_TYPE_INTERSECT = 1;
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
index e6560ab14dc..d720900300e 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -752,7 +752,11 @@ package object dsl {
             createSetOperation(logicalPlan, otherPlan, 
SetOpType.SET_OP_TYPE_INTERSECT, isAll))
           .build()
 
-      def union(otherPlan: Relation, isAll: Boolean = true, byName: Boolean = 
false): Relation =
+      def union(
+          otherPlan: Relation,
+          isAll: Boolean = true,
+          byName: Boolean = false,
+          allowMissingColumns: Boolean = false): Relation =
         Relation
           .newBuilder()
           .setSetOp(
@@ -761,7 +765,8 @@ package object dsl {
               otherPlan,
               SetOpType.SET_OP_TYPE_UNION,
               isAll,
-              byName))
+              byName,
+              allowMissingColumns))
           .build()
 
       def coalesce(num: Integer): Relation =
@@ -1020,7 +1025,8 @@ package object dsl {
           right: Relation,
           t: SetOpType,
           isAll: Boolean = true,
-          byName: Boolean = false): SetOperation.Builder = {
+          byName: Boolean = false,
+          allowMissingColumns: Boolean = false): SetOperation.Builder = {
         val setOp = SetOperation
           .newBuilder()
           .setLeftInput(left)
@@ -1028,6 +1034,7 @@ package object dsl {
           .setSetOpType(t)
           .setIsAll(isAll)
           .setByName(byName)
+          .setAllowMissingColumns(allowMissingColumns)
         setOp
       }
     }
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index a888c55769b..c4480233209 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -1146,10 +1146,15 @@ class SparkConnectPlanner(session: SparkSession) {
           transformRelation(u.getRightInput),
           u.getIsAll)
       case proto.SetOperation.SetOpType.SET_OP_TYPE_UNION =>
+        if (!u.getByName && u.getAllowMissingColumns) {
+          throw InvalidPlanInput(
+            "UnionByName `allowMissingCol` can be true only if `byName` is 
true.")
+        }
         val combinedUnion = CombineUnions(
           Union(
             Seq(transformRelation(u.getLeftInput), 
transformRelation(u.getRightInput)),
-            byName = u.getByName))
+            byName = u.getByName,
+            allowMissingCol = u.getAllowMissingColumns))
         if (u.getIsAll) {
           combinedUnion
         } else {
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
index b6605d67736..63e5415b44f 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
@@ -214,6 +214,25 @@ class SparkConnectPlannerSuite extends SparkFunSuite with 
SparkConnectPlanTest {
     assert(res.nodeName == "Union")
   }
 
+  test("Union By Name") {
+    val union = proto.Relation.newBuilder
+      .setSetOp(
+        proto.SetOperation.newBuilder
+          .setLeftInput(readRel)
+          .setRightInput(readRel)
+          .setSetOpType(proto.SetOperation.SetOpType.SET_OP_TYPE_UNION)
+          .setByName(false)
+          .setAllowMissingColumns(true)
+          .build())
+      .build()
+    val msg = intercept[InvalidPlanInput] {
+      transform(union)
+    }
+    assert(
+      msg.getMessage.contains(
+        "UnionByName `allowMissingCol` can be true only if `byName` is true."))
+  }
+
   test("Simple Join") {
     val incompleteJoin =
       
proto.Relation.newBuilder.setJoin(proto.Join.newBuilder.setLeft(readRel)).build()
diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 6f19e9a4d60..0c0e217f025 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -771,7 +771,11 @@ class DataFrame:
             raise ValueError("Argument to UnionByName does not contain a valid 
plan.")
         return DataFrame.withPlan(
             plan.SetOperation(
-                self._plan, other._plan, "union", is_all=True, 
by_name=allowMissingColumns
+                self._plan,
+                other._plan,
+                "union",
+                by_name=True,
+                allow_missing_columns=allowMissingColumns,
             ),
             session=self._session,
         )
@@ -1673,9 +1677,6 @@ def _test() -> None:
         # TODO(SPARK-41625): Support Structured Streaming
         del pyspark.sql.connect.dataframe.DataFrame.isStreaming.__doc__
 
-        # TODO(SPARK-41832): fix unionByName
-        del pyspark.sql.connect.dataframe.DataFrame.unionByName.__doc__
-
         # TODO(SPARK-41818): Support saveAsTable
         del pyspark.sql.connect.dataframe.DataFrame.write.__doc__
 
diff --git a/python/pyspark/sql/connect/plan.py 
b/python/pyspark/sql/connect/plan.py
index 4a2e76a8954..307093c889c 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -787,12 +787,14 @@ class SetOperation(LogicalPlan):
         set_op: str,
         is_all: bool = True,
         by_name: bool = False,
+        allow_missing_columns: bool = False,
     ) -> None:
         super().__init__(child)
         self.other = other
         self.by_name = by_name
         self.is_all = is_all
         self.set_op = set_op
+        self.allow_missing_columns = allow_missing_columns
 
     def plan(self, session: "SparkConnectClient") -> proto.Relation:
         assert self._child is not None
@@ -817,6 +819,7 @@ class SetOperation(LogicalPlan):
 
         rel.set_op.is_all = self.is_all
         rel.set_op.by_name = self.by_name
+        rel.set_op.allow_missing_columns = self.allow_missing_columns
         return rel
 
     def print(self, indent: int = 0) -> str:
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.py 
b/python/pyspark/sql/connect/proto/relations_pb2.py
index 6180adc894d..a620f3f754c 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.py
+++ b/python/pyspark/sql/connect/proto/relations_pb2.py
@@ -36,7 +36,7 @@ from pyspark.sql.connect.proto import catalog_pb2 as 
spark_dot_connect_dot_catal
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\xed\x12\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
 
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
 
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 
\x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66il [...]
+    
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\xed\x12\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
 
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
 
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 
\x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66il [...]
 )
 
 
@@ -635,79 +635,79 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     _JOIN_JOINTYPE._serialized_start = 3619
     _JOIN_JOINTYPE._serialized_end = 3827
     _SETOPERATION._serialized_start = 3830
-    _SETOPERATION._serialized_end = 4226
-    _SETOPERATION_SETOPTYPE._serialized_start = 4089
-    _SETOPERATION_SETOPTYPE._serialized_end = 4203
-    _LIMIT._serialized_start = 4228
-    _LIMIT._serialized_end = 4304
-    _OFFSET._serialized_start = 4306
-    _OFFSET._serialized_end = 4385
-    _TAIL._serialized_start = 4387
-    _TAIL._serialized_end = 4462
-    _AGGREGATE._serialized_start = 4465
-    _AGGREGATE._serialized_end = 5047
-    _AGGREGATE_PIVOT._serialized_start = 4804
-    _AGGREGATE_PIVOT._serialized_end = 4915
-    _AGGREGATE_GROUPTYPE._serialized_start = 4918
-    _AGGREGATE_GROUPTYPE._serialized_end = 5047
-    _SORT._serialized_start = 5050
-    _SORT._serialized_end = 5210
-    _DROP._serialized_start = 5212
-    _DROP._serialized_end = 5312
-    _DEDUPLICATE._serialized_start = 5315
-    _DEDUPLICATE._serialized_end = 5486
-    _LOCALRELATION._serialized_start = 5488
-    _LOCALRELATION._serialized_end = 5577
-    _SAMPLE._serialized_start = 5580
-    _SAMPLE._serialized_end = 5853
-    _RANGE._serialized_start = 5856
-    _RANGE._serialized_end = 6001
-    _SUBQUERYALIAS._serialized_start = 6003
-    _SUBQUERYALIAS._serialized_end = 6117
-    _REPARTITION._serialized_start = 6120
-    _REPARTITION._serialized_end = 6262
-    _SHOWSTRING._serialized_start = 6265
-    _SHOWSTRING._serialized_end = 6407
-    _STATSUMMARY._serialized_start = 6409
-    _STATSUMMARY._serialized_end = 6501
-    _STATDESCRIBE._serialized_start = 6503
-    _STATDESCRIBE._serialized_end = 6584
-    _STATCROSSTAB._serialized_start = 6586
-    _STATCROSSTAB._serialized_end = 6687
-    _STATCOV._serialized_start = 6689
-    _STATCOV._serialized_end = 6785
-    _STATCORR._serialized_start = 6788
-    _STATCORR._serialized_end = 6925
-    _STATAPPROXQUANTILE._serialized_start = 6928
-    _STATAPPROXQUANTILE._serialized_end = 7092
-    _STATFREQITEMS._serialized_start = 7094
-    _STATFREQITEMS._serialized_end = 7219
-    _STATSAMPLEBY._serialized_start = 7222
-    _STATSAMPLEBY._serialized_end = 7531
-    _STATSAMPLEBY_FRACTION._serialized_start = 7423
-    _STATSAMPLEBY_FRACTION._serialized_end = 7522
-    _NAFILL._serialized_start = 7534
-    _NAFILL._serialized_end = 7668
-    _NADROP._serialized_start = 7671
-    _NADROP._serialized_end = 7805
-    _NAREPLACE._serialized_start = 7808
-    _NAREPLACE._serialized_end = 8104
-    _NAREPLACE_REPLACEMENT._serialized_start = 7963
-    _NAREPLACE_REPLACEMENT._serialized_end = 8104
-    _RENAMECOLUMNSBYSAMELENGTHNAMES._serialized_start = 8106
-    _RENAMECOLUMNSBYSAMELENGTHNAMES._serialized_end = 8220
-    _RENAMECOLUMNSBYNAMETONAMEMAP._serialized_start = 8223
-    _RENAMECOLUMNSBYNAMETONAMEMAP._serialized_end = 8482
-    _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._serialized_start = 
8415
-    _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._serialized_end = 8482
-    _WITHCOLUMNS._serialized_start = 8484
-    _WITHCOLUMNS._serialized_end = 8603
-    _HINT._serialized_start = 8606
-    _HINT._serialized_end = 8738
-    _UNPIVOT._serialized_start = 8741
-    _UNPIVOT._serialized_end = 8987
-    _TOSCHEMA._serialized_start = 8989
-    _TOSCHEMA._serialized_end = 9095
-    _REPARTITIONBYEXPRESSION._serialized_start = 9098
-    _REPARTITIONBYEXPRESSION._serialized_end = 9301
+    _SETOPERATION._serialized_end = 4309
+    _SETOPERATION_SETOPTYPE._serialized_start = 4146
+    _SETOPERATION_SETOPTYPE._serialized_end = 4260
+    _LIMIT._serialized_start = 4311
+    _LIMIT._serialized_end = 4387
+    _OFFSET._serialized_start = 4389
+    _OFFSET._serialized_end = 4468
+    _TAIL._serialized_start = 4470
+    _TAIL._serialized_end = 4545
+    _AGGREGATE._serialized_start = 4548
+    _AGGREGATE._serialized_end = 5130
+    _AGGREGATE_PIVOT._serialized_start = 4887
+    _AGGREGATE_PIVOT._serialized_end = 4998
+    _AGGREGATE_GROUPTYPE._serialized_start = 5001
+    _AGGREGATE_GROUPTYPE._serialized_end = 5130
+    _SORT._serialized_start = 5133
+    _SORT._serialized_end = 5293
+    _DROP._serialized_start = 5295
+    _DROP._serialized_end = 5395
+    _DEDUPLICATE._serialized_start = 5398
+    _DEDUPLICATE._serialized_end = 5569
+    _LOCALRELATION._serialized_start = 5571
+    _LOCALRELATION._serialized_end = 5660
+    _SAMPLE._serialized_start = 5663
+    _SAMPLE._serialized_end = 5936
+    _RANGE._serialized_start = 5939
+    _RANGE._serialized_end = 6084
+    _SUBQUERYALIAS._serialized_start = 6086
+    _SUBQUERYALIAS._serialized_end = 6200
+    _REPARTITION._serialized_start = 6203
+    _REPARTITION._serialized_end = 6345
+    _SHOWSTRING._serialized_start = 6348
+    _SHOWSTRING._serialized_end = 6490
+    _STATSUMMARY._serialized_start = 6492
+    _STATSUMMARY._serialized_end = 6584
+    _STATDESCRIBE._serialized_start = 6586
+    _STATDESCRIBE._serialized_end = 6667
+    _STATCROSSTAB._serialized_start = 6669
+    _STATCROSSTAB._serialized_end = 6770
+    _STATCOV._serialized_start = 6772
+    _STATCOV._serialized_end = 6868
+    _STATCORR._serialized_start = 6871
+    _STATCORR._serialized_end = 7008
+    _STATAPPROXQUANTILE._serialized_start = 7011
+    _STATAPPROXQUANTILE._serialized_end = 7175
+    _STATFREQITEMS._serialized_start = 7177
+    _STATFREQITEMS._serialized_end = 7302
+    _STATSAMPLEBY._serialized_start = 7305
+    _STATSAMPLEBY._serialized_end = 7614
+    _STATSAMPLEBY_FRACTION._serialized_start = 7506
+    _STATSAMPLEBY_FRACTION._serialized_end = 7605
+    _NAFILL._serialized_start = 7617
+    _NAFILL._serialized_end = 7751
+    _NADROP._serialized_start = 7754
+    _NADROP._serialized_end = 7888
+    _NAREPLACE._serialized_start = 7891
+    _NAREPLACE._serialized_end = 8187
+    _NAREPLACE_REPLACEMENT._serialized_start = 8046
+    _NAREPLACE_REPLACEMENT._serialized_end = 8187
+    _RENAMECOLUMNSBYSAMELENGTHNAMES._serialized_start = 8189
+    _RENAMECOLUMNSBYSAMELENGTHNAMES._serialized_end = 8303
+    _RENAMECOLUMNSBYNAMETONAMEMAP._serialized_start = 8306
+    _RENAMECOLUMNSBYNAMETONAMEMAP._serialized_end = 8565
+    _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._serialized_start = 
8498
+    _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._serialized_end = 8565
+    _WITHCOLUMNS._serialized_start = 8567
+    _WITHCOLUMNS._serialized_end = 8686
+    _HINT._serialized_start = 8689
+    _HINT._serialized_end = 8821
+    _UNPIVOT._serialized_start = 8824
+    _UNPIVOT._serialized_end = 9070
+    _TOSCHEMA._serialized_start = 9072
+    _TOSCHEMA._serialized_end = 9178
+    _REPARTITIONBYEXPRESSION._serialized_start = 9181
+    _REPARTITIONBYEXPRESSION._serialized_end = 9384
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.pyi 
b/python/pyspark/sql/connect/proto/relations_pb2.pyi
index b80a8a5b1bb..395e4d4bfb7 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/relations_pb2.pyi
@@ -836,6 +836,7 @@ class SetOperation(google.protobuf.message.Message):
     SET_OP_TYPE_FIELD_NUMBER: builtins.int
     IS_ALL_FIELD_NUMBER: builtins.int
     BY_NAME_FIELD_NUMBER: builtins.int
+    ALLOW_MISSING_COLUMNS_FIELD_NUMBER: builtins.int
     @property
     def left_input(self) -> global___Relation:
         """(Required) Left input relation for a Set operation."""
@@ -853,6 +854,11 @@ class SetOperation(google.protobuf.message.Message):
     by_name: builtins.bool
     """(Optional) If to perform the Set operation based on name resolution.
 
+    Only UNION supports this option.
+    """
+    allow_missing_columns: builtins.bool
+    """(Optional) If to perform the Set operation and allow missing columns.
+
     Only UNION supports this option.
     """
     def __init__(
@@ -863,14 +869,19 @@ class SetOperation(google.protobuf.message.Message):
         set_op_type: global___SetOperation.SetOpType.ValueType = ...,
         is_all: builtins.bool | None = ...,
         by_name: builtins.bool | None = ...,
+        allow_missing_columns: builtins.bool | None = ...,
     ) -> None: ...
     def HasField(
         self,
         field_name: typing_extensions.Literal[
+            "_allow_missing_columns",
+            b"_allow_missing_columns",
             "_by_name",
             b"_by_name",
             "_is_all",
             b"_is_all",
+            "allow_missing_columns",
+            b"allow_missing_columns",
             "by_name",
             b"by_name",
             "is_all",
@@ -884,10 +895,14 @@ class SetOperation(google.protobuf.message.Message):
     def ClearField(
         self,
         field_name: typing_extensions.Literal[
+            "_allow_missing_columns",
+            b"_allow_missing_columns",
             "_by_name",
             b"_by_name",
             "_is_all",
             b"_is_all",
+            "allow_missing_columns",
+            b"allow_missing_columns",
             "by_name",
             b"by_name",
             "is_all",
@@ -901,6 +916,11 @@ class SetOperation(google.protobuf.message.Message):
         ],
     ) -> None: ...
     @typing.overload
+    def WhichOneof(
+        self,
+        oneof_group: typing_extensions.Literal["_allow_missing_columns", 
b"_allow_missing_columns"],
+    ) -> typing_extensions.Literal["allow_missing_columns"] | None: ...
+    @typing.overload
     def WhichOneof(
         self, oneof_group: typing_extensions.Literal["_by_name", b"_by_name"]
     ) -> typing_extensions.Literal["by_name"] | None: ...
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py 
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 2ea986f3540..1e0288786e6 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -1421,6 +1421,28 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
             .toPandas(),
         )
 
+    def test_union_by_name(self):
+        # SPARK-41832: Test unionByName
+        data1 = [(1, 2, 3)]
+        data2 = [(6, 2, 5)]
+        df1_connect = self.connect.createDataFrame(data1, ["a", "b", "c"])
+        df2_connect = self.connect.createDataFrame(data2, ["a", "b", "c"])
+        union_df_connect = df1_connect.unionByName(df2_connect)
+
+        df1_spark = self.spark.createDataFrame(data1, ["a", "b", "c"])
+        df2_spark = self.spark.createDataFrame(data2, ["a", "b", "c"])
+        union_df_spark = df1_spark.unionByName(df2_spark)
+
+        self.assert_eq(union_df_connect.toPandas(), union_df_spark.toPandas())
+
+        df2_connect = self.connect.createDataFrame(data2, ["a", "B", "C"])
+        union_df_connect = df1_connect.unionByName(df2_connect, 
allowMissingColumns=True)
+
+        df2_spark = self.spark.createDataFrame(data2, ["a", "B", "C"])
+        union_df_spark = df1_spark.unionByName(df2_spark, 
allowMissingColumns=True)
+
+        self.assert_eq(union_df_connect.toPandas(), union_df_spark.toPandas())
+
     def test_random_split(self):
         # SPARK-41440: test randomSplit(weights, seed).
         relations = (


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to