This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 050c3a7b229f [SPARK-54068][PYTHON] Fix `to_feather` to support PyArrow
22.0.0
050c3a7b229f is described below
commit 050c3a7b229f4e88ae099b116e041f9c659d6581
Author: ashrithb <[email protected]>
AuthorDate: Sun Dec 7 21:19:00 2025 -0800
[SPARK-54068][PYTHON] Fix `to_feather` to support PyArrow 22.0.0
### What changes were proposed in this pull request?
This PR fixes the `test_to_feather` test failure with PyArrow 22.0.0 by
filtering
non-serializable attrs (`metrics`, `observed_metrics`) before writing to
feather format.
**Changes:**
1. Modified `to_feather()` in `pyspark/pandas/frame.py` to filter out
non-serializable
attrs before passing to PyArrow
2. Removed the `unittest.skipIf` workaround from `test_to_feather`
3. Added `to_dict()` methods to `MetricValue`, `PlanMetrics`, and
`PlanObservedMetrics`
for future utility (not used in the fix, but useful additions)
### Why are the changes needed?
PyArrow 22.0.0 changed its behavior to serialize pandas `DataFrame.attrs`
to JSON
metadata when writing Feather files. PySpark Spark Connect stores
`PlanMetrics` and
`PlanObservedMetrics` objects in `pdf.attrs`, which are not JSON
serializable, causing: TypeError: Object of type PlanMetrics is not JSON
serializable
### Does this PR introduce any user-facing change?
No. The fix filters internal Spark metadata (`metrics`, `observed_metrics`)
from attrs
only when writing to feather format. Code that directly accesses
`pdf.attrs["metrics"]`
(like `test_observe`) continues to work with the original objects.
### How was this patch tested?
- Verified that `pdf.attrs["metrics"][0].name` still works (backward
compatibility)
- Verified that feather write succeeds with PyArrow 22.0.0 when attrs are
filtered
- Removed the `unittest.skipIf` workaround so `test_to_feather` now runs on
all versions
- All existing tests pass including `test_observe` which accesses attrs
directly
- Removed the `unittest.skipIf(not has_arrow_21_or_below, "SPARK-54068")`
workaround so the test now runs on all PyArrow versions
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53377 from ashrithb/SPARK-54068-pyarrow-feather-planmetrics-fix.
Authored-by: ashrithb <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 4e1e99590a403442ccb88214a971c86be0ca28e1)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
python/pyspark/pandas/frame.py | 14 ++++++++++++-
python/pyspark/pandas/tests/io/test_feather.py | 12 -----------
python/pyspark/sql/connect/client/core.py | 14 +++++++++++++
python/pyspark/sql/metrics.py | 29 ++++++++++++++++++++++++++
4 files changed, 56 insertions(+), 13 deletions(-)
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index af89d18a0ede..05e6de554f7a 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -2699,8 +2699,20 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
# Make sure locals() call is at the top of the function so we don't
capture local variables.
args = locals()
+ pdf = self._to_internal_pandas()
+ # SPARK-54068: PyArrow >= 22.0.0 serializes DataFrame.attrs to JSON
metadata,
+ # but PlanMetrics/PlanObservedMetrics objects from Spark Connect are
not
+ # JSON serializable. We filter these internal attrs only for affected
versions.
+ import pyarrow as pa
+ from pyspark.loose_version import LooseVersion
+
+ if LooseVersion(pa.__version__) >= LooseVersion("22.0.0"):
+ pdf.attrs = {
+ k: v for k, v in pdf.attrs.items() if k not in ("metrics",
"observed_metrics")
+ }
+
return validate_arguments_and_invoke_function(
- self._to_internal_pandas(), self.to_feather,
pd.DataFrame.to_feather, args
+ pdf, self.to_feather, pd.DataFrame.to_feather, args
)
def to_stata(
diff --git a/python/pyspark/pandas/tests/io/test_feather.py
b/python/pyspark/pandas/tests/io/test_feather.py
index 10638d915c0e..74fa6bc7d7b6 100644
--- a/python/pyspark/pandas/tests/io/test_feather.py
+++ b/python/pyspark/pandas/tests/io/test_feather.py
@@ -17,10 +17,8 @@
import unittest
import pandas as pd
-import sys
from pyspark import pandas as ps
-from pyspark.loose_version import LooseVersion
from pyspark.testing.pandasutils import PandasOnSparkTestCase, TestUtils
@@ -36,16 +34,6 @@ class FeatherMixin:
def psdf(self):
return ps.from_pandas(self.pdf)
- has_arrow_21_or_below = False
- try:
- import pyarrow as pa
-
- if LooseVersion(pa.__version__) < LooseVersion("22.0.0"):
- has_arrow_21_or_below = True
- except ImportError:
- pass
-
- @unittest.skipIf(not has_arrow_21_or_below, "SPARK-54068")
def test_to_feather(self):
with self.temp_dir() as dirpath:
path1 = f"{dirpath}/file1.feather"
diff --git a/python/pyspark/sql/connect/client/core.py
b/python/pyspark/sql/connect/client/core.py
index b4da47191d83..80d83c69c45c 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -501,6 +501,20 @@ class PlanObservedMetrics(ObservedMetrics):
def keys(self) -> List[str]:
return self._keys
+ def to_dict(self) -> dict[str, Any]:
+ """Return a JSON-serializable dictionary representation of this
observed metrics.
+
+ Returns
+ -------
+ dict
+ A dictionary with keys 'name', 'keys', and 'pairs'.
+ """
+ return {
+ "name": self._name,
+ "keys": self._keys,
+ "pairs": self.pairs,
+ }
+
class AnalyzeResult:
def __init__(
diff --git a/python/pyspark/sql/metrics.py b/python/pyspark/sql/metrics.py
index 4ab9b041e313..50d8ae444c47 100644
--- a/python/pyspark/sql/metrics.py
+++ b/python/pyspark/sql/metrics.py
@@ -68,6 +68,20 @@ class MetricValue:
def metric_type(self) -> str:
return self._type
+ def to_dict(self) -> Dict[str, Any]:
+ """Return a JSON-serializable dictionary representation of this metric
value.
+
+ Returns
+ -------
+ dict
+ A dictionary with keys 'name', 'value', and 'type'.
+ """
+ return {
+ "name": self._name,
+ "value": self._value,
+ "type": self._type,
+ }
+
class PlanMetrics:
"""Represents a particular plan node and the associated metrics of this
node."""
@@ -97,6 +111,21 @@ class PlanMetrics:
def metrics(self) -> List[MetricValue]:
return self._metrics
+ def to_dict(self) -> Dict[str, Any]:
+ """Return a JSON-serializable dictionary representation of this plan
metrics.
+
+ Returns
+ -------
+ dict
+ A dictionary with keys 'name', 'plan_id', 'parent_plan_id', and
'metrics'.
+ """
+ return {
+ "name": self._name,
+ "plan_id": self._id,
+ "parent_plan_id": self._parent_id,
+ "metrics": [m.to_dict() for m in self._metrics],
+ }
+
class CollectedMetrics:
@dataclasses.dataclass
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]