This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a3e3da960cc2 [SPARK-55176][PYTHON] Extract `arrow_to_pandas` converter
into ArrowArrayToPandasConversion
a3e3da960cc2 is described below
commit a3e3da960cc282ade7867a0c4b706886f3461a47
Author: Yicong-Huang <[email protected]>
AuthorDate: Mon Feb 2 06:49:45 2026 +0900
[SPARK-55176][PYTHON] Extract `arrow_to_pandas` converter into
ArrowArrayToPandasConversion
### What changes were proposed in this pull request?
Moved Arrow-to-Pandas conversion logic from serializers to
`ArrowArrayToPandasConversion` class:
**Changes in `serializers.py`**:
- Removed `ArrowStreamPandasSerializer.arrow_to_pandas()` method
- Removed `ArrowStreamPandasUDFSerializer.arrow_to_pandas()` method
- Serializers now call `ArrowArrayToPandasConversion.convert()` directly
### Why are the changes needed?
Separation of concerns: conversion logic moved out of serializer classes.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests pass.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53957 from
Yicong-Huang/SPARK-55176/refactor/extract-arrow-to-pandas-converter.
Authored-by: Yicong-Huang <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/sql/conversion.py | 108 ++++++++++++++--
python/pyspark/sql/pandas/serializers.py | 213 ++++++++++++++++++++++---------
2 files changed, 252 insertions(+), 69 deletions(-)
diff --git a/python/pyspark/sql/conversion.py b/python/pyspark/sql/conversion.py
index fdcb29b54412..105c06c7a4a1 100644
--- a/python/pyspark/sql/conversion.py
+++ b/python/pyspark/sql/conversion.py
@@ -1002,6 +1002,16 @@ class ArrowTimestampConversion:
class ArrowArrayToPandasConversion:
+ """
+ Conversion utilities for converting PyArrow Arrays and ChunkedArrays to
pandas.
+
+ This class provides methods to convert PyArrow columnar data structures to
pandas
+ Series or DataFrames, with support for Spark-specific type handling and
conversions.
+
+ The class is primarily used by PySpark's Arrow-based serializers for UDF
execution,
+ where Arrow data needs to be converted to pandas for Python UDF processing.
+ """
+
@classmethod
def convert_legacy(
cls,
@@ -1014,14 +1024,39 @@ class ArrowArrayToPandasConversion:
df_for_struct: bool = False,
) -> Union["pd.Series", "pd.DataFrame"]:
"""
+ Convert a PyArrow Array or ChunkedArray to a pandas Series or
DataFrame.
+
+ This is the lower-level conversion method that requires explicit Spark
type
+ specification. For a more convenient API, see :meth:`convert`.
+
Parameters
----------
- arr : :class:`pyarrow.Array`.
- spark_type: target spark type, should always be specified.
- timezone : The timezone to convert from. If there is a timestamp type,
it's required.
- struct_in_pandas : How to handle struct type. If there is a struct
type, it's required.
- ndarray_as_list : Whether `np.ndarray` is converted to a list or not.
- df_for_struct: when true, and spark type is a StructType, return a
DataFrame.
+ arr : pa.Array or pa.ChunkedArray
+ The arrow column to convert.
+ spark_type : DataType
+ Target Spark type. Must be specified and should match the Arrow
array type.
+ timezone : str, optional
+ The timezone to use for timestamp conversion. Required if the data
contains
+ timestamp types.
+ struct_in_pandas : str, optional
+ How to handle struct types in pandas. Valid values are "dict",
"row", or "legacy".
+ Required if the data contains struct types.
+ ndarray_as_list : bool, optional
+ Whether to convert numpy ndarrays to Python lists. Default is
False.
+ df_for_struct : bool, optional
+ If True and spark_type is a StructType, return a DataFrame with
columns
+ corresponding to struct fields instead of a Series. Default is
False.
+
+ Returns
+ -------
+ pd.Series or pd.DataFrame
+ Converted pandas Series. If df_for_struct is True and spark_type
is StructType,
+ returns a DataFrame with columns corresponding to struct fields.
+
+ Notes
+ -----
+ This method handles date type columns specially to avoid overflow
issues with
+ datetime64[ns] intermediate representations.
"""
import pyarrow as pa
import pandas as pd
@@ -1032,7 +1067,11 @@ class ArrowArrayToPandasConversion:
import pyarrow.types as types
assert types.is_struct(arr.type)
- assert len(spark_type.names) == len(arr.type.names),
f"{spark_type} {arr.type} "
+ assert len(spark_type.names) == len(arr.type.names), (
+ f"Schema mismatch: spark_type has {len(spark_type.names)}
fields, "
+ f"but arrow type has {len(arr.type.names)} fields. "
+ f"spark_type={spark_type}, arrow_type={arr.type}"
+ )
series = [
cls.convert_legacy(
@@ -1049,10 +1088,11 @@ class ArrowArrayToPandasConversion:
pdf.columns = spark_type.names # type: ignore[assignment]
return pdf
- # If the given column is a date type column, creates a series of
datetime.date directly
- # instead of creating datetime64[ns] as intermediate data to avoid
overflow caused by
- # datetime64[ns] type handling.
- # Cast dates to objects instead of datetime64[ns] dtype to avoid
overflow.
+ # Convert Arrow array to pandas Series with specific options:
+ # - date_as_object: Convert date types to Python datetime.date objects
directly
+ # instead of datetime64[ns] to avoid overflow issues
+ # - coerce_temporal_nanoseconds: Handle nanosecond precision
timestamps correctly
+ # - integer_object_nulls: Use object dtype for integer arrays with
nulls
pandas_options = {
"date_as_object": True,
"coerce_temporal_nanoseconds": True,
@@ -1070,3 +1110,49 @@ class ArrowArrayToPandasConversion:
integer_object_nulls=True,
)
return converter(ser)
+
+ @classmethod
+ def convert(
+ cls,
+ arrow_column: Union["pa.Array", "pa.ChunkedArray"],
+ target_type: DataType,
+ *,
+ timezone: Optional[str] = None,
+ struct_in_pandas: str = "dict",
+ ndarray_as_list: bool = False,
+ df_for_struct: bool = False,
+ ) -> Union["pd.Series", "pd.DataFrame"]:
+ """
+ Convert a PyArrow Array or ChunkedArray to a pandas Series or
DataFrame.
+
+ Parameters
+ ----------
+ arrow_column : pa.Array or pa.ChunkedArray
+ The Arrow column to convert.
+ target_type : DataType
+ The target Spark type for the column to be converted to.
+ timezone : str, optional
+ Timezone for timestamp conversion. Required if the data contains
timestamp types.
+ struct_in_pandas : str, optional
+ How to represent struct types in pandas. Valid values are "dict",
"row", or "legacy".
+ Default is "dict".
+ ndarray_as_list : bool, optional
+ Whether to convert numpy ndarrays to Python lists. Default is
False.
+ df_for_struct : bool, optional
+ If True, convert struct columns to a DataFrame with columns
corresponding
+ to struct fields instead of a Series. Default is False.
+
+ Returns
+ -------
+ pd.Series or pd.DataFrame
+ Converted pandas Series. If df_for_struct is True and the type is
StructType,
+ returns a DataFrame with columns corresponding to struct fields.
+ """
+ return cls.convert_legacy(
+ arrow_column,
+ target_type,
+ timezone=timezone,
+ struct_in_pandas=struct_in_pandas,
+ ndarray_as_list=ndarray_as_list,
+ df_for_struct=df_for_struct,
+ )
diff --git a/python/pyspark/sql/pandas/serializers.py
b/python/pyspark/sql/pandas/serializers.py
index 74599869548d..6e5facc79aa6 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -404,30 +404,35 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
A timezone to respect when handling timestamp values
safecheck : bool
If True, conversion from Arrow to Pandas checks for overflow/truncation
- assign_cols_by_name : bool
- If True, then Pandas DataFrames will get columns by name
int_to_decimal_coercion_enabled : bool
If True, applies additional coercions in Python before converting to
Arrow
This has performance penalties.
+ struct_in_pandas : str, optional
+ How to represent struct in pandas ("dict", "row", etc.). Default is
"dict".
+ ndarray_as_list : bool, optional
+ Whether to convert ndarray as list. Default is False.
+ df_for_struct : bool, optional
+ If True, convert struct columns to DataFrame instead of Series.
Default is False.
+ input_type : StructType, optional
+ Spark types for each column. Default is None.
"""
- def __init__(self, timezone, safecheck, int_to_decimal_coercion_enabled):
+ def __init__(
+ self,
+ timezone,
+ safecheck,
+ int_to_decimal_coercion_enabled: bool = False,
+ struct_in_pandas: str = "dict",
+ ndarray_as_list: bool = False,
+ df_for_struct: bool = False,
+ ):
super().__init__()
self._timezone = timezone
self._safecheck = safecheck
self._int_to_decimal_coercion_enabled = int_to_decimal_coercion_enabled
-
- def arrow_to_pandas(
- self, arrow_column, idx, struct_in_pandas="dict",
ndarray_as_list=False, spark_type=None
- ):
- return ArrowArrayToPandasConversion.convert_legacy(
- arrow_column,
- spark_type or from_arrow_type(arrow_column.type),
- timezone=self._timezone,
- struct_in_pandas=struct_in_pandas,
- ndarray_as_list=ndarray_as_list,
- df_for_struct=False,
- )
+ self._struct_in_pandas = struct_in_pandas
+ self._ndarray_as_list = ndarray_as_list
+ self._df_for_struct = df_for_struct
def _create_array(self, series, arrow_type, spark_type=None,
arrow_cast=False):
"""
@@ -550,16 +555,27 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
"""
Deserialize ArrowRecordBatches to an Arrow table and return as a list
of pandas.Series.
"""
- batches = super().load_stream(stream)
import pandas as pd
+ import pyspark
+ batches = super().load_stream(stream)
for batch in batches:
- pandas_batches = [
- self.arrow_to_pandas(batch.column(i), i) for i in
range(batch.num_columns)
- ]
- if len(pandas_batches) == 0:
+ if batch.num_columns == 0:
yield [pd.Series([pyspark._NoValue] * batch.num_rows)]
else:
+ pandas_batches = [
+ ArrowArrayToPandasConversion.convert(
+ batch.column(i),
+ self._input_type[i].dataType
+ if self._input_type is not None
+ else from_arrow_type(batch.column(i).type),
+ timezone=self._timezone,
+ struct_in_pandas=self._struct_in_pandas,
+ ndarray_as_list=self._ndarray_as_list,
+ df_for_struct=self._df_for_struct,
+ )
+ for i in range(batch.num_columns)
+ ]
yield pandas_batches
def __repr__(self):
@@ -583,31 +599,20 @@ class
ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
input_type: Optional[StructType] = None,
int_to_decimal_coercion_enabled: bool = False,
):
- super().__init__(timezone, safecheck, int_to_decimal_coercion_enabled)
+ super().__init__(
+ timezone,
+ safecheck,
+ int_to_decimal_coercion_enabled,
+ struct_in_pandas,
+ ndarray_as_list,
+ df_for_struct,
+ )
self._assign_cols_by_name = assign_cols_by_name
- self._df_for_struct = df_for_struct
- self._struct_in_pandas = struct_in_pandas
- self._ndarray_as_list = ndarray_as_list
self._arrow_cast = arrow_cast
if input_type is not None:
assert isinstance(input_type, StructType)
self._input_type = input_type
- def arrow_to_pandas(self, arrow_column, idx):
- if self._input_type is not None:
- spark_type = self._input_type[idx].dataType
- else:
- spark_type = from_arrow_type(arrow_column.type)
-
- return ArrowArrayToPandasConversion.convert_legacy(
- arr=arrow_column,
- spark_type=spark_type,
- timezone=self._timezone,
- struct_in_pandas=self._struct_in_pandas,
- ndarray_as_list=self._ndarray_as_list,
- df_for_struct=self._df_for_struct,
- )
-
def _create_struct_array(
self,
df: "pd.DataFrame",
@@ -1104,7 +1109,19 @@ class
ArrowStreamAggPandasUDFSerializer(ArrowStreamPandasUDFSerializer):
# Lazily read and convert Arrow batches to pandas Series one at a
time
# from the stream. This avoids loading all batches into memory for
the group
series_iter = (
- tuple(self.arrow_to_pandas(c, i) for i, c in
enumerate(batch.columns))
+ tuple(
+ ArrowArrayToPandasConversion.convert(
+ c,
+ self._input_type[i].dataType
+ if self._input_type is not None
+ else from_arrow_type(c.type),
+ timezone=self._timezone,
+ struct_in_pandas=self._struct_in_pandas,
+ ndarray_as_list=self._ndarray_as_list,
+ df_for_struct=self._df_for_struct,
+ )
+ for i, c in enumerate(batch.columns)
+ )
for batch in batches
)
yield series_iter
@@ -1147,9 +1164,18 @@ class
GroupPandasUDFSerializer(ArrowStreamPandasUDFSerializer):
def process_group(batches: "Iterator[pa.RecordBatch]"):
# Convert each Arrow batch to pandas Series list on-demand,
yielding one list per batch
for batch in batches:
- # The batch from ArrowStreamSerializer is already flattened
(no struct wrapper)
series = [
- self.arrow_to_pandas(batch.column(i), i) for i in
range(batch.num_columns)
+ ArrowArrayToPandasConversion.convert(
+ batch.column(i),
+ self._input_type[i].dataType
+ if self._input_type is not None
+ else from_arrow_type(batch.column(i).type),
+ timezone=self._timezone,
+ struct_in_pandas=self._struct_in_pandas,
+ ndarray_as_list=self._ndarray_as_list,
+ df_for_struct=self._df_for_struct,
+ )
+ for i in range(batch.num_columns)
]
yield series
@@ -1208,11 +1234,29 @@ class
CogroupPandasUDFSerializer(ArrowStreamPandasUDFSerializer):
for left_batches, right_batches in self._load_group_dataframes(stream,
num_dfs=2):
yield (
[
- self.arrow_to_pandas(c, i)
+ ArrowArrayToPandasConversion.convert(
+ c,
+ self._input_type[i].dataType
+ if self._input_type is not None
+ else from_arrow_type(c.type),
+ timezone=self._timezone,
+ struct_in_pandas=self._struct_in_pandas,
+ ndarray_as_list=self._ndarray_as_list,
+ df_for_struct=self._df_for_struct,
+ )
for i, c in
enumerate(pa.Table.from_batches(left_batches).itercolumns())
],
[
- self.arrow_to_pandas(c, i)
+ ArrowArrayToPandasConversion.convert(
+ c,
+ self._input_type[i].dataType
+ if self._input_type is not None
+ else from_arrow_type(c.type),
+ timezone=self._timezone,
+ struct_in_pandas=self._struct_in_pandas,
+ ndarray_as_list=self._ndarray_as_list,
+ df_for_struct=self._df_for_struct,
+ )
for i, c in
enumerate(pa.Table.from_batches(right_batches).itercolumns())
],
)
@@ -1247,11 +1291,15 @@ class
ApplyInPandasWithStateSerializer(ArrowStreamPandasUDFSerializer):
int_to_decimal_coercion_enabled,
):
super().__init__(
- timezone,
- safecheck,
- assign_cols_by_name,
- int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
+ timezone=timezone,
+ safecheck=safecheck,
+ assign_cols_by_name=assign_cols_by_name,
+ df_for_struct=False,
+ struct_in_pandas="dict",
+ ndarray_as_list=False,
arrow_cast=True,
+ input_type=None,
+ int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
)
self.pickleSer = CPickleSerializer()
self.utf8_deserializer = UTF8Deserializer()
@@ -1375,7 +1423,17 @@ class
ApplyInPandasWithStateSerializer(ArrowStreamPandasUDFSerializer):
)
state_arrow =
pa.Table.from_batches([state_batch]).itercolumns()
- state_pandas = [self.arrow_to_pandas(c, i) for i, c in
enumerate(state_arrow)][0]
+ state_pandas = [
+ ArrowArrayToPandasConversion.convert(
+ c,
+ from_arrow_type(c.type),
+ timezone=self._timezone,
+ struct_in_pandas=self._struct_in_pandas,
+ ndarray_as_list=self._ndarray_as_list,
+ df_for_struct=self._df_for_struct,
+ )
+ for c in state_arrow
+ ][0]
for state_idx in range(0, len(state_pandas)):
state_info_col = state_pandas.iloc[state_idx]
@@ -1407,7 +1465,17 @@ class
ApplyInPandasWithStateSerializer(ArrowStreamPandasUDFSerializer):
data_batch_for_group = data_batch.slice(data_start_offset,
num_data_rows)
data_arrow =
pa.Table.from_batches([data_batch_for_group]).itercolumns()
- data_pandas = [self.arrow_to_pandas(c, i) for i, c in
enumerate(data_arrow)]
+ data_pandas = [
+ ArrowArrayToPandasConversion.convert(
+ c,
+ from_arrow_type(c.type),
+ timezone=self._timezone,
+ struct_in_pandas=self._struct_in_pandas,
+ ndarray_as_list=self._ndarray_as_list,
+ df_for_struct=self._df_for_struct,
+ )
+ for c in data_arrow
+ ]
# state info
yield (
@@ -1613,11 +1681,15 @@ class
TransformWithStateInPandasSerializer(ArrowStreamPandasUDFSerializer):
int_to_decimal_coercion_enabled,
):
super().__init__(
- timezone,
- safecheck,
- assign_cols_by_name,
- int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
+ timezone=timezone,
+ safecheck=safecheck,
+ assign_cols_by_name=assign_cols_by_name,
+ df_for_struct=False,
+ struct_in_pandas="dict",
+ ndarray_as_list=False,
arrow_cast=True,
+ input_type=None,
+ int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
)
self.arrow_max_records_per_batch = (
arrow_max_records_per_batch if arrow_max_records_per_batch > 0
else 2**31 - 1
@@ -1671,8 +1743,15 @@ class
TransformWithStateInPandasSerializer(ArrowStreamPandasUDFSerializer):
for batch in batches:
self._update_batch_size_stats(batch)
data_pandas = [
- self.arrow_to_pandas(c, i)
- for i, c in
enumerate(pa.Table.from_batches([batch]).itercolumns())
+ ArrowArrayToPandasConversion.convert(
+ c,
+ from_arrow_type(c.type),
+ timezone=self._timezone,
+ struct_in_pandas=self._struct_in_pandas,
+ ndarray_as_list=self._ndarray_as_list,
+ df_for_struct=self._df_for_struct,
+ )
+ for c in pa.Table.from_batches([batch]).itercolumns()
]
for row in pd.concat(data_pandas,
axis=1).itertuples(index=False):
batch_key = tuple(row[s] for s in self.key_offsets)
@@ -1801,13 +1880,31 @@ class
TransformWithStateInPandasInitStateSerializer(TransformWithStateInPandasSe
flatten_state_table = flatten_columns(batch, "inputData")
data_pandas = [
- self.arrow_to_pandas(c, i)
+ ArrowArrayToPandasConversion.convert(
+ c,
+ self._input_type[i].dataType
+ if self._input_type is not None
+ else from_arrow_type(c.type),
+ timezone=self._timezone,
+ struct_in_pandas=self._struct_in_pandas,
+ ndarray_as_list=self._ndarray_as_list,
+ df_for_struct=self._df_for_struct,
+ )
for i, c in
enumerate(flatten_state_table.itercolumns())
]
flatten_init_table = flatten_columns(batch, "initState")
init_data_pandas = [
- self.arrow_to_pandas(c, i)
+ ArrowArrayToPandasConversion.convert(
+ c,
+ self._input_type[i].dataType
+ if self._input_type is not None
+ else from_arrow_type(c.type),
+ timezone=self._timezone,
+ struct_in_pandas=self._struct_in_pandas,
+ ndarray_as_list=self._ndarray_as_list,
+ df_for_struct=self._df_for_struct,
+ )
for i, c in enumerate(flatten_init_table.itercolumns())
]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]