This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bba8bb86f19a [SPARK-54598][PYTHON] Extract logic to read UDFs
bba8bb86f19a is described below
commit bba8bb86f19a4e37c94579b38e1f6542b484fcf5
Author: Yicong-Huang <[email protected]>
AuthorDate: Fri Dec 5 15:27:53 2025 +0800
[SPARK-54598][PYTHON] Extract logic to read UDFs
### What changes were proposed in this pull request?
This PR refactors the UDF reading logic in `read_udfs()` to eliminate code
duplication. Currently, the logic for reading UDFs (functions and their
argument offsets) is duplicated across multiple `eval_type` branches, with
different patterns for single UDF vs. multiple UDFs cases.
### Why are the changes needed?
This duplication makes the code harder to maintain and increases the risk
of inconsistencies. By centralizing the UDF reading logic at the beginning of
`read_udfs()`, we can:
- Reduce code duplication
- Ensure consistent UDF reading behavior across all eval types
- Make it easier to add new eval types in the future
### Does this PR introduce _any_ user-facing change?
No, this is an internal refactoring that maintains backward compatibility.
The API behavior remains the same from the user's perspective.
### How was this patch tested?
Existing Tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53330 from Yicong-Huang/SPARK-54598/refactor/udf-fetching-logic.
Authored-by: Yicong-Huang <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/worker.py | 56 ++++++++++++++----------------------------------
1 file changed, 16 insertions(+), 40 deletions(-)
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index c49b633b9408..50e71fb6da9d 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -2878,7 +2878,12 @@ def read_udfs(pickleSer, infile, eval_type):
else:
profiler = None
+ # Read all UDFs
num_udfs = read_int(infile)
+ udfs = [
+ read_single_udf(pickleSer, infile, eval_type, runner_conf,
udf_index=i, profiler=profiler)
+ for i in range(num_udfs)
+ ]
is_scalar_iter = eval_type in (
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
@@ -2896,9 +2901,7 @@ def read_udfs(pickleSer, infile, eval_type):
if is_map_arrow_iter:
assert num_udfs == 1, "One MAP_ARROW_ITER UDF expected here."
- arg_offsets, udf = read_single_udf(
- pickleSer, infile, eval_type, runner_conf, udf_index=0,
profiler=profiler
- )
+ arg_offsets, udf = udfs[0]
def func(_, iterator):
num_input_rows = 0
@@ -2994,9 +2997,7 @@ def read_udfs(pickleSer, infile, eval_type):
# See FlatMapGroupsInPandasExec for how arg_offsets are used to
# distinguish between grouping attributes and data attributes
- arg_offsets, f = read_single_udf(
- pickleSer, infile, eval_type, runner_conf, udf_index=0,
profiler=profiler
- )
+ arg_offsets, f = udfs[0]
parsed_offsets = extract_key_value_indexes(arg_offsets)
def mapper(series_iter):
@@ -3022,9 +3023,7 @@ def read_udfs(pickleSer, infile, eval_type):
# See TransformWithStateInPandasExec for how arg_offsets are used to
# distinguish between grouping attributes and data attributes
- arg_offsets, f = read_single_udf(
- pickleSer, infile, eval_type, runner_conf, udf_index=0,
profiler=profiler
- )
+ arg_offsets, f = udfs[0]
parsed_offsets = extract_key_value_indexes(arg_offsets)
ser.key_offsets = parsed_offsets[0][0]
stateful_processor_api_client =
StatefulProcessorApiClient(state_server_port, key_schema)
@@ -3053,9 +3052,7 @@ def read_udfs(pickleSer, infile, eval_type):
# See TransformWithStateInPandasExec for how arg_offsets are used to
# distinguish between grouping attributes and data attributes
- arg_offsets, f = read_single_udf(
- pickleSer, infile, eval_type, runner_conf, udf_index=0,
profiler=profiler
- )
+ arg_offsets, f = udfs[0]
# parsed offsets:
# [
# [groupingKeyOffsets, dedupDataOffsets],
@@ -3091,9 +3088,7 @@ def read_udfs(pickleSer, infile, eval_type):
# See TransformWithStateInPySparkExec for how arg_offsets are used to
# distinguish between grouping attributes and data attributes
- arg_offsets, f = read_single_udf(
- pickleSer, infile, eval_type, runner_conf, udf_index=0,
profiler=profiler
- )
+ arg_offsets, f = udfs[0]
parsed_offsets = extract_key_value_indexes(arg_offsets)
ser.key_offsets = parsed_offsets[0][0]
stateful_processor_api_client =
StatefulProcessorApiClient(state_server_port, key_schema)
@@ -3118,9 +3113,7 @@ def read_udfs(pickleSer, infile, eval_type):
# See TransformWithStateInPandasExec for how arg_offsets are used to
# distinguish between grouping attributes and data attributes
- arg_offsets, f = read_single_udf(
- pickleSer, infile, eval_type, runner_conf, udf_index=0,
profiler=profiler
- )
+ arg_offsets, f = udfs[0]
# parsed offsets:
# [
# [groupingKeyOffsets, dedupDataOffsets],
@@ -3156,9 +3149,7 @@ def read_udfs(pickleSer, infile, eval_type):
# See FlatMapGroupsInPandasExec for how arg_offsets are used to
# distinguish between grouping attributes and data attributes
- arg_offsets, f = read_single_udf(
- pickleSer, infile, eval_type, runner_conf, udf_index=0,
profiler=profiler
- )
+ arg_offsets, f = udfs[0]
parsed_offsets = extract_key_value_indexes(arg_offsets)
def batch_from_offset(batch, offsets):
@@ -3187,9 +3178,7 @@ def read_udfs(pickleSer, infile, eval_type):
# See FlatMapGroupsInPandas(WithState)Exec for how arg_offsets are
used to
# distinguish between grouping attributes and data attributes
- arg_offsets, f = read_single_udf(
- pickleSer, infile, eval_type, runner_conf, udf_index=0,
profiler=profiler
- )
+ arg_offsets, f = udfs[0]
parsed_offsets = extract_key_value_indexes(arg_offsets)
def mapper(a):
@@ -3223,9 +3212,7 @@ def read_udfs(pickleSer, infile, eval_type):
# We assume there is only one UDF here because cogrouped map doesn't
# support combining multiple UDFs.
assert num_udfs == 1
- arg_offsets, f = read_single_udf(
- pickleSer, infile, eval_type, runner_conf, udf_index=0,
profiler=profiler
- )
+ arg_offsets, f = udfs[0]
parsed_offsets = extract_key_value_indexes(arg_offsets)
@@ -3242,9 +3229,7 @@ def read_udfs(pickleSer, infile, eval_type):
# We assume there is only one UDF here because cogrouped map doesn't
# support combining multiple UDFs.
assert num_udfs == 1
- arg_offsets, f = read_single_udf(
- pickleSer, infile, eval_type, runner_conf, udf_index=0,
profiler=profiler
- )
+ arg_offsets, f = udfs[0]
parsed_offsets = extract_key_value_indexes(arg_offsets)
@@ -3269,9 +3254,7 @@ def read_udfs(pickleSer, infile, eval_type):
# support combining multiple UDFs.
assert num_udfs == 1
- arg_offsets, f = read_single_udf(
- pickleSer, infile, eval_type, runner_conf, udf_index=0,
profiler=profiler
- )
+ arg_offsets, f = udfs[0]
# Convert to iterator of batches: Iterator[pa.Array] for single column,
# or Iterator[Tuple[pa.Array, ...]] for multiple columns
@@ -3283,13 +3266,6 @@ def read_udfs(pickleSer, infile, eval_type):
return f(batch_iter)
else:
- udfs = []
- for i in range(num_udfs):
- udfs.append(
- read_single_udf(
- pickleSer, infile, eval_type, runner_conf, udf_index=i,
profiler=profiler
- )
- )
def mapper(a):
result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f
in udfs)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]