This is an automated email from the ASF dual-hosted git repository.
zhengruifeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 74934af1b888 [SPARK-56717][PYTHON][TESTS] Add ASV microbenchmark for
SQL_ARROW_TABLE_UDF
74934af1b888 is described below
commit 74934af1b888400dc24f9b2967d0f11295fdef2d
Author: Yicong Huang <[email protected]>
AuthorDate: Tue May 12 12:05:36 2026 +0800
[SPARK-56717][PYTHON][TESTS] Add ASV microbenchmark for SQL_ARROW_TABLE_UDF
### What changes were proposed in this pull request?
Add an ASV micro-benchmark for `SQL_ARROW_TABLE_UDF` (Python UDTF with
`useArrow=True`) eval type to `bench_eval_type.py`.
The new benchmark drives the worker through the UDTF wire protocol via the
existing `write_arrow_udtf_payload` helper, and threads `input_type` through
`EvalConf` so the non-legacy Arrow code path is exercised. Adds
`connInfo`/`secret` to the preamble `TaskContextInfo` JSON (required after the
SPARK-56519 worker-protocol refactor).
UDTFs covered: `identity_udtf` (1->1), `explode_udtf` (1->3), `filter_udtf`
(1->0/1), `stringify_udtf` (1->1, type change). Row counts are scaled down vs
`SQL_ARROW_BATCHED_UDF` because the worker calls
`LocalDataToArrowConversion.convert` once per input row in this path.
### Why are the changes needed?
Part of [SPARK-55724](https://issues.apache.org/jira/browse/SPARK-55724).
Establishes a performance baseline before refactoring `SQL_ARROW_TABLE_UDF`.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
`COLUMNS=120 ./python/asv run --python=same --bench "ArrowTableUDF" -a
"repeat=(3,5,5.0)"` (one of two stable runs):
`ArrowTableUDFTimeBench`:
```text
=================== =============== ============== =============
================
-- udtf
-------------------
-------------------------------------------------------------
scenario identity_udtf explode_udtf filter_udtf
stringify_udtf
=================== =============== ============== =============
================
sm_batch_few_col 120+/-0.2ms 122+/-0.3ms 94.0+/-0.4ms
119+/-0.2ms
sm_batch_many_col 40.1+/-0.3ms 40.4+/-0.2ms 33.4+/-0.3ms
39.9+/-0.2ms
lg_batch_few_col 299+/-0.5ms 304+/-1ms 232+/-1ms
297+/-0.6ms
lg_batch_many_col 157+/-0.1ms 158+/-0.7ms 129+/-0.4ms
155+/-0.4ms
pure_ints 306+/-2ms 309+/-1ms 237+/-0.4ms
304+/-2ms
pure_strings 313+/-0.6ms 319+/-1ms 249+/-1ms
313+/-0.7ms
=================== =============== ============== =============
================
```
`ArrowTableUDFPeakmemBench`:
```text
=================== =============== ============== =============
================
-- udtf
-------------------
-------------------------------------------------------------
scenario identity_udtf explode_udtf filter_udtf
stringify_udtf
=================== =============== ============== =============
================
sm_batch_few_col 464M 464M 464M 464M
sm_batch_many_col 464M 464M 465M 464M
lg_batch_few_col 465M 465M 465M 465M
lg_batch_many_col 468M 468M 468M 468M
pure_ints 465M 465M 465M 465M
pure_strings 465M 465M 465M 465M
=================== =============== ============== =============
================
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #55673 from Yicong-Huang/SPARK-56717.
Authored-by: Yicong Huang <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/benchmarks/bench_eval_type.py | 98 ++++++++++++++++++++++++++++++++++++
1 file changed, 98 insertions(+)
diff --git a/python/benchmarks/bench_eval_type.py
b/python/benchmarks/bench_eval_type.py
index 5b648f8e3aaf..a4fda48dfcb1 100644
--- a/python/benchmarks/bench_eval_type.py
+++ b/python/benchmarks/bench_eval_type.py
@@ -109,6 +109,8 @@ class MockProtocolWriter:
json.dumps(
{
"isBarrier": False,
+ "connInfo": None,
+ "secret": None,
"stageId": 0,
"partitionId": 0,
"attemptNumber": 0,
@@ -608,6 +610,102 @@ class ArrowUDTFPeakmemBench(_ArrowUDTFBenchMixin,
_PeakmemBenchBase):
pass
+# -- SQL_ARROW_TABLE_UDF ----------------------------------------------------
+# Python UDTF (``@udtf(useArrow=True)``): handler is a class with ``eval(self,
*args)``
+# that yields output rows. Each input row triggers one ``eval`` call; yielded
rows
+# are converted to Arrow via ``LocalDataToArrowConversion``.
+
+
+class _ArrowTableUDFIdentity:
+ def eval(self, x):
+ yield (x,)
+
+
+class _ArrowTableUDFExplode:
+ def eval(self, x):
+ for _ in range(3):
+ yield (x,)
+
+
+class _ArrowTableUDFFilter:
+ def eval(self, x):
+ if x is not None and (hash(x) & 1):
+ yield (x,)
+
+
+class _ArrowTableUDFStringify:
+ def eval(self, x):
+ yield (str(x),)
+
+
+class _ArrowTableUDFBenchMixin:
+ """Provides ``_write_scenario`` for SQL_ARROW_TABLE_UDF (Python UDTF,
useArrow=True).
+
+ Writes the extra ``input_type`` (StructType JSON) into ``EvalConf`` that
the
+ non-legacy path requires, and uses the UDTF wire protocol (no
num_udfs/result_id).
+ """
+
+ # Per-input-row ``LocalDataToArrowConversion.convert`` call makes this path
+ # ~15-20x slower than SQL_ARROW_BATCHED_UDF, so row counts are scaled down
+ # accordingly to keep each measurement under ASV's per-sample budget.
+ _scenario_configs = {
+ "sm_batch_few_col": ("mixed", 2_000, 5, 500),
+ "sm_batch_many_col": ("mixed", 500, 50, 500),
+ "lg_batch_few_col": ("mixed", 5_000, 5, 2_500),
+ "lg_batch_many_col": ("mixed", 2_000, 50, 2_000),
+ "pure_ints": ("pure_ints", 5_000, 10, 2_500),
+ "pure_strings": ("pure_strings", 5_000, 10, 2_500),
+ }
+
+ @staticmethod
+ def _build_scenario(name):
+ np.random.seed(42)
+ type_key, num_rows, num_cols, batch_size =
_ArrowTableUDFBenchMixin._scenario_configs[name]
+ pool = MockDataFactory.NAMED_TYPE_POOLS[type_key]
+ return MockDataFactory.make_batches(
+ num_rows=num_rows,
+ num_cols=num_cols,
+ spark_type_pool=pool,
+ batch_size=batch_size,
+ )
+
+ # Each entry: (handler_class, return_type_or_None, arg_offsets).
+ # ``None`` return_type means "use input column 0's type".
+ _udtfs = {
+ "identity_udtf": (_ArrowTableUDFIdentity, None, [0]),
+ "explode_udtf": (_ArrowTableUDFExplode, None, [0]),
+ "filter_udtf": (_ArrowTableUDFFilter, None, [0]),
+ "stringify_udtf": (_ArrowTableUDFStringify, StringType(), [0]),
+ }
+ params = [list(_scenario_configs), list(_udtfs)]
+ param_names = ["scenario", "udtf"]
+
+ def _write_scenario(self, scenario, udtf_name, buf):
+ batches, schema = self._build_scenario(scenario)
+ handler, ret_type, arg_offsets = self._udtfs[udtf_name]
+ if ret_type is None:
+ ret_type = schema.fields[0].dataType
+ return_type = StructType([StructField("c0", ret_type)])
+
+ MockProtocolWriter.write_worker_input(
+ PythonEvalType.SQL_ARROW_TABLE_UDF,
+ lambda b: MockProtocolWriter.write_arrow_udtf_payload(
+ handler, return_type, arg_offsets, b
+ ),
+ lambda b: MockProtocolWriter.write_data_payload(iter(batches), b),
+ buf,
+ eval_conf={"input_type": schema.json()},
+ )
+
+
+class ArrowTableUDFTimeBench(_ArrowTableUDFBenchMixin, _TimeBenchBase):
+ pass
+
+
+class ArrowTableUDFPeakmemBench(_ArrowTableUDFBenchMixin, _PeakmemBenchBase):
+ pass
+
+
# -- SQL_COGROUPED_MAP_ARROW_UDF
------------------------------------------------
# UDF receives two ``pa.Table`` (left, right) per co-group, returns
``pa.Table``.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]