This is an automated email from the ASF dual-hosted git repository.
zhengruifeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f40eccd6948a [SPARK-56837][PYTHON][TESTS] Pass ArrowBatchedUDF
benchmark input_type via EvalConf
f40eccd6948a is described below
commit f40eccd6948a9282fa62c3f9278abf0df264cb8f
Author: Yicong Huang <[email protected]>
AuthorDate: Wed May 13 14:59:14 2026 +0800
[SPARK-56837][PYTHON][TESTS] Pass ArrowBatchedUDF benchmark input_type via
EvalConf
### What changes were proposed in this pull request?
`_ArrowBatchedBenchMixin._write_scenario` in
`python/benchmarks/bench_eval_type.py` wrote the `input_type` schema JSON as a
length-prefixed UTF-8 string before the UDF payload. This was the old
wire-protocol shape. Since
[SPARK-56340](https://issues.apache.org/jira/browse/SPARK-56340) (move
input_type schema to eval conf), the worker reads `input_type` via `EvalConf`
instead, so the extra prefix gets parsed as the UDF count and the worker exits
with `UnicodeDecodeError` while reading [...]
This PR moves the schema to `eval_conf={"input_type": schema.json()}`,
matching the pattern already used by the `_ArrowTableUDFBenchMixin`.
### Why are the changes needed?
Running any `ArrowBatchedUDFTimeBench` / `ArrowBatchedUDFPeakmemBench` ASV
benchmark currently fails with:
```
File "pyspark/worker.py", line 3581, in main
init_info = WorkerInitInfo.from_stream(infile)
...
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa3 in position 353:
invalid start byte
```
The bench file is the only `SQL_ARROW_BATCHED_UDF` mock writer in the tree
and was missed when the worker protocol changed.
### Does this PR introduce _any_ user-facing change?
No. Test-only change.
### How was this patch tested?
Running both bench classes locally now succeeds. Numbers from one run:
```text
=== bench_eval_type.ArrowBatchedUDFTimeBench.time_worker ===
scenario identity_udf stringify_udf nullcheck_udf
sm_batch_few_col 44.3+/-0.3ms 46.9+/-0.3ms 45.0+/-0.4ms
sm_batch_many_col 112+/-0.7ms 113+/-1ms 112+/-0.5ms
lg_batch_few_col 106+/-0.7ms 113+/-2ms 106+/-0.4ms
lg_batch_many_col 448+/-1ms 449+/-0.3ms 447+/-3ms
pure_ints 157+/-1ms 162+/-1ms 156+/-2ms
pure_floats 148+/-0.2ms 170+/-1ms 149+/-2ms
pure_strings 302+/-0.5ms 305+/-3ms 295+/-0.7ms
mixed_types 226+/-0.9ms 230+/-1ms 222+/-0.9ms
=== bench_eval_type.ArrowBatchedUDFPeakmemBench.peakmem_worker ===
scenario identity_udf stringify_udf nullcheck_udf
sm_batch_few_col 464M 464M 464M
sm_batch_many_col 469M 469M 469M
lg_batch_few_col 469M 470M 469M
lg_batch_many_col 509M 510M 509M
pure_ints 469M 470M 469M
pure_floats 469M 470M 469M
pure_strings 473M 473M 473M
mixed_types 471M 471M 470M
```
Run commands:
```bash
COLUMNS=120 asv run --bench ArrowBatchedUDFTimeBench -a repeat=3
--python=same
COLUMNS=120 asv run --bench ArrowBatchedUDFPeakmemBench -a repeat=3
--python=same
```
Smoke-tested all 40 benchmark classes in the file (every other class still
passes; only the two ArrowBatched classes were broken).
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #55834 from
Yicong-Huang/SPARK-56837/fix/bench-arrow-batched-input-type.
Authored-by: Yicong Huang <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/benchmarks/bench_eval_type.py | 11 ++++-------
1 file changed, 4 insertions(+), 7 deletions(-)
diff --git a/python/benchmarks/bench_eval_type.py
b/python/benchmarks/bench_eval_type.py
index a4fda48dfcb1..14020d6f22ec 100644
--- a/python/benchmarks/bench_eval_type.py
+++ b/python/benchmarks/bench_eval_type.py
@@ -440,8 +440,8 @@ class _PeakmemBenchBase:
class _ArrowBatchedBenchMixin:
"""Provides ``_write_scenario`` for SQL_ARROW_BATCHED_UDF.
- Writes the extra ``input_type`` (StructType JSON) that the wire protocol
- requires before the UDF payload.
+ Passes the ``input_type`` (StructType JSON) through ``EvalConf``, which the
+ non-legacy and legacy paths both read via ``eval_conf.input_type``.
"""
# Row-by-row processing is ~100x slower than columnar Arrow UDFs,
@@ -486,15 +486,12 @@ class _ArrowBatchedBenchMixin:
if ret_type is None:
ret_type = schema.fields[0].dataType
- def write_udf(b):
- MockProtocolWriter.write_utf8(schema.json(), b)
- MockProtocolWriter.write_udf_payload(udf_func, ret_type,
arg_offsets, b)
-
MockProtocolWriter.write_worker_input(
PythonEvalType.SQL_ARROW_BATCHED_UDF,
- write_udf,
+ lambda b: MockProtocolWriter.write_udf_payload(udf_func, ret_type,
arg_offsets, b),
lambda b: MockProtocolWriter.write_data_payload(iter(batches), b),
buf,
+ eval_conf={"input_type": schema.json()},
)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]