This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f758c235a5e [SPARK-55296][PS] Support CoW mode with pandas 3
1f758c235a5e is described below

commit 1f758c235a5ea8581e20e199a781f23bc4fcf12e
Author: Takuya Ueshin <[email protected]>
AuthorDate: Thu Feb 19 11:33:49 2026 -0800

    [SPARK-55296][PS] Support CoW mode with pandas 3
    
    ### What changes were proposed in this pull request?
    
    Support CoW (Copy-on-Write) mode with pandas 3.
    
    ### Why are the changes needed?
    
    Pandas 3 is doing copy-on-write for everything.
    
    For example:
    
    ```py
    >>> pdf = pd.DataFrame(
    ...     [[1, 2], [4, 5], [7, 8]],
    ...     index=["cobra", "viper", "sidewinder"],
    ...     columns=["max_speed", "shield"],
    ... )
    >>>
    >>> pser1 = pdf.max_speed
    >>> pser2 = pdf.shield
    >>>
    >>> pdf.loc[["viper", "sidewinder"], ["max_speed", "shield"]] = 10
    ```
    
    - pandas 2
    
    ```py
    >>> pdf
                max_speed  shield
    cobra               1       2
    viper              10      10
    sidewinder         10      10
    >>> pser1
    cobra          1
    viper         10
    sidewinder    10
    Name: max_speed, dtype: int64
    >>> pser2
    cobra          2
    viper         10
    sidewinder    10
    Name: shield, dtype: int64
    ```
    
    - pandas 3
    
    ```py
    >>> pdf
                max_speed  shield
    cobra               1       2
    viper              10      10
    sidewinder         10      10
    >>> pser1
    cobra         1
    viper         4
    sidewinder    7
    Name: max_speed, dtype: int64
    >>> pser2
    cobra         2
    viper         5
    sidewinder    8
    Name: shield, dtype: int64
    ```
    
    Or for `Series`:
    
    ```py
    >>> pdf = pd.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}, index=["cobra", 
"viper", "sidewinder"])
    >>>
    >>> pser = pdf.x
    >>> psery = pdf.y
    >>>
    >>> pser.loc[pser % 2 == 1] = -pser
    ```
    
    - pandas 2
    
    ```py
    >>> pdf
                x  y
    cobra      -1  4
    viper       2  5
    sidewinder -3  6
    >>> pser
    cobra        -1
    viper         2
    sidewinder   -3
    Name: x, dtype: int64
    >>> psery
    cobra         4
    viper         5
    sidewinder    6
    Name: y, dtype: int64
    ```
    
    - pandas 3
    
    ```py
    >>> pdf
                x  y
    cobra       1  4
    viper       2  5
    sidewinder  3  6
    >>> pser
    cobra        -1
    viper         2
    sidewinder   -3
    Name: x, dtype: int64
    >>> psery
    cobra         4
    viper         5
    sidewinder    6
    Name: y, dtype: int64
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it will behave more like pandas 3.
    
    ### How was this patch tested?
    
    Updated the related tests to make it clear, but basically the existing 
tests should pass.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Codex (GPT-5.3-Codex)
    
    Closes #54375 from ueshin/issues/SPARK-55296/cow.
    
    Authored-by: Takuya Ueshin <[email protected]>
    Signed-off-by: Takuya Ueshin <[email protected]>
---
 python/pyspark/pandas/indexing.py                  |  33 ++++-
 python/pyspark/pandas/series.py                    |   5 +-
 .../pandas/tests/indexes/test_indexing_iloc.py     |   8 +-
 .../pandas/tests/indexes/test_indexing_loc.py      | 159 +++++++++++++--------
 4 files changed, 140 insertions(+), 65 deletions(-)

diff --git a/python/pyspark/pandas/indexing.py 
b/python/pyspark/pandas/indexing.py
index 9212dc96ec3d..fea5e3f55a56 100644
--- a/python/pyspark/pandas/indexing.py
+++ b/python/pyspark/pandas/indexing.py
@@ -27,6 +27,7 @@ import pandas as pd
 from pandas.api.types import is_list_like
 import numpy as np
 
+from pyspark.loose_version import LooseVersion
 from pyspark.sql import functions as F, Column as PySparkColumn
 from pyspark.sql.types import BooleanType, LongType, DataType
 from pyspark.sql.utils import is_remote
@@ -720,7 +721,13 @@ class LocIndexerLike(IndexerLike, metaclass=ABCMeta):
 
             cond, limit, remaining_index = self._select_rows(rows_sel)
             missing_keys: List[Name] = []
-            _, data_spark_columns, _, _, _ = self._select_cols(cols_sel, 
missing_keys=missing_keys)
+            (
+                selected_column_labels,
+                data_spark_columns,
+                _,
+                _,
+                _,
+            ) = self._select_cols(cols_sel, missing_keys=missing_keys)
 
             if cond is None:
                 cond = F.lit(True)
@@ -737,6 +744,30 @@ class LocIndexerLike(IndexerLike, metaclass=ABCMeta):
                 if isinstance(value, Series):
                     value = value.spark.column
             else:
+                if (
+                    # Only apply this behavior for pandas 3+, where CoW 
semantics changed.
+                    LooseVersion(pd.__version__) >= "3.0.0"
+                    # Only for multi-column assignment (single-column 
assignment is unaffected).
+                    and len(selected_column_labels) > 1
+                    # Column selector must be list-like (e.g. ["shield", 
"max_speed"]), not scalar label access.
+                    and is_list_like(cols_sel)
+                    # Excludes string/bytes (single label), tuple (e.g. 
MultiIndex label),
+                    # and slice selectors; keeps this narrowly on explicit 
column lists.
+                    and not isinstance(cols_sel, (str, bytes, tuple, slice))
+                    # Only trigger when cached/anchored Series exist on the 
frame,
+                    # matching the problematic case where views were 
materialized before assignment.
+                    and hasattr(self._psdf_or_psser, "_psseries")
+                ):
+                    selected_column_labels_set = set(selected_column_labels)
+                    selected_labels_in_internal_order = [
+                        label
+                        for label in self._internal.column_labels
+                        if label in selected_column_labels_set
+                    ]
+                    if selected_column_labels != 
selected_labels_in_internal_order:
+                        # If requested columns are in different order than the 
DataFrame’s internal order,
+                        # it returns early (no-op), matching pandas 3 behavior 
for that edge case.
+                        return
                 value = F.lit(value)
 
             new_data_spark_columns = []
diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py
index 72d49574423b..882d44880b47 100644
--- a/python/pyspark/pandas/series.py
+++ b/python/pyspark/pandas/series.py
@@ -430,7 +430,10 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
             assert not copy
             assert fastpath is no_default
 
-            self._anchor = data
+            if LooseVersion(pd.__version__) < "3.0.0":
+                self._anchor = data
+            else:
+                self._anchor = DataFrame(data)
             self._col_label = index
 
         elif isinstance(data, Series):
diff --git a/python/pyspark/pandas/tests/indexes/test_indexing_iloc.py 
b/python/pyspark/pandas/tests/indexes/test_indexing_iloc.py
index a5b24b9783de..ce3fc7d30070 100644
--- a/python/pyspark/pandas/tests/indexes/test_indexing_iloc.py
+++ b/python/pyspark/pandas/tests/indexes/test_indexing_iloc.py
@@ -19,6 +19,7 @@ import numpy as np
 import pandas as pd
 
 from pyspark import pandas as ps
+from pyspark.loose_version import LooseVersion
 from pyspark.pandas.exceptions import SparkPandasIndexingError, 
SparkPandasNotImplementedError
 from pyspark.testing.pandasutils import PandasOnSparkTestCase
 from pyspark.testing.sqlutils import SQLTestUtils
@@ -180,9 +181,10 @@ class IndexingILocMixin:
         )
         psdf = ps.from_pandas(pdf)
 
-        pdf.iloc[:, 0] = pdf
-        psdf.iloc[:, 0] = psdf
-        self.assert_eq(psdf, pdf)
+        if LooseVersion(pd.__version__) < "3.0.0":
+            pdf.iloc[:, 0] = pdf
+            psdf.iloc[:, 0] = psdf
+            self.assert_eq(psdf, pdf)
 
     def test_series_iloc_setitem(self):
         pdf = pd.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}, index=["cobra", 
"viper", "sidewinder"])
diff --git a/python/pyspark/pandas/tests/indexes/test_indexing_loc.py 
b/python/pyspark/pandas/tests/indexes/test_indexing_loc.py
index 916863b61b86..f01362d66860 100644
--- a/python/pyspark/pandas/tests/indexes/test_indexing_loc.py
+++ b/python/pyspark/pandas/tests/indexes/test_indexing_loc.py
@@ -20,6 +20,7 @@ import numpy as np
 import pandas as pd
 
 from pyspark import pandas as ps
+from pyspark.loose_version import LooseVersion
 from pyspark.testing.pandasutils import PandasOnSparkTestCase
 from pyspark.testing.sqlutils import SQLTestUtils
 
@@ -240,6 +241,58 @@ class IndexingLocMixin:
         self.assert_eq(pdf.B.loc["2011":"2015"], psdf.B.loc["2011":"2015"])
 
     def test_frame_loc_setitem(self):
+        def check(op, check_ser, almost):
+            pdf = pd.DataFrame(
+                [[1, 2], [4, 5], [7, 8]],
+                index=["cobra", "viper", "sidewinder"],
+                columns=["max_speed", "shield"],
+            )
+            psdf = ps.from_pandas(pdf)
+
+            if check_ser:
+                pser1 = pdf.max_speed
+                pser2 = pdf.shield
+                psser1 = psdf.max_speed
+                psser2 = psdf.shield
+
+            op(pdf)
+            op(psdf)
+
+            self.assert_eq(psdf, pdf, almost=almost)
+            if check_ser:
+                self.assert_eq(psser1, pser1)
+                self.assert_eq(psser2, pser2)
+
+        def op0(df):
+            df.loc[["viper", "sidewinder"], ["max_speed", "shield"]] = 10
+
+        def op1(df):
+            df.loc[["viper", "sidewinder"], ["shield", "max_speed"]] = 10
+
+        def op2(df):
+            df.loc[["viper", "sidewinder"], "shield"] = 50
+
+        def op3(df):
+            df.loc["cobra", "max_speed"] = 30
+
+        def op4(df):
+            df.loc[df.max_speed < 5, "max_speed"] = -df.max_speed
+
+        def op5(df):
+            df.loc[df.max_speed < 2, "max_speed"] = -df.max_speed
+
+        def op6(df):
+            df.loc[:, "min_speed"] = 0
+
+        for check_ser in [True, False]:
+            for op in [op0, op1, op2, op3, op4, op5, (op6, True)]:
+                if isinstance(op, tuple):
+                    op, almost = op
+                else:
+                    op, almost = op, False
+                with self.subTest(check_ser=check_ser, op=op.__name__):
+                    check(op, check_ser=check_ser, almost=almost)
+
         pdf = pd.DataFrame(
             [[1, 2], [4, 5], [7, 8]],
             index=["cobra", "viper", "sidewinder"],
@@ -247,47 +300,6 @@ class IndexingLocMixin:
         )
         psdf = ps.from_pandas(pdf)
 
-        pser1 = pdf.max_speed
-        pser2 = pdf.shield
-        psser1 = psdf.max_speed
-        psser2 = psdf.shield
-
-        pdf.loc[["viper", "sidewinder"], ["shield", "max_speed"]] = 10
-        psdf.loc[["viper", "sidewinder"], ["shield", "max_speed"]] = 10
-        self.assert_eq(psdf, pdf)
-        self.assert_eq(psser1, pser1)
-        self.assert_eq(psser2, pser2)
-
-        pdf.loc[["viper", "sidewinder"], "shield"] = 50
-        psdf.loc[["viper", "sidewinder"], "shield"] = 50
-        self.assert_eq(psdf, pdf)
-        self.assert_eq(psser1, pser1)
-        self.assert_eq(psser2, pser2)
-
-        pdf.loc["cobra", "max_speed"] = 30
-        psdf.loc["cobra", "max_speed"] = 30
-        self.assert_eq(psdf, pdf)
-        self.assert_eq(psser1, pser1)
-        self.assert_eq(psser2, pser2)
-
-        pdf.loc[pdf.max_speed < 5, "max_speed"] = -pdf.max_speed
-        psdf.loc[psdf.max_speed < 5, "max_speed"] = -psdf.max_speed
-        self.assert_eq(psdf, pdf)
-        self.assert_eq(psser1, pser1)
-        self.assert_eq(psser2, pser2)
-
-        pdf.loc[pdf.max_speed < 2, "max_speed"] = -pdf.max_speed
-        psdf.loc[psdf.max_speed < 2, "max_speed"] = -psdf.max_speed
-        self.assert_eq(psdf, pdf)
-        self.assert_eq(psser1, pser1)
-        self.assert_eq(psser2, pser2)
-
-        pdf.loc[:, "min_speed"] = 0
-        psdf.loc[:, "min_speed"] = 0
-        self.assert_eq(psdf, pdf, almost=True)
-        self.assert_eq(psser1, pser1)
-        self.assert_eq(psser2, pser2)
-
         with self.assertRaisesRegex(ValueError, "Incompatible indexer with 
Series"):
             psdf.loc["cobra", "max_speed"] = -psdf.max_speed
         with self.assertRaisesRegex(ValueError, "shape mismatch"):
@@ -296,23 +308,49 @@ class IndexingLocMixin:
             psdf.loc[:, "max_speed"] = psdf
 
         # multi-index columns
-        columns = pd.MultiIndex.from_tuples(
-            [("x", "max_speed"), ("x", "shield"), ("y", "min_speed")]
-        )
-        pdf.columns = columns
-        psdf.columns = columns
+        def check(op, check_ser):
+            pdf = pd.DataFrame(
+                [[1, 2, 0], [4, 5, 0], [7, 8, 0]],
+                index=["cobra", "viper", "sidewinder"],
+                columns=pd.MultiIndex.from_tuples(
+                    [("x", "max_speed"), ("x", "shield"), ("y", "min_speed")]
+                ),
+            )
+            psdf = ps.from_pandas(pdf)
+
+            if check_ser:
+                pser1 = pdf[("x", "max_speed")]
+                pser2 = pdf[("x", "shield")]
+                psser1 = psdf[("x", "max_speed")]
+                psser2 = psdf[("x", "shield")]
+
+            op(pdf)
+            op(psdf)
+
+            self.assert_eq(psdf, pdf, almost=True)
+            if check_ser:
+                self.assert_eq(psser1, pser1)
+                self.assert_eq(psser2, pser2)
+
+        def mop0(df):
+            df.loc[:, ("y", "shield")] = -df[("x", "shield")]
+
+        def mop1(df):
+            df.loc[:, "z"] = 100
+
+        for check_ser in [True, False]:
+            for op in [mop0, mop1]:
+                with self.subTest(check_ser=check_ser, op=op.__name__):
+                    check(op, check_ser=check_ser)
 
-        pdf.loc[:, ("y", "shield")] = -pdf[("x", "shield")]
-        psdf.loc[:, ("y", "shield")] = -psdf[("x", "shield")]
-        self.assert_eq(psdf, pdf, almost=True)
-        self.assert_eq(psser1, pser1)
-        self.assert_eq(psser2, pser2)
-
-        pdf.loc[:, "z"] = 100
-        psdf.loc[:, "z"] = 100
-        self.assert_eq(psdf, pdf, almost=True)
-        self.assert_eq(psser1, pser1)
-        self.assert_eq(psser2, pser2)
+        pdf = pd.DataFrame(
+            [[1, 2, 0], [4, 5, 0], [7, 8, 0]],
+            index=["cobra", "viper", "sidewinder"],
+            columns=pd.MultiIndex.from_tuples(
+                [("x", "max_speed"), ("x", "shield"), ("y", "min_speed")]
+            ),
+        )
+        psdf = ps.from_pandas(pdf)
 
         with self.assertRaisesRegex(KeyError, "Key length \\(3\\) exceeds 
index depth \\(2\\)"):
             psdf.loc[:, [("x", "max_speed", "foo")]] = -psdf[("x", "shield")]
@@ -322,9 +360,10 @@ class IndexingLocMixin:
         )
         psdf = ps.from_pandas(pdf)
 
-        pdf.loc[:, "max_speed"] = pdf
-        psdf.loc[:, "max_speed"] = psdf
-        self.assert_eq(psdf, pdf)
+        if LooseVersion(pd.__version__) < "3.0.0":
+            pdf.loc[:, "max_speed"] = pdf
+            psdf.loc[:, "max_speed"] = psdf
+            self.assert_eq(psdf, pdf)
 
     def test_series_loc_setitem(self):
         pdf = pd.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}, index=["cobra", 
"viper", "sidewinder"])


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to