This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ac46ea6aa8ce [SPARK-55821][PYTHON] Enforce keyword-only arguments in
serializer __init__ methods
ac46ea6aa8ce is described below
commit ac46ea6aa8ce480d042f242125e050ea9a7dcbbf
Author: Yicong Huang <[email protected]>
AuthorDate: Thu Mar 5 17:37:20 2026 +0800
[SPARK-55821][PYTHON] Enforce keyword-only arguments in serializer __init__
methods
### What changes were proposed in this pull request?
Add `*` separator to enforce keyword-only arguments in all serializer
`__init__` methods in `pyspark.sql.pandas.serializers`, and convert all call
sites in `worker.py` from positional to keyword arguments.
### Why are the changes needed?
As noted by zhengruifeng in
https://github.com/apache/spark/pull/54568#discussion_r2875751193, serializer
constructors accept too many positional arguments, making call sites
error-prone and hard to read. Enforcing keyword-only arguments prevents
positional mistakes and improves readability.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #54605 from
Yicong-Huang/SPARK-55821/refactor/keyword-only-serializer-init.
Lead-authored-by: Yicong Huang
<[email protected]>
Co-authored-by: Yicong-Huang
<[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/sql/pandas/serializers.py | 54 +++++++++-------
python/pyspark/worker.py | 104 ++++++++++++++++---------------
2 files changed, 85 insertions(+), 73 deletions(-)
diff --git a/python/pyspark/sql/pandas/serializers.py
b/python/pyspark/sql/pandas/serializers.py
index 019a88472648..84f25a7bb9f5 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -267,7 +267,7 @@ class
ArrowStreamArrowUDTFSerializer(ArrowStreamUDTFSerializer):
Serializer for PyArrow-native UDTFs that work directly with PyArrow
RecordBatches and Arrays.
"""
- def __init__(self, table_arg_offsets=None):
+ def __init__(self, *, table_arg_offsets=None):
super().__init__()
self.table_arg_offsets = table_arg_offsets if table_arg_offsets else []
@@ -360,7 +360,7 @@ class
ArrowStreamGroupUDFSerializer(ArrowStreamUDFSerializer):
If True, reorder serialized columns by schema name.
"""
- def __init__(self, assign_cols_by_name):
+ def __init__(self, *, assign_cols_by_name):
super().__init__()
self._assign_cols_by_name = assign_cols_by_name
@@ -427,6 +427,7 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
def __init__(
self,
+ *,
timezone,
safecheck,
int_to_decimal_coercion_enabled: bool = False,
@@ -507,6 +508,7 @@ class
ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
def __init__(
self,
+ *,
timezone,
safecheck,
assign_cols_by_name,
@@ -522,16 +524,16 @@ class
ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
is_udtf: bool = False,
):
super().__init__(
- timezone,
- safecheck,
- int_to_decimal_coercion_enabled,
- prefers_large_types,
- struct_in_pandas,
- ndarray_as_list,
- prefer_int_ext_dtype,
- df_for_struct,
- input_type,
- arrow_cast,
+ timezone=timezone,
+ safecheck=safecheck,
+ int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
+ prefers_large_types=prefers_large_types,
+ struct_in_pandas=struct_in_pandas,
+ ndarray_as_list=ndarray_as_list,
+ prefer_int_ext_dtype=prefer_int_ext_dtype,
+ df_for_struct=df_for_struct,
+ input_type=input_type,
+ arrow_cast=arrow_cast,
)
self._assign_cols_by_name = assign_cols_by_name
self._ignore_unexpected_complex_type_values =
ignore_unexpected_complex_type_values
@@ -594,6 +596,7 @@ class ArrowStreamArrowUDFSerializer(ArrowStreamSerializer):
def __init__(
self,
+ *,
safecheck,
arrow_cast,
):
@@ -655,6 +658,7 @@ class
ArrowBatchUDFSerializer(ArrowStreamArrowUDFSerializer):
def __init__(
self,
+ *,
safecheck: bool,
input_type: StructType,
int_to_decimal_coercion_enabled: bool,
@@ -751,6 +755,7 @@ class
ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer):
def __init__(
self,
+ *,
timezone,
safecheck,
input_type,
@@ -818,6 +823,7 @@ class
ArrowStreamAggArrowUDFSerializer(ArrowStreamArrowUDFSerializer):
class ArrowStreamAggPandasUDFSerializer(ArrowStreamPandasUDFSerializer):
def __init__(
self,
+ *,
timezone,
safecheck,
assign_cols_by_name,
@@ -873,6 +879,7 @@ class
ArrowStreamAggPandasUDFSerializer(ArrowStreamPandasUDFSerializer):
class GroupPandasUDFSerializer(ArrowStreamPandasUDFSerializer):
def __init__(
self,
+ *,
timezone,
safecheck,
assign_cols_by_name,
@@ -989,6 +996,7 @@ class
ApplyInPandasWithStateSerializer(ArrowStreamPandasUDFSerializer):
def __init__(
self,
+ *,
timezone,
safecheck,
assign_cols_by_name,
@@ -1388,6 +1396,7 @@ class
TransformWithStateInPandasSerializer(ArrowStreamPandasUDFSerializer):
def __init__(
self,
+ *,
timezone,
safecheck,
assign_cols_by_name,
@@ -1523,6 +1532,7 @@ class
TransformWithStateInPandasInitStateSerializer(TransformWithStateInPandasSe
def __init__(
self,
+ *,
timezone,
safecheck,
assign_cols_by_name,
@@ -1532,13 +1542,13 @@ class
TransformWithStateInPandasInitStateSerializer(TransformWithStateInPandasSe
int_to_decimal_coercion_enabled,
):
super().__init__(
- timezone,
- safecheck,
- assign_cols_by_name,
- prefer_int_ext_dtype,
- arrow_max_records_per_batch,
- arrow_max_bytes_per_batch,
- int_to_decimal_coercion_enabled,
+ timezone=timezone,
+ safecheck=safecheck,
+ assign_cols_by_name=assign_cols_by_name,
+ prefer_int_ext_dtype=prefer_int_ext_dtype,
+ arrow_max_records_per_batch=arrow_max_records_per_batch,
+ arrow_max_bytes_per_batch=arrow_max_bytes_per_batch,
+ int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
)
self.init_key_offsets = None
@@ -1678,7 +1688,7 @@ class
TransformWithStateInPySparkRowSerializer(ArrowStreamUDFSerializer):
Limit of the number of records that can be written to a single
ArrowRecordBatch in memory.
"""
- def __init__(self, arrow_max_records_per_batch):
+ def __init__(self, *, arrow_max_records_per_batch):
super().__init__()
self.arrow_max_records_per_batch = (
arrow_max_records_per_batch if arrow_max_records_per_batch > 0
else 2**31 - 1
@@ -1777,8 +1787,8 @@ class
TransformWithStateInPySparkRowInitStateSerializer(TransformWithStateInPySp
Same as input parameters in TransformWithStateInPySparkRowSerializer.
"""
- def __init__(self, arrow_max_records_per_batch):
- super().__init__(arrow_max_records_per_batch)
+ def __init__(self, *, arrow_max_records_per_batch):
+
super().__init__(arrow_max_records_per_batch=arrow_max_records_per_batch)
self.init_key_offsets = None
def load_stream(self, stream):
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 1892dcbf3bf6..84e275936fb8 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -1493,8 +1493,8 @@ def read_udtf(pickleSer, infile, eval_type, runner_conf):
if runner_conf.use_legacy_pandas_udtf_conversion:
# NOTE: if timezone is set here, that implies
respectSessionTimeZone is True
ser = ArrowStreamPandasUDTFSerializer(
- runner_conf.timezone,
- runner_conf.safecheck,
+ timezone=runner_conf.timezone,
+ safecheck=runner_conf.safecheck,
input_type=input_type,
prefer_int_ext_dtype=runner_conf.prefer_int_ext_dtype,
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
@@ -2685,7 +2685,7 @@ def read_udfs(pickleSer, infile, eval_type, runner_conf,
eval_conf):
eval_type == PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF
or eval_type == PythonEvalType.SQL_GROUPED_MAP_ARROW_ITER_UDF
):
- ser =
ArrowStreamGroupUDFSerializer(runner_conf.assign_cols_by_name)
+ ser =
ArrowStreamGroupUDFSerializer(assign_cols_by_name=runner_conf.assign_cols_by_name)
elif eval_type in (
PythonEvalType.SQL_GROUPED_AGG_ARROW_UDF,
PythonEvalType.SQL_GROUPED_AGG_ARROW_ITER_UDF,
@@ -2698,70 +2698,72 @@ def read_udfs(pickleSer, infile, eval_type,
runner_conf, eval_conf):
PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF,
):
ser = ArrowStreamAggPandasUDFSerializer(
- runner_conf.timezone,
- runner_conf.safecheck,
- runner_conf.assign_cols_by_name,
- runner_conf.prefer_int_ext_dtype,
- runner_conf.int_to_decimal_coercion_enabled,
+ timezone=runner_conf.timezone,
+ safecheck=runner_conf.safecheck,
+ assign_cols_by_name=runner_conf.assign_cols_by_name,
+ prefer_int_ext_dtype=runner_conf.prefer_int_ext_dtype,
+
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
)
elif (
eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF
or eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_ITER_UDF
):
ser = GroupPandasUDFSerializer(
- runner_conf.timezone,
- runner_conf.safecheck,
- runner_conf.assign_cols_by_name,
- runner_conf.prefer_int_ext_dtype,
- runner_conf.int_to_decimal_coercion_enabled,
+ timezone=runner_conf.timezone,
+ safecheck=runner_conf.safecheck,
+ assign_cols_by_name=runner_conf.assign_cols_by_name,
+ prefer_int_ext_dtype=runner_conf.prefer_int_ext_dtype,
+
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
)
elif eval_type == PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF:
- ser = CogroupArrowUDFSerializer(runner_conf.assign_cols_by_name)
+ ser =
CogroupArrowUDFSerializer(assign_cols_by_name=runner_conf.assign_cols_by_name)
elif eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF:
ser = CogroupPandasUDFSerializer(
- runner_conf.timezone,
- runner_conf.safecheck,
- runner_conf.assign_cols_by_name,
+ timezone=runner_conf.timezone,
+ safecheck=runner_conf.safecheck,
+ assign_cols_by_name=runner_conf.assign_cols_by_name,
prefer_int_ext_dtype=runner_conf.prefer_int_ext_dtype,
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
arrow_cast=True,
)
elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE:
ser = ApplyInPandasWithStateSerializer(
- runner_conf.timezone,
- runner_conf.safecheck,
- runner_conf.assign_cols_by_name,
- runner_conf.prefer_int_ext_dtype,
- eval_conf.state_value_schema,
- runner_conf.arrow_max_records_per_batch,
- runner_conf.use_large_var_types,
+ timezone=runner_conf.timezone,
+ safecheck=runner_conf.safecheck,
+ assign_cols_by_name=runner_conf.assign_cols_by_name,
+ prefer_int_ext_dtype=runner_conf.prefer_int_ext_dtype,
+ state_object_schema=eval_conf.state_value_schema,
+
arrow_max_records_per_batch=runner_conf.arrow_max_records_per_batch,
+ prefers_large_var_types=runner_conf.use_large_var_types,
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
)
elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_UDF:
ser = TransformWithStateInPandasSerializer(
- runner_conf.timezone,
- runner_conf.safecheck,
- runner_conf.assign_cols_by_name,
- runner_conf.prefer_int_ext_dtype,
- runner_conf.arrow_max_records_per_batch,
- runner_conf.arrow_max_bytes_per_batch,
+ timezone=runner_conf.timezone,
+ safecheck=runner_conf.safecheck,
+ assign_cols_by_name=runner_conf.assign_cols_by_name,
+ prefer_int_ext_dtype=runner_conf.prefer_int_ext_dtype,
+
arrow_max_records_per_batch=runner_conf.arrow_max_records_per_batch,
+
arrow_max_bytes_per_batch=runner_conf.arrow_max_bytes_per_batch,
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
)
elif eval_type ==
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_INIT_STATE_UDF:
ser = TransformWithStateInPandasInitStateSerializer(
- runner_conf.timezone,
- runner_conf.safecheck,
- runner_conf.assign_cols_by_name,
- runner_conf.prefer_int_ext_dtype,
- runner_conf.arrow_max_records_per_batch,
- runner_conf.arrow_max_bytes_per_batch,
+ timezone=runner_conf.timezone,
+ safecheck=runner_conf.safecheck,
+ assign_cols_by_name=runner_conf.assign_cols_by_name,
+ prefer_int_ext_dtype=runner_conf.prefer_int_ext_dtype,
+
arrow_max_records_per_batch=runner_conf.arrow_max_records_per_batch,
+
arrow_max_bytes_per_batch=runner_conf.arrow_max_bytes_per_batch,
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
)
elif eval_type ==
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_UDF:
- ser =
TransformWithStateInPySparkRowSerializer(runner_conf.arrow_max_records_per_batch)
+ ser = TransformWithStateInPySparkRowSerializer(
+
arrow_max_records_per_batch=runner_conf.arrow_max_records_per_batch
+ )
elif eval_type ==
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_INIT_STATE_UDF:
ser = TransformWithStateInPySparkRowInitStateSerializer(
- runner_conf.arrow_max_records_per_batch
+
arrow_max_records_per_batch=runner_conf.arrow_max_records_per_batch
)
elif eval_type == PythonEvalType.SQL_MAP_ARROW_ITER_UDF:
ser = ArrowStreamSerializer(write_start_stream=True)
@@ -2777,10 +2779,10 @@ def read_udfs(pickleSer, infile, eval_type,
runner_conf, eval_conf):
):
input_type =
_parse_datatype_json_string(utf8_deserializer.loads(infile))
ser = ArrowBatchUDFSerializer(
- runner_conf.safecheck,
- input_type,
- runner_conf.int_to_decimal_coercion_enabled,
- runner_conf.binary_as_bytes,
+ safecheck=runner_conf.safecheck,
+ input_type=input_type,
+
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
+ binary_as_bytes=runner_conf.binary_as_bytes,
)
else:
# Scalar Pandas UDF handles struct type arguments as pandas
DataFrames instead of
@@ -2803,15 +2805,15 @@ def read_udfs(pickleSer, infile, eval_type,
runner_conf, eval_conf):
)
ser = ArrowStreamPandasUDFSerializer(
- runner_conf.timezone,
- runner_conf.safecheck,
- runner_conf.assign_cols_by_name,
- df_for_struct,
- struct_in_pandas,
- ndarray_as_list,
- runner_conf.prefer_int_ext_dtype,
- True,
- input_type,
+ timezone=runner_conf.timezone,
+ safecheck=runner_conf.safecheck,
+ assign_cols_by_name=runner_conf.assign_cols_by_name,
+ df_for_struct=df_for_struct,
+ struct_in_pandas=struct_in_pandas,
+ ndarray_as_list=ndarray_as_list,
+ prefer_int_ext_dtype=runner_conf.prefer_int_ext_dtype,
+ arrow_cast=True,
+ input_type=input_type,
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
prefers_large_types=runner_conf.use_large_var_types,
)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]