This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f5fd6561a09 [SPARK-54969][PYTHON] Implement new arrow->pandas 
conversion
3f5fd6561a09 is described below

commit 3f5fd6561a09a134d55cc80ba2f1f6364d17e368
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Feb 3 15:44:33 2026 +0800

    [SPARK-54969][PYTHON] Implement new arrow->pandas conversion
    
    ### What changes were proposed in this pull request?
    Implement new arrow->pandas conversion
    
    ### Why are the changes needed?
    to replace existing conversion, based on my local test, the conversion of 
integers with nulls is 3x+ faster:
    
    conversion used in `convert_legacy`
    ```
            # %timeit 
a.to_pandas(integer_object_nulls=True).astype(pd.Int64Dtype())
            # 2.05 s ± 22.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop 
each)
    ```
    
    new conversion
    ```
            # %timeit 
a.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int64Dtype())
            # 589 ms ± 9.35 ms per loop (mean ± std. dev. of 7 runs, 1 loop 
each)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    ci
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #54105 from zhengruifeng/convert_np.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/sql/conversion.py | 246 +++++++++++++++++++++++++++++++++------
 1 file changed, 210 insertions(+), 36 deletions(-)

diff --git a/python/pyspark/sql/conversion.py b/python/pyspark/sql/conversion.py
index 105c06c7a4a1..f65e6cb814bf 100644
--- a/python/pyspark/sql/conversion.py
+++ b/python/pyspark/sql/conversion.py
@@ -31,7 +31,14 @@ from pyspark.sql.pandas.utils import 
require_minimum_pyarrow_version
 from pyspark.sql.types import (
     ArrayType,
     BinaryType,
+    BooleanType,
+    ByteType,
+    ShortType,
+    IntegerType,
+    LongType,
     DataType,
+    FloatType,
+    DoubleType,
     DecimalType,
     GeographyType,
     Geography,
@@ -43,8 +50,12 @@ from pyspark.sql.types import (
     StringType,
     StructField,
     StructType,
+    DateType,
+    TimeType,
     TimestampNTZType,
     TimestampType,
+    DayTimeIntervalType,
+    YearMonthIntervalType,
     UserDefinedType,
     VariantType,
     VariantVal,
@@ -1012,6 +1023,62 @@ class ArrowArrayToPandasConversion:
     where Arrow data needs to be converted to pandas for Python UDF processing.
     """
 
+    @classmethod
+    def convert(
+        cls,
+        arrow_column: Union["pa.Array", "pa.ChunkedArray"],
+        target_type: DataType,
+        *,
+        timezone: Optional[str] = None,
+        struct_in_pandas: str = "dict",
+        ndarray_as_list: bool = False,
+        df_for_struct: bool = False,
+    ) -> Union["pd.Series", "pd.DataFrame"]:
+        """
+        Convert a PyArrow Array or ChunkedArray to a pandas Series or 
DataFrame.
+
+        Parameters
+        ----------
+        arrow_column : pa.Array or pa.ChunkedArray
+            The Arrow column to convert.
+        target_type : DataType
+            The target Spark type for the column to be converted to.
+        timezone : str, optional
+            Timezone for timestamp conversion. Required if the data contains 
timestamp types.
+        struct_in_pandas : str, optional
+            How to represent struct types in pandas. Valid values are "dict", 
"row", or "legacy".
+            Default is "dict".
+        ndarray_as_list : bool, optional
+            Whether to convert numpy ndarrays to Python lists. Default is 
False.
+        df_for_struct : bool, optional
+            If True, convert struct columns to a DataFrame with columns 
corresponding
+            to struct fields instead of a Series. Default is False.
+
+        Returns
+        -------
+        pd.Series or pd.DataFrame
+            Converted pandas Series. If df_for_struct is True and the type is 
StructType,
+            returns a DataFrame with columns corresponding to struct fields.
+        """
+        if cls._prefer_convert_numpy(target_type, df_for_struct):
+            return cls.convert_numpy(
+                arrow_column,
+                target_type,
+                timezone=timezone,
+                struct_in_pandas=struct_in_pandas,
+                ndarray_as_list=ndarray_as_list,
+                df_for_struct=df_for_struct,
+            )
+
+        return cls.convert_legacy(
+            arrow_column,
+            target_type,
+            timezone=timezone,
+            struct_in_pandas=struct_in_pandas,
+            ndarray_as_list=ndarray_as_list,
+            df_for_struct=df_for_struct,
+        )
+
     @classmethod
     def convert_legacy(
         cls,
@@ -1112,47 +1179,154 @@ class ArrowArrayToPandasConversion:
         return converter(ser)
 
     @classmethod
-    def convert(
+    def _prefer_convert_numpy(
         cls,
-        arrow_column: Union["pa.Array", "pa.ChunkedArray"],
-        target_type: DataType,
+        spark_type: DataType,
+        df_for_struct: bool,
+    ) -> bool:
+        supported_types = (
+            NullType,
+            BinaryType,
+            BooleanType,
+            FloatType,
+            DoubleType,
+            ByteType,
+            ShortType,
+            IntegerType,
+            LongType,
+        )
+        if df_for_struct and isinstance(spark_type, StructType):
+            return all(isinstance(f.dataType, supported_types) for f in 
spark_type.fields)
+        else:
+            return isinstance(spark_type, supported_types)
+
+    @classmethod
+    def convert_numpy(
+        cls,
+        arr: Union["pa.Array", "pa.ChunkedArray"],
+        spark_type: DataType,
         *,
         timezone: Optional[str] = None,
-        struct_in_pandas: str = "dict",
+        struct_in_pandas: Optional[str] = None,
         ndarray_as_list: bool = False,
         df_for_struct: bool = False,
     ) -> Union["pd.Series", "pd.DataFrame"]:
-        """
-        Convert a PyArrow Array or ChunkedArray to a pandas Series or 
DataFrame.
+        import pyarrow as pa
+        import pandas as pd
 
-        Parameters
-        ----------
-        arrow_column : pa.Array or pa.ChunkedArray
-            The Arrow column to convert.
-        target_type : DataType
-            The target Spark type for the column to be converted to.
-        timezone : str, optional
-            Timezone for timestamp conversion. Required if the data contains 
timestamp types.
-        struct_in_pandas : str, optional
-            How to represent struct types in pandas. Valid values are "dict", 
"row", or "legacy".
-            Default is "dict".
-        ndarray_as_list : bool, optional
-            Whether to convert numpy ndarrays to Python lists. Default is 
False.
-        df_for_struct : bool, optional
-            If True, convert struct columns to a DataFrame with columns 
corresponding
-            to struct fields instead of a Series. Default is False.
+        assert isinstance(arr, (pa.Array, pa.ChunkedArray))
 
-        Returns
-        -------
-        pd.Series or pd.DataFrame
-            Converted pandas Series. If df_for_struct is True and the type is 
StructType,
-            returns a DataFrame with columns corresponding to struct fields.
-        """
-        return cls.convert_legacy(
-            arrow_column,
-            target_type,
-            timezone=timezone,
-            struct_in_pandas=struct_in_pandas,
-            ndarray_as_list=ndarray_as_list,
-            df_for_struct=df_for_struct,
-        )
+        if df_for_struct and isinstance(spark_type, StructType):
+            import pyarrow.types as types
+
+            assert types.is_struct(arr.type)
+            assert len(spark_type.names) == len(arr.type.names), 
f"{spark_type} {arr.type} "
+
+            series = [
+                cls.convert_numpy(
+                    field_arr,
+                    spark_type=field.dataType,
+                    timezone=timezone,
+                    struct_in_pandas=struct_in_pandas,
+                    ndarray_as_list=ndarray_as_list,
+                    df_for_struct=False,  # always False for child fields
+                )
+                for field_arr, field in zip(arr.flatten(), spark_type)
+            ]
+            pdf = pd.concat(series, axis=1)
+            pdf.columns = spark_type.names  # type: ignore[assignment]
+            return pdf
+
+        arr = ArrowTimestampConversion.localize_tz(arr)
+
+        # TODO(SPARK-55332): Create benchmark for pa.array -> pd.series 
integer conversion
+        # 1, benchmark a nullable integral array
+        # a = pa.array(list(range(10000000)) + [9223372036854775707, None], 
type=pa.int64())
+        # %timeit a.to_pandas(types_mapper=pd.ArrowDtype)
+        # 11.9 μs ± 407 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops 
each)
+        # %timeit 
a.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int64Dtype())
+        # 589 ms ± 9.35 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
+        # %timeit pd.Series(a.to_pylist(), dtype=pd.Int64Dtype())
+        # 2.94 s ± 19.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
+        # %timeit 
a.to_pandas(integer_object_nulls=True).astype(pd.Int64Dtype())
+        # 2.05 s ± 22.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
+        # pd.Series(a, dtype=pd.Int64Dtype())
+        # fails due to internal np.float64 coercion
+        # OverflowError: Python int too large to convert to C long
+        #
+        # 2, benchmark a nullable integral array
+        # b = pa.array(list(range(10000000)) + [9223372036854775707, 1], 
type=pa.int64())
+        # %timeit b.to_pandas(types_mapper=pd.ArrowDtype).astype(np.int64)
+        # 30.2 μs ± 831 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops 
each)
+        # %timeit pd.Series(b.to_pandas(types_mapper=pd.ArrowDtype), 
dtype=np.int64)
+        # 33.3 μs ± 928 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops 
each)
+        # %timeit pd.Series(b, dtype=np.int64) <- lose the name
+        # 11.9 μs ± 125 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops 
each)
+        # %timeit b.to_pandas()
+        # 7.56 μs ± 96.5 ns per loop (mean ± std. dev. of 7 runs, 100,000 
loops each)
+        # %timeit b.to_pandas().astype(np.int64) <- astype is non-trivial
+        # 19.1 μs ± 242 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops 
each)
+        if isinstance(spark_type, ByteType):
+            if arr.null_count > 0:
+                return 
arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int8Dtype())
+            else:
+                return arr.to_pandas()
+        elif isinstance(spark_type, ShortType):
+            if arr.null_count > 0:
+                return 
arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int16Dtype())
+            else:
+                return arr.to_pandas()
+        elif isinstance(spark_type, IntegerType):
+            if arr.null_count > 0:
+                return 
arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int32Dtype())
+            else:
+                return arr.to_pandas()
+        elif isinstance(spark_type, LongType):
+            if arr.null_count > 0:
+                return 
arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int64Dtype())
+            else:
+                return arr.to_pandas()
+        elif isinstance(
+            spark_type,
+            (
+                NullType,
+                BinaryType,
+                BooleanType,
+                FloatType,
+                DoubleType,
+                DecimalType,
+                StringType,
+                DateType,
+                TimeType,
+                TimestampType,
+                TimestampNTZType,
+                DayTimeIntervalType,
+                YearMonthIntervalType,
+            ),
+        ):
+            # TODO(SPARK-55333): Revisit date_as_object in arrow->pandas 
conversion
+            # TODO(SPARK-55334): Implement coerce_temporal_nanoseconds
+            # If the given column is a date type column, creates a series of 
datetime.date directly
+            # instead of creating datetime64[ns] as intermediate data to avoid 
overflow caused by
+            # datetime64[ns] type handling.
+            # Cast dates to objects instead of datetime64[ns] dtype to avoid 
overflow.
+            pandas_options = {
+                "date_as_object": True,
+                "coerce_temporal_nanoseconds": True,
+            }
+            return arr.to_pandas(**pandas_options)
+        # elif isinstance(
+        #     spark_type,
+        #     (
+        #         ArrayType,
+        #         MapType,
+        #         StructType,
+        #         UserDefinedType,
+        #         VariantType,
+        #         GeographyType,
+        #         GeometryType,
+        #     ),
+        # ):
+        # TODO(SPARK-55324): Support complex types
+        else:  # pragma: no cover
+            assert False, f"Need converter for {spark_type} but failed to find 
one."


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to