This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a189a450e990 [SPARK-54650][PYTHON] Move int to decimal conversion into
`_create_converter_from_pandas`
a189a450e990 is described below
commit a189a450e9906362749e3ad3d022f593a2a562e4
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Dec 9 17:26:05 2025 +0800
[SPARK-54650][PYTHON] Move int to decimal conversion into
`_create_converter_from_pandas`
### What changes were proposed in this pull request?
Move int to decimal conversion into `_create_converter_from_pandas`
### Why are the changes needed?
this conversion should be in `_create_converter_from_pandas`
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
ci
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #53405 from zhengruifeng/mv_int_dec.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/sql/pandas/serializers.py | 47 ++++----------------------------
python/pyspark/sql/pandas/types.py | 25 +++++++++++++++++
2 files changed, 30 insertions(+), 42 deletions(-)
diff --git a/python/pyspark/sql/pandas/serializers.py
b/python/pyspark/sql/pandas/serializers.py
index aae1c34b47af..43a42d7fc3b4 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -19,7 +19,6 @@
Serializers for PyArrow and pandas conversions. See `pyspark.serializers` for
more details.
"""
-from decimal import Decimal
from itertools import groupby
from typing import TYPE_CHECKING, Iterator, Optional
@@ -356,40 +355,6 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
self._safecheck = safecheck
self._int_to_decimal_coercion_enabled = int_to_decimal_coercion_enabled
- @staticmethod
- def _apply_python_coercions(series, arrow_type):
- """
- Apply additional coercions to the series in Python before converting
to Arrow:
- - Convert integer series to decimal type.
- When we have a pandas series of integers that needs to be converted
to
- pyarrow.decimal128 (with precision < 20), PyArrow fails with
precision errors.
- Explicitly cast to Decimal first.
-
- Parameters
- ----------
- series : pandas.Series
- The series to potentially convert
- arrow_type : pyarrow.DataType
- The target arrow type
-
- Returns
- -------
- pandas.Series
- The potentially converted pandas series
- """
- import pyarrow.types as types
- import pandas as pd
-
- # Convert integer series to Decimal objects
- if (
- types.is_decimal(arrow_type)
- and series.dtype.kind in ["i", "u"] # integer types
(signed/unsigned)
- and not series.empty
- ):
- series = series.apply(lambda x: Decimal(x) if pd.notna(x) else
None)
-
- return series
-
def arrow_to_pandas(
self, arrow_column, idx, struct_in_pandas="dict",
ndarray_as_list=False, spark_type=None
):
@@ -442,13 +407,13 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
if arrow_type is not None:
dt = spark_type or from_arrow_type(arrow_type,
prefer_timestamp_ntz=True)
conv = _create_converter_from_pandas(
- dt, timezone=self._timezone,
error_on_duplicated_field_names=False
+ dt,
+ timezone=self._timezone,
+ error_on_duplicated_field_names=False,
+
int_to_decimal_coercion_enabled=self._int_to_decimal_coercion_enabled,
)
series = conv(series)
- if self._int_to_decimal_coercion_enabled:
- series = self._apply_python_coercions(series, arrow_type)
-
if hasattr(series.array, "__arrow_array__"):
mask = None
else:
@@ -1046,12 +1011,10 @@ class
ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer):
timezone=self._timezone,
error_on_duplicated_field_names=False,
ignore_unexpected_complex_type_values=True,
+
int_to_decimal_coercion_enabled=self._int_to_decimal_coercion_enabled,
)
series = conv(series)
- if self._int_to_decimal_coercion_enabled:
- series = self._apply_python_coercions(series, arrow_type)
-
if hasattr(series.array, "__arrow_array__"):
mask = None
else:
diff --git a/python/pyspark/sql/pandas/types.py
b/python/pyspark/sql/pandas/types.py
index a6b4adc0f23a..cfcca827fffe 100644
--- a/python/pyspark/sql/pandas/types.py
+++ b/python/pyspark/sql/pandas/types.py
@@ -22,6 +22,7 @@ pandas instances during the type conversion.
import datetime
import itertools
import functools
+from decimal import Decimal
from typing import Any, Callable, Iterable, List, Optional, Union,
TYPE_CHECKING
from pyspark.errors import PySparkTypeError, UnsupportedOperationException,
PySparkValueError
@@ -1225,6 +1226,7 @@ def _create_converter_from_pandas(
timezone: Optional[str] = None,
error_on_duplicated_field_names: bool = True,
ignore_unexpected_complex_type_values: bool = False,
+ int_to_decimal_coercion_enabled: bool = False,
) -> Callable[["pd.Series"], "pd.Series"]:
"""
Create a converter of pandas Series to create Spark DataFrame with Arrow
optimization.
@@ -1264,6 +1266,29 @@ def _create_converter_from_pandas(
return correct_timestamp
+ elif isinstance(data_type, DecimalType):
+ if int_to_decimal_coercion_enabled:
+ # For decimal with low precision, e.g. pa.decimal128(1)
+ # pa.Array.from_pandas(pd.Series([1,2,3])).cast(pa.decimal128(1))
fails with
+ # ArrowInvalid: Precision is not great enough for the result.
+ # It should be at least 19.
+ # Here change it to
+ # pa.Array.from_pandas(pd.Series([1,2,3]).apply(
+ # lambda x: Decimal(x))).cast(pa.decimal128(1))
+
+ def convert_int_to_decimal(pser: pd.Series) -> pd.Series:
+ if pser.dtype.kind in ["i", "u"]:
+ return pser.apply( # type: ignore[return-value]
+ lambda x: Decimal(x) if pd.notna(x) else None
+ )
+ else:
+ return pser
+
+ return convert_int_to_decimal
+
+ else:
+ return lambda pser: pser
+
def _converter(dt: DataType) -> Optional[Callable[[Any], Any]]:
if isinstance(dt, ArrayType):
_element_conv = _converter(dt.elementType)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]