This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 177aee0707c2 [SPARK-55886][SQL][PYTHON] Add DataFrame.zip for merging
column-projected DataFrames
177aee0707c2 is described below
commit 177aee0707c210beefe067e6b0348a388c69cae7
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Sun May 31 20:43:52 2026 +0800
[SPARK-55886][SQL][PYTHON] Add DataFrame.zip for merging column-projected
DataFrames
### What changes were proposed in this pull request?
Add a new `DataFrame.zip(other)` API that combines the columns of two
DataFrames side-by-side without a join key by reusing the shared base plan
rather than emitting a relational join.
- **Logical plan**: introduces `Zip(left, right)`, always unresolved, with
a `ZIP` tree pattern.
- **Analyzer**: a new `ResolveZip` rule walks both `Project` chains down to
their bases and -- when the two bases produce the same canonicalized result --
rewrites the `Zip` into a chain of `Project` nodes over that shared base. Each
user-written alias is kept in its own `Alias`, grouped into one `Project` layer
per dependency depth, so `CollapseProject`'s existing safety guards
(`canCollapseExpressions`) still apply and nondeterministic/expensive producers
are never duplicated. Alias [...]
- **CheckAnalysis**: any `Zip` that survives `ResolveZip` (the two sides do
not share a base, or a side contains a non-scalar Python UDF) raises a new
`ZIP_PLANS_NOT_MERGEABLE` error (sqlState `42K03`).
- **Scala API**: `Dataset.zip` declared in `sql/api` and implemented in the
classic `Dataset`; Spark Connect throws `UnsupportedOperationException`
(planned for a follow-up).
- **PySpark**: abstract `DataFrame.zip` on the parent class, classic
implementation delegating to the JVM via `_jdf.zip`, Connect raises
`PySparkNotImplementedError`. New entry in the API reference index.
### Why are the changes needed?
`RDD.zip` is a natural way to project two views of the same data and
recombine them row-for-row. There has been no DataFrame equivalent: users
porting that pattern have to fall back to a join on a synthetic row id, or
recompute the source and select both column sets, which adds unnecessary work
to the plan (a shuffle/join, or duplicated source evaluation) when the two
sides are known to be row-aligned by construction.
`DataFrame.zip` lifts the RDD pattern to the DataFrame API. Because the
analyzer rewrites the operator into a `Project` chain over the shared base
(collapsed by the optimizer), the operation is free at runtime: no join, no
extra scan, no shuffle.
Side-by-side:
```python
# RDD: rdd.zip lines up two row-aligned projections of the same source.
square = lambda x: x * x
is_even = lambda x: x % 2 == 0
rdd = sc.range(10)
rdd1 = rdd.map(square)
rdd2 = rdd.map(is_even)
zipped_rdd = rdd1.zip(rdd2).collect()
# [(0, True), (1, False), (4, True), (9, False), (16, True),
# (25, False), (36, True), (49, False), (64, True), (81, False)]
# DataFrame: the same pattern, now expressible directly.
square_udf = sf.udf(square, LongType())
is_even_udf = sf.udf(is_even, BooleanType())
df = spark.range(10)
df1 = df.select(square_udf("id").alias("square"))
df2 = df.select(is_even_udf("id").alias("is_even"))
df1.zip(df2).show()
# +------+-------+
# |square|is_even|
# +------+-------+
# | 0| true|
# | 1| false|
# | 4| true|
# | 9| false|
# | 16| true|
# | 25| false|
# | 36| true|
# | 49| false|
# | 64| true|
# | 81| false|
# +------+-------+
```
Additional patterns this enables:
- Computing a derived column on one branch and aligning it with a derived
column from the same source.
- Splitting a single transformation into independently named sub-DataFrames
and recombining them.
### Does this PR introduce _any_ user-facing change?
Yes. New public API:
- Scala: `Dataset.zip(other: Dataset[_]): DataFrame`, `since 4.3.0`.
- PySpark: `DataFrame.zip(other)`, `versionadded:: 4.3.0`.
### How was this patch tested?
- New `ResolveZipSuite` (catalyst) covering the analyzer rewrite: matching
bases, mismatched bases, expression projections, partial `Project` on one side,
unresolved children, longer chains of `Project`s, alias composition through
chains, stacked `withColumn`-style projections, different-instance bases with
the same canonicalized plan (exercises the positional attribute remap), and
both `ZIP_PLANS_NOT_MERGEABLE` triggers (mismatched bases and non-scalar Python
UDF).
- New `DataFrameZipSuite` (sql/core) covering end-to-end results, the
resolved-plan shape, `withColumn`/`withColumnRenamed` on both sides, longer
chains, parent-with-chained-child, the two error cases (unrelated DataFrames,
`spark.range` sources), and two guards that a shared producer is evaluated only
once -- a single-side `rand()` referenced twice, and a `rand()` in a parent
shared by both sides (asserting exactly one `Rand` in the optimized plan).
- New `python/pyspark/sql/tests/test_zip.py` mirroring the Scala suite plus
scalar Python UDF and pandas UDF cases, and `test_parity_zip.py` asserting
Spark Connect raises `NOT_IMPLEMENTED`.
- MIMA exclusion added for the new abstract method on `Dataset`.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code
Closes #54976 from zhengruifeng/df-zip.
Lead-authored-by: Ruifeng Zheng <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 6 +
dev/sparktestsupport/modules.py | 2 +
project/MimaExcludes.scala | 2 +
.../source/reference/pyspark.sql/dataframe.rst | 1 +
python/pyspark/sql/classic/dataframe.py | 3 +
python/pyspark/sql/connect/dataframe.py | 10 +
python/pyspark/sql/dataframe.py | 43 ++++
.../pyspark/sql/tests/connect/test_parity_zip.py | 37 ++++
python/pyspark/sql/tests/test_zip.py | 185 +++++++++++++++++
.../main/scala/org/apache/spark/sql/Dataset.scala | 22 ++
.../spark/sql/catalyst/analysis/Analyzer.scala | 1 +
.../sql/catalyst/analysis/CheckAnalysis.scala | 9 +
.../spark/sql/catalyst/analysis/ResolveZip.scala | 199 ++++++++++++++++++
.../plans/logical/basicLogicalOperators.scala | 24 +++
.../sql/catalyst/rules/RuleIdCollection.scala | 1 +
.../spark/sql/catalyst/trees/TreePatterns.scala | 1 +
.../sql/catalyst/analysis/ResolveZipSuite.scala | 211 +++++++++++++++++++
.../org/apache/spark/sql/connect/Dataset.scala | 5 +
.../org/apache/spark/sql/classic/Dataset.scala | 5 +
.../org/apache/spark/sql/DataFrameZipSuite.scala | 229 +++++++++++++++++++++
20 files changed, 996 insertions(+)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 8d0a7d20eb14..91976d21934d 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -8970,6 +8970,12 @@
],
"sqlState" : "42KDF"
},
+ "ZIP_PLANS_NOT_MERGEABLE" : {
+ "message" : [
+ "The two DataFrames in zip() cannot be merged. They must produce the
same canonicalized plan after stripping outer Project chains, and must not
contain non-scalar Python UDFs."
+ ],
+ "sqlState" : "42K03"
+ },
"_LEGACY_ERROR_TEMP_0001" : {
"message" : [
"Invalid InsertIntoContext."
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 286a0c35b27e..7a4e50ff4141 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -614,6 +614,7 @@ pyspark_sql = Module(
"pyspark.sql.tests.test_serde",
"pyspark.sql.tests.test_session",
"pyspark.sql.tests.test_nearest_by_join",
+ "pyspark.sql.tests.test_zip",
"pyspark.sql.tests.test_subquery",
"pyspark.sql.tests.test_types",
"pyspark.sql.tests.test_geographytype",
@@ -1178,6 +1179,7 @@ pyspark_connect = Module(
"pyspark.sql.tests.connect.test_parity_repartition",
"pyspark.sql.tests.connect.test_parity_stat",
"pyspark.sql.tests.connect.test_parity_nearest_by_join",
+ "pyspark.sql.tests.connect.test_parity_zip",
"pyspark.sql.tests.connect.test_parity_subquery",
"pyspark.sql.tests.connect.test_parity_types",
"pyspark.sql.tests.connect.test_parity_column",
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index b004e62ae739..422db17e1036 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -50,6 +50,8 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ui.ProxyRedirectHandler#ResponseWrapper.this"),
// [SPARK-55228][SQL] Implement Dataset.zipWithIndex in Scala API
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.Dataset.zipWithIndex"),
+ // [SPARK-55886][SQL] Add DataFrame.zip
+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.Dataset.zip"),
// [SPARK-55949][SQL] Add DataFrame API for CDC queries
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.changes"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.DataStreamReader.changes"),
diff --git a/python/docs/source/reference/pyspark.sql/dataframe.rst
b/python/docs/source/reference/pyspark.sql/dataframe.rst
index e61100435664..22e88bb67d5e 100644
--- a/python/docs/source/reference/pyspark.sql/dataframe.rst
+++ b/python/docs/source/reference/pyspark.sql/dataframe.rst
@@ -141,6 +141,7 @@ DataFrame
DataFrame.writeTo
DataFrame.mergeInto
DataFrame.pandas_api
+ DataFrame.zip
DataFrame.zipWithIndex
DataFrameNaFunctions.drop
DataFrameNaFunctions.fill
diff --git a/python/pyspark/sql/classic/dataframe.py
b/python/pyspark/sql/classic/dataframe.py
index e1c3380416e4..a4288270a5ad 100644
--- a/python/pyspark/sql/classic/dataframe.py
+++ b/python/pyspark/sql/classic/dataframe.py
@@ -776,6 +776,9 @@ class DataFrame(ParentDataFrame, PandasMapOpsMixin,
PandasConversionMixin):
jdf = self._jdf.crossJoin(other._jdf)
return DataFrame(jdf, self.sparkSession)
+ def zip(self, other: ParentDataFrame) -> ParentDataFrame:
+ return DataFrame(self._jdf.zip(other._jdf), self.sparkSession)
+
def join(
self,
other: ParentDataFrame,
diff --git a/python/pyspark/sql/connect/dataframe.py
b/python/pyspark/sql/connect/dataframe.py
index b0a9692f289a..0c8e8d9fe9ba 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -379,6 +379,12 @@ class DataFrame(ParentDataFrame):
session=self._session,
)
+ def zip(self, other: ParentDataFrame) -> ParentDataFrame:
+ raise PySparkNotImplementedError(
+ errorClass="NOT_IMPLEMENTED",
+ messageParameters={"feature": "zip"},
+ )
+
def _check_same_session(self, other: ParentDataFrame) -> "DataFrame":
if (
not isinstance(other, DataFrame)
@@ -2509,6 +2515,10 @@ def _test() -> None:
globs = pyspark.sql.dataframe.__dict__.copy()
+ # `zip` is not yet supported on Spark Connect; the parent docstring's
+ # example would call into the connect impl and fail with NOT_IMPLEMENTED.
+ del pyspark.sql.dataframe.DataFrame.zip.__doc__
+
if not is_remote_only():
del pyspark.sql.dataframe.DataFrame.toJSON.__doc__
del pyspark.sql.dataframe.DataFrame.rdd.__doc__
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index f96b614624b7..6c4d32ea1797 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -2586,6 +2586,49 @@ class DataFrame:
"""
...
+ @dispatch_df_method
+ def zip(self, other: "DataFrame") -> "DataFrame":
+ """Combines the columns of this :class:`DataFrame` with another
:class:`DataFrame`
+ side-by-side, preserving row alignment between the two inputs.
+
+ Both DataFrames must produce the same canonicalized plan after
stripping outer
+ ``Project`` chains. In practice this means they derive from a common
source through
+ chains of projection-only operations (:meth:`select`,
:meth:`withColumn`,
+ :meth:`withColumnRenamed`, etc.); the chains may differ between the
two sides, but
+ anything below them, including any :meth:`filter`, :meth:`orderBy`,
:meth:`join`,
+ or aggregation, must be identical on both sides so the two sides stay
row-aligned.
+ Non-scalar Python UDFs (e.g., ``GROUPED_MAP``) are not allowed on
either side. An
+ :class:`AnalysisException` is thrown when the two DataFrames cannot be
aligned.
+
+ .. versionadded:: 4.3.0
+
+ Parameters
+ ----------
+ other : :class:`DataFrame`
+ The DataFrame to combine with, which must derive from the same
source as this
+ DataFrame.
+
+ Returns
+ -------
+ :class:`DataFrame`
+ A new DataFrame containing the columns of this DataFrame followed
by the columns
+ of `other`.
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame([(1, 2, 3), (4, 5, 6)], ["a", "b", "c"])
+ >>> left = df.select("a")
+ >>> right = df.select("b")
+ >>> left.zip(right).show()
+ +---+---+
+ | a| b|
+ +---+---+
+ | 1| 2|
+ | 4| 5|
+ +---+---+
+ """
+ ...
+
@dispatch_df_method
def join(
self,
diff --git a/python/pyspark/sql/tests/connect/test_parity_zip.py
b/python/pyspark/sql/tests/connect/test_parity_zip.py
new file mode 100644
index 000000000000..b0564e63a48b
--- /dev/null
+++ b/python/pyspark/sql/tests/connect/test_parity_zip.py
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyspark.errors import PySparkNotImplementedError
+from pyspark.testing.connectutils import ReusedConnectTestCase
+
+
+class ZipParityTests(ReusedConnectTestCase):
+ """`DataFrame.zip` is classic-only for now; assert the Connect stub raises
a clean
+ NOT_IMPLEMENTED instead of falling through to a generic error or appearing
to work."""
+
+ def test_zip_raises_not_implemented(self):
+ df = self.spark.createDataFrame([(1, 2)], ["a", "b"])
+ with self.assertRaises(PySparkNotImplementedError) as ctx:
+ df.select("a").zip(df.select("b"))
+ self.assertEqual(ctx.exception.getCondition(), "NOT_IMPLEMENTED")
+ self.assertIn("zip", str(ctx.exception))
+
+
+if __name__ == "__main__":
+ from pyspark.testing import main
+
+ main()
diff --git a/python/pyspark/sql/tests/test_zip.py
b/python/pyspark/sql/tests/test_zip.py
new file mode 100644
index 000000000000..33984657904e
--- /dev/null
+++ b/python/pyspark/sql/tests/test_zip.py
@@ -0,0 +1,185 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from pyspark.errors import AnalysisException
+from pyspark.sql import Row
+from pyspark.sql import functions as sf
+from pyspark.sql.types import IntegerType
+from pyspark.testing.sqlutils import ReusedSQLTestCase
+from pyspark.testing.utils import (
+ have_pandas,
+ have_pyarrow,
+ pandas_requirement_message,
+ pyarrow_requirement_message,
+)
+
+
+class DataFrameZipTestsMixin:
+ """Tests for DataFrame.zip(). Currently only the classic path is supported;
+ Spark Connect raises ``NOT_IMPLEMENTED``."""
+
+ def test_zip_select_different_columns(self):
+ df = self.spark.createDataFrame([(1, 2, 3), (4, 5, 6), (7, 8, 9)],
["a", "b", "c"])
+ zipped = df.select("a").zip(df.select("b"))
+ self.assertEqual(zipped.columns, ["a", "b"])
+ self.assertEqual(
+ sorted(zipped.collect()),
+ [Row(a=1, b=2), Row(a=4, b=5), Row(a=7, b=8)],
+ )
+
+ def test_zip_with_expressions(self):
+ df = self.spark.createDataFrame([(1, 10), (2, 20), (3, 30)], ["a",
"b"])
+ left = df.select((sf.col("a") + 1).alias("a_plus_1"))
+ right = df.select((sf.col("b") * 2).alias("b_times_2"))
+ self.assertEqual(
+ sorted(left.zip(right).collect()),
+ [
+ Row(a_plus_1=2, b_times_2=20),
+ Row(a_plus_1=3, b_times_2=40),
+ Row(a_plus_1=4, b_times_2=60),
+ ],
+ )
+
+ def test_zip_one_side_is_base(self):
+ df = self.spark.createDataFrame([(1, 2), (3, 4)], ["a", "b"])
+ right = df.select((sf.col("a") + sf.col("b")).alias("sum"))
+ self.assertEqual(
+ sorted(df.zip(right).collect()),
+ [Row(a=1, b=2, sum=3), Row(a=3, b=4, sum=7)],
+ )
+
+ def test_zip_with_python_udf(self):
+ df = self.spark.createDataFrame([(1, 10), (2, 20), (3, 30)], ["a",
"b"])
+ plus_one = sf.udf(lambda x: x + 1, IntegerType())
+ left = df.select(plus_one(sf.col("a")).alias("a_plus_1"))
+ right = df.select(plus_one(sf.col("b")).alias("b_plus_1"))
+ self.assertEqual(
+ sorted(left.zip(right).collect()),
+ [
+ Row(a_plus_1=2, b_plus_1=11),
+ Row(a_plus_1=3, b_plus_1=21),
+ Row(a_plus_1=4, b_plus_1=31),
+ ],
+ )
+
+ @unittest.skipIf(
+ not have_pandas or not have_pyarrow,
+ pandas_requirement_message or pyarrow_requirement_message,
+ )
+ def test_zip_with_pandas_udf(self):
+ import pandas as pd
+
+ @sf.pandas_udf(IntegerType())
+ def plus_one(s: pd.Series) -> pd.Series:
+ return s + 1
+
+ df = self.spark.createDataFrame([(1, 10), (2, 20), (3, 30)], ["a",
"b"])
+ left = df.select(plus_one(sf.col("a")).alias("a_plus_1"))
+ right = df.select(plus_one(sf.col("b")).alias("b_plus_1"))
+ self.assertEqual(
+ sorted(left.zip(right).collect()),
+ [
+ Row(a_plus_1=2, b_plus_1=11),
+ Row(a_plus_1=3, b_plus_1=21),
+ Row(a_plus_1=4, b_plus_1=31),
+ ],
+ )
+
+ def test_zip_different_bases_throws(self):
+ df1 = self.spark.createDataFrame([(1, 2)], ["a", "b"])
+ df2 = self.spark.createDataFrame([(3, 4, 5)], ["x", "y", "z"])
+ with self.assertRaises(AnalysisException) as ctx:
+ df1.select("a").zip(df2.select("x")).schema
+ self.assertEqual(ctx.exception.getCondition(),
"ZIP_PLANS_NOT_MERGEABLE")
+
+ def test_zip_different_range_bases_throws(self):
+ df1 = self.spark.range(10).toDF("id1")
+ df2 = self.spark.range(20).toDF("id2")
+ with self.assertRaises(AnalysisException) as ctx:
+ df1.zip(df2).schema
+ self.assertEqual(ctx.exception.getCondition(),
"ZIP_PLANS_NOT_MERGEABLE")
+
+ def test_zip_with_withColumn(self):
+ df = self.spark.createDataFrame([(1, 10), (2, 20), (3, 30)], ["a",
"b"])
+ left = df.withColumn("a_plus_1", sf.col("a") + 1)
+ right = df.withColumn("b_times_2", sf.col("b") * 2)
+ zipped = left.zip(right)
+ # Schema has duplicates (a, b appear twice) since withColumn keeps
original columns.
+ self.assertEqual(zipped.columns, ["a", "b", "a_plus_1", "a", "b",
"b_times_2"])
+ rows = sorted(zipped.collect(), key=lambda r: r[0])
+ self.assertEqual(
+ [tuple(r) for r in rows],
+ [(1, 10, 2, 1, 10, 20), (2, 20, 3, 2, 20, 40), (3, 30, 4, 3, 30,
60)],
+ )
+
+ def test_zip_with_withColumnRenamed(self):
+ df = self.spark.createDataFrame([(1, 2), (3, 4)], ["a", "b"])
+ left = df.withColumnRenamed("a", "a1")
+ right = df.withColumnRenamed("b", "b1")
+ self.assertEqual(
+ sorted(left.zip(right).collect()),
+ [Row(a1=1, b=2, a=1, b1=2), Row(a1=3, b=4, a=3, b1=4)],
+ )
+
+ def test_zip_chained_withColumn(self):
+ # Stack two withColumn calls on left (two Project layers) and one on
right.
+ df = self.spark.createDataFrame([(1, 10), (2, 20)], ["a", "b"])
+ left = df.withColumn("a_plus_1", sf.col("a") +
1).withColumn("a_plus_2", sf.col("a") + 2)
+ right = df.withColumn("b_times_2", sf.col("b") * 2)
+ zipped = left.zip(right)
+ self.assertEqual(
+ zipped.columns,
+ ["a", "b", "a_plus_1", "a_plus_2", "a", "b", "b_times_2"],
+ )
+ rows = sorted(zipped.collect(), key=lambda r: r[0])
+ self.assertEqual(
+ [tuple(r) for r in rows],
+ [(1, 10, 2, 3, 1, 10, 20), (2, 20, 3, 4, 2, 20, 40)],
+ )
+
+ def test_zip_longer_chain(self):
+ # Left has three nested Projects; right has one.
+ df = self.spark.createDataFrame([(1, 2, 3), (4, 5, 6)], ["a", "b",
"c"])
+ left = df.select("a", "b", "c").select("a", "b").select("a")
+ right = df.select("c")
+ self.assertEqual(
+ sorted(left.zip(right).collect()),
+ [Row(a=1, c=3), Row(a=4, c=6)],
+ )
+
+ def test_zip_parent_with_chained_child(self):
+ # df.zip(<chained projection of df>) -- the parent has no Project,
child has many.
+ df = self.spark.createDataFrame([(1, 2), (3, 4)], ["a", "b"])
+ child = df.select((sf.col("a") + 1).alias("a_plus_1")).select(
+ (sf.col("a_plus_1") * 2).alias("doubled")
+ )
+ self.assertEqual(
+ sorted(df.zip(child).collect()),
+ [Row(a=1, b=2, doubled=4), Row(a=3, b=4, doubled=8)],
+ )
+
+
+class DataFrameZipTests(DataFrameZipTestsMixin, ReusedSQLTestCase):
+ pass
+
+
+if __name__ == "__main__":
+ from pyspark.testing import main
+
+ main()
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala
index 38765262e1fc..0a0b141e9e83 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -819,6 +819,28 @@ abstract class Dataset[T] extends Serializable {
*/
def crossJoin(right: Dataset[_]): DataFrame
+ /**
+ * Combines the columns of this DataFrame with another DataFrame
side-by-side, preserving row
+ * alignment between the two inputs.
+ *
+ * Both DataFrames must produce the same canonicalized plan after stripping
outer `Project`
+ * chains. In practice this means they derive from a common source through
chains of
+ * projection-only operations (`select`, `withColumn`, `withColumnRenamed`,
etc.); the chains
+ * may differ between the two sides, but anything below them -- including
any `filter`,
+ * `orderBy`, `join`, or aggregation -- must be identical on both sides so
the two sides stay
+ * row-aligned. Non-scalar Python UDFs (e.g., `GROUPED_MAP`) are not allowed
on either side. An
+ * `AnalysisException` is thrown when the two DataFrames cannot be aligned.
+ *
+ * @param other
+ * The DataFrame to combine with, which must derive from the same source
as this DataFrame.
+ * @return
+ * A new DataFrame containing the columns of this DataFrame followed by
the columns of
+ * `other`.
+ * @group untypedrel
+ * @since 4.3.0
+ */
+ def zip(other: Dataset[_]): DataFrame
+
/**
* Joins this Dataset returning a `Tuple2` for each pair where `condition`
evaluates to true.
*
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index b47c0031ddd4..b4b25d7c048f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -563,6 +563,7 @@ class Analyzer(
ResolveBinaryArithmetic ::
new ResolveIdentifierClause(earlyBatches) ::
ResolveUnion ::
+ ResolveZip ::
FlattenSequentialStreamingUnion ::
ValidateSequentialStreamingUnion ::
ResolveRowLevelCommandAssignments ::
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 4e07280f94c9..c63fa6398cbd 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -701,6 +701,15 @@ trait CheckAnalysis extends LookupCatalog with
QueryErrorsBase with PlanToString
"expression" -> toSQLExpr(rankingExpression),
"type" -> toSQLType(rankingExpression.dataType)))
+ case z: Zip =>
+ // ResolveZip succeeded for all valid inputs, so a surviving Zip
means the two
+ // sides either don't share a base or contain a non-scalar Python
UDF. Either way
+ // we surface ZIP_PLANS_NOT_MERGEABLE -- without this we'd fall
through to the
+ // generic unresolved-operator INTERNAL_ERROR catch-all.
+ z.failAnalysis(
+ errorClass = "ZIP_PLANS_NOT_MERGEABLE",
+ messageParameters = Map.empty)
+
case a: Aggregate =>
a.groupingExpressions.foreach(
expression =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveZip.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveZip.scala
new file mode 100644
index 000000000000..7b6ec5b909cd
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveZip.scala
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeMap, Expression, ExprId, NamedExpression, PythonUDF}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Zip}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.ZIP
+
+/**
+ * Resolves a [[Zip]] node by rewriting it into a chain of [[Project]] nodes
over the shared
+ * base plan.
+ *
+ * The two children of `Zip` must produce the same canonicalized plan after
stripping outer
+ * `Project` chains, and the chains themselves must contain only scalar
expressions
+ * (`Project.resolved` already rejects Generator, AggregateExpression, and
WindowExpression;
+ * this rule additionally rejects non-scalar Python UDFs that break the 1:1
row mapping).
+ *
+ * The rewrite collects every alias introduced by either chain, deduplicates
aliases that
+ * share the same canonicalized child (a shared parent that feeds both sides
is re-instanced
+ * by the analyzer, so its producer surfaces twice), groups them by dependency
depth (depth 1
+ * = references only base attributes; depth k = references at least one
depth-(k-1) alias), and
+ * emits one `Project` layer per depth so each user-written alias stays in its
own `Alias`.
+ * `CollapseProject` runs later with its existing safety guards
(`canCollapseExpressions`), so
+ * nondeterministic producers (`rand()`, `uuid()`) and expensive producers
referenced more than
+ * once stay separate -- avoiding the double evaluation that an unguarded
inline would cause.
+ *
+ * If the two sides cannot be merged, the `Zip` node remains unresolved and
`CheckAnalysis`
+ * reports a `ZIP_PLANS_NOT_MERGEABLE` error.
+ */
+object ResolveZip extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsUpWithPruning(
+ _.containsPattern(ZIP), ruleId) {
+ case z: Zip if z.childrenResolved => tryMerge(z).getOrElse(z)
+ }
+
+ private def tryMerge(z: Zip): Option[LogicalPlan] = {
+ val (leftAliases, leftTopList, leftBase) = analyzeChain(z.left)
+ val (rightAliases, rightTopList, rightBase) = analyzeChain(z.right)
+
+ if (!leftBase.sameResult(rightBase)) return None
+ if (!allScalar(leftAliases ++ rightAliases)) return None
+
+ // Right base's attributes may have different exprIds than the left base's
even when the
+ // two bases are `sameResult`. Map positionally so right-side references
resolve against
+ // the left base in the merged plan.
+ val attrMapping: AttributeMap[Attribute] =
+ AttributeMap(rightBase.output.zip(leftBase.output))
+ val remappedRightAliases = rightAliases.map(remapAlias(_, attrMapping))
+ val remappedRightTopList = rightTopList.map(remapNamedExpr(_, attrMapping))
+
+ // When both sides walk through a shared parent `Project`, the analyzer
re-instances the
+ // right side, so the shared producer surfaces as two aliases with
different exprIds but the
+ // same child expression (e.g. two `rand(sameSeed)`). Keeping both would
evaluate the producer
+ // twice per row. Deduplicate by canonicalized child: keep the first alias
for each distinct
+ // child, and remap references to the dropped aliases (in surviving alias
bodies and in the
+ // output lists) to the survivor. A freshly written producer gets a
distinct seed, so its
+ // canonical form differs and it is never merged with an unrelated one.
+ val canonToKept = mutable.LinkedHashMap.empty[Expression, Alias]
+ val droppedToSurvivor = mutable.HashMap.empty[ExprId, Attribute]
+ (leftAliases ++ remappedRightAliases).foreach { a =>
+ canonToKept.get(a.child.canonicalized) match {
+ case Some(kept) => droppedToSurvivor(a.exprId) = kept.toAttribute
+ case None => canonToKept(a.child.canonicalized) = a
+ }
+ }
+
+ def remapDropped[E <: Expression](e: E): E = if
(droppedToSurvivor.isEmpty) {
+ e
+ } else {
+ e.transform { case a: Attribute => droppedToSurvivor.getOrElse(a.exprId,
a) }
+ .asInstanceOf[E]
+ }
+
+ // Rewrite surviving alias bodies so references to dropped aliases point
at the survivors.
+ val dedupedAliases = canonToKept.values.toSeq.map { a =>
+ remapAliasChild(a, remapDropped(a.child))
+ }
+ val layered = buildLayeredChain(dedupedAliases, leftBase)
+ // Build the top-level output list. For each top-level expression:
+ // - If it references a dropped alias, use the survivor attribute -- but
re-alias it to the
+ // dropped column's own name, exprId, and metadata so the schema stays
correct. Two sides
+ // may expose the same deterministic producer under different names
(e.g.
+ // df.select($"a".as("x")).zip(df.select($"a".as("y")))); dedup keeps
the survivor's
+ // identity only for internal references, not for user-visible output
columns. Only the
+ // producer (value) is shared; the dropped column keeps its own name
and metadata.
+ // - Otherwise pass the attribute through unchanged.
+ val finalProjectList: Seq[NamedExpression] =
+ (leftTopList ++ remappedRightTopList).map { ne =>
+ val attr = ne.toAttribute
+ droppedToSurvivor.get(attr.exprId) match {
+ case Some(survivorAttr) =>
+ // Force explicitMetadata so the output column keeps the dropped
column's own metadata
+ // even though its value now comes from the survivor attribute.
+ Alias(survivorAttr, attr.name)(
+ exprId = attr.exprId, explicitMetadata = Some(attr.metadata))
+ case None => remapDropped(attr)
+ }
+ }
+ Some(Project(finalProjectList, layered))
+ }
+
+ /**
+ * Walks a chain of `Project` nodes and returns:
+ * - every `Alias` introduced anywhere in the chain (deepest first, then
outward),
+ * - the topmost `Project`'s projection list (or the plan's output if
there is no top
+ * `Project`), used to drive the final output column list, and
+ * - the chain's base plan (first non-`Project` node).
+ */
+ private def analyzeChain(
+ plan: LogicalPlan): (Seq[Alias], Seq[NamedExpression], LogicalPlan) =
plan match {
+ case Project(exprs, child) =>
+ val (childAliases, _, base) = analyzeChain(child)
+ val newAliases = exprs.collect { case a: Alias => a }
+ (childAliases ++ newAliases, exprs, base)
+ case other =>
+ (Seq.empty, other.output, other)
+ }
+
+ /** Rewrites a single `Alias` so its body references the left base's
attributes. */
+ private def remapAlias(a: Alias, attrMapping: AttributeMap[Attribute]):
Alias = {
+ val newChild = a.child.transform {
+ case attr: Attribute => attrMapping.getOrElse(attr, attr)
+ }
+ remapAliasChild(a, newChild)
+ }
+
+ /** Returns a copy of `a` with `newChild` as its body, preserving name,
exprId, and metadata. */
+ private def remapAliasChild(a: Alias, newChild: Expression): Alias = {
+ Alias(newChild, a.name)(
+ exprId = a.exprId,
+ qualifier = a.qualifier,
+ explicitMetadata = a.explicitMetadata,
+ nonInheritableMetadataKeys = a.nonInheritableMetadataKeys)
+ }
+
+ private def remapNamedExpr(
+ ne: NamedExpression, attrMapping: AttributeMap[Attribute]):
NamedExpression = ne match {
+ case a: Alias => remapAlias(a, attrMapping)
+ case attr: Attribute => attrMapping.getOrElse(attr, attr)
+ case other =>
+ other.transform { case attr: Attribute => attrMapping.getOrElse(attr,
attr) }
+ .asInstanceOf[NamedExpression]
+ }
+
+ /**
+ * Builds a chain of `Project`s over `base`, with one layer per dependency
depth so each
+ * user-written alias stays in its own `Alias`. Each layer carries every
attribute from the
+ * previous layer (full passthrough) so deeper layers and the top can still
reference
+ * earlier columns; `ColumnPruning` removes the unused passthroughs in the
optimizer.
+ */
+ private def buildLayeredChain(aliases: Seq[Alias], base: LogicalPlan):
LogicalPlan = {
+ if (aliases.isEmpty) return base
+
+ val aliasByExprId: Map[ExprId, Alias] = aliases.map(a => a.exprId ->
a).toMap
+ val depthCache = mutable.Map.empty[ExprId, Int]
+ def depthOf(exprId: ExprId): Int = depthCache.getOrElseUpdate(exprId, {
+ val alias = aliasByExprId(exprId)
+ val refDepths = alias.child.collect { case a: Attribute => a }
+ .flatMap(r => aliasByExprId.get(r.exprId).map(_ => depthOf(r.exprId)))
+ if (refDepths.isEmpty) 1 else refDepths.max + 1
+ })
+ aliases.foreach(a => depthOf(a.exprId))
+ val byDepth = aliases.groupBy(a => depthCache(a.exprId)).toSeq.sortBy(_._1)
+
+ byDepth.foldLeft[LogicalPlan](base) { case (acc, (_, depthAliases)) =>
+ Project(acc.output ++ depthAliases, acc)
+ }
+ }
+
+ /**
+ * Returns true if no alias contains a non-scalar Python UDF.
`Project.resolved` already
+ * rejects Generator, AggregateExpression, and WindowExpression; this
additionally rejects
+ * non-scalar Python UDFs (e.g. `GROUPED_MAP`) that would break the 1:1 row
mapping.
+ */
+ private def allScalar(aliases: Seq[Alias]): Boolean = {
+ !aliases.exists(_.exists {
+ case udf: PythonUDF => !PythonUDF.isScalarPythonUDF(udf)
+ case _ => false
+ })
+ }
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 9184c5ef412b..2fd41281b0b5 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -834,6 +834,30 @@ case class Join(
override def isStateful: Boolean = left.isStreaming && right.isStreaming
}
+/**
+ * A logical plan that combines the columns of two DataFrames that derive from
the same
+ * base plan through chains of Project nodes. This node is always unresolved
and must be
+ * rewritten by [[ResolveZip]] into a chain of Project nodes over the shared
base plan
+ * during analysis. If the two children do not share the same base plan (after
stripping
+ * outer Projects), or if either side contains a non-scalar Python UDF,
analysis will fail
+ * with ZIP_PLANS_NOT_MERGEABLE.
+ */
+case class Zip(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
+ override def output: Seq[Attribute] = left.output ++ right.output
+
+ override def maxRows: Option[Long] = left.maxRows
+
+ override def maxRowsPerPartition: Option[Long] = left.maxRowsPerPartition
+
+ final override val nodePatterns: Seq[TreePattern] = Seq(ZIP)
+
+ // Always unresolved -- must be rewritten by ResolveZip during analysis.
+ override lazy val resolved: Boolean = false
+
+ override protected def withNewChildrenInternal(
+ newLeft: LogicalPlan, newRight: LogicalPlan): Zip = copy(left = newLeft,
right = newRight)
+}
+
/**
* Insert query result into a directory.
*
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
index fb6254d82056..36fdd4706016 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
@@ -108,6 +108,7 @@ object RuleIdCollection {
"org.apache.spark.sql.catalyst.analysis.ResolveUnresolvedHaving" ::
"org.apache.spark.sql.catalyst.analysis.ResolveUpdateEventTimeWatermarkColumn"
::
"org.apache.spark.sql.catalyst.analysis.ResolveWindowTime" ::
+ "org.apache.spark.sql.catalyst.analysis.ResolveZip" ::
"org.apache.spark.sql.catalyst.analysis.SessionWindowing" ::
"org.apache.spark.sql.catalyst.analysis.SubstituteUnresolvedOrdinals" ::
"org.apache.spark.sql.catalyst.analysis.TimeWindowing" ::
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
index 557b01167d88..67016e586a5a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
@@ -190,6 +190,7 @@ object TreePattern extends Enumeration {
val WINDOW: Value = Value
val WINDOW_GROUP_LIMIT: Value = Value
val WITH_WINDOW_DEFINITION: Value = Value
+ val ZIP: Value = Value
// Unresolved Plan patterns (Alphabetically ordered)
val NAMED_STREAMING_RELATION: Value = Value
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveZipSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveZipSuite.scala
new file mode 100644
index 000000000000..d297f29ea8d9
--- /dev/null
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveZipSuite.scala
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.api.python.PythonEvalType
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.PythonUDF
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.types.IntegerType
+
+class ResolveZipSuite extends AnalysisTest {
+
+ private val base = LocalRelation($"a".int, $"b".int, $"c".int)
+
+ object Resolve extends RuleExecutor[LogicalPlan] {
+ override val batches: Seq[Batch] = Seq(
+ Batch("ResolveZip", Once, ResolveZip))
+ }
+
+ private def reachesBase(plan: LogicalPlan, expectedBase: LogicalPlan):
Boolean = plan match {
+ case Project(_, child) => reachesBase(child, expectedBase)
+ case other => other eq expectedBase
+ }
+
+ test("resolve Zip: both sides have Project over same base") {
+ val left = Project(Seq(base.output(0)), base)
+ val right = Project(Seq(base.output(1)), base)
+ val zip = Zip(left, right)
+
+ val resolved = Resolve.execute(zip)
+ val expected = Project(Seq(base.output(0), base.output(1)), base)
+ comparePlans(resolved, expected)
+ }
+
+ test("resolve Zip: left is bare plan, right has Project") {
+ val right = Project(Seq(base.output(0)), base)
+ val zip = Zip(base, right)
+
+ val resolved = Resolve.execute(zip)
+ val expected = Project(base.output ++ Seq(base.output(0)), base)
+ comparePlans(resolved, expected)
+ }
+
+ test("resolve Zip: both sides are bare same plan") {
+ val zip = Zip(base, base)
+
+ val resolved = Resolve.execute(zip)
+ val expected = Project(base.output ++ base.output, base)
+ comparePlans(resolved, expected)
+ }
+
+ test("resolve Zip: both sides have expressions over same base") {
+ val left = base.select(($"a" + 1).as("a_plus_1"))
+ val right = base.select(($"b" * 2).as("b_times_2"))
+ val zip = Zip(left.analyze, right.analyze)
+
+ val resolved = Resolve.execute(zip)
+ assert(!resolved.isInstanceOf[Zip], "Zip should have been resolved to a
Project")
+ assert(resolved.isInstanceOf[Project])
+ assert(resolved.output.length == 2)
+ assert(resolved.output(0).name == "a_plus_1")
+ assert(resolved.output(1).name == "b_times_2")
+ }
+
+ test("resolve Zip: different base plans - Zip remains unresolved") {
+ val base2 = LocalRelation($"x".int, $"y".int, $"z".int, $"w".int)
+ val left = Project(Seq(base.output(0)), base)
+ val right = Project(Seq(base2.output(0)), base2)
+ val zip = Zip(left, right)
+
+ val resolved = Resolve.execute(zip)
+ // ResolveZip cannot merge, so Zip stays
+ assert(resolved.isInstanceOf[Zip])
+ }
+
+ test("resolve Zip: skipped when children are unresolved") {
+ val unresolvedChild = Project(
+ Seq(UnresolvedAttribute("a")),
+ UnresolvedRelation(Seq("t")))
+ val zip = Zip(unresolvedChild, unresolvedChild)
+
+ val result = Resolve.execute(zip)
+ // Zip should remain unchanged because children are not resolved
+ assert(result.isInstanceOf[Zip])
+ }
+
+ test("CheckAnalysis: different base plans throws ZIP_PLANS_NOT_MERGEABLE") {
+ val base2 = LocalRelation($"x".int, $"y".int, $"z".int, $"w".int)
+ val left = Project(Seq(base.output(0)), base)
+ val right = Project(Seq(base2.output(0)), base2)
+ val zip = Zip(left, right)
+
+ assertAnalysisErrorCondition(
+ zip,
+ expectedErrorCondition = "ZIP_PLANS_NOT_MERGEABLE",
+ expectedMessageParameters = Map.empty
+ )
+ }
+
+ test("resolve Zip: longer chain of selects on both sides") {
+ // Left has 3 nested Projects, right has 1 Project. Both reach the same
base.
+ val left = Project(Seq(base.output(0)),
+ Project(Seq(base.output(0), base.output(1)),
+ Project(base.output, base)))
+ val right = Project(Seq(base.output(1)), base)
+ val zip = Zip(left, right)
+
+ val resolved = Resolve.execute(zip)
+ assert(resolved.isInstanceOf[Project], "Asymmetric chain should still
merge to a Project")
+ assert(resolved.output.map(_.name) == Seq("a", "b"))
+ }
+
+ test("resolve Zip: chained Project with aliases composes substitutions") {
+ // Build df.select(a + 1 AS x).select(x * 2 AS y) -- outer references the
inner alias.
+ val inner = base.select(($"a" + 1).as("x"))
+ val outer = inner.select(($"x" * 2).as("y")).analyze
+ val right = base.select(($"b" * 3).as("z")).analyze
+ val zip = Zip(outer, right)
+
+ val resolved = Resolve.execute(zip)
+ assert(resolved.isInstanceOf[Project], "Aliased chain should still merge
to a Project")
+ assert(reachesBase(resolved, base),
+ "Resolved plan should be a Project chain rooted at the shared base")
+ assert(resolved.output.map(_.name) == Seq("y", "z"))
+ }
+
+ test("resolve Zip: different-instance bases with same canonical plan") {
+ // Two LocalRelations with the same schema but distinct exprIds.
`sameResult` matches
+ // (canonicalized plans are equal), so this is the only path where
`attrMapping` actually
+ // remaps right-side references.
+ val baseB = LocalRelation($"a".int, $"b".int, $"c".int)
+ val left = Project(Seq(base.output(0)), base)
+ val right = baseB.select(($"a" + 1).as("a_plus_1")).analyze
+ val zip = Zip(left, right)
+
+ val resolved = Resolve.execute(zip)
+ assert(resolved.isInstanceOf[Project])
+ assert(reachesBase(resolved, base),
+ "Resolved plan should be rooted at the left base, not the right base")
+ assert(!resolved.exists(_ eq baseB), "Right base should be discarded after
merge")
+ assert(resolved.output.map(_.name) == Seq("a", "a_plus_1"))
+ }
+
+ test("CheckAnalysis: non-scalar Python UDF throws ZIP_PLANS_NOT_MERGEABLE") {
+ // A GROUPED_MAP Python UDF in either side's projection breaks the 1:1 row
mapping, so
+ // ResolveZip refuses to merge and the surviving Zip must surface
ZIP_PLANS_NOT_MERGEABLE
+ // (rather than fall through to the generic unresolved-operator
INTERNAL_ERROR).
+ val groupedMapUdf = PythonUDF(
+ "pyUDF",
+ null,
+ IntegerType,
+ Seq(base.output(0)),
+ PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
+ udfDeterministic = true)
+ val left = base.select(groupedMapUdf.as("x"))
+ val right = base.select($"b".as("y"))
+ val zip = Zip(left.analyze, right.analyze)
+
+ assertAnalysisErrorCondition(
+ zip,
+ expectedErrorCondition = "ZIP_PLANS_NOT_MERGEABLE",
+ expectedMessageParameters = Map.empty
+ )
+ }
+
+ test("resolve Zip: stacked withColumn-style projections (multiple Project
layers)") {
+ // Emulate df.withColumn("d", a + 1).withColumn("e", b * 2) on left:
+ // two passthrough-plus-alias Projects stacked, while right has a single
layer.
+ val left = base
+ .select($"a", $"b", $"c", ($"a" + 1).as("d"))
+ .select($"a", $"b", $"c", $"d", ($"b" * 2).as("e"))
+ .analyze
+ val right = base.select($"a", $"b", $"c", ($"c" + 100).as("f")).analyze
+ val zip = Zip(left, right)
+
+ val resolved = Resolve.execute(zip)
+ assert(resolved.isInstanceOf[Project], "Stacked withColumn chain should
merge to a Project")
+ assert(reachesBase(resolved, base),
+ "Resolved plan should be a Project chain rooted at the shared base")
+ assert(resolved.output.map(_.name) == Seq("a", "b", "c", "d", "e", "a",
"b", "c", "f"))
+ }
+
+ test("resolve Zip: shared-producer dedup preserves each side's output column
name") {
+ // Both sides project the same deterministic expression over the shared
base, but under
+ // different user-given names. The dedup must merge the producer but keep
each side's name.
+ val left = base.select($"a".as("x")).analyze
+ val right = base.select($"a".as("y")).analyze
+ val zip = Zip(left, right)
+
+ val resolved = Resolve.execute(zip)
+ assert(resolved.isInstanceOf[Project])
+ assert(resolved.output.map(_.name) == Seq("x", "y"),
+ s"schema should be [x, y] but got ${resolved.output.map(_.name)}")
+ }
+}
diff --git
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Dataset.scala
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Dataset.scala
index 34c685213711..3f7ed1a7c287 100644
---
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Dataset.scala
+++
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Dataset.scala
@@ -346,6 +346,11 @@ class Dataset[T] private[sql] (
builder.setJoinType(proto.Join.JoinType.JOIN_TYPE_CROSS)
}
+ /** @inheritdoc */
+ def zip(other: sql.Dataset[_]): DataFrame = {
+ throw new UnsupportedOperationException("zip is not supported in Spark
Connect")
+ }
+
/** @inheritdoc */
def joinWith[U](other: sql.Dataset[U], condition: Column, joinType: String):
Dataset[(T, U)] = {
val joinTypeValue = toJoinType(joinType, skipSemiAnti = true)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala
index 833b3f451273..6596fcbe091c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala
@@ -706,6 +706,11 @@ class Dataset[T] private[sql](
Join(logicalPlan, right.logicalPlan, joinType = Cross, None, JoinHint.NONE)
}
+ /** @inheritdoc */
+ def zip(other: sql.Dataset[_]): DataFrame = withPlan {
+ Zip(logicalPlan, other.logicalPlan)
+ }
+
/** @inheritdoc */
def joinWith[U](other: sql.Dataset[U], condition: Column, joinType: String):
Dataset[(T, U)] = {
// Creates a Join node and resolve it first, to get join condition
resolved, self-join resolved,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameZipSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameZipSuite.scala
new file mode 100644
index 000000000000..40720d3ce5c3
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameZipSuite.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.expressions.Rand
+import org.apache.spark.sql.catalyst.plans.logical.Project
+import org.apache.spark.sql.functions.{lit, rand, uniform}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.MetadataBuilder
+
+class DataFrameZipSuite extends QueryTest with SharedSparkSession {
+ import testImplicits._
+
+ test("zip: select different columns from the same DataFrame") {
+ val df = Seq((1, 2, 3), (4, 5, 6), (7, 8, 9)).toDF("a", "b", "c")
+ val left = df.select("a")
+ val right = df.select("b")
+
+ checkAnswer(
+ left.zip(right),
+ Row(1, 2) :: Row(4, 5) :: Row(7, 8) :: Nil)
+ }
+
+ test("zip: select with expressions over the same DataFrame") {
+ val df = Seq((1, 10), (2, 20), (3, 30)).toDF("a", "b")
+ val left = df.select(($"a" + 1).as("a_plus_1"))
+ val right = df.select(($"b" * 2).as("b_times_2"))
+
+ checkAnswer(
+ left.zip(right),
+ Row(2, 20) :: Row(3, 40) :: Row(4, 60) :: Nil)
+ }
+
+ test("zip: one side selects all columns") {
+ val df = Seq((1, 2), (3, 4)).toDF("a", "b")
+ val right = df.select(($"a" + $"b").as("sum"))
+
+ checkAnswer(
+ df.zip(right),
+ Row(1, 2, 3) :: Row(3, 4, 7) :: Nil)
+ }
+
+ test("zip: resolved plan is a Project") {
+ val df = Seq((1, 2)).toDF("a", "b")
+ val left = df.select("a")
+ val right = df.select("b")
+ val zipped = left.zip(right)
+
+ assert(zipped.queryExecution.analyzed.isInstanceOf[Project])
+ }
+
+ test("zip: different base plans throws AnalysisException") {
+ val df1 = Seq((1, 2)).toDF("a", "b")
+ val df2 = Seq((3, 4, 5)).toDF("x", "y", "z")
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ df1.select("a").zip(df2.select("x")).queryExecution.assertAnalyzed()
+ },
+ condition = "ZIP_PLANS_NOT_MERGEABLE"
+ )
+ }
+
+ test("zip: different base plans from spark.range throws AnalysisException") {
+ val df1 = spark.range(10).toDF("id1")
+ val df2 = spark.range(20).toDF("id2")
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ df1.zip(df2).queryExecution.assertAnalyzed()
+ },
+ condition = "ZIP_PLANS_NOT_MERGEABLE"
+ )
+ }
+
+ test("zip: withColumn on both sides") {
+ val df = Seq((1, 10), (2, 20), (3, 30)).toDF("a", "b")
+ val left = df.withColumn("a_plus_1", $"a" + 1)
+ val right = df.withColumn("b_times_2", $"b" * 2)
+ val zipped = left.zip(right)
+
+ assert(zipped.queryExecution.analyzed.isInstanceOf[Project])
+ checkAnswer(
+ zipped,
+ Row(1, 10, 2, 1, 10, 20) ::
+ Row(2, 20, 3, 2, 20, 40) ::
+ Row(3, 30, 4, 3, 30, 60) :: Nil)
+ }
+
+ test("zip: chained withColumn (multiple Project layers on the same side)") {
+ val df = Seq((1, 10), (2, 20)).toDF("a", "b")
+ val left = df
+ .withColumn("a_plus_1", $"a" + 1)
+ .withColumn("a_plus_2", $"a" + 2)
+ val right = df.withColumn("b_times_2", $"b" * 2)
+ val zipped = left.zip(right)
+
+ assert(zipped.queryExecution.analyzed.isInstanceOf[Project])
+ checkAnswer(
+ zipped,
+ Row(1, 10, 2, 3, 1, 10, 20) ::
+ Row(2, 20, 3, 4, 2, 20, 40) :: Nil)
+ }
+
+ test("zip: longer chain of selects on both sides") {
+ val df = Seq((1, 2, 3), (4, 5, 6)).toDF("a", "b", "c")
+ val left = df.select("a", "b", "c").select("a", "b").select("a")
+ val right = df.select("c")
+ val zipped = left.zip(right)
+
+ assert(zipped.queryExecution.analyzed.isInstanceOf[Project])
+ checkAnswer(zipped, Row(1, 3) :: Row(4, 6) :: Nil)
+ }
+
+ test("zip: parent and child with chain") {
+ val df = Seq((1, 2), (3, 4)).toDF("a", "b")
+ val child = df.select(($"a" + 1).as("a_plus_1")).select(($"a_plus_1" *
2).as("doubled"))
+ val zipped = df.zip(child)
+
+ assert(zipped.queryExecution.analyzed.isInstanceOf[Project])
+ checkAnswer(zipped, Row(1, 2, 4) :: Row(3, 4, 8) :: Nil)
+ }
+
+ test("zip: withColumnRenamed on both sides") {
+ val df = Seq((1, 2), (3, 4)).toDF("a", "b")
+ val left = df.withColumnRenamed("a", "a1")
+ val right = df.withColumnRenamed("b", "b1")
+
+ checkAnswer(
+ left.zip(right),
+ Row(1, 2, 1, 2) :: Row(3, 4, 3, 4) :: Nil)
+ }
+
+ test("zip: nondeterministic alias used multiple times stays a single
expression") {
+ // df.withColumn("r", rand()).withColumn("x", r + r) -- if the rewrite
naively inlined
+ // r into r+r, rand() would be evaluated twice per row and x would no
longer equal 2*r.
+ // The depth-layered chain keeps each user alias in its own Alias entry,
and
+ // CollapseProject's canCollapseExpressions guard refuses to inline a
non-deterministic
+ // producer consumed more than once -- so the optimized plan must contain
rand() exactly
+ // once.
+ val df = spark.range(10).toDF("id")
+ val left = df.withColumn("r", rand()).withColumn("x", $"r" +
$"r").select("x")
+ val right = df.select("id")
+
+ val optimized = left.zip(right).queryExecution.optimizedPlan
+ val randCount = optimized.flatMap { p =>
+ p.expressions.flatMap(_.collect { case _: Rand => 1 })
+ }.sum
+ assert(randCount == 1,
+ s"rand() must appear exactly once after the rewrite, got $randCount;
plan:\n$optimized")
+ }
+
+ test("zip: shared parent alias is not double-evaluated") {
+ // Both sides derive from the same `parent` whose alias `r` is a
nondeterministic random int.
+ // Each side's chain walk collects `r`, so without de-duplication the
layered chain would emit
+ // `r` twice and evaluate the producer once per copy. Assert the optimized
plan keeps a single
+ // Rand.
+ val df = spark.range(10).toDF("id")
+ // `uniform(0, 1000000)` (no explicit seed) is a nondeterministic random
int producer.
+ val parent = df.withColumn("r", uniform(lit(0), lit(1000000)))
+ val left = parent.select("r")
+ val right = parent.withColumn("x", $"r" + $"r").select("x")
+ val zipped = left.zip(right)
+
+ // Structural guard against duplication: the producer (a Rand inside
uniform's replacement)
+ // must appear exactly once in the optimized plan.
+ val optimized = zipped.queryExecution.optimizedPlan
+ val randCount = optimized.flatMap { p =>
+ p.expressions.flatMap(_.collect { case _: Rand => 1 })
+ }.sum
+ assert(randCount == 1,
+ s"the random producer must appear exactly once after the rewrite, got
$randCount; " +
+ s"plan:\n$optimized")
+
+ // Result correctness: the emitted `x` must equal `r + r` for the exact
`r` emitted in the
+ // output, i.e. the merge wired `x` to the same producer instance that
feeds the `r` column.
+ // `r` is an int in [0, 1000000) so `r + r` is exact and cannot overflow
Int. Reduce to a
+ // single boolean via distinct.
+ checkAnswer(zipped.select(($"x" === $"r" + $"r").as("ok")).distinct(),
Row(true))
+ }
+
+ test("zip: shared-producer dedup preserves output column names") {
+ // When both sides project the same deterministic expression under
different names, the
+ // dedup merges the producers but must keep each side's user-given output
name intact.
+ val df = Seq((1, 2), (3, 4)).toDF("a", "b")
+ // Same column `a` aliased differently on each side.
+ val zipped = df.select($"a".as("x")).zip(df.select($"a".as("y")))
+ assert(zipped.columns === Array("x", "y"),
+ s"expected schema [x, y] but got ${zipped.columns.mkString("[", ", ",
"]")}")
+ checkAnswer(zipped, Row(1, 1) :: Row(3, 3) :: Nil)
+
+ // Same expression over shared base, different output names.
+ val zipped2 = df.select(($"a" + 1).as("x")).zip(df.select(($"a" +
1).as("y")))
+ assert(zipped2.columns === Array("x", "y"),
+ s"expected schema [x, y] but got ${zipped2.columns.mkString("[", ", ",
"]")}")
+ checkAnswer(zipped2, Row(2, 2) :: Row(4, 4) :: Nil)
+ }
+
+ test("zip: shared-producer dedup preserves each side's output column
metadata") {
+ // Both sides project the same producer under different names AND
different metadata. The
+ // dedup shares the producer (value), but each output column must keep its
own metadata --
+ // the dropped column must not inherit the survivor's.
+ val mx = new MetadataBuilder().putString("desc", "left").build()
+ val my = new MetadataBuilder().putString("desc", "right").build()
+ val df = Seq((1, 2), (3, 4)).toDF("a", "b")
+ val zipped = df.select($"a".as("x", mx)).zip(df.select($"a".as("y", my)))
+ assert(zipped.columns === Array("x", "y"))
+ assert(zipped.schema("x").metadata === mx,
+ s"column x lost its metadata: ${zipped.schema("x").metadata}")
+ assert(zipped.schema("y").metadata === my,
+ s"column y inherited the survivor's metadata:
${zipped.schema("y").metadata}")
+ checkAnswer(zipped, Row(1, 1) :: Row(3, 3) :: Nil)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]