This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 050c3a7b229f [SPARK-54068][PYTHON] Fix `to_feather` to support PyArrow 
22.0.0
050c3a7b229f is described below

commit 050c3a7b229f4e88ae099b116e041f9c659d6581
Author: ashrithb <[email protected]>
AuthorDate: Sun Dec 7 21:19:00 2025 -0800

    [SPARK-54068][PYTHON] Fix `to_feather` to support PyArrow 22.0.0
    
    ### What changes were proposed in this pull request?
    
    This PR fixes the `test_to_feather` test failure with PyArrow 22.0.0 by 
filtering
    non-serializable attrs (`metrics`, `observed_metrics`) before writing to 
feather format.
    
    **Changes:**
    1. Modified `to_feather()` in `pyspark/pandas/frame.py` to filter out 
non-serializable
       attrs before passing to PyArrow
    2. Removed the `unittest.skipIf` workaround from `test_to_feather`
    3. Added `to_dict()` methods to `MetricValue`, `PlanMetrics`, and 
`PlanObservedMetrics`
       for future utility (not used in the fix, but useful additions)
    
    ### Why are the changes needed?
    
    PyArrow 22.0.0 changed its behavior to serialize pandas `DataFrame.attrs` 
to JSON
    metadata when writing Feather files. PySpark Spark Connect stores 
`PlanMetrics` and
    `PlanObservedMetrics` objects in `pdf.attrs`, which are not JSON 
serializable, causing: TypeError: Object of type PlanMetrics is not JSON 
serializable
    
    ### Does this PR introduce any user-facing change?
    
    No. The fix filters internal Spark metadata (`metrics`, `observed_metrics`) 
from attrs
    only when writing to feather format. Code that directly accesses 
`pdf.attrs["metrics"]`
    (like `test_observe`) continues to work with the original objects.
    
    ### How was this patch tested?
    
    - Verified that `pdf.attrs["metrics"][0].name` still works (backward 
compatibility)
    - Verified that feather write succeeds with PyArrow 22.0.0 when attrs are 
filtered
    - Removed the `unittest.skipIf` workaround so `test_to_feather` now runs on 
all versions
    - All existing tests pass including `test_observe` which accesses attrs 
directly
    - Removed the `unittest.skipIf(not has_arrow_21_or_below, "SPARK-54068")` 
workaround so the test now runs on all PyArrow versions
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #53377 from ashrithb/SPARK-54068-pyarrow-feather-planmetrics-fix.
    
    Authored-by: ashrithb <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 4e1e99590a403442ccb88214a971c86be0ca28e1)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 python/pyspark/pandas/frame.py                 | 14 ++++++++++++-
 python/pyspark/pandas/tests/io/test_feather.py | 12 -----------
 python/pyspark/sql/connect/client/core.py      | 14 +++++++++++++
 python/pyspark/sql/metrics.py                  | 29 ++++++++++++++++++++++++++
 4 files changed, 56 insertions(+), 13 deletions(-)

diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index af89d18a0ede..05e6de554f7a 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -2699,8 +2699,20 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
         # Make sure locals() call is at the top of the function so we don't 
capture local variables.
         args = locals()
 
+        pdf = self._to_internal_pandas()
+        # SPARK-54068: PyArrow >= 22.0.0 serializes DataFrame.attrs to JSON 
metadata,
+        # but PlanMetrics/PlanObservedMetrics objects from Spark Connect are 
not
+        # JSON serializable. We filter these internal attrs only for affected 
versions.
+        import pyarrow as pa
+        from pyspark.loose_version import LooseVersion
+
+        if LooseVersion(pa.__version__) >= LooseVersion("22.0.0"):
+            pdf.attrs = {
+                k: v for k, v in pdf.attrs.items() if k not in ("metrics", 
"observed_metrics")
+            }
+
         return validate_arguments_and_invoke_function(
-            self._to_internal_pandas(), self.to_feather, 
pd.DataFrame.to_feather, args
+            pdf, self.to_feather, pd.DataFrame.to_feather, args
         )
 
     def to_stata(
diff --git a/python/pyspark/pandas/tests/io/test_feather.py 
b/python/pyspark/pandas/tests/io/test_feather.py
index 10638d915c0e..74fa6bc7d7b6 100644
--- a/python/pyspark/pandas/tests/io/test_feather.py
+++ b/python/pyspark/pandas/tests/io/test_feather.py
@@ -17,10 +17,8 @@
 import unittest
 
 import pandas as pd
-import sys
 
 from pyspark import pandas as ps
-from pyspark.loose_version import LooseVersion
 from pyspark.testing.pandasutils import PandasOnSparkTestCase, TestUtils
 
 
@@ -36,16 +34,6 @@ class FeatherMixin:
     def psdf(self):
         return ps.from_pandas(self.pdf)
 
-    has_arrow_21_or_below = False
-    try:
-        import pyarrow as pa
-
-        if LooseVersion(pa.__version__) < LooseVersion("22.0.0"):
-            has_arrow_21_or_below = True
-    except ImportError:
-        pass
-
-    @unittest.skipIf(not has_arrow_21_or_below, "SPARK-54068")
     def test_to_feather(self):
         with self.temp_dir() as dirpath:
             path1 = f"{dirpath}/file1.feather"
diff --git a/python/pyspark/sql/connect/client/core.py 
b/python/pyspark/sql/connect/client/core.py
index b4da47191d83..80d83c69c45c 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -501,6 +501,20 @@ class PlanObservedMetrics(ObservedMetrics):
     def keys(self) -> List[str]:
         return self._keys
 
+    def to_dict(self) -> dict[str, Any]:
+        """Return a JSON-serializable dictionary representation of this 
observed metrics.
+
+        Returns
+        -------
+        dict
+            A dictionary with keys 'name', 'keys', and 'pairs'.
+        """
+        return {
+            "name": self._name,
+            "keys": self._keys,
+            "pairs": self.pairs,
+        }
+
 
 class AnalyzeResult:
     def __init__(
diff --git a/python/pyspark/sql/metrics.py b/python/pyspark/sql/metrics.py
index 4ab9b041e313..50d8ae444c47 100644
--- a/python/pyspark/sql/metrics.py
+++ b/python/pyspark/sql/metrics.py
@@ -68,6 +68,20 @@ class MetricValue:
     def metric_type(self) -> str:
         return self._type
 
+    def to_dict(self) -> Dict[str, Any]:
+        """Return a JSON-serializable dictionary representation of this metric 
value.
+
+        Returns
+        -------
+        dict
+            A dictionary with keys 'name', 'value', and 'type'.
+        """
+        return {
+            "name": self._name,
+            "value": self._value,
+            "type": self._type,
+        }
+
 
 class PlanMetrics:
     """Represents a particular plan node and the associated metrics of this 
node."""
@@ -97,6 +111,21 @@ class PlanMetrics:
     def metrics(self) -> List[MetricValue]:
         return self._metrics
 
+    def to_dict(self) -> Dict[str, Any]:
+        """Return a JSON-serializable dictionary representation of this plan 
metrics.
+
+        Returns
+        -------
+        dict
+            A dictionary with keys 'name', 'plan_id', 'parent_plan_id', and 
'metrics'.
+        """
+        return {
+            "name": self._name,
+            "plan_id": self._id,
+            "parent_plan_id": self._parent_id,
+            "metrics": [m.to_dict() for m in self._metrics],
+        }
+
 
 class CollectedMetrics:
     @dataclasses.dataclass


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to