This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 587dfa4af2ea [SPARK-55947][PYTHON][TEST] Add ASV micro-benchmarks for
SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF
587dfa4af2ea is described below
commit 587dfa4af2ea7d40d2582d1f41fa825f6f2407ad
Author: Yicong Huang <[email protected]>
AuthorDate: Thu Mar 12 15:14:57 2026 +0800
[SPARK-55947][PYTHON][TEST] Add ASV micro-benchmarks for
SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF
### What changes were proposed in this pull request?
Add ASV micro-benchmarks for `SQL_GROUPED_MAP_ARROW_UDF` and
`SQL_GROUPED_MAP_ARROW_ITER_UDF` eval types.
### Why are the changes needed?
These benchmarks establish baseline performance measurements for grouped
map Arrow UDF eval types, enabling data-driven optimization of the PySpark
worker pipeline.
### Does this PR introduce _any_ user-facing change?
No. Benchmark-only change.
### How was this patch tested?
`COLUMNS=120 asv run --python=same --bench "GroupedMapArrow" --attribute
"repeat=(3,5,5.0)"`:
**GroupedMapArrowUDFTimeBench** (`SQL_GROUPED_MAP_ARROW_UDF`):
```
================ ============== ============ ============
-- udf
---------------- ----------------------------------------
scenario identity_udf sort_udf filter_udf
================ ============== ============ ============
few_groups_sm 13.3±0.06ms 37.5±0.1ms 20.0±0.1ms
few_groups_lg 80.4±0.8ms 302±0.8ms 110±0.9ms
many_groups_sm 449±10ms 691±20ms 613±10ms
many_groups_lg 226±5ms 694±9ms 330±7ms
wide_values 303±0.9ms 474±2ms 391±0.8ms
multi_key 93.0±0.6ms 211±1ms 121±1ms
================ ============== ============ ============
```
**GroupedMapArrowUDFPeakmemBench** (`SQL_GROUPED_MAP_ARROW_UDF`):
```
================ ============== ========== ============
-- udf
---------------- --------------------------------------
scenario identity_udf sort_udf filter_udf
================ ============== ========== ============
few_groups_sm 635M 639M 638M
few_groups_lg 698M 705M 704M
many_groups_sm 654M 656M 656M
many_groups_lg 753M 757M 758M
wide_values 755M 760M 760M
multi_key 662M 665M 665M
================ ============== ========== ============
```
**GroupedMapArrowIterUDFTimeBench** (`SQL_GROUPED_MAP_ARROW_ITER_UDF`):
```
================ ============== ========== ============
-- udf
---------------- --------------------------------------
scenario identity_udf sort_udf filter_udf
================ ============== ========== ============
few_groups_sm 13.1±0.3ms 37.6±3ms 19.6±0.2ms
few_groups_lg 79.6±0.4ms 303±5ms 111±2ms
many_groups_sm 440±9ms 673±20ms 584±10ms
many_groups_lg 231±7ms 689±40ms 323±0.7ms
wide_values 302±3ms 464±10ms 385±8ms
multi_key 92.0±0.2ms 211±4ms 118±0.8ms
================ ============== ========== ============
```
**GroupedMapArrowIterUDFPeakmemBench** (`SQL_GROUPED_MAP_ARROW_ITER_UDF`):
```
================ ============== ========== ============
-- udf
---------------- --------------------------------------
scenario identity_udf sort_udf filter_udf
================ ============== ========== ============
few_groups_sm 634M 638M 637M
few_groups_lg 698M 705M 704M
many_groups_sm 653M 656M 656M
many_groups_lg 753M 759M 758M
wide_values 755M 760M 760M
multi_key 662M 665M 664M
================ ============== ========== ============
```
Both eval types show comparable time and memory profiles across all
scenarios.
### Was this patch authored or co-authored using generative AI tooling?
Yes.
Closes #54743 from Yicong-Huang/SPARK-55947/bench/grouped-arrow-udf.
Authored-by: Yicong Huang <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/benchmarks/bench_eval_type.py | 241 +++++++++++++++++++++++++++++++++++
1 file changed, 241 insertions(+)
diff --git a/python/benchmarks/bench_eval_type.py
b/python/benchmarks/bench_eval_type.py
index 95be9498f2fb..3445022eec81 100644
--- a/python/benchmarks/bench_eval_type.py
+++ b/python/benchmarks/bench_eval_type.py
@@ -41,6 +41,7 @@ from pyspark.sql.types import (
BooleanType,
DoubleType,
IntegerType,
+ LongType,
StringType,
StructField,
StructType,
@@ -438,6 +439,246 @@ class
ArrowBatchedUDFPeakmemBench(_ArrowBatchedBenchMixin, _PeakmemBenchBase):
param_names = ["scenario", "udf"]
+# ---------------------------------------------------------------------------
+# Grouped Arrow UDF helpers
+# ---------------------------------------------------------------------------
+
+
+def _write_grouped_arrow_data(
+ groups: list[tuple[pa.RecordBatch, ...]],
+ num_dfs: int,
+ buf: io.BufferedIOBase,
+) -> None:
+ """Write grouped Arrow data in the wire protocol expected by
_load_group_dataframes."""
+ for group in groups:
+ write_int(num_dfs, buf)
+ for df_batch in group:
+ _write_arrow_ipc_batches(iter([df_batch]), buf)
+ write_int(0, buf) # end of groups
+
+
+def _make_grouped_arg_offsets(num_key_cols: int, num_value_cols: int) ->
list[int]:
+ """Build arg_offsets for grouped map UDFs.
+
+ Format: [total_len, num_keys, key_idx..., value_idx...]
+ All columns are in a single struct, so key indexes come first,
+ then value indexes.
+ """
+ total = num_key_cols + num_value_cols + 1 # +1 for the num_keys prefix
+ key_idxs = list(range(num_key_cols))
+ value_idxs = list(range(num_key_cols, num_key_cols + num_value_cols))
+ return [total, num_key_cols] + key_idxs + value_idxs
+
+
+def _make_grouped_batches(
+ num_groups: int,
+ rows_per_group: int,
+ num_key_cols: int,
+ num_value_cols: int,
+) -> tuple[list[tuple[pa.RecordBatch, ...]], StructType]:
+ """Create grouped data: each group is a single batch wrapped in a struct
column.
+
+ Returns (groups, return_type) where each group is a 1-tuple of a
+ struct-wrapped RecordBatch suitable for ArrowStreamGroupUDFSerializer.
+ """
+ n_cols = num_key_cols + num_value_cols
+ type_cycle = [
+ (lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)),
LongType()),
+ (lambda r: pa.array([f"s{j}" for j in range(r)]), StringType()),
+ (lambda r: pa.array(np.random.choice([True, False], r)),
BooleanType()),
+ (lambda r: pa.array(np.random.rand(r)), DoubleType()),
+ ]
+
+ groups = []
+ for g in range(num_groups):
+ arrays = [type_cycle[i % len(type_cycle)][0](rows_per_group) for i in
range(n_cols)]
+ names = [f"col_{i}" for i in range(n_cols)]
+ batch = pa.RecordBatch.from_arrays(arrays, names=names)
+ # Wrap in struct to match JVM-side encoding (flatten_struct undoes
this)
+ struct_array = pa.StructArray.from_arrays(batch.columns, names=names)
+ wrapped = pa.RecordBatch.from_arrays([struct_array], names=["_0"])
+ groups.append((wrapped,))
+
+ value_fields = [
+ StructField(f"col_{i}", type_cycle[i % len(type_cycle)][1])
+ for i in range(num_key_cols, n_cols)
+ ]
+ return_type = StructType(value_fields)
+
+ return groups, return_type
+
+
+# -- SQL_GROUPED_MAP_ARROW_UDF ------------------------------------------------
+# UDF receives (key: pa.RecordBatch, values: Iterator[pa.RecordBatch]),
+# returns Iterator[pa.RecordBatch].
+
+
+def _grouped_map_arrow_identity(table):
+ """Identity grouped map UDF: takes a pa.Table, returns it as-is."""
+ return table
+
+
+def _grouped_map_arrow_sort(table):
+ """Sort by first column."""
+ return table.sort_by([(table.column_names[0], "ascending")])
+
+
+def _grouped_map_arrow_filter(table):
+ """Filter rows where first column is valid."""
+ import pyarrow.compute as pc
+
+ return table.filter(pc.is_valid(table.column(0)))
+
+
+_GROUPED_MAP_ARROW_UDFS = {
+ "identity_udf": _grouped_map_arrow_identity,
+ "sort_udf": _grouped_map_arrow_sort,
+ "filter_udf": _grouped_map_arrow_filter,
+}
+
+
+def _build_grouped_map_arrow_scenarios():
+ """Build scenarios for SQL_GROUPED_MAP_ARROW_UDF.
+
+ Returns dict mapping name to (groups, return_type, arg_offsets).
+ """
+ scenarios = {}
+
+ for name, (num_groups, rows_per_group, num_key_cols, num_value_cols) in {
+ "few_groups_sm": (50, 5_000, 1, 4),
+ "few_groups_lg": (50, 50_000, 1, 4),
+ "many_groups_sm": (2_000, 500, 1, 4),
+ "many_groups_lg": (500, 10_000, 1, 4),
+ "wide_values": (200, 5_000, 1, 20),
+ "multi_key": (200, 5_000, 3, 5),
+ }.items():
+ groups, return_type = _make_grouped_batches(
+ num_groups, rows_per_group, num_key_cols, num_value_cols
+ )
+ arg_offsets = _make_grouped_arg_offsets(num_key_cols, num_value_cols)
+ scenarios[name] = (groups, return_type, arg_offsets)
+
+ return scenarios
+
+
+_GROUPED_MAP_ARROW_SCENARIOS = _build_grouped_map_arrow_scenarios()
+
+
+class _GroupedMapArrowBenchMixin:
+ """Provides _write_scenario for SQL_GROUPED_MAP_ARROW_UDF."""
+
+ def _write_scenario(self, scenario, udf_name, buf):
+ groups, return_type, arg_offsets = self._scenarios[scenario]
+ udf_func = self._udfs[udf_name]
+
+ def write_command(b):
+ # Grouped map uses a single UDF with grouped arg_offsets
+ write_int(1, b) # num_udfs
+ write_int(len(arg_offsets), b) # num_arg
+ for offset in arg_offsets:
+ write_int(offset, b)
+ _write_bool(False, b) # is_kwarg
+ write_int(1, b) # num_chained
+ command = cloudpickle_dumps((udf_func, return_type))
+ write_int(len(command), b)
+ b.write(command)
+ write_long(0, b) # result_id
+
+ _write_worker_input(
+ PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF,
+ write_command,
+ lambda b: _write_grouped_arrow_data(groups, num_dfs=1, buf=b),
+ buf,
+ )
+
+
+class GroupedMapArrowUDFTimeBench(_GroupedMapArrowBenchMixin, _TimeBenchBase):
+ _scenarios = _GROUPED_MAP_ARROW_SCENARIOS
+ _udfs = _GROUPED_MAP_ARROW_UDFS
+ params = [list(_GROUPED_MAP_ARROW_SCENARIOS),
list(_GROUPED_MAP_ARROW_UDFS)]
+ param_names = ["scenario", "udf"]
+
+
+class GroupedMapArrowUDFPeakmemBench(_GroupedMapArrowBenchMixin,
_PeakmemBenchBase):
+ _scenarios = _GROUPED_MAP_ARROW_SCENARIOS
+ _udfs = _GROUPED_MAP_ARROW_UDFS
+ params = [list(_GROUPED_MAP_ARROW_SCENARIOS),
list(_GROUPED_MAP_ARROW_UDFS)]
+ param_names = ["scenario", "udf"]
+
+
+# -- SQL_GROUPED_MAP_ARROW_ITER_UDF ------------------------------------------
+# UDF receives Iterator[pa.RecordBatch] per group,
+# returns Iterator[pa.RecordBatch].
+# Uses the same wire format and serializer as SQL_GROUPED_MAP_ARROW_UDF.
+
+
+def _grouped_map_arrow_iter_identity(batches):
+ """Identity grouped map iter UDF: yields each batch as-is."""
+ yield from batches
+
+
+def _grouped_map_arrow_iter_sort(batches):
+ """Sort each batch by first column."""
+ for batch in batches:
+ yield batch.sort_by([(batch.column_names[0], "ascending")])
+
+
+def _grouped_map_arrow_iter_filter(batches):
+ """Filter rows where first column is valid."""
+ import pyarrow.compute as pc
+
+ for batch in batches:
+ yield batch.filter(pc.is_valid(batch.column(0)))
+
+
+_GROUPED_MAP_ARROW_ITER_UDFS = {
+ "identity_udf": _grouped_map_arrow_iter_identity,
+ "sort_udf": _grouped_map_arrow_iter_sort,
+ "filter_udf": _grouped_map_arrow_iter_filter,
+}
+
+
+class _GroupedMapArrowIterBenchMixin:
+ """Provides _write_scenario for SQL_GROUPED_MAP_ARROW_ITER_UDF."""
+
+ def _write_scenario(self, scenario, udf_name, buf):
+ groups, return_type, arg_offsets = self._scenarios[scenario]
+ udf_func = self._udfs[udf_name]
+
+ def write_command(b):
+ write_int(1, b) # num_udfs
+ write_int(len(arg_offsets), b) # num_arg
+ for offset in arg_offsets:
+ write_int(offset, b)
+ _write_bool(False, b) # is_kwarg
+ write_int(1, b) # num_chained
+ command = cloudpickle_dumps((udf_func, return_type))
+ write_int(len(command), b)
+ b.write(command)
+ write_long(0, b) # result_id
+
+ _write_worker_input(
+ PythonEvalType.SQL_GROUPED_MAP_ARROW_ITER_UDF,
+ write_command,
+ lambda b: _write_grouped_arrow_data(groups, num_dfs=1, buf=b),
+ buf,
+ )
+
+
+class GroupedMapArrowIterUDFTimeBench(_GroupedMapArrowIterBenchMixin,
_TimeBenchBase):
+ _scenarios = _GROUPED_MAP_ARROW_SCENARIOS
+ _udfs = _GROUPED_MAP_ARROW_ITER_UDFS
+ params = [list(_GROUPED_MAP_ARROW_SCENARIOS),
list(_GROUPED_MAP_ARROW_ITER_UDFS)]
+ param_names = ["scenario", "udf"]
+
+
+class GroupedMapArrowIterUDFPeakmemBench(_GroupedMapArrowIterBenchMixin,
_PeakmemBenchBase):
+ _scenarios = _GROUPED_MAP_ARROW_SCENARIOS
+ _udfs = _GROUPED_MAP_ARROW_ITER_UDFS
+ params = [list(_GROUPED_MAP_ARROW_SCENARIOS),
list(_GROUPED_MAP_ARROW_ITER_UDFS)]
+ param_names = ["scenario", "udf"]
+
+
# -- SQL_MAP_ARROW_ITER_UDF ------------------------------------------------
# UDF receives ``Iterator[pa.RecordBatch]``, returns
``Iterator[pa.RecordBatch]``.
# Used by ``mapInArrow``.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]