This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a189a450e990 [SPARK-54650][PYTHON] Move int to decimal conversion into 
`_create_converter_from_pandas`
a189a450e990 is described below

commit a189a450e9906362749e3ad3d022f593a2a562e4
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Dec 9 17:26:05 2025 +0800

    [SPARK-54650][PYTHON] Move int to decimal conversion into 
`_create_converter_from_pandas`
    
    ### What changes were proposed in this pull request?
    Move int to decimal conversion into `_create_converter_from_pandas`
    
    ### Why are the changes needed?
    this conversion should be in `_create_converter_from_pandas`
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    ci
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #53405 from zhengruifeng/mv_int_dec.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/sql/pandas/serializers.py | 47 ++++----------------------------
 python/pyspark/sql/pandas/types.py       | 25 +++++++++++++++++
 2 files changed, 30 insertions(+), 42 deletions(-)

diff --git a/python/pyspark/sql/pandas/serializers.py 
b/python/pyspark/sql/pandas/serializers.py
index aae1c34b47af..43a42d7fc3b4 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -19,7 +19,6 @@
 Serializers for PyArrow and pandas conversions. See `pyspark.serializers` for 
more details.
 """
 
-from decimal import Decimal
 from itertools import groupby
 from typing import TYPE_CHECKING, Iterator, Optional
 
@@ -356,40 +355,6 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
         self._safecheck = safecheck
         self._int_to_decimal_coercion_enabled = int_to_decimal_coercion_enabled
 
-    @staticmethod
-    def _apply_python_coercions(series, arrow_type):
-        """
-        Apply additional coercions to the series in Python before converting 
to Arrow:
-        - Convert integer series to decimal type.
-          When we have a pandas series of integers that needs to be converted 
to
-          pyarrow.decimal128 (with precision < 20), PyArrow fails with 
precision errors.
-          Explicitly cast to Decimal first.
-
-        Parameters
-        ----------
-        series : pandas.Series
-            The series to potentially convert
-        arrow_type : pyarrow.DataType
-            The target arrow type
-
-        Returns
-        -------
-        pandas.Series
-            The potentially converted pandas series
-        """
-        import pyarrow.types as types
-        import pandas as pd
-
-        # Convert integer series to Decimal objects
-        if (
-            types.is_decimal(arrow_type)
-            and series.dtype.kind in ["i", "u"]  # integer types 
(signed/unsigned)
-            and not series.empty
-        ):
-            series = series.apply(lambda x: Decimal(x) if pd.notna(x) else 
None)
-
-        return series
-
     def arrow_to_pandas(
         self, arrow_column, idx, struct_in_pandas="dict", 
ndarray_as_list=False, spark_type=None
     ):
@@ -442,13 +407,13 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
         if arrow_type is not None:
             dt = spark_type or from_arrow_type(arrow_type, 
prefer_timestamp_ntz=True)
             conv = _create_converter_from_pandas(
-                dt, timezone=self._timezone, 
error_on_duplicated_field_names=False
+                dt,
+                timezone=self._timezone,
+                error_on_duplicated_field_names=False,
+                
int_to_decimal_coercion_enabled=self._int_to_decimal_coercion_enabled,
             )
             series = conv(series)
 
-            if self._int_to_decimal_coercion_enabled:
-                series = self._apply_python_coercions(series, arrow_type)
-
         if hasattr(series.array, "__arrow_array__"):
             mask = None
         else:
@@ -1046,12 +1011,10 @@ class 
ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer):
                 timezone=self._timezone,
                 error_on_duplicated_field_names=False,
                 ignore_unexpected_complex_type_values=True,
+                
int_to_decimal_coercion_enabled=self._int_to_decimal_coercion_enabled,
             )
             series = conv(series)
 
-            if self._int_to_decimal_coercion_enabled:
-                series = self._apply_python_coercions(series, arrow_type)
-
         if hasattr(series.array, "__arrow_array__"):
             mask = None
         else:
diff --git a/python/pyspark/sql/pandas/types.py 
b/python/pyspark/sql/pandas/types.py
index a6b4adc0f23a..cfcca827fffe 100644
--- a/python/pyspark/sql/pandas/types.py
+++ b/python/pyspark/sql/pandas/types.py
@@ -22,6 +22,7 @@ pandas instances during the type conversion.
 import datetime
 import itertools
 import functools
+from decimal import Decimal
 from typing import Any, Callable, Iterable, List, Optional, Union, 
TYPE_CHECKING
 
 from pyspark.errors import PySparkTypeError, UnsupportedOperationException, 
PySparkValueError
@@ -1225,6 +1226,7 @@ def _create_converter_from_pandas(
     timezone: Optional[str] = None,
     error_on_duplicated_field_names: bool = True,
     ignore_unexpected_complex_type_values: bool = False,
+    int_to_decimal_coercion_enabled: bool = False,
 ) -> Callable[["pd.Series"], "pd.Series"]:
     """
     Create a converter of pandas Series to create Spark DataFrame with Arrow 
optimization.
@@ -1264,6 +1266,29 @@ def _create_converter_from_pandas(
 
         return correct_timestamp
 
+    elif isinstance(data_type, DecimalType):
+        if int_to_decimal_coercion_enabled:
+            # For decimal with low precision, e.g. pa.decimal128(1)
+            # pa.Array.from_pandas(pd.Series([1,2,3])).cast(pa.decimal128(1)) 
fails with
+            # ArrowInvalid: Precision is not great enough for the result.
+            # It should be at least 19.
+            # Here change it to
+            # pa.Array.from_pandas(pd.Series([1,2,3]).apply(
+            #     lambda x: Decimal(x))).cast(pa.decimal128(1))
+
+            def convert_int_to_decimal(pser: pd.Series) -> pd.Series:
+                if pser.dtype.kind in ["i", "u"]:
+                    return pser.apply(  # type: ignore[return-value]
+                        lambda x: Decimal(x) if pd.notna(x) else None
+                    )
+                else:
+                    return pser
+
+            return convert_int_to_decimal
+
+        else:
+            return lambda pser: pser
+
     def _converter(dt: DataType) -> Optional[Callable[[Any], Any]]:
         if isinstance(dt, ArrayType):
             _element_conv = _converter(dt.elementType)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to