This is an automated email from the ASF dual-hosted git repository.

zhengruifeng pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 3daea05df607 [SPARK-56837][PYTHON][TESTS] Pass ArrowBatchedUDF 
benchmark input_type via EvalConf
3daea05df607 is described below

commit 3daea05df607ae3e31e4a9f2295323f7e253fdff
Author: Yicong Huang <[email protected]>
AuthorDate: Wed May 13 14:59:14 2026 +0800

    [SPARK-56837][PYTHON][TESTS] Pass ArrowBatchedUDF benchmark input_type via 
EvalConf
    
    ### What changes were proposed in this pull request?
    
    `_ArrowBatchedBenchMixin._write_scenario` in 
`python/benchmarks/bench_eval_type.py` wrote the `input_type` schema JSON as a 
length-prefixed UTF-8 string before the UDF payload. This was the old 
wire-protocol shape. Since 
[SPARK-56340](https://issues.apache.org/jira/browse/SPARK-56340) (move 
input_type schema to eval conf), the worker reads `input_type` via `EvalConf` 
instead, so the extra prefix gets parsed as the UDF count and the worker exits 
with `UnicodeDecodeError` while reading  [...]
    
    This PR moves the schema to `eval_conf={"input_type": schema.json()}`, 
matching the pattern already used by the `_ArrowTableUDFBenchMixin`.
    
    ### Why are the changes needed?
    
    Running any `ArrowBatchedUDFTimeBench` / `ArrowBatchedUDFPeakmemBench` ASV 
benchmark currently fails with:
    
    ```
    File "pyspark/worker.py", line 3581, in main
        init_info = WorkerInitInfo.from_stream(infile)
      ...
    UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa3 in position 353: 
invalid start byte
    ```
    
    The bench file is the only `SQL_ARROW_BATCHED_UDF` mock writer in the tree 
and was missed when the worker protocol changed.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. Test-only change.
    
    ### How was this patch tested?
    
    Running both bench classes locally now succeeds. Numbers from one run:
    
    ```text
    === bench_eval_type.ArrowBatchedUDFTimeBench.time_worker ===
    scenario             identity_udf   stringify_udf   nullcheck_udf
    sm_batch_few_col      44.3+/-0.3ms    46.9+/-0.3ms    45.0+/-0.4ms
    sm_batch_many_col     112+/-0.7ms     113+/-1ms       112+/-0.5ms
    lg_batch_few_col      106+/-0.7ms     113+/-2ms       106+/-0.4ms
    lg_batch_many_col     448+/-1ms       449+/-0.3ms     447+/-3ms
    pure_ints             157+/-1ms       162+/-1ms       156+/-2ms
    pure_floats           148+/-0.2ms     170+/-1ms       149+/-2ms
    pure_strings          302+/-0.5ms     305+/-3ms       295+/-0.7ms
    mixed_types           226+/-0.9ms     230+/-1ms       222+/-0.9ms
    
    === bench_eval_type.ArrowBatchedUDFPeakmemBench.peakmem_worker ===
    scenario             identity_udf   stringify_udf   nullcheck_udf
    sm_batch_few_col      464M           464M            464M
    sm_batch_many_col     469M           469M            469M
    lg_batch_few_col      469M           470M            469M
    lg_batch_many_col     509M           510M            509M
    pure_ints             469M           470M            469M
    pure_floats           469M           470M            469M
    pure_strings          473M           473M            473M
    mixed_types           471M           471M            470M
    ```
    
    Run commands:
    
    ```bash
    COLUMNS=120 asv run --bench ArrowBatchedUDFTimeBench   -a repeat=3 
--python=same
    COLUMNS=120 asv run --bench ArrowBatchedUDFPeakmemBench -a repeat=3 
--python=same
    ```
    
    Smoke-tested all 40 benchmark classes in the file (every other class still 
passes; only the two ArrowBatched classes were broken).
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #55834 from 
Yicong-Huang/SPARK-56837/fix/bench-arrow-batched-input-type.
    
    Authored-by: Yicong Huang <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
    (cherry picked from commit f40eccd6948a9282fa62c3f9278abf0df264cb8f)
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/benchmarks/bench_eval_type.py | 11 ++++-------
 1 file changed, 4 insertions(+), 7 deletions(-)

diff --git a/python/benchmarks/bench_eval_type.py 
b/python/benchmarks/bench_eval_type.py
index a4fda48dfcb1..14020d6f22ec 100644
--- a/python/benchmarks/bench_eval_type.py
+++ b/python/benchmarks/bench_eval_type.py
@@ -440,8 +440,8 @@ class _PeakmemBenchBase:
 class _ArrowBatchedBenchMixin:
     """Provides ``_write_scenario`` for SQL_ARROW_BATCHED_UDF.
 
-    Writes the extra ``input_type`` (StructType JSON) that the wire protocol
-    requires before the UDF payload.
+    Passes the ``input_type`` (StructType JSON) through ``EvalConf``, which the
+    non-legacy and legacy paths both read via ``eval_conf.input_type``.
     """
 
     # Row-by-row processing is ~100x slower than columnar Arrow UDFs,
@@ -486,15 +486,12 @@ class _ArrowBatchedBenchMixin:
         if ret_type is None:
             ret_type = schema.fields[0].dataType
 
-        def write_udf(b):
-            MockProtocolWriter.write_utf8(schema.json(), b)
-            MockProtocolWriter.write_udf_payload(udf_func, ret_type, 
arg_offsets, b)
-
         MockProtocolWriter.write_worker_input(
             PythonEvalType.SQL_ARROW_BATCHED_UDF,
-            write_udf,
+            lambda b: MockProtocolWriter.write_udf_payload(udf_func, ret_type, 
arg_offsets, b),
             lambda b: MockProtocolWriter.write_data_payload(iter(batches), b),
             buf,
+            eval_conf={"input_type": schema.json()},
         )
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to