This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ea7948f35378 [SPARK-55946][PS] Set up __pandas_priority__ so mixed
binary ops dispatch correctly to pandas-on-Spark
ea7948f35378 is described below
commit ea7948f35378c663649690d37ac1dbadb1385085
Author: Takuya Ueshin <[email protected]>
AuthorDate: Wed Mar 11 13:39:43 2026 +0900
[SPARK-55946][PS] Set up __pandas_priority__ so mixed binary ops dispatch
correctly to pandas-on-Spark
### What changes were proposed in this pull request?
This PR sets `__pandas_priority__` on pandas-on-Spark objects so mixed
reverse arithmetic with pandas objects is dispatched to pandas-on-Spark
implementations.
The changes are:
- add `__pandas_priority__ = 3500` to pandas-on-Spark `Series`/`Index` via
`IndexOpsMixin`
- add `__pandas_priority__ = 4500` to pandas-on-Spark `DataFrame`
- preserve the existing unsupported behavior for datetime reverse
subtraction from a pandas `Series` by raising `NotImplementedError` in
`DatetimeOps.rsub`
- add test coverage for mixed pandas `DataFrame` and pandas-on-Spark
`DataFrame` reverse ops
- extend datetime arithmetic exception tests to cover the pandas object
variants involved in reverse-op dispatch
### Why are the changes needed?
pandas 2.1+ uses `__pandas_priority__` to decide whether mixed arithmetic
should defer to the right-hand operand's reflected implementation.
Without this, mixed reverse ops between pandas objects and pandas-on-Spark
objects may be handled by pandas first, which can surface pandas-side errors
instead of the existing pandas-on-Spark errors.
For example, datetime reverse ops such as `other + psser` should continue
to raise pandas-on-Spark errors like:
- `Addition can not be applied to datetimes.`
instead of pandas-side wrapping or broadcasting errors.
The same dispatch issue also applies to mixed `pandas.DataFrame` and
pandas-on-Spark `DataFrame` operations.
### Does this PR introduce _any_ user-facing change?
Yes.
For mixed arithmetic between pandas objects and pandas-on-Spark objects,
pandas-on-Spark now takes precedence more consistently, so users will see
pandas-on-Spark errors and behavior from reflected operations instead of
pandas-side fallback errors in those cases.
For example, mixed DataFrame reverse ops now dispatch to pandas-on-Spark
code paths:
```python
import pandas as pd
from pyspark import pandas as ps
pdf = pd.DataFrame({"a": [1, 2], "b": [3, 4]})
psdf = ps.from_pandas(pdf)
pdf + psdf
```
This now raises a pandas-on-Spark error:
```text
TypeError: radd with a sequence is currently not supported; however, got
DataFrame.
```
### How was this patch tested?
Added and ran targeted unittests in both pandas 2 and pandas 3 environments:
-
`pyspark.pandas.tests.series.test_datetime.SeriesDateTimeTests.test_arithmetic_op_exceptions`
-
`pyspark.pandas.tests.computation.test_binary_ops.FrameBinaryOpsTests.test_mixed_dataframe_ops_dispatch_to_pandas_on_spark`
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: OpenAI Codex (GPT-5)
Closes #54741 from ueshin/issues/SPARK-55946/pandas_priority.
Authored-by: Takuya Ueshin <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/pandas/base.py | 5 +-
.../pyspark/pandas/data_type_ops/datetime_ops.py | 2 +
python/pyspark/pandas/frame.py | 4 ++
.../pandas/tests/computation/test_binary_ops.py | 14 +++++
.../pyspark/pandas/tests/series/test_datetime.py | 60 +++++++++++++---------
5 files changed, 59 insertions(+), 26 deletions(-)
diff --git a/python/pyspark/pandas/base.py b/python/pyspark/pandas/base.py
index 5a39b581e986..ef2d91a0e237 100644
--- a/python/pyspark/pandas/base.py
+++ b/python/pyspark/pandas/base.py
@@ -22,7 +22,7 @@ import warnings
from abc import ABCMeta, abstractmethod
from functools import wraps, partial
from itertools import chain
-from typing import Any, Callable, Optional, Sequence, Tuple, Union, cast,
TYPE_CHECKING
+from typing import Any, Callable, ClassVar, Optional, Sequence, Tuple, Union,
cast, TYPE_CHECKING
import numpy as np
import pandas as pd
@@ -284,6 +284,9 @@ class IndexOpsMixin(object, metaclass=ABCMeta):
Assuming there are following attributes or properties and functions.
"""
+ # Keep pandas-on-Spark above pandas Series and Index for reflected ops.
+ __pandas_priority__: ClassVar[int] = pd.Series.__pandas_priority__ + 500
# type: ignore[attr-defined]
+
@property
@abstractmethod
def _internal(self) -> InternalFrame:
diff --git a/python/pyspark/pandas/data_type_ops/datetime_ops.py
b/python/pyspark/pandas/data_type_ops/datetime_ops.py
index 07399ecd873e..d407b8512b4e 100644
--- a/python/pyspark/pandas/data_type_ops/datetime_ops.py
+++ b/python/pyspark/pandas/data_type_ops/datetime_ops.py
@@ -94,6 +94,8 @@ class DatetimeOps(DataTypeOps):
"The timestamp subtraction returns an integer in seconds, "
"whereas pandas returns 'timedelta64[ns]'."
)
+ if isinstance(right, pd.Series):
+ raise NotImplementedError()
if isinstance(right, datetime.datetime):
warnings.warn(msg, UserWarning)
return cast(
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 8377f72cd376..e775efdfaf71 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -32,6 +32,7 @@ from types import TracebackType
from typing import (
Any,
Callable,
+ ClassVar,
Dict,
Generic,
IO,
@@ -543,6 +544,9 @@ class DataFrame(Frame, Generic[T]):
2 None NaN
"""
+ # Keep pandas-on-Spark above pandas DataFrame for reflected ops.
+ __pandas_priority__: ClassVar[int] = pd.DataFrame.__pandas_priority__ +
500 # type: ignore[attr-defined]
+
def __init__( # type: ignore[no-untyped-def]
self, data=None, index=None, columns=None, dtype=None, copy=False
):
diff --git a/python/pyspark/pandas/tests/computation/test_binary_ops.py
b/python/pyspark/pandas/tests/computation/test_binary_ops.py
index 919e9b77973a..b0e83c02305c 100644
--- a/python/pyspark/pandas/tests/computation/test_binary_ops.py
+++ b/python/pyspark/pandas/tests/computation/test_binary_ops.py
@@ -69,6 +69,20 @@ class FrameBinaryOpsMixin:
lambda: psdf.add(psdf_other),
)
+ def test_mixed_dataframe_ops_dispatch_to_pandas_on_spark(self):
+ pdf = pd.DataFrame({"a": [1, 2], "b": [3, 4]})
+ psdf = ps.from_pandas(pdf)
+
+ with self.assertRaisesRegex(
+ TypeError, "add with a sequence is currently not supported;
however, got DataFrame."
+ ):
+ psdf + pdf
+
+ with self.assertRaisesRegex(
+ TypeError, "radd with a sequence is currently not supported;
however, got DataFrame."
+ ):
+ pdf + psdf
+
def test_binary_operator_add(self):
# Positive
pdf = pd.DataFrame({"a": ["x"], "b": ["y"], "c": [1], "d": [2]})
diff --git a/python/pyspark/pandas/tests/series/test_datetime.py
b/python/pyspark/pandas/tests/series/test_datetime.py
index a8374b38e66f..5397f8442cda 100644
--- a/python/pyspark/pandas/tests/series/test_datetime.py
+++ b/python/pyspark/pandas/tests/series/test_datetime.py
@@ -22,6 +22,7 @@ import numpy as np
import pandas as pd
from pyspark import pandas as ps
+from pyspark.loose_version import LooseVersion
from pyspark.testing.pandasutils import PandasOnSparkTestCase
from pyspark.testing.sqlutils import SQLTestUtils
@@ -85,35 +86,44 @@ class SeriesDateTimeTestsMixin:
py_datetime = self.pd_start_date.dt.to_pydatetime()
datetime_index = ps.Index(self.pd_start_date)
- for other in [1, 0.1, psser, datetime_index, py_datetime]:
- expected_err_msg = "Addition can not be applied to datetimes."
- self.assertRaisesRegex(TypeError, expected_err_msg, lambda: psser
+ other)
- self.assertRaisesRegex(TypeError, expected_err_msg, lambda: other
+ psser)
-
- expected_err_msg = "Multiplication can not be applied to
datetimes."
- self.assertRaisesRegex(TypeError, expected_err_msg, lambda: psser
* other)
- self.assertRaisesRegex(TypeError, expected_err_msg, lambda: other
* psser)
-
- expected_err_msg = "True division can not be applied to datetimes."
- self.assertRaisesRegex(TypeError, expected_err_msg, lambda: psser
/ other)
- self.assertRaisesRegex(TypeError, expected_err_msg, lambda: other
/ psser)
-
- expected_err_msg = "Floor division can not be applied to
datetimes."
- self.assertRaisesRegex(TypeError, expected_err_msg, lambda: psser
// other)
- self.assertRaisesRegex(TypeError, expected_err_msg, lambda: other
// psser)
-
- expected_err_msg = "Modulo can not be applied to datetimes."
- self.assertRaisesRegex(TypeError, expected_err_msg, lambda: psser
% other)
- self.assertRaisesRegex(TypeError, expected_err_msg, lambda: other
% psser)
+ others = [1, 0.1, psser, datetime_index, py_datetime]
+ if LooseVersion(pd.__version__) < "3.0.0":
+ # py_datetime is np.ndarray, adding pd.Series.
+ others.append(pd.Series(py_datetime, dtype="object"))
+ else:
+ # py_datetime is pd.Series, adding np.ndarray.
+ others.append(py_datetime.to_numpy())
+ for i, other in enumerate(others):
+ with self.subTest(i=i):
+ expected_err_msg = "Addition can not be applied to datetimes."
+ self.assertRaisesRegex(TypeError, expected_err_msg, lambda:
psser + other)
+ self.assertRaisesRegex(TypeError, expected_err_msg, lambda:
other + psser)
+
+ expected_err_msg = "Multiplication can not be applied to
datetimes."
+ self.assertRaisesRegex(TypeError, expected_err_msg, lambda:
psser * other)
+ self.assertRaisesRegex(TypeError, expected_err_msg, lambda:
other * psser)
+
+ expected_err_msg = "True division can not be applied to
datetimes."
+ self.assertRaisesRegex(TypeError, expected_err_msg, lambda:
psser / other)
+ self.assertRaisesRegex(TypeError, expected_err_msg, lambda:
other / psser)
+
+ expected_err_msg = "Floor division can not be applied to
datetimes."
+ self.assertRaisesRegex(TypeError, expected_err_msg, lambda:
psser // other)
+ self.assertRaisesRegex(TypeError, expected_err_msg, lambda:
other // psser)
+
+ expected_err_msg = "Modulo can not be applied to datetimes."
+ self.assertRaisesRegex(TypeError, expected_err_msg, lambda:
psser % other)
+ self.assertRaisesRegex(TypeError, expected_err_msg, lambda:
other % psser)
expected_err_msg = "Datetime subtraction can only be applied to
datetime series."
- for other in [1, 0.1]:
- self.assertRaisesRegex(TypeError, expected_err_msg, lambda: psser
- other)
- self.assertRaisesRegex(TypeError, expected_err_msg, lambda: other
- psser)
+ for i, other in enumerate([1, 0.1], len(others)):
+ with self.subTest(i=i):
+ self.assertRaisesRegex(TypeError, expected_err_msg, lambda:
psser - other)
+ self.assertRaisesRegex(TypeError, expected_err_msg, lambda:
other - psser)
- self.assertRaisesRegex(TypeError, expected_err_msg, lambda: psser -
other)
- self.assertRaises(NotImplementedError, lambda: py_datetime - psser)
+ self.assertRaisesRegex(TypeError, expected_err_msg, lambda:
psser - other)
+ self.assertRaises(NotImplementedError, lambda: py_datetime -
psser)
def test_date_subtraction(self):
pdf = self.pdf1
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]