This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8c4916590b09 [SPARK-43579][PYTHON] optim: Cache the converter between Arrow and pandas for reuse 8c4916590b09 is described below commit 8c4916590b09d77868456ff3bc60ac1500beefb9 Author: Peter Nguyen <petern0...@gmail.com> AuthorDate: Mon Sep 15 07:57:12 2025 +0900 [SPARK-43579][PYTHON] optim: Cache the converter between Arrow and pandas for reuse ### What changes were proposed in this pull request? Cache the converter between Arrow and pandas using memoization to avoid recreating duplicate ones unnecessarily. ### Why are the changes needed? Performance improvement ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Passes existing tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #52332 from petern48/cache_arrow_pandas_converter. Authored-by: Peter Nguyen <petern0...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/sql/pandas/serializers.py | 37 +++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/python/pyspark/sql/pandas/serializers.py b/python/pyspark/sql/pandas/serializers.py index 860335d1ff0f..0965a2e7b546 100644 --- a/python/pyspark/sql/pandas/serializers.py +++ b/python/pyspark/sql/pandas/serializers.py @@ -355,6 +355,7 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer): self._timezone = timezone self._safecheck = safecheck self._int_to_decimal_coercion_enabled = int_to_decimal_coercion_enabled + self._converter_cache = {} @staticmethod def _apply_python_coercions(series, arrow_type): @@ -403,15 +404,24 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer): } s = arrow_column.to_pandas(**pandas_options) - # TODO(SPARK-43579): cache the converter for reuse - converter = _create_converter_to_pandas( - data_type=spark_type or from_arrow_type(arrow_column.type, prefer_timestamp_ntz=True), - nullable=True, - timezone=self._timezone, - struct_in_pandas=struct_in_pandas, - error_on_duplicated_field_names=True, - ndarray_as_list=ndarray_as_list, + data_type = spark_type or from_arrow_type(arrow_column.type, prefer_timestamp_ntz=True) + key = ( + data_type.json(), + struct_in_pandas, + ndarray_as_list, ) + + if key not in self._converter_cache: + self._converter_cache[key] = _create_converter_to_pandas( + data_type=data_type, + nullable=True, + timezone=self._timezone, + struct_in_pandas=struct_in_pandas, + error_on_duplicated_field_names=True, + ndarray_as_list=ndarray_as_list, + ) + + converter = self._converter_cache[key] return converter(s) def _create_array(self, series, arrow_type, spark_type=None, arrow_cast=False): @@ -442,10 +452,13 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer): if arrow_type is not None: dt = spark_type or from_arrow_type(arrow_type, prefer_timestamp_ntz=True) - # TODO(SPARK-43579): cache the converter for reuse - conv = _create_converter_from_pandas( - dt, timezone=self._timezone, error_on_duplicated_field_names=False - ) + key = dt.json() + if key not in self._converter_cache: + self._converter_cache[key] = _create_converter_from_pandas( + dt, timezone=self._timezone, error_on_duplicated_field_names=False + ) + + conv = self._converter_cache[key] series = conv(series) if self._int_to_decimal_coercion_enabled: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org