This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 66949cc255f9 [SPARK-55724][PYTHON][TEST] Add ASV microbenchmark for
SQL_MAP_ARROW_ITER_UDF
66949cc255f9 is described below
commit 66949cc255f9c574ce82318b806e055831bd777a
Author: Yicong Huang <[email protected]>
AuthorDate: Tue Mar 10 21:11:02 2026 +0800
[SPARK-55724][PYTHON][TEST] Add ASV microbenchmark for
SQL_MAP_ARROW_ITER_UDF
### What changes were proposed in this pull request?
Add ASV microbenchmarks for `SQL_MAP_ARROW_ITER_UDF` (used by `mapInArrow`).
### Why are the changes needed?
Part of SPARK-55724.
### Does this PR introduce _any_ user-facing change?
No. Benchmark files only.
### How was this patch tested?
`COLUMNS=120 asv run --python=same --bench "MapArrowIter" --attribute
"repeat=(3,5,5.0)"`:
**MapArrowIterUDFTimeBench** (`SQL_MAP_ARROW_ITER_UDF`):
```
=================== ============== =========== ============
-- udf
------------------- ---------------------------------------
scenario identity_udf sort_udf filter_udf
=================== ============== =========== ============
sm_batch_few_col 45.4+-5ms 119+-6ms 78.7+-5ms
sm_batch_many_col 29.0+-3ms 56.1+-4ms 49.8+-4ms
lg_batch_few_col 178+-0.3ms 362+-0.9ms 311+-1ms
lg_batch_many_col 173+-0.2ms 347+-2ms 295+-2ms
pure_ints 90.6+-0.3ms 128+-0.5ms 114+-0.9ms
pure_floats 89.4+-0.3ms 206+-0.8ms 115+-3ms
pure_strings 96.5+-0.5ms 258+-0.4ms 186+-0.3ms
pure_ts 89.4+-0.1ms 132+-0.4ms 113+-1ms
mixed_types 81.2+-0.7ms 157+-0.8ms 148+-5ms
=================== ============== =========== ============
```
**MapArrowIterUDFPeakmemBench** (`SQL_MAP_ARROW_ITER_UDF`):
```
=================== ============== ========== ============
-- udf
------------------- --------------------------------------
scenario identity_udf sort_udf filter_udf
=================== ============== ========== ============
sm_batch_few_col 142M 145M 144M
sm_batch_many_col 142M 146M 146M
lg_batch_few_col 296M 299M 298M
lg_batch_many_col 298M 310M 309M
pure_ints 207M 209M 209M
pure_floats 207M 209M 209M
pure_strings 215M 219M 219M
pure_ts 207M 209M 209M
mixed_types 195M 197M 197M
=================== ============== ========== ============
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #54715 from Yicong-Huang/SPARK-55724/bench/map-arrow-iter-udf.
Authored-by: Yicong Huang <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/benchmarks/bench_eval_type.py | 123 +++++++++++++++++++++++++++++++++++
1 file changed, 123 insertions(+)
diff --git a/python/benchmarks/bench_eval_type.py
b/python/benchmarks/bench_eval_type.py
index 920b64a53e2a..95be9498f2fb 100644
--- a/python/benchmarks/bench_eval_type.py
+++ b/python/benchmarks/bench_eval_type.py
@@ -438,6 +438,129 @@ class
ArrowBatchedUDFPeakmemBench(_ArrowBatchedBenchMixin, _PeakmemBenchBase):
param_names = ["scenario", "udf"]
+# -- SQL_MAP_ARROW_ITER_UDF ------------------------------------------------
+# UDF receives ``Iterator[pa.RecordBatch]``, returns
``Iterator[pa.RecordBatch]``.
+# Used by ``mapInArrow``.
+
+
+def _identity_batch_iter(it):
+ yield from it
+
+
+def _sort_batch_iter(it):
+ import pyarrow.compute as pc
+
+ for batch in it:
+ indices = pc.sort_indices(batch.column(0))
+ yield batch.take(indices)
+
+
+def _filter_batch_iter(it):
+ import pyarrow.compute as pc
+
+ for batch in it:
+ mask = pc.is_valid(batch.column(0))
+ yield batch.filter(mask)
+
+
+_MAP_ARROW_ITER_UDFS = {
+ "identity_udf": (_identity_batch_iter, None, [0]),
+ "sort_udf": (_sort_batch_iter, None, [0]),
+ "filter_udf": (_filter_batch_iter, None, [0]),
+}
+
+
+def _build_map_arrow_iter_scenarios():
+ """Build scenarios for SQL_MAP_ARROW_ITER_UDF.
+
+ Same data shapes as non-grouped scenarios but with reduced batch counts
+ to account for the struct wrap/unwrap overhead per batch.
+ """
+ scenarios = {}
+
+ for name, (rows, n_cols, num_batches) in {
+ "sm_batch_few_col": (1_000, 5, 500),
+ "sm_batch_many_col": (1_000, 50, 50),
+ "lg_batch_few_col": (10_000, 5, 500),
+ "lg_batch_many_col": (10_000, 50, 50),
+ }.items():
+ batch, col0_type = _make_typed_batch(rows, n_cols)
+ scenarios[name] = (batch, num_batches, col0_type)
+
+ _PURE_ROWS, _PURE_COLS, _PURE_BATCHES = 5_000, 10, 200
+
+ for scenario_name, make_array, spark_type in [
+ (
+ "pure_ints",
+ lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)),
+ IntegerType(),
+ ),
+ ("pure_floats", lambda r: pa.array(np.random.rand(r)), DoubleType()),
+ ("pure_strings", lambda r: pa.array([f"s{j}" for j in range(r)]),
StringType()),
+ (
+ "pure_ts",
+ lambda r: pa.array(
+ np.arange(0, r, dtype="datetime64[us]"),
type=pa.timestamp("us", tz=None)
+ ),
+ TimestampNTZType(),
+ ),
+ ]:
+ batch = _make_pure_batch(_PURE_ROWS, _PURE_COLS, make_array,
spark_type)
+ scenarios[scenario_name] = (batch, _PURE_BATCHES, spark_type)
+
+ scenarios["mixed_types"] = (
+ _make_typed_batch(_PURE_ROWS, _PURE_COLS)[0],
+ _PURE_BATCHES,
+ IntegerType(),
+ )
+
+ return scenarios
+
+
+_MAP_ARROW_ITER_SCENARIOS = _build_map_arrow_iter_scenarios()
+
+
+def _wrap_batch_in_struct(batch: pa.RecordBatch) -> pa.RecordBatch:
+ """Wrap all columns into a single struct column, mimicking JVM-side
encoding."""
+ struct_array = pa.StructArray.from_arrays(batch.columns,
names=batch.schema.names)
+ return pa.RecordBatch.from_arrays([struct_array], names=["_0"])
+
+
+class _MapArrowIterBenchMixin:
+ """Provides ``_write_scenario`` for SQL_MAP_ARROW_ITER_UDF.
+
+ Like ``_NonGroupedBenchMixin`` but wraps input batches in a struct column
+ to match the JVM-side wire format (``flatten_struct`` undoes this).
+ """
+
+ def _write_scenario(self, scenario, udf_name, buf):
+ batch, num_batches, col0_type = self._scenarios[scenario]
+ udf_func, ret_type, arg_offsets = self._udfs[udf_name]
+ if ret_type is None:
+ ret_type = col0_type
+ wrapped = _wrap_batch_in_struct(batch)
+ _write_worker_input(
+ PythonEvalType.SQL_MAP_ARROW_ITER_UDF,
+ lambda b: _build_udf_payload(udf_func, ret_type, arg_offsets, b),
+ lambda b: _write_arrow_ipc_batches((wrapped for _ in
range(num_batches)), b),
+ buf,
+ )
+
+
+class MapArrowIterUDFTimeBench(_MapArrowIterBenchMixin, _TimeBenchBase):
+ _scenarios = _MAP_ARROW_ITER_SCENARIOS
+ _udfs = _MAP_ARROW_ITER_UDFS
+ params = [list(_MAP_ARROW_ITER_SCENARIOS), list(_MAP_ARROW_ITER_UDFS)]
+ param_names = ["scenario", "udf"]
+
+
+class MapArrowIterUDFPeakmemBench(_MapArrowIterBenchMixin, _PeakmemBenchBase):
+ _scenarios = _MAP_ARROW_ITER_SCENARIOS
+ _udfs = _MAP_ARROW_ITER_UDFS
+ params = [list(_MAP_ARROW_ITER_SCENARIOS), list(_MAP_ARROW_ITER_UDFS)]
+ param_names = ["scenario", "udf"]
+
+
# -- SQL_SCALAR_ARROW_UDF ---------------------------------------------------
# UDF receives individual ``pa.Array`` columns, returns a ``pa.Array``.
# All UDFs operate on arg_offsets=[0] so they work with any column type.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]