This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ea7948f35378 [SPARK-55946][PS] Set up __pandas_priority__ so mixed 
binary ops dispatch correctly to pandas-on-Spark
ea7948f35378 is described below

commit ea7948f35378c663649690d37ac1dbadb1385085
Author: Takuya Ueshin <[email protected]>
AuthorDate: Wed Mar 11 13:39:43 2026 +0900

    [SPARK-55946][PS] Set up __pandas_priority__ so mixed binary ops dispatch 
correctly to pandas-on-Spark
    
    ### What changes were proposed in this pull request?
    
    This PR sets `__pandas_priority__` on pandas-on-Spark objects so mixed 
reverse arithmetic with pandas objects is dispatched to pandas-on-Spark 
implementations.
    
    The changes are:
    
    - add `__pandas_priority__ = 3500` to pandas-on-Spark `Series`/`Index` via 
`IndexOpsMixin`
    - add `__pandas_priority__ = 4500` to pandas-on-Spark `DataFrame`
    - preserve the existing unsupported behavior for datetime reverse 
subtraction from a pandas `Series` by raising `NotImplementedError` in 
`DatetimeOps.rsub`
    - add test coverage for mixed pandas `DataFrame` and pandas-on-Spark 
`DataFrame` reverse ops
    - extend datetime arithmetic exception tests to cover the pandas object 
variants involved in reverse-op dispatch
    
    ### Why are the changes needed?
    
    pandas 2.1+ uses `__pandas_priority__` to decide whether mixed arithmetic 
should defer to the right-hand operand's reflected implementation.
    
    Without this, mixed reverse ops between pandas objects and pandas-on-Spark 
objects may be handled by pandas first, which can surface pandas-side errors 
instead of the existing pandas-on-Spark errors.
    
    For example, datetime reverse ops such as `other + psser` should continue 
to raise pandas-on-Spark errors like:
    
    - `Addition can not be applied to datetimes.`
    
    instead of pandas-side wrapping or broadcasting errors.
    
    The same dispatch issue also applies to mixed `pandas.DataFrame` and 
pandas-on-Spark `DataFrame` operations.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    For mixed arithmetic between pandas objects and pandas-on-Spark objects, 
pandas-on-Spark now takes precedence more consistently, so users will see 
pandas-on-Spark errors and behavior from reflected operations instead of 
pandas-side fallback errors in those cases.
    
    For example, mixed DataFrame reverse ops now dispatch to pandas-on-Spark 
code paths:
    
    ```python
    import pandas as pd
    from pyspark import pandas as ps
    
    pdf = pd.DataFrame({"a": [1, 2], "b": [3, 4]})
    psdf = ps.from_pandas(pdf)
    
    pdf + psdf
    ```
    
    This now raises a pandas-on-Spark error:
    
    ```text
    TypeError: radd with a sequence is currently not supported; however, got 
DataFrame.
    ```
    
    ### How was this patch tested?
    
    Added and ran targeted unittests in both pandas 2 and pandas 3 environments:
    
    - 
`pyspark.pandas.tests.series.test_datetime.SeriesDateTimeTests.test_arithmetic_op_exceptions`
    - 
`pyspark.pandas.tests.computation.test_binary_ops.FrameBinaryOpsTests.test_mixed_dataframe_ops_dispatch_to_pandas_on_spark`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: OpenAI Codex (GPT-5)
    
    Closes #54741 from ueshin/issues/SPARK-55946/pandas_priority.
    
    Authored-by: Takuya Ueshin <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/pandas/base.py                      |  5 +-
 .../pyspark/pandas/data_type_ops/datetime_ops.py   |  2 +
 python/pyspark/pandas/frame.py                     |  4 ++
 .../pandas/tests/computation/test_binary_ops.py    | 14 +++++
 .../pyspark/pandas/tests/series/test_datetime.py   | 60 +++++++++++++---------
 5 files changed, 59 insertions(+), 26 deletions(-)

diff --git a/python/pyspark/pandas/base.py b/python/pyspark/pandas/base.py
index 5a39b581e986..ef2d91a0e237 100644
--- a/python/pyspark/pandas/base.py
+++ b/python/pyspark/pandas/base.py
@@ -22,7 +22,7 @@ import warnings
 from abc import ABCMeta, abstractmethod
 from functools import wraps, partial
 from itertools import chain
-from typing import Any, Callable, Optional, Sequence, Tuple, Union, cast, 
TYPE_CHECKING
+from typing import Any, Callable, ClassVar, Optional, Sequence, Tuple, Union, 
cast, TYPE_CHECKING
 
 import numpy as np
 import pandas as pd
@@ -284,6 +284,9 @@ class IndexOpsMixin(object, metaclass=ABCMeta):
     Assuming there are following attributes or properties and functions.
     """
 
+    # Keep pandas-on-Spark above pandas Series and Index for reflected ops.
+    __pandas_priority__: ClassVar[int] = pd.Series.__pandas_priority__ + 500  
# type: ignore[attr-defined]
+
     @property
     @abstractmethod
     def _internal(self) -> InternalFrame:
diff --git a/python/pyspark/pandas/data_type_ops/datetime_ops.py 
b/python/pyspark/pandas/data_type_ops/datetime_ops.py
index 07399ecd873e..d407b8512b4e 100644
--- a/python/pyspark/pandas/data_type_ops/datetime_ops.py
+++ b/python/pyspark/pandas/data_type_ops/datetime_ops.py
@@ -94,6 +94,8 @@ class DatetimeOps(DataTypeOps):
             "The timestamp subtraction returns an integer in seconds, "
             "whereas pandas returns 'timedelta64[ns]'."
         )
+        if isinstance(right, pd.Series):
+            raise NotImplementedError()
         if isinstance(right, datetime.datetime):
             warnings.warn(msg, UserWarning)
             return cast(
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 8377f72cd376..e775efdfaf71 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -32,6 +32,7 @@ from types import TracebackType
 from typing import (
     Any,
     Callable,
+    ClassVar,
     Dict,
     Generic,
     IO,
@@ -543,6 +544,9 @@ class DataFrame(Frame, Generic[T]):
     2    None  NaN
     """
 
+    # Keep pandas-on-Spark above pandas DataFrame for reflected ops.
+    __pandas_priority__: ClassVar[int] = pd.DataFrame.__pandas_priority__ + 
500  # type: ignore[attr-defined]
+
     def __init__(  # type: ignore[no-untyped-def]
         self, data=None, index=None, columns=None, dtype=None, copy=False
     ):
diff --git a/python/pyspark/pandas/tests/computation/test_binary_ops.py 
b/python/pyspark/pandas/tests/computation/test_binary_ops.py
index 919e9b77973a..b0e83c02305c 100644
--- a/python/pyspark/pandas/tests/computation/test_binary_ops.py
+++ b/python/pyspark/pandas/tests/computation/test_binary_ops.py
@@ -69,6 +69,20 @@ class FrameBinaryOpsMixin:
             lambda: psdf.add(psdf_other),
         )
 
+    def test_mixed_dataframe_ops_dispatch_to_pandas_on_spark(self):
+        pdf = pd.DataFrame({"a": [1, 2], "b": [3, 4]})
+        psdf = ps.from_pandas(pdf)
+
+        with self.assertRaisesRegex(
+            TypeError, "add with a sequence is currently not supported; 
however, got DataFrame."
+        ):
+            psdf + pdf
+
+        with self.assertRaisesRegex(
+            TypeError, "radd with a sequence is currently not supported; 
however, got DataFrame."
+        ):
+            pdf + psdf
+
     def test_binary_operator_add(self):
         # Positive
         pdf = pd.DataFrame({"a": ["x"], "b": ["y"], "c": [1], "d": [2]})
diff --git a/python/pyspark/pandas/tests/series/test_datetime.py 
b/python/pyspark/pandas/tests/series/test_datetime.py
index a8374b38e66f..5397f8442cda 100644
--- a/python/pyspark/pandas/tests/series/test_datetime.py
+++ b/python/pyspark/pandas/tests/series/test_datetime.py
@@ -22,6 +22,7 @@ import numpy as np
 import pandas as pd
 
 from pyspark import pandas as ps
+from pyspark.loose_version import LooseVersion
 from pyspark.testing.pandasutils import PandasOnSparkTestCase
 from pyspark.testing.sqlutils import SQLTestUtils
 
@@ -85,35 +86,44 @@ class SeriesDateTimeTestsMixin:
         py_datetime = self.pd_start_date.dt.to_pydatetime()
         datetime_index = ps.Index(self.pd_start_date)
 
-        for other in [1, 0.1, psser, datetime_index, py_datetime]:
-            expected_err_msg = "Addition can not be applied to datetimes."
-            self.assertRaisesRegex(TypeError, expected_err_msg, lambda: psser 
+ other)
-            self.assertRaisesRegex(TypeError, expected_err_msg, lambda: other 
+ psser)
-
-            expected_err_msg = "Multiplication can not be applied to 
datetimes."
-            self.assertRaisesRegex(TypeError, expected_err_msg, lambda: psser 
* other)
-            self.assertRaisesRegex(TypeError, expected_err_msg, lambda: other 
* psser)
-
-            expected_err_msg = "True division can not be applied to datetimes."
-            self.assertRaisesRegex(TypeError, expected_err_msg, lambda: psser 
/ other)
-            self.assertRaisesRegex(TypeError, expected_err_msg, lambda: other 
/ psser)
-
-            expected_err_msg = "Floor division can not be applied to 
datetimes."
-            self.assertRaisesRegex(TypeError, expected_err_msg, lambda: psser 
// other)
-            self.assertRaisesRegex(TypeError, expected_err_msg, lambda: other 
// psser)
-
-            expected_err_msg = "Modulo can not be applied to datetimes."
-            self.assertRaisesRegex(TypeError, expected_err_msg, lambda: psser 
% other)
-            self.assertRaisesRegex(TypeError, expected_err_msg, lambda: other 
% psser)
+        others = [1, 0.1, psser, datetime_index, py_datetime]
+        if LooseVersion(pd.__version__) < "3.0.0":
+            # py_datetime is np.ndarray, adding pd.Series.
+            others.append(pd.Series(py_datetime, dtype="object"))
+        else:
+            # py_datetime is pd.Series, adding np.ndarray.
+            others.append(py_datetime.to_numpy())
+        for i, other in enumerate(others):
+            with self.subTest(i=i):
+                expected_err_msg = "Addition can not be applied to datetimes."
+                self.assertRaisesRegex(TypeError, expected_err_msg, lambda: 
psser + other)
+                self.assertRaisesRegex(TypeError, expected_err_msg, lambda: 
other + psser)
+
+                expected_err_msg = "Multiplication can not be applied to 
datetimes."
+                self.assertRaisesRegex(TypeError, expected_err_msg, lambda: 
psser * other)
+                self.assertRaisesRegex(TypeError, expected_err_msg, lambda: 
other * psser)
+
+                expected_err_msg = "True division can not be applied to 
datetimes."
+                self.assertRaisesRegex(TypeError, expected_err_msg, lambda: 
psser / other)
+                self.assertRaisesRegex(TypeError, expected_err_msg, lambda: 
other / psser)
+
+                expected_err_msg = "Floor division can not be applied to 
datetimes."
+                self.assertRaisesRegex(TypeError, expected_err_msg, lambda: 
psser // other)
+                self.assertRaisesRegex(TypeError, expected_err_msg, lambda: 
other // psser)
+
+                expected_err_msg = "Modulo can not be applied to datetimes."
+                self.assertRaisesRegex(TypeError, expected_err_msg, lambda: 
psser % other)
+                self.assertRaisesRegex(TypeError, expected_err_msg, lambda: 
other % psser)
 
         expected_err_msg = "Datetime subtraction can only be applied to 
datetime series."
 
-        for other in [1, 0.1]:
-            self.assertRaisesRegex(TypeError, expected_err_msg, lambda: psser 
- other)
-            self.assertRaisesRegex(TypeError, expected_err_msg, lambda: other 
- psser)
+        for i, other in enumerate([1, 0.1], len(others)):
+            with self.subTest(i=i):
+                self.assertRaisesRegex(TypeError, expected_err_msg, lambda: 
psser - other)
+                self.assertRaisesRegex(TypeError, expected_err_msg, lambda: 
other - psser)
 
-        self.assertRaisesRegex(TypeError, expected_err_msg, lambda: psser - 
other)
-        self.assertRaises(NotImplementedError, lambda: py_datetime - psser)
+                self.assertRaisesRegex(TypeError, expected_err_msg, lambda: 
psser - other)
+                self.assertRaises(NotImplementedError, lambda: py_datetime - 
psser)
 
     def test_date_subtraction(self):
         pdf = self.pdf1


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to