This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3f6d2317b69b [SPARK-47996][PS] support cross merge in pandas API
3f6d2317b69b is described below
commit 3f6d2317b69b3790d403f1d515040e3c8396cdc6
Author: Fangchen Li <[email protected]>
AuthorDate: Thu Jan 29 11:21:39 2026 +0800
[SPARK-47996][PS] support cross merge in pandas API
### What changes were proposed in this pull request?
Implement "cross" merge in pandas API.
### Why are the changes needed?
"cross" merge was not supported.
### Does this PR introduce _any_ user-facing change?
Yes, user can use "cross" for the how parameter in merge.
### How was this patch tested?
New unit testes used.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.5
Closes #54000 from fangchenli/SPARK-47996-cross-merge.
Authored-by: Fangchen Li <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/pandas/frame.py | 29 ++++++---
python/pyspark/pandas/namespace.py | 4 +-
.../pandas/tests/computation/test_combine.py | 74 +++++++++++++++++++++-
python/pyspark/pandas/utils.py | 6 +-
4 files changed, 100 insertions(+), 13 deletions(-)
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 1c66bbec37b7..15ffee9fe2dd 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -8546,7 +8546,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
----------
right: Object to merge with.
how: Type of merge to be performed.
- {'left', 'right', 'outer', 'inner'}, default 'inner'
+ {'left', 'right', 'outer', 'inner', 'cross'}, default 'inner'
left: use only keys from left frame, like a SQL left outer join;
not preserve
key order unlike pandas.
@@ -8556,6 +8556,8 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
lexicographically.
inner: use intersection of keys from both frames, like a SQL inner
join;
not preserve the order of the left keys unlike pandas.
+ cross: creates the cartesian product from both frames, preserves
the order
+ of the left keys.
on: Column or index level names to join on. These must be found in
both DataFrames. If on
is None and not merging on indexes then this defaults to the
intersection of the
columns in both DataFrames.
@@ -8661,7 +8663,16 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
if isinstance(right, ps.Series):
right = right.to_frame()
- if on:
+ if how == "cross":
+ if on or left_on or right_on:
+ raise ValueError("Can not pass on, left_on, or right_on to
merge with how='cross'.")
+ if left_index or right_index:
+ raise ValueError(
+ "Can not pass left_index=True or right_index=True to merge
with how='cross'."
+ )
+ left_key_names: List[str] = []
+ right_key_names: List[str] = []
+ elif on:
if left_on or right_on:
raise ValueError(
'Can only pass argument "on" OR "left_on" and "right_on", '
@@ -8741,12 +8752,14 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
left_key_columns = [scol_for(left_table, label) for label in
left_key_names]
right_key_columns = [scol_for(right_table, label) for label in
right_key_names]
- join_condition = reduce(
- lambda x, y: x & y,
- [lkey == rkey for lkey, rkey in zip(left_key_columns,
right_key_columns)],
- )
-
- joined_table = left_table.join(right_table, join_condition, how=how)
+ if how == "cross":
+ joined_table = left_table.crossJoin(right_table)
+ else:
+ join_condition = reduce(
+ lambda x, y: x & y,
+ [lkey == rkey for lkey, rkey in zip(left_key_columns,
right_key_columns)],
+ )
+ joined_table = left_table.join(right_table, join_condition,
how=how)
# Unpack suffixes tuple for convenience
left_suffix = suffixes[0]
diff --git a/python/pyspark/pandas/namespace.py
b/python/pyspark/pandas/namespace.py
index 0170e4533424..32362fdb46a2 100644
--- a/python/pyspark/pandas/namespace.py
+++ b/python/pyspark/pandas/namespace.py
@@ -2958,7 +2958,7 @@ def merge(
----------
right: Object to merge with.
how: Type of merge to be performed.
- {'left', 'right', 'outer', 'inner'}, default 'inner'
+ {'left', 'right', 'outer', 'inner', 'cross'}, default 'inner'
left: use only keys from left frame, like a SQL left outer join;
preserve key
order.
@@ -2968,6 +2968,8 @@ def merge(
lexicographically.
inner: use intersection of keys from both frames, like a SQL inner
join;
preserve the order of the left keys.
+ cross: creates the cartesian product from both frames, preserves the
order
+ of the left keys.
on: Column or index level names to join on. These must be found in both
DataFrames. If on
is None and not merging on indexes then this defaults to the
intersection of the
columns in both DataFrames.
diff --git a/python/pyspark/pandas/tests/computation/test_combine.py
b/python/pyspark/pandas/tests/computation/test_combine.py
index 82ba0d8f4c89..f9a09c94b4fc 100644
--- a/python/pyspark/pandas/tests/computation/test_combine.py
+++ b/python/pyspark/pandas/tests/computation/test_combine.py
@@ -352,6 +352,78 @@ class FrameCombineMixin:
pdf.sort_values(by=list(pdf.columns)).reset_index(drop=True),
)
+ def test_merge_cross(self):
+ # Test basic cross merge
+ left_pdf = pd.DataFrame({"A": [1, 2], "B": ["a", "b"]})
+ right_pdf = pd.DataFrame({"C": [3, 4], "D": ["c", "d"]})
+ left_psdf = ps.from_pandas(left_pdf)
+ right_psdf = ps.from_pandas(right_pdf)
+
+ psdf = left_psdf.merge(right_psdf, how="cross")
+ pdf = left_pdf.merge(right_pdf, how="cross")
+ self.assert_eq(
+ psdf.sort_values(by=list(psdf.columns)).reset_index(drop=True),
+ pdf.sort_values(by=list(pdf.columns)).reset_index(drop=True),
+ )
+
+ # Test cross merge with duplicate column names (should add suffixes)
+ left_pdf = pd.DataFrame({"A": [1, 2], "B": ["a", "b"]})
+ right_pdf = pd.DataFrame({"A": [3, 4], "C": ["c", "d"]})
+ left_psdf = ps.from_pandas(left_pdf)
+ right_psdf = ps.from_pandas(right_pdf)
+
+ psdf = left_psdf.merge(right_psdf, how="cross")
+ pdf = left_pdf.merge(right_pdf, how="cross")
+ self.assert_eq(
+ psdf.sort_values(by=list(psdf.columns)).reset_index(drop=True),
+ pdf.sort_values(by=list(pdf.columns)).reset_index(drop=True),
+ )
+
+ # Test cross merge with custom suffixes
+ psdf = left_psdf.merge(right_psdf, how="cross", suffixes=("_left",
"_right"))
+ pdf = left_pdf.merge(right_pdf, how="cross", suffixes=("_left",
"_right"))
+ self.assert_eq(
+ psdf.sort_values(by=list(psdf.columns)).reset_index(drop=True),
+ pdf.sort_values(by=list(pdf.columns)).reset_index(drop=True),
+ )
+
+ # Test cross merge with ps.merge function
+ psdf = ps.merge(left_psdf, right_psdf, how="cross")
+ pdf = pd.merge(left_pdf, right_pdf, how="cross")
+ self.assert_eq(
+ psdf.sort_values(by=list(psdf.columns)).reset_index(drop=True),
+ pdf.sort_values(by=list(pdf.columns)).reset_index(drop=True),
+ )
+
+ def test_merge_cross_raises(self):
+ left = ps.DataFrame({"A": [1, 2], "B": ["a", "b"]})
+ right = ps.DataFrame({"C": [3, 4], "D": ["c", "d"]})
+
+ with self.assertRaisesRegex(
+ ValueError, "Can not pass on, left_on, or right_on to merge with
how='cross'"
+ ):
+ left.merge(right, how="cross", on="A")
+
+ with self.assertRaisesRegex(
+ ValueError, "Can not pass on, left_on, or right_on to merge with
how='cross'"
+ ):
+ left.merge(right, how="cross", left_on="A", right_on="C")
+
+ with self.assertRaisesRegex(
+ ValueError, "Can not pass on, left_on, or right_on to merge with
how='cross'"
+ ):
+ left.merge(right, how="cross", right_on="C")
+
+ with self.assertRaisesRegex(
+ ValueError, "Can not pass left_index=True or right_index=True to
merge with how='cross'"
+ ):
+ left.merge(right, how="cross", left_index=True)
+
+ with self.assertRaisesRegex(
+ ValueError, "Can not pass left_index=True or right_index=True to
merge with how='cross'"
+ ):
+ left.merge(right, how="cross", right_index=True)
+
def test_merge_raises(self):
left = ps.DataFrame(
{"value": [1, 2, 3, 5, 6], "x": list("abcde")},
@@ -392,7 +464,7 @@ class FrameCombineMixin:
):
left.merge(right, left_on=["value", "x"], right_on="value")
- with self.assertRaisesRegex(ValueError, "['inner', 'left', 'right',
'full', 'outer']"):
+ with self.assertRaisesRegex(ValueError, r"\['inner', 'left', 'right',
'outer', 'cross'\]"):
left.merge(right, left_index=True, right_index=True, how="foo")
with self.assertRaisesRegex(KeyError, "id"):
diff --git a/python/pyspark/pandas/utils.py b/python/pyspark/pandas/utils.py
index 6cd531043bbc..58c1cce985fc 100644
--- a/python/pyspark/pandas/utils.py
+++ b/python/pyspark/pandas/utils.py
@@ -779,10 +779,10 @@ def validate_how(how: str) -> str:
if how == "outer":
# 'outer' in pandas equals 'full' in Spark
how = "full"
- if how not in ("inner", "left", "right", "full"):
+ if how not in ("inner", "left", "right", "full", "cross"):
raise ValueError(
- "The 'how' parameter has to be amongst the following values: ",
- "['inner', 'left', 'right', 'outer']",
+ "The 'how' parameter has to be amongst the following values: "
+ "['inner', 'left', 'right', 'outer', 'cross']"
)
return how
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]