This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a3e3da960cc2 [SPARK-55176][PYTHON] Extract `arrow_to_pandas` converter 
into ArrowArrayToPandasConversion
a3e3da960cc2 is described below

commit a3e3da960cc282ade7867a0c4b706886f3461a47
Author: Yicong-Huang <[email protected]>
AuthorDate: Mon Feb 2 06:49:45 2026 +0900

    [SPARK-55176][PYTHON] Extract `arrow_to_pandas` converter into 
ArrowArrayToPandasConversion
    
    ### What changes were proposed in this pull request?
    
    Moved Arrow-to-Pandas conversion logic from serializers to 
`ArrowArrayToPandasConversion` class:
    
    **Changes in `serializers.py`**:
    - Removed `ArrowStreamPandasSerializer.arrow_to_pandas()` method
    - Removed `ArrowStreamPandasUDFSerializer.arrow_to_pandas()` method
    - Serializers now call `ArrowArrayToPandasConversion.convert()` directly
    
    ### Why are the changes needed?
    
    Separation of concerns: conversion logic moved out of serializer classes.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests pass.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #53957 from 
Yicong-Huang/SPARK-55176/refactor/extract-arrow-to-pandas-converter.
    
    Authored-by: Yicong-Huang <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/sql/conversion.py         | 108 ++++++++++++++--
 python/pyspark/sql/pandas/serializers.py | 213 ++++++++++++++++++++++---------
 2 files changed, 252 insertions(+), 69 deletions(-)

diff --git a/python/pyspark/sql/conversion.py b/python/pyspark/sql/conversion.py
index fdcb29b54412..105c06c7a4a1 100644
--- a/python/pyspark/sql/conversion.py
+++ b/python/pyspark/sql/conversion.py
@@ -1002,6 +1002,16 @@ class ArrowTimestampConversion:
 
 
 class ArrowArrayToPandasConversion:
+    """
+    Conversion utilities for converting PyArrow Arrays and ChunkedArrays to 
pandas.
+
+    This class provides methods to convert PyArrow columnar data structures to 
pandas
+    Series or DataFrames, with support for Spark-specific type handling and 
conversions.
+
+    The class is primarily used by PySpark's Arrow-based serializers for UDF 
execution,
+    where Arrow data needs to be converted to pandas for Python UDF processing.
+    """
+
     @classmethod
     def convert_legacy(
         cls,
@@ -1014,14 +1024,39 @@ class ArrowArrayToPandasConversion:
         df_for_struct: bool = False,
     ) -> Union["pd.Series", "pd.DataFrame"]:
         """
+        Convert a PyArrow Array or ChunkedArray to a pandas Series or 
DataFrame.
+
+        This is the lower-level conversion method that requires explicit Spark 
type
+        specification. For a more convenient API, see :meth:`convert`.
+
         Parameters
         ----------
-        arr : :class:`pyarrow.Array`.
-        spark_type: target spark type, should always be specified.
-        timezone : The timezone to convert from. If there is a timestamp type, 
it's required.
-        struct_in_pandas : How to handle struct type. If there is a struct 
type, it's required.
-        ndarray_as_list : Whether `np.ndarray` is converted to a list or not.
-        df_for_struct: when true, and spark type is a StructType, return a 
DataFrame.
+        arr : pa.Array or pa.ChunkedArray
+            The arrow column to convert.
+        spark_type : DataType
+            Target Spark type. Must be specified and should match the Arrow 
array type.
+        timezone : str, optional
+            The timezone to use for timestamp conversion. Required if the data 
contains
+            timestamp types.
+        struct_in_pandas : str, optional
+            How to handle struct types in pandas. Valid values are "dict", 
"row", or "legacy".
+            Required if the data contains struct types.
+        ndarray_as_list : bool, optional
+            Whether to convert numpy ndarrays to Python lists. Default is 
False.
+        df_for_struct : bool, optional
+            If True and spark_type is a StructType, return a DataFrame with 
columns
+            corresponding to struct fields instead of a Series. Default is 
False.
+
+        Returns
+        -------
+        pd.Series or pd.DataFrame
+            Converted pandas Series. If df_for_struct is True and spark_type 
is StructType,
+            returns a DataFrame with columns corresponding to struct fields.
+
+        Notes
+        -----
+        This method handles date type columns specially to avoid overflow 
issues with
+        datetime64[ns] intermediate representations.
         """
         import pyarrow as pa
         import pandas as pd
@@ -1032,7 +1067,11 @@ class ArrowArrayToPandasConversion:
             import pyarrow.types as types
 
             assert types.is_struct(arr.type)
-            assert len(spark_type.names) == len(arr.type.names), 
f"{spark_type} {arr.type} "
+            assert len(spark_type.names) == len(arr.type.names), (
+                f"Schema mismatch: spark_type has {len(spark_type.names)} 
fields, "
+                f"but arrow type has {len(arr.type.names)} fields. "
+                f"spark_type={spark_type}, arrow_type={arr.type}"
+            )
 
             series = [
                 cls.convert_legacy(
@@ -1049,10 +1088,11 @@ class ArrowArrayToPandasConversion:
             pdf.columns = spark_type.names  # type: ignore[assignment]
             return pdf
 
-        # If the given column is a date type column, creates a series of 
datetime.date directly
-        # instead of creating datetime64[ns] as intermediate data to avoid 
overflow caused by
-        # datetime64[ns] type handling.
-        # Cast dates to objects instead of datetime64[ns] dtype to avoid 
overflow.
+        # Convert Arrow array to pandas Series with specific options:
+        # - date_as_object: Convert date types to Python datetime.date objects 
directly
+        #   instead of datetime64[ns] to avoid overflow issues
+        # - coerce_temporal_nanoseconds: Handle nanosecond precision 
timestamps correctly
+        # - integer_object_nulls: Use object dtype for integer arrays with 
nulls
         pandas_options = {
             "date_as_object": True,
             "coerce_temporal_nanoseconds": True,
@@ -1070,3 +1110,49 @@ class ArrowArrayToPandasConversion:
             integer_object_nulls=True,
         )
         return converter(ser)
+
+    @classmethod
+    def convert(
+        cls,
+        arrow_column: Union["pa.Array", "pa.ChunkedArray"],
+        target_type: DataType,
+        *,
+        timezone: Optional[str] = None,
+        struct_in_pandas: str = "dict",
+        ndarray_as_list: bool = False,
+        df_for_struct: bool = False,
+    ) -> Union["pd.Series", "pd.DataFrame"]:
+        """
+        Convert a PyArrow Array or ChunkedArray to a pandas Series or 
DataFrame.
+
+        Parameters
+        ----------
+        arrow_column : pa.Array or pa.ChunkedArray
+            The Arrow column to convert.
+        target_type : DataType
+            The target Spark type for the column to be converted to.
+        timezone : str, optional
+            Timezone for timestamp conversion. Required if the data contains 
timestamp types.
+        struct_in_pandas : str, optional
+            How to represent struct types in pandas. Valid values are "dict", 
"row", or "legacy".
+            Default is "dict".
+        ndarray_as_list : bool, optional
+            Whether to convert numpy ndarrays to Python lists. Default is 
False.
+        df_for_struct : bool, optional
+            If True, convert struct columns to a DataFrame with columns 
corresponding
+            to struct fields instead of a Series. Default is False.
+
+        Returns
+        -------
+        pd.Series or pd.DataFrame
+            Converted pandas Series. If df_for_struct is True and the type is 
StructType,
+            returns a DataFrame with columns corresponding to struct fields.
+        """
+        return cls.convert_legacy(
+            arrow_column,
+            target_type,
+            timezone=timezone,
+            struct_in_pandas=struct_in_pandas,
+            ndarray_as_list=ndarray_as_list,
+            df_for_struct=df_for_struct,
+        )
diff --git a/python/pyspark/sql/pandas/serializers.py 
b/python/pyspark/sql/pandas/serializers.py
index 74599869548d..6e5facc79aa6 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -404,30 +404,35 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
         A timezone to respect when handling timestamp values
     safecheck : bool
         If True, conversion from Arrow to Pandas checks for overflow/truncation
-    assign_cols_by_name : bool
-        If True, then Pandas DataFrames will get columns by name
     int_to_decimal_coercion_enabled : bool
         If True, applies additional coercions in Python before converting to 
Arrow
         This has performance penalties.
+    struct_in_pandas : str, optional
+        How to represent struct in pandas ("dict", "row", etc.). Default is 
"dict".
+    ndarray_as_list : bool, optional
+        Whether to convert ndarray as list. Default is False.
+    df_for_struct : bool, optional
+        If True, convert struct columns to DataFrame instead of Series. 
Default is False.
+    input_type : StructType, optional
+        Spark types for each column. Default is None.
     """
 
-    def __init__(self, timezone, safecheck, int_to_decimal_coercion_enabled):
+    def __init__(
+        self,
+        timezone,
+        safecheck,
+        int_to_decimal_coercion_enabled: bool = False,
+        struct_in_pandas: str = "dict",
+        ndarray_as_list: bool = False,
+        df_for_struct: bool = False,
+    ):
         super().__init__()
         self._timezone = timezone
         self._safecheck = safecheck
         self._int_to_decimal_coercion_enabled = int_to_decimal_coercion_enabled
-
-    def arrow_to_pandas(
-        self, arrow_column, idx, struct_in_pandas="dict", 
ndarray_as_list=False, spark_type=None
-    ):
-        return ArrowArrayToPandasConversion.convert_legacy(
-            arrow_column,
-            spark_type or from_arrow_type(arrow_column.type),
-            timezone=self._timezone,
-            struct_in_pandas=struct_in_pandas,
-            ndarray_as_list=ndarray_as_list,
-            df_for_struct=False,
-        )
+        self._struct_in_pandas = struct_in_pandas
+        self._ndarray_as_list = ndarray_as_list
+        self._df_for_struct = df_for_struct
 
     def _create_array(self, series, arrow_type, spark_type=None, 
arrow_cast=False):
         """
@@ -550,16 +555,27 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
         """
         Deserialize ArrowRecordBatches to an Arrow table and return as a list 
of pandas.Series.
         """
-        batches = super().load_stream(stream)
         import pandas as pd
+        import pyspark
 
+        batches = super().load_stream(stream)
         for batch in batches:
-            pandas_batches = [
-                self.arrow_to_pandas(batch.column(i), i) for i in 
range(batch.num_columns)
-            ]
-            if len(pandas_batches) == 0:
+            if batch.num_columns == 0:
                 yield [pd.Series([pyspark._NoValue] * batch.num_rows)]
             else:
+                pandas_batches = [
+                    ArrowArrayToPandasConversion.convert(
+                        batch.column(i),
+                        self._input_type[i].dataType
+                        if self._input_type is not None
+                        else from_arrow_type(batch.column(i).type),
+                        timezone=self._timezone,
+                        struct_in_pandas=self._struct_in_pandas,
+                        ndarray_as_list=self._ndarray_as_list,
+                        df_for_struct=self._df_for_struct,
+                    )
+                    for i in range(batch.num_columns)
+                ]
                 yield pandas_batches
 
     def __repr__(self):
@@ -583,31 +599,20 @@ class 
ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
         input_type: Optional[StructType] = None,
         int_to_decimal_coercion_enabled: bool = False,
     ):
-        super().__init__(timezone, safecheck, int_to_decimal_coercion_enabled)
+        super().__init__(
+            timezone,
+            safecheck,
+            int_to_decimal_coercion_enabled,
+            struct_in_pandas,
+            ndarray_as_list,
+            df_for_struct,
+        )
         self._assign_cols_by_name = assign_cols_by_name
-        self._df_for_struct = df_for_struct
-        self._struct_in_pandas = struct_in_pandas
-        self._ndarray_as_list = ndarray_as_list
         self._arrow_cast = arrow_cast
         if input_type is not None:
             assert isinstance(input_type, StructType)
         self._input_type = input_type
 
-    def arrow_to_pandas(self, arrow_column, idx):
-        if self._input_type is not None:
-            spark_type = self._input_type[idx].dataType
-        else:
-            spark_type = from_arrow_type(arrow_column.type)
-
-        return ArrowArrayToPandasConversion.convert_legacy(
-            arr=arrow_column,
-            spark_type=spark_type,
-            timezone=self._timezone,
-            struct_in_pandas=self._struct_in_pandas,
-            ndarray_as_list=self._ndarray_as_list,
-            df_for_struct=self._df_for_struct,
-        )
-
     def _create_struct_array(
         self,
         df: "pd.DataFrame",
@@ -1104,7 +1109,19 @@ class 
ArrowStreamAggPandasUDFSerializer(ArrowStreamPandasUDFSerializer):
             # Lazily read and convert Arrow batches to pandas Series one at a 
time
             # from the stream. This avoids loading all batches into memory for 
the group
             series_iter = (
-                tuple(self.arrow_to_pandas(c, i) for i, c in 
enumerate(batch.columns))
+                tuple(
+                    ArrowArrayToPandasConversion.convert(
+                        c,
+                        self._input_type[i].dataType
+                        if self._input_type is not None
+                        else from_arrow_type(c.type),
+                        timezone=self._timezone,
+                        struct_in_pandas=self._struct_in_pandas,
+                        ndarray_as_list=self._ndarray_as_list,
+                        df_for_struct=self._df_for_struct,
+                    )
+                    for i, c in enumerate(batch.columns)
+                )
                 for batch in batches
             )
             yield series_iter
@@ -1147,9 +1164,18 @@ class 
GroupPandasUDFSerializer(ArrowStreamPandasUDFSerializer):
         def process_group(batches: "Iterator[pa.RecordBatch]"):
             # Convert each Arrow batch to pandas Series list on-demand, 
yielding one list per batch
             for batch in batches:
-                # The batch from ArrowStreamSerializer is already flattened 
(no struct wrapper)
                 series = [
-                    self.arrow_to_pandas(batch.column(i), i) for i in 
range(batch.num_columns)
+                    ArrowArrayToPandasConversion.convert(
+                        batch.column(i),
+                        self._input_type[i].dataType
+                        if self._input_type is not None
+                        else from_arrow_type(batch.column(i).type),
+                        timezone=self._timezone,
+                        struct_in_pandas=self._struct_in_pandas,
+                        ndarray_as_list=self._ndarray_as_list,
+                        df_for_struct=self._df_for_struct,
+                    )
+                    for i in range(batch.num_columns)
                 ]
                 yield series
 
@@ -1208,11 +1234,29 @@ class 
CogroupPandasUDFSerializer(ArrowStreamPandasUDFSerializer):
         for left_batches, right_batches in self._load_group_dataframes(stream, 
num_dfs=2):
             yield (
                 [
-                    self.arrow_to_pandas(c, i)
+                    ArrowArrayToPandasConversion.convert(
+                        c,
+                        self._input_type[i].dataType
+                        if self._input_type is not None
+                        else from_arrow_type(c.type),
+                        timezone=self._timezone,
+                        struct_in_pandas=self._struct_in_pandas,
+                        ndarray_as_list=self._ndarray_as_list,
+                        df_for_struct=self._df_for_struct,
+                    )
                     for i, c in 
enumerate(pa.Table.from_batches(left_batches).itercolumns())
                 ],
                 [
-                    self.arrow_to_pandas(c, i)
+                    ArrowArrayToPandasConversion.convert(
+                        c,
+                        self._input_type[i].dataType
+                        if self._input_type is not None
+                        else from_arrow_type(c.type),
+                        timezone=self._timezone,
+                        struct_in_pandas=self._struct_in_pandas,
+                        ndarray_as_list=self._ndarray_as_list,
+                        df_for_struct=self._df_for_struct,
+                    )
                     for i, c in 
enumerate(pa.Table.from_batches(right_batches).itercolumns())
                 ],
             )
@@ -1247,11 +1291,15 @@ class 
ApplyInPandasWithStateSerializer(ArrowStreamPandasUDFSerializer):
         int_to_decimal_coercion_enabled,
     ):
         super().__init__(
-            timezone,
-            safecheck,
-            assign_cols_by_name,
-            int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
+            timezone=timezone,
+            safecheck=safecheck,
+            assign_cols_by_name=assign_cols_by_name,
+            df_for_struct=False,
+            struct_in_pandas="dict",
+            ndarray_as_list=False,
             arrow_cast=True,
+            input_type=None,
+            int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
         )
         self.pickleSer = CPickleSerializer()
         self.utf8_deserializer = UTF8Deserializer()
@@ -1375,7 +1423,17 @@ class 
ApplyInPandasWithStateSerializer(ArrowStreamPandasUDFSerializer):
                 )
 
                 state_arrow = 
pa.Table.from_batches([state_batch]).itercolumns()
-                state_pandas = [self.arrow_to_pandas(c, i) for i, c in 
enumerate(state_arrow)][0]
+                state_pandas = [
+                    ArrowArrayToPandasConversion.convert(
+                        c,
+                        from_arrow_type(c.type),
+                        timezone=self._timezone,
+                        struct_in_pandas=self._struct_in_pandas,
+                        ndarray_as_list=self._ndarray_as_list,
+                        df_for_struct=self._df_for_struct,
+                    )
+                    for c in state_arrow
+                ][0]
 
                 for state_idx in range(0, len(state_pandas)):
                     state_info_col = state_pandas.iloc[state_idx]
@@ -1407,7 +1465,17 @@ class 
ApplyInPandasWithStateSerializer(ArrowStreamPandasUDFSerializer):
                     data_batch_for_group = data_batch.slice(data_start_offset, 
num_data_rows)
                     data_arrow = 
pa.Table.from_batches([data_batch_for_group]).itercolumns()
 
-                    data_pandas = [self.arrow_to_pandas(c, i) for i, c in 
enumerate(data_arrow)]
+                    data_pandas = [
+                        ArrowArrayToPandasConversion.convert(
+                            c,
+                            from_arrow_type(c.type),
+                            timezone=self._timezone,
+                            struct_in_pandas=self._struct_in_pandas,
+                            ndarray_as_list=self._ndarray_as_list,
+                            df_for_struct=self._df_for_struct,
+                        )
+                        for c in data_arrow
+                    ]
 
                     # state info
                     yield (
@@ -1613,11 +1681,15 @@ class 
TransformWithStateInPandasSerializer(ArrowStreamPandasUDFSerializer):
         int_to_decimal_coercion_enabled,
     ):
         super().__init__(
-            timezone,
-            safecheck,
-            assign_cols_by_name,
-            int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
+            timezone=timezone,
+            safecheck=safecheck,
+            assign_cols_by_name=assign_cols_by_name,
+            df_for_struct=False,
+            struct_in_pandas="dict",
+            ndarray_as_list=False,
             arrow_cast=True,
+            input_type=None,
+            int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
         )
         self.arrow_max_records_per_batch = (
             arrow_max_records_per_batch if arrow_max_records_per_batch > 0 
else 2**31 - 1
@@ -1671,8 +1743,15 @@ class 
TransformWithStateInPandasSerializer(ArrowStreamPandasUDFSerializer):
                 for batch in batches:
                     self._update_batch_size_stats(batch)
                     data_pandas = [
-                        self.arrow_to_pandas(c, i)
-                        for i, c in 
enumerate(pa.Table.from_batches([batch]).itercolumns())
+                        ArrowArrayToPandasConversion.convert(
+                            c,
+                            from_arrow_type(c.type),
+                            timezone=self._timezone,
+                            struct_in_pandas=self._struct_in_pandas,
+                            ndarray_as_list=self._ndarray_as_list,
+                            df_for_struct=self._df_for_struct,
+                        )
+                        for c in pa.Table.from_batches([batch]).itercolumns()
                     ]
                     for row in pd.concat(data_pandas, 
axis=1).itertuples(index=False):
                         batch_key = tuple(row[s] for s in self.key_offsets)
@@ -1801,13 +1880,31 @@ class 
TransformWithStateInPandasInitStateSerializer(TransformWithStateInPandasSe
 
                     flatten_state_table = flatten_columns(batch, "inputData")
                     data_pandas = [
-                        self.arrow_to_pandas(c, i)
+                        ArrowArrayToPandasConversion.convert(
+                            c,
+                            self._input_type[i].dataType
+                            if self._input_type is not None
+                            else from_arrow_type(c.type),
+                            timezone=self._timezone,
+                            struct_in_pandas=self._struct_in_pandas,
+                            ndarray_as_list=self._ndarray_as_list,
+                            df_for_struct=self._df_for_struct,
+                        )
                         for i, c in 
enumerate(flatten_state_table.itercolumns())
                     ]
 
                     flatten_init_table = flatten_columns(batch, "initState")
                     init_data_pandas = [
-                        self.arrow_to_pandas(c, i)
+                        ArrowArrayToPandasConversion.convert(
+                            c,
+                            self._input_type[i].dataType
+                            if self._input_type is not None
+                            else from_arrow_type(c.type),
+                            timezone=self._timezone,
+                            struct_in_pandas=self._struct_in_pandas,
+                            ndarray_as_list=self._ndarray_as_list,
+                            df_for_struct=self._df_for_struct,
+                        )
                         for i, c in enumerate(flatten_init_table.itercolumns())
                     ]
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to