This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8fe50fcdb6a [SPARK-44766][PYTHON] Cache the pandas converter for reuse
for Python UDTFs
8fe50fcdb6a is described below
commit 8fe50fcdb6a34b06c07c235f497b77cc5e245877
Author: allisonwang-db <[email protected]>
AuthorDate: Fri Aug 11 10:23:01 2023 +0900
[SPARK-44766][PYTHON] Cache the pandas converter for reuse for Python UDTFs
### What changes were proposed in this pull request?
This PR caches the pandas converter for reuse when serializing the results
from arrow-optimized Python UDTFs.
### Why are the changes needed?
To improve the performance
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests
Closes #42439 from allisonwang-db/spark-44766-cache-converter.
Authored-by: allisonwang-db <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/sql/pandas/serializers.py | 20 +++++++++++++-------
1 file changed, 13 insertions(+), 7 deletions(-)
diff --git a/python/pyspark/sql/pandas/serializers.py
b/python/pyspark/sql/pandas/serializers.py
index d1a3babb1fd..2cc3db15c9c 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -499,6 +499,7 @@ class
ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer):
# Enables explicit casting for mismatched return types of Arrow
Python UDTFs.
arrow_cast=True,
)
+ self._converter_map = dict()
def _create_batch(self, series):
"""
@@ -538,6 +539,17 @@ class
ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer):
return pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in
range(len(arrs))])
+ def _get_or_create_converter_from_pandas(self, dt):
+ if dt not in self._converter_map:
+ conv = _create_converter_from_pandas(
+ dt,
+ timezone=self._timezone,
+ error_on_duplicated_field_names=False,
+ ignore_unexpected_complex_type_values=True,
+ )
+ self._converter_map[dt] = conv
+ return self._converter_map[dt]
+
def _create_array(self, series, arrow_type, spark_type=None,
arrow_cast=False):
"""
Override the `_create_array` method in the superclass to create an
Arrow Array
@@ -569,13 +581,7 @@ class
ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer):
if arrow_type is not None:
dt = spark_type or from_arrow_type(arrow_type,
prefer_timestamp_ntz=True)
- # TODO(SPARK-43579): cache the converter for reuse
- conv = _create_converter_from_pandas(
- dt,
- timezone=self._timezone,
- error_on_duplicated_field_names=False,
- ignore_unexpected_complex_type_values=True,
- )
+ conv = self._get_or_create_converter_from_pandas(dt)
series = conv(series)
if hasattr(series.array, "__arrow_array__"):
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]