This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3f5fd6561a09 [SPARK-54969][PYTHON] Implement new arrow->pandas
conversion
3f5fd6561a09 is described below
commit 3f5fd6561a09a134d55cc80ba2f1f6364d17e368
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Feb 3 15:44:33 2026 +0800
[SPARK-54969][PYTHON] Implement new arrow->pandas conversion
### What changes were proposed in this pull request?
Implement new arrow->pandas conversion
### Why are the changes needed?
to replace existing conversion, based on my local test, the conversion of
integers with nulls is 3x+ faster:
conversion used in `convert_legacy`
```
# %timeit
a.to_pandas(integer_object_nulls=True).astype(pd.Int64Dtype())
# 2.05 s ± 22.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop
each)
```
new conversion
```
# %timeit
a.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int64Dtype())
# 589 ms ± 9.35 ms per loop (mean ± std. dev. of 7 runs, 1 loop
each)
```
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
ci
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #54105 from zhengruifeng/convert_np.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/sql/conversion.py | 246 +++++++++++++++++++++++++++++++++------
1 file changed, 210 insertions(+), 36 deletions(-)
diff --git a/python/pyspark/sql/conversion.py b/python/pyspark/sql/conversion.py
index 105c06c7a4a1..f65e6cb814bf 100644
--- a/python/pyspark/sql/conversion.py
+++ b/python/pyspark/sql/conversion.py
@@ -31,7 +31,14 @@ from pyspark.sql.pandas.utils import
require_minimum_pyarrow_version
from pyspark.sql.types import (
ArrayType,
BinaryType,
+ BooleanType,
+ ByteType,
+ ShortType,
+ IntegerType,
+ LongType,
DataType,
+ FloatType,
+ DoubleType,
DecimalType,
GeographyType,
Geography,
@@ -43,8 +50,12 @@ from pyspark.sql.types import (
StringType,
StructField,
StructType,
+ DateType,
+ TimeType,
TimestampNTZType,
TimestampType,
+ DayTimeIntervalType,
+ YearMonthIntervalType,
UserDefinedType,
VariantType,
VariantVal,
@@ -1012,6 +1023,62 @@ class ArrowArrayToPandasConversion:
where Arrow data needs to be converted to pandas for Python UDF processing.
"""
+ @classmethod
+ def convert(
+ cls,
+ arrow_column: Union["pa.Array", "pa.ChunkedArray"],
+ target_type: DataType,
+ *,
+ timezone: Optional[str] = None,
+ struct_in_pandas: str = "dict",
+ ndarray_as_list: bool = False,
+ df_for_struct: bool = False,
+ ) -> Union["pd.Series", "pd.DataFrame"]:
+ """
+ Convert a PyArrow Array or ChunkedArray to a pandas Series or
DataFrame.
+
+ Parameters
+ ----------
+ arrow_column : pa.Array or pa.ChunkedArray
+ The Arrow column to convert.
+ target_type : DataType
+ The target Spark type for the column to be converted to.
+ timezone : str, optional
+ Timezone for timestamp conversion. Required if the data contains
timestamp types.
+ struct_in_pandas : str, optional
+ How to represent struct types in pandas. Valid values are "dict",
"row", or "legacy".
+ Default is "dict".
+ ndarray_as_list : bool, optional
+ Whether to convert numpy ndarrays to Python lists. Default is
False.
+ df_for_struct : bool, optional
+ If True, convert struct columns to a DataFrame with columns
corresponding
+ to struct fields instead of a Series. Default is False.
+
+ Returns
+ -------
+ pd.Series or pd.DataFrame
+ Converted pandas Series. If df_for_struct is True and the type is
StructType,
+ returns a DataFrame with columns corresponding to struct fields.
+ """
+ if cls._prefer_convert_numpy(target_type, df_for_struct):
+ return cls.convert_numpy(
+ arrow_column,
+ target_type,
+ timezone=timezone,
+ struct_in_pandas=struct_in_pandas,
+ ndarray_as_list=ndarray_as_list,
+ df_for_struct=df_for_struct,
+ )
+
+ return cls.convert_legacy(
+ arrow_column,
+ target_type,
+ timezone=timezone,
+ struct_in_pandas=struct_in_pandas,
+ ndarray_as_list=ndarray_as_list,
+ df_for_struct=df_for_struct,
+ )
+
@classmethod
def convert_legacy(
cls,
@@ -1112,47 +1179,154 @@ class ArrowArrayToPandasConversion:
return converter(ser)
@classmethod
- def convert(
+ def _prefer_convert_numpy(
cls,
- arrow_column: Union["pa.Array", "pa.ChunkedArray"],
- target_type: DataType,
+ spark_type: DataType,
+ df_for_struct: bool,
+ ) -> bool:
+ supported_types = (
+ NullType,
+ BinaryType,
+ BooleanType,
+ FloatType,
+ DoubleType,
+ ByteType,
+ ShortType,
+ IntegerType,
+ LongType,
+ )
+ if df_for_struct and isinstance(spark_type, StructType):
+ return all(isinstance(f.dataType, supported_types) for f in
spark_type.fields)
+ else:
+ return isinstance(spark_type, supported_types)
+
+ @classmethod
+ def convert_numpy(
+ cls,
+ arr: Union["pa.Array", "pa.ChunkedArray"],
+ spark_type: DataType,
*,
timezone: Optional[str] = None,
- struct_in_pandas: str = "dict",
+ struct_in_pandas: Optional[str] = None,
ndarray_as_list: bool = False,
df_for_struct: bool = False,
) -> Union["pd.Series", "pd.DataFrame"]:
- """
- Convert a PyArrow Array or ChunkedArray to a pandas Series or
DataFrame.
+ import pyarrow as pa
+ import pandas as pd
- Parameters
- ----------
- arrow_column : pa.Array or pa.ChunkedArray
- The Arrow column to convert.
- target_type : DataType
- The target Spark type for the column to be converted to.
- timezone : str, optional
- Timezone for timestamp conversion. Required if the data contains
timestamp types.
- struct_in_pandas : str, optional
- How to represent struct types in pandas. Valid values are "dict",
"row", or "legacy".
- Default is "dict".
- ndarray_as_list : bool, optional
- Whether to convert numpy ndarrays to Python lists. Default is
False.
- df_for_struct : bool, optional
- If True, convert struct columns to a DataFrame with columns
corresponding
- to struct fields instead of a Series. Default is False.
+ assert isinstance(arr, (pa.Array, pa.ChunkedArray))
- Returns
- -------
- pd.Series or pd.DataFrame
- Converted pandas Series. If df_for_struct is True and the type is
StructType,
- returns a DataFrame with columns corresponding to struct fields.
- """
- return cls.convert_legacy(
- arrow_column,
- target_type,
- timezone=timezone,
- struct_in_pandas=struct_in_pandas,
- ndarray_as_list=ndarray_as_list,
- df_for_struct=df_for_struct,
- )
+ if df_for_struct and isinstance(spark_type, StructType):
+ import pyarrow.types as types
+
+ assert types.is_struct(arr.type)
+ assert len(spark_type.names) == len(arr.type.names),
f"{spark_type} {arr.type} "
+
+ series = [
+ cls.convert_numpy(
+ field_arr,
+ spark_type=field.dataType,
+ timezone=timezone,
+ struct_in_pandas=struct_in_pandas,
+ ndarray_as_list=ndarray_as_list,
+ df_for_struct=False, # always False for child fields
+ )
+ for field_arr, field in zip(arr.flatten(), spark_type)
+ ]
+ pdf = pd.concat(series, axis=1)
+ pdf.columns = spark_type.names # type: ignore[assignment]
+ return pdf
+
+ arr = ArrowTimestampConversion.localize_tz(arr)
+
+ # TODO(SPARK-55332): Create benchmark for pa.array -> pd.series
integer conversion
+ # 1, benchmark a nullable integral array
+ # a = pa.array(list(range(10000000)) + [9223372036854775707, None],
type=pa.int64())
+ # %timeit a.to_pandas(types_mapper=pd.ArrowDtype)
+ # 11.9 μs ± 407 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops
each)
+ # %timeit
a.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int64Dtype())
+ # 589 ms ± 9.35 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
+ # %timeit pd.Series(a.to_pylist(), dtype=pd.Int64Dtype())
+ # 2.94 s ± 19.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
+ # %timeit
a.to_pandas(integer_object_nulls=True).astype(pd.Int64Dtype())
+ # 2.05 s ± 22.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
+ # pd.Series(a, dtype=pd.Int64Dtype())
+ # fails due to internal np.float64 coercion
+ # OverflowError: Python int too large to convert to C long
+ #
+ # 2, benchmark a nullable integral array
+ # b = pa.array(list(range(10000000)) + [9223372036854775707, 1],
type=pa.int64())
+ # %timeit b.to_pandas(types_mapper=pd.ArrowDtype).astype(np.int64)
+ # 30.2 μs ± 831 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops
each)
+ # %timeit pd.Series(b.to_pandas(types_mapper=pd.ArrowDtype),
dtype=np.int64)
+ # 33.3 μs ± 928 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops
each)
+ # %timeit pd.Series(b, dtype=np.int64) <- lose the name
+ # 11.9 μs ± 125 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops
each)
+ # %timeit b.to_pandas()
+ # 7.56 μs ± 96.5 ns per loop (mean ± std. dev. of 7 runs, 100,000
loops each)
+ # %timeit b.to_pandas().astype(np.int64) <- astype is non-trivial
+ # 19.1 μs ± 242 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops
each)
+ if isinstance(spark_type, ByteType):
+ if arr.null_count > 0:
+ return
arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int8Dtype())
+ else:
+ return arr.to_pandas()
+ elif isinstance(spark_type, ShortType):
+ if arr.null_count > 0:
+ return
arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int16Dtype())
+ else:
+ return arr.to_pandas()
+ elif isinstance(spark_type, IntegerType):
+ if arr.null_count > 0:
+ return
arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int32Dtype())
+ else:
+ return arr.to_pandas()
+ elif isinstance(spark_type, LongType):
+ if arr.null_count > 0:
+ return
arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int64Dtype())
+ else:
+ return arr.to_pandas()
+ elif isinstance(
+ spark_type,
+ (
+ NullType,
+ BinaryType,
+ BooleanType,
+ FloatType,
+ DoubleType,
+ DecimalType,
+ StringType,
+ DateType,
+ TimeType,
+ TimestampType,
+ TimestampNTZType,
+ DayTimeIntervalType,
+ YearMonthIntervalType,
+ ),
+ ):
+ # TODO(SPARK-55333): Revisit date_as_object in arrow->pandas
conversion
+ # TODO(SPARK-55334): Implement coerce_temporal_nanoseconds
+ # If the given column is a date type column, creates a series of
datetime.date directly
+ # instead of creating datetime64[ns] as intermediate data to avoid
overflow caused by
+ # datetime64[ns] type handling.
+ # Cast dates to objects instead of datetime64[ns] dtype to avoid
overflow.
+ pandas_options = {
+ "date_as_object": True,
+ "coerce_temporal_nanoseconds": True,
+ }
+ return arr.to_pandas(**pandas_options)
+ # elif isinstance(
+ # spark_type,
+ # (
+ # ArrayType,
+ # MapType,
+ # StructType,
+ # UserDefinedType,
+ # VariantType,
+ # GeographyType,
+ # GeometryType,
+ # ),
+ # ):
+ # TODO(SPARK-55324): Support complex types
+ else: # pragma: no cover
+ assert False, f"Need converter for {spark_type} but failed to find
one."
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]