This is an automated email from the ASF dual-hosted git repository.
ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1f758c235a5e [SPARK-55296][PS] Support CoW mode with pandas 3
1f758c235a5e is described below
commit 1f758c235a5ea8581e20e199a781f23bc4fcf12e
Author: Takuya Ueshin <[email protected]>
AuthorDate: Thu Feb 19 11:33:49 2026 -0800
[SPARK-55296][PS] Support CoW mode with pandas 3
### What changes were proposed in this pull request?
Support CoW (Copy-on-Write) mode with pandas 3.
### Why are the changes needed?
Pandas 3 is doing copy-on-write for everything.
For example:
```py
>>> pdf = pd.DataFrame(
... [[1, 2], [4, 5], [7, 8]],
... index=["cobra", "viper", "sidewinder"],
... columns=["max_speed", "shield"],
... )
>>>
>>> pser1 = pdf.max_speed
>>> pser2 = pdf.shield
>>>
>>> pdf.loc[["viper", "sidewinder"], ["max_speed", "shield"]] = 10
```
- pandas 2
```py
>>> pdf
max_speed shield
cobra 1 2
viper 10 10
sidewinder 10 10
>>> pser1
cobra 1
viper 10
sidewinder 10
Name: max_speed, dtype: int64
>>> pser2
cobra 2
viper 10
sidewinder 10
Name: shield, dtype: int64
```
- pandas 3
```py
>>> pdf
max_speed shield
cobra 1 2
viper 10 10
sidewinder 10 10
>>> pser1
cobra 1
viper 4
sidewinder 7
Name: max_speed, dtype: int64
>>> pser2
cobra 2
viper 5
sidewinder 8
Name: shield, dtype: int64
```
Or for `Series`:
```py
>>> pdf = pd.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}, index=["cobra",
"viper", "sidewinder"])
>>>
>>> pser = pdf.x
>>> psery = pdf.y
>>>
>>> pser.loc[pser % 2 == 1] = -pser
```
- pandas 2
```py
>>> pdf
x y
cobra -1 4
viper 2 5
sidewinder -3 6
>>> pser
cobra -1
viper 2
sidewinder -3
Name: x, dtype: int64
>>> psery
cobra 4
viper 5
sidewinder 6
Name: y, dtype: int64
```
- pandas 3
```py
>>> pdf
x y
cobra 1 4
viper 2 5
sidewinder 3 6
>>> pser
cobra -1
viper 2
sidewinder -3
Name: x, dtype: int64
>>> psery
cobra 4
viper 5
sidewinder 6
Name: y, dtype: int64
```
### Does this PR introduce _any_ user-facing change?
Yes, it will behave more like pandas 3.
### How was this patch tested?
Updated the related tests to make it clear, but basically the existing
tests should pass.
### Was this patch authored or co-authored using generative AI tooling?
Codex (GPT-5.3-Codex)
Closes #54375 from ueshin/issues/SPARK-55296/cow.
Authored-by: Takuya Ueshin <[email protected]>
Signed-off-by: Takuya Ueshin <[email protected]>
---
python/pyspark/pandas/indexing.py | 33 ++++-
python/pyspark/pandas/series.py | 5 +-
.../pandas/tests/indexes/test_indexing_iloc.py | 8 +-
.../pandas/tests/indexes/test_indexing_loc.py | 159 +++++++++++++--------
4 files changed, 140 insertions(+), 65 deletions(-)
diff --git a/python/pyspark/pandas/indexing.py
b/python/pyspark/pandas/indexing.py
index 9212dc96ec3d..fea5e3f55a56 100644
--- a/python/pyspark/pandas/indexing.py
+++ b/python/pyspark/pandas/indexing.py
@@ -27,6 +27,7 @@ import pandas as pd
from pandas.api.types import is_list_like
import numpy as np
+from pyspark.loose_version import LooseVersion
from pyspark.sql import functions as F, Column as PySparkColumn
from pyspark.sql.types import BooleanType, LongType, DataType
from pyspark.sql.utils import is_remote
@@ -720,7 +721,13 @@ class LocIndexerLike(IndexerLike, metaclass=ABCMeta):
cond, limit, remaining_index = self._select_rows(rows_sel)
missing_keys: List[Name] = []
- _, data_spark_columns, _, _, _ = self._select_cols(cols_sel,
missing_keys=missing_keys)
+ (
+ selected_column_labels,
+ data_spark_columns,
+ _,
+ _,
+ _,
+ ) = self._select_cols(cols_sel, missing_keys=missing_keys)
if cond is None:
cond = F.lit(True)
@@ -737,6 +744,30 @@ class LocIndexerLike(IndexerLike, metaclass=ABCMeta):
if isinstance(value, Series):
value = value.spark.column
else:
+ if (
+ # Only apply this behavior for pandas 3+, where CoW
semantics changed.
+ LooseVersion(pd.__version__) >= "3.0.0"
+ # Only for multi-column assignment (single-column
assignment is unaffected).
+ and len(selected_column_labels) > 1
+ # Column selector must be list-like (e.g. ["shield",
"max_speed"]), not scalar label access.
+ and is_list_like(cols_sel)
+ # Excludes string/bytes (single label), tuple (e.g.
MultiIndex label),
+ # and slice selectors; keeps this narrowly on explicit
column lists.
+ and not isinstance(cols_sel, (str, bytes, tuple, slice))
+ # Only trigger when cached/anchored Series exist on the
frame,
+ # matching the problematic case where views were
materialized before assignment.
+ and hasattr(self._psdf_or_psser, "_psseries")
+ ):
+ selected_column_labels_set = set(selected_column_labels)
+ selected_labels_in_internal_order = [
+ label
+ for label in self._internal.column_labels
+ if label in selected_column_labels_set
+ ]
+ if selected_column_labels !=
selected_labels_in_internal_order:
+ # If requested columns are in different order than the
DataFrame’s internal order,
+ # it returns early (no-op), matching pandas 3 behavior
for that edge case.
+ return
value = F.lit(value)
new_data_spark_columns = []
diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py
index 72d49574423b..882d44880b47 100644
--- a/python/pyspark/pandas/series.py
+++ b/python/pyspark/pandas/series.py
@@ -430,7 +430,10 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
assert not copy
assert fastpath is no_default
- self._anchor = data
+ if LooseVersion(pd.__version__) < "3.0.0":
+ self._anchor = data
+ else:
+ self._anchor = DataFrame(data)
self._col_label = index
elif isinstance(data, Series):
diff --git a/python/pyspark/pandas/tests/indexes/test_indexing_iloc.py
b/python/pyspark/pandas/tests/indexes/test_indexing_iloc.py
index a5b24b9783de..ce3fc7d30070 100644
--- a/python/pyspark/pandas/tests/indexes/test_indexing_iloc.py
+++ b/python/pyspark/pandas/tests/indexes/test_indexing_iloc.py
@@ -19,6 +19,7 @@ import numpy as np
import pandas as pd
from pyspark import pandas as ps
+from pyspark.loose_version import LooseVersion
from pyspark.pandas.exceptions import SparkPandasIndexingError,
SparkPandasNotImplementedError
from pyspark.testing.pandasutils import PandasOnSparkTestCase
from pyspark.testing.sqlutils import SQLTestUtils
@@ -180,9 +181,10 @@ class IndexingILocMixin:
)
psdf = ps.from_pandas(pdf)
- pdf.iloc[:, 0] = pdf
- psdf.iloc[:, 0] = psdf
- self.assert_eq(psdf, pdf)
+ if LooseVersion(pd.__version__) < "3.0.0":
+ pdf.iloc[:, 0] = pdf
+ psdf.iloc[:, 0] = psdf
+ self.assert_eq(psdf, pdf)
def test_series_iloc_setitem(self):
pdf = pd.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}, index=["cobra",
"viper", "sidewinder"])
diff --git a/python/pyspark/pandas/tests/indexes/test_indexing_loc.py
b/python/pyspark/pandas/tests/indexes/test_indexing_loc.py
index 916863b61b86..f01362d66860 100644
--- a/python/pyspark/pandas/tests/indexes/test_indexing_loc.py
+++ b/python/pyspark/pandas/tests/indexes/test_indexing_loc.py
@@ -20,6 +20,7 @@ import numpy as np
import pandas as pd
from pyspark import pandas as ps
+from pyspark.loose_version import LooseVersion
from pyspark.testing.pandasutils import PandasOnSparkTestCase
from pyspark.testing.sqlutils import SQLTestUtils
@@ -240,6 +241,58 @@ class IndexingLocMixin:
self.assert_eq(pdf.B.loc["2011":"2015"], psdf.B.loc["2011":"2015"])
def test_frame_loc_setitem(self):
+ def check(op, check_ser, almost):
+ pdf = pd.DataFrame(
+ [[1, 2], [4, 5], [7, 8]],
+ index=["cobra", "viper", "sidewinder"],
+ columns=["max_speed", "shield"],
+ )
+ psdf = ps.from_pandas(pdf)
+
+ if check_ser:
+ pser1 = pdf.max_speed
+ pser2 = pdf.shield
+ psser1 = psdf.max_speed
+ psser2 = psdf.shield
+
+ op(pdf)
+ op(psdf)
+
+ self.assert_eq(psdf, pdf, almost=almost)
+ if check_ser:
+ self.assert_eq(psser1, pser1)
+ self.assert_eq(psser2, pser2)
+
+ def op0(df):
+ df.loc[["viper", "sidewinder"], ["max_speed", "shield"]] = 10
+
+ def op1(df):
+ df.loc[["viper", "sidewinder"], ["shield", "max_speed"]] = 10
+
+ def op2(df):
+ df.loc[["viper", "sidewinder"], "shield"] = 50
+
+ def op3(df):
+ df.loc["cobra", "max_speed"] = 30
+
+ def op4(df):
+ df.loc[df.max_speed < 5, "max_speed"] = -df.max_speed
+
+ def op5(df):
+ df.loc[df.max_speed < 2, "max_speed"] = -df.max_speed
+
+ def op6(df):
+ df.loc[:, "min_speed"] = 0
+
+ for check_ser in [True, False]:
+ for op in [op0, op1, op2, op3, op4, op5, (op6, True)]:
+ if isinstance(op, tuple):
+ op, almost = op
+ else:
+ op, almost = op, False
+ with self.subTest(check_ser=check_ser, op=op.__name__):
+ check(op, check_ser=check_ser, almost=almost)
+
pdf = pd.DataFrame(
[[1, 2], [4, 5], [7, 8]],
index=["cobra", "viper", "sidewinder"],
@@ -247,47 +300,6 @@ class IndexingLocMixin:
)
psdf = ps.from_pandas(pdf)
- pser1 = pdf.max_speed
- pser2 = pdf.shield
- psser1 = psdf.max_speed
- psser2 = psdf.shield
-
- pdf.loc[["viper", "sidewinder"], ["shield", "max_speed"]] = 10
- psdf.loc[["viper", "sidewinder"], ["shield", "max_speed"]] = 10
- self.assert_eq(psdf, pdf)
- self.assert_eq(psser1, pser1)
- self.assert_eq(psser2, pser2)
-
- pdf.loc[["viper", "sidewinder"], "shield"] = 50
- psdf.loc[["viper", "sidewinder"], "shield"] = 50
- self.assert_eq(psdf, pdf)
- self.assert_eq(psser1, pser1)
- self.assert_eq(psser2, pser2)
-
- pdf.loc["cobra", "max_speed"] = 30
- psdf.loc["cobra", "max_speed"] = 30
- self.assert_eq(psdf, pdf)
- self.assert_eq(psser1, pser1)
- self.assert_eq(psser2, pser2)
-
- pdf.loc[pdf.max_speed < 5, "max_speed"] = -pdf.max_speed
- psdf.loc[psdf.max_speed < 5, "max_speed"] = -psdf.max_speed
- self.assert_eq(psdf, pdf)
- self.assert_eq(psser1, pser1)
- self.assert_eq(psser2, pser2)
-
- pdf.loc[pdf.max_speed < 2, "max_speed"] = -pdf.max_speed
- psdf.loc[psdf.max_speed < 2, "max_speed"] = -psdf.max_speed
- self.assert_eq(psdf, pdf)
- self.assert_eq(psser1, pser1)
- self.assert_eq(psser2, pser2)
-
- pdf.loc[:, "min_speed"] = 0
- psdf.loc[:, "min_speed"] = 0
- self.assert_eq(psdf, pdf, almost=True)
- self.assert_eq(psser1, pser1)
- self.assert_eq(psser2, pser2)
-
with self.assertRaisesRegex(ValueError, "Incompatible indexer with
Series"):
psdf.loc["cobra", "max_speed"] = -psdf.max_speed
with self.assertRaisesRegex(ValueError, "shape mismatch"):
@@ -296,23 +308,49 @@ class IndexingLocMixin:
psdf.loc[:, "max_speed"] = psdf
# multi-index columns
- columns = pd.MultiIndex.from_tuples(
- [("x", "max_speed"), ("x", "shield"), ("y", "min_speed")]
- )
- pdf.columns = columns
- psdf.columns = columns
+ def check(op, check_ser):
+ pdf = pd.DataFrame(
+ [[1, 2, 0], [4, 5, 0], [7, 8, 0]],
+ index=["cobra", "viper", "sidewinder"],
+ columns=pd.MultiIndex.from_tuples(
+ [("x", "max_speed"), ("x", "shield"), ("y", "min_speed")]
+ ),
+ )
+ psdf = ps.from_pandas(pdf)
+
+ if check_ser:
+ pser1 = pdf[("x", "max_speed")]
+ pser2 = pdf[("x", "shield")]
+ psser1 = psdf[("x", "max_speed")]
+ psser2 = psdf[("x", "shield")]
+
+ op(pdf)
+ op(psdf)
+
+ self.assert_eq(psdf, pdf, almost=True)
+ if check_ser:
+ self.assert_eq(psser1, pser1)
+ self.assert_eq(psser2, pser2)
+
+ def mop0(df):
+ df.loc[:, ("y", "shield")] = -df[("x", "shield")]
+
+ def mop1(df):
+ df.loc[:, "z"] = 100
+
+ for check_ser in [True, False]:
+ for op in [mop0, mop1]:
+ with self.subTest(check_ser=check_ser, op=op.__name__):
+ check(op, check_ser=check_ser)
- pdf.loc[:, ("y", "shield")] = -pdf[("x", "shield")]
- psdf.loc[:, ("y", "shield")] = -psdf[("x", "shield")]
- self.assert_eq(psdf, pdf, almost=True)
- self.assert_eq(psser1, pser1)
- self.assert_eq(psser2, pser2)
-
- pdf.loc[:, "z"] = 100
- psdf.loc[:, "z"] = 100
- self.assert_eq(psdf, pdf, almost=True)
- self.assert_eq(psser1, pser1)
- self.assert_eq(psser2, pser2)
+ pdf = pd.DataFrame(
+ [[1, 2, 0], [4, 5, 0], [7, 8, 0]],
+ index=["cobra", "viper", "sidewinder"],
+ columns=pd.MultiIndex.from_tuples(
+ [("x", "max_speed"), ("x", "shield"), ("y", "min_speed")]
+ ),
+ )
+ psdf = ps.from_pandas(pdf)
with self.assertRaisesRegex(KeyError, "Key length \\(3\\) exceeds
index depth \\(2\\)"):
psdf.loc[:, [("x", "max_speed", "foo")]] = -psdf[("x", "shield")]
@@ -322,9 +360,10 @@ class IndexingLocMixin:
)
psdf = ps.from_pandas(pdf)
- pdf.loc[:, "max_speed"] = pdf
- psdf.loc[:, "max_speed"] = psdf
- self.assert_eq(psdf, pdf)
+ if LooseVersion(pd.__version__) < "3.0.0":
+ pdf.loc[:, "max_speed"] = pdf
+ psdf.loc[:, "max_speed"] = psdf
+ self.assert_eq(psdf, pdf)
def test_series_loc_setitem(self):
pdf = pd.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}, index=["cobra",
"viper", "sidewinder"])
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]