This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c4916590b09 [SPARK-43579][PYTHON] optim: Cache the converter between 
Arrow and pandas for reuse
8c4916590b09 is described below

commit 8c4916590b09d77868456ff3bc60ac1500beefb9
Author: Peter Nguyen <petern0...@gmail.com>
AuthorDate: Mon Sep 15 07:57:12 2025 +0900

    [SPARK-43579][PYTHON] optim: Cache the converter between Arrow and pandas 
for reuse
    
    ### What changes were proposed in this pull request?
    
    Cache the converter between Arrow and pandas using memoization to avoid 
recreating duplicate ones unnecessarily.
    
    ### Why are the changes needed?
    
    Performance improvement
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Passes existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #52332 from petern48/cache_arrow_pandas_converter.
    
    Authored-by: Peter Nguyen <petern0...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/sql/pandas/serializers.py | 37 +++++++++++++++++++++-----------
 1 file changed, 25 insertions(+), 12 deletions(-)

diff --git a/python/pyspark/sql/pandas/serializers.py 
b/python/pyspark/sql/pandas/serializers.py
index 860335d1ff0f..0965a2e7b546 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -355,6 +355,7 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
         self._timezone = timezone
         self._safecheck = safecheck
         self._int_to_decimal_coercion_enabled = int_to_decimal_coercion_enabled
+        self._converter_cache = {}
 
     @staticmethod
     def _apply_python_coercions(series, arrow_type):
@@ -403,15 +404,24 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
         }
         s = arrow_column.to_pandas(**pandas_options)
 
-        # TODO(SPARK-43579): cache the converter for reuse
-        converter = _create_converter_to_pandas(
-            data_type=spark_type or from_arrow_type(arrow_column.type, 
prefer_timestamp_ntz=True),
-            nullable=True,
-            timezone=self._timezone,
-            struct_in_pandas=struct_in_pandas,
-            error_on_duplicated_field_names=True,
-            ndarray_as_list=ndarray_as_list,
+        data_type = spark_type or from_arrow_type(arrow_column.type, 
prefer_timestamp_ntz=True)
+        key = (
+            data_type.json(),
+            struct_in_pandas,
+            ndarray_as_list,
         )
+
+        if key not in self._converter_cache:
+            self._converter_cache[key] = _create_converter_to_pandas(
+                data_type=data_type,
+                nullable=True,
+                timezone=self._timezone,
+                struct_in_pandas=struct_in_pandas,
+                error_on_duplicated_field_names=True,
+                ndarray_as_list=ndarray_as_list,
+            )
+
+        converter = self._converter_cache[key]
         return converter(s)
 
     def _create_array(self, series, arrow_type, spark_type=None, 
arrow_cast=False):
@@ -442,10 +452,13 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
 
         if arrow_type is not None:
             dt = spark_type or from_arrow_type(arrow_type, 
prefer_timestamp_ntz=True)
-            # TODO(SPARK-43579): cache the converter for reuse
-            conv = _create_converter_from_pandas(
-                dt, timezone=self._timezone, 
error_on_duplicated_field_names=False
-            )
+            key = dt.json()
+            if key not in self._converter_cache:
+                self._converter_cache[key] = _create_converter_from_pandas(
+                    dt, timezone=self._timezone, 
error_on_duplicated_field_names=False
+                )
+
+            conv = self._converter_cache[key]
             series = conv(series)
 
             if self._int_to_decimal_coercion_enabled:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to