This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bba8bb86f19a [SPARK-54598][PYTHON] Extract logic to read UDFs
bba8bb86f19a is described below

commit bba8bb86f19a4e37c94579b38e1f6542b484fcf5
Author: Yicong-Huang <[email protected]>
AuthorDate: Fri Dec 5 15:27:53 2025 +0800

    [SPARK-54598][PYTHON] Extract logic to read UDFs
    
    ### What changes were proposed in this pull request?
    This PR refactors the UDF reading logic in `read_udfs()` to eliminate code 
duplication. Currently, the logic for reading UDFs (functions and their 
argument offsets) is duplicated across multiple `eval_type` branches, with 
different patterns for single UDF vs. multiple UDFs cases.
    
    ### Why are the changes needed?
    
    This duplication makes the code harder to maintain and increases the risk 
of inconsistencies. By centralizing the UDF reading logic at the beginning of 
`read_udfs()`, we can:
    - Reduce code duplication
    - Ensure consistent UDF reading behavior across all eval types
    - Make it easier to add new eval types in the future
    
    ### Does this PR introduce _any_ user-facing change?
    No, this is an internal refactoring that maintains backward compatibility. 
The API behavior remains the same from the user's perspective.
    
    ### How was this patch tested?
    Existing Tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #53330 from Yicong-Huang/SPARK-54598/refactor/udf-fetching-logic.
    
    Authored-by: Yicong-Huang <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/worker.py | 56 ++++++++++++++----------------------------------
 1 file changed, 16 insertions(+), 40 deletions(-)

diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index c49b633b9408..50e71fb6da9d 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -2878,7 +2878,12 @@ def read_udfs(pickleSer, infile, eval_type):
     else:
         profiler = None
 
+    # Read all UDFs
     num_udfs = read_int(infile)
+    udfs = [
+        read_single_udf(pickleSer, infile, eval_type, runner_conf, 
udf_index=i, profiler=profiler)
+        for i in range(num_udfs)
+    ]
 
     is_scalar_iter = eval_type in (
         PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
@@ -2896,9 +2901,7 @@ def read_udfs(pickleSer, infile, eval_type):
         if is_map_arrow_iter:
             assert num_udfs == 1, "One MAP_ARROW_ITER UDF expected here."
 
-        arg_offsets, udf = read_single_udf(
-            pickleSer, infile, eval_type, runner_conf, udf_index=0, 
profiler=profiler
-        )
+        arg_offsets, udf = udfs[0]
 
         def func(_, iterator):
             num_input_rows = 0
@@ -2994,9 +2997,7 @@ def read_udfs(pickleSer, infile, eval_type):
 
         # See FlatMapGroupsInPandasExec for how arg_offsets are used to
         # distinguish between grouping attributes and data attributes
-        arg_offsets, f = read_single_udf(
-            pickleSer, infile, eval_type, runner_conf, udf_index=0, 
profiler=profiler
-        )
+        arg_offsets, f = udfs[0]
         parsed_offsets = extract_key_value_indexes(arg_offsets)
 
         def mapper(series_iter):
@@ -3022,9 +3023,7 @@ def read_udfs(pickleSer, infile, eval_type):
 
         # See TransformWithStateInPandasExec for how arg_offsets are used to
         # distinguish between grouping attributes and data attributes
-        arg_offsets, f = read_single_udf(
-            pickleSer, infile, eval_type, runner_conf, udf_index=0, 
profiler=profiler
-        )
+        arg_offsets, f = udfs[0]
         parsed_offsets = extract_key_value_indexes(arg_offsets)
         ser.key_offsets = parsed_offsets[0][0]
         stateful_processor_api_client = 
StatefulProcessorApiClient(state_server_port, key_schema)
@@ -3053,9 +3052,7 @@ def read_udfs(pickleSer, infile, eval_type):
 
         # See TransformWithStateInPandasExec for how arg_offsets are used to
         # distinguish between grouping attributes and data attributes
-        arg_offsets, f = read_single_udf(
-            pickleSer, infile, eval_type, runner_conf, udf_index=0, 
profiler=profiler
-        )
+        arg_offsets, f = udfs[0]
         # parsed offsets:
         # [
         #     [groupingKeyOffsets, dedupDataOffsets],
@@ -3091,9 +3088,7 @@ def read_udfs(pickleSer, infile, eval_type):
 
         # See TransformWithStateInPySparkExec for how arg_offsets are used to
         # distinguish between grouping attributes and data attributes
-        arg_offsets, f = read_single_udf(
-            pickleSer, infile, eval_type, runner_conf, udf_index=0, 
profiler=profiler
-        )
+        arg_offsets, f = udfs[0]
         parsed_offsets = extract_key_value_indexes(arg_offsets)
         ser.key_offsets = parsed_offsets[0][0]
         stateful_processor_api_client = 
StatefulProcessorApiClient(state_server_port, key_schema)
@@ -3118,9 +3113,7 @@ def read_udfs(pickleSer, infile, eval_type):
 
         # See TransformWithStateInPandasExec for how arg_offsets are used to
         # distinguish between grouping attributes and data attributes
-        arg_offsets, f = read_single_udf(
-            pickleSer, infile, eval_type, runner_conf, udf_index=0, 
profiler=profiler
-        )
+        arg_offsets, f = udfs[0]
         # parsed offsets:
         # [
         #     [groupingKeyOffsets, dedupDataOffsets],
@@ -3156,9 +3149,7 @@ def read_udfs(pickleSer, infile, eval_type):
 
         # See FlatMapGroupsInPandasExec for how arg_offsets are used to
         # distinguish between grouping attributes and data attributes
-        arg_offsets, f = read_single_udf(
-            pickleSer, infile, eval_type, runner_conf, udf_index=0, 
profiler=profiler
-        )
+        arg_offsets, f = udfs[0]
         parsed_offsets = extract_key_value_indexes(arg_offsets)
 
         def batch_from_offset(batch, offsets):
@@ -3187,9 +3178,7 @@ def read_udfs(pickleSer, infile, eval_type):
 
         # See FlatMapGroupsInPandas(WithState)Exec for how arg_offsets are 
used to
         # distinguish between grouping attributes and data attributes
-        arg_offsets, f = read_single_udf(
-            pickleSer, infile, eval_type, runner_conf, udf_index=0, 
profiler=profiler
-        )
+        arg_offsets, f = udfs[0]
         parsed_offsets = extract_key_value_indexes(arg_offsets)
 
         def mapper(a):
@@ -3223,9 +3212,7 @@ def read_udfs(pickleSer, infile, eval_type):
         # We assume there is only one UDF here because cogrouped map doesn't
         # support combining multiple UDFs.
         assert num_udfs == 1
-        arg_offsets, f = read_single_udf(
-            pickleSer, infile, eval_type, runner_conf, udf_index=0, 
profiler=profiler
-        )
+        arg_offsets, f = udfs[0]
 
         parsed_offsets = extract_key_value_indexes(arg_offsets)
 
@@ -3242,9 +3229,7 @@ def read_udfs(pickleSer, infile, eval_type):
         # We assume there is only one UDF here because cogrouped map doesn't
         # support combining multiple UDFs.
         assert num_udfs == 1
-        arg_offsets, f = read_single_udf(
-            pickleSer, infile, eval_type, runner_conf, udf_index=0, 
profiler=profiler
-        )
+        arg_offsets, f = udfs[0]
 
         parsed_offsets = extract_key_value_indexes(arg_offsets)
 
@@ -3269,9 +3254,7 @@ def read_udfs(pickleSer, infile, eval_type):
         # support combining multiple UDFs.
         assert num_udfs == 1
 
-        arg_offsets, f = read_single_udf(
-            pickleSer, infile, eval_type, runner_conf, udf_index=0, 
profiler=profiler
-        )
+        arg_offsets, f = udfs[0]
 
         # Convert to iterator of batches: Iterator[pa.Array] for single column,
         # or Iterator[Tuple[pa.Array, ...]] for multiple columns
@@ -3283,13 +3266,6 @@ def read_udfs(pickleSer, infile, eval_type):
             return f(batch_iter)
 
     else:
-        udfs = []
-        for i in range(num_udfs):
-            udfs.append(
-                read_single_udf(
-                    pickleSer, infile, eval_type, runner_conf, udf_index=i, 
profiler=profiler
-                )
-            )
 
         def mapper(a):
             result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f 
in udfs)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to