This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8fe50fcdb6a [SPARK-44766][PYTHON] Cache the pandas converter for reuse for Python UDTFs 8fe50fcdb6a is described below commit 8fe50fcdb6a34b06c07c235f497b77cc5e245877 Author: allisonwang-db <allison.w...@databricks.com> AuthorDate: Fri Aug 11 10:23:01 2023 +0900 [SPARK-44766][PYTHON] Cache the pandas converter for reuse for Python UDTFs ### What changes were proposed in this pull request? This PR caches the pandas converter for reuse when serializing the results from arrow-optimized Python UDTFs. ### Why are the changes needed? To improve the performance ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests Closes #42439 from allisonwang-db/spark-44766-cache-converter. Authored-by: allisonwang-db <allison.w...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/sql/pandas/serializers.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/pandas/serializers.py b/python/pyspark/sql/pandas/serializers.py index d1a3babb1fd..2cc3db15c9c 100644 --- a/python/pyspark/sql/pandas/serializers.py +++ b/python/pyspark/sql/pandas/serializers.py @@ -499,6 +499,7 @@ class ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer): # Enables explicit casting for mismatched return types of Arrow Python UDTFs. arrow_cast=True, ) + self._converter_map = dict() def _create_batch(self, series): """ @@ -538,6 +539,17 @@ class ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer): return pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in range(len(arrs))]) + def _get_or_create_converter_from_pandas(self, dt): + if dt not in self._converter_map: + conv = _create_converter_from_pandas( + dt, + timezone=self._timezone, + error_on_duplicated_field_names=False, + ignore_unexpected_complex_type_values=True, + ) + self._converter_map[dt] = conv + return self._converter_map[dt] + def _create_array(self, series, arrow_type, spark_type=None, arrow_cast=False): """ Override the `_create_array` method in the superclass to create an Arrow Array @@ -569,13 +581,7 @@ class ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer): if arrow_type is not None: dt = spark_type or from_arrow_type(arrow_type, prefer_timestamp_ntz=True) - # TODO(SPARK-43579): cache the converter for reuse - conv = _create_converter_from_pandas( - dt, - timezone=self._timezone, - error_on_duplicated_field_names=False, - ignore_unexpected_complex_type_values=True, - ) + conv = self._get_or_create_converter_from_pandas(dt) series = conv(series) if hasattr(series.array, "__arrow_array__"): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org