This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2a5c0df21a5f [SPARK-55726][PYTHON][TEST][FOLLOW-UP] Make
SQL_GROUPED_MAP_PANDAS_UDF benchmark to two bench classes
2a5c0df21a5f is described below
commit 2a5c0df21a5f5096d5f7536ec3908b07490f7c22
Author: Yicong Huang <[email protected]>
AuthorDate: Thu Mar 12 17:05:11 2026 +0800
[SPARK-55726][PYTHON][TEST][FOLLOW-UP] Make SQL_GROUPED_MAP_PANDAS_UDF
benchmark to two bench classes
### What changes were proposed in this pull request?
Follow-up to #54533 (SPARK-55726). The `SQL_GROUPED_MAP_PANDAS_UDF`
benchmark was accidentally dropped when #54555 (SPARK-55754) rewrote
`bench_eval_type.py`.
This PR restores it using the two-class pattern (`TimeBench` +
`PeakmemBench`) consistent with the other eval type benchmarks.
### Why are the changes needed?
The benchmark was merged in #54533 but lost when #54555 rewrote the file.
### Does this PR introduce _any_ user-facing change?
No. Benchmark files only.
### How was this patch tested?
`COLUMNS=120 asv run --python=same --bench "GroupedMapPandas" --attribute
"repeat=(3,5,5.0)"`:
**GroupedMapPandasUDFTimeBench**:
```
================= ============== =========== ==================
-- udf
----------------- ---------------------------------------------
scenario identity_udf sort_udf key_identity_udf
================= ============== =========== ==================
sm_grp_few_col 427+-1ms 465+-0.8ms 392+-1ms
sm_grp_many_col 353+-20ms 342+-2ms 329+-1ms
lg_grp_few_col 260+-9ms 308+-2ms 221+-1ms
lg_grp_many_col 396+-20ms 479+-30ms 417+-20ms
mixed_types 453+-0.8ms 488+-5ms 405+-4ms
================= ============== =========== ==================
```
**GroupedMapPandasUDFPeakmemBench**:
```
================= ============== ========== ==================
-- udf
----------------- --------------------------------------------
scenario identity_udf sort_udf key_identity_udf
================= ============== ========== ==================
sm_grp_few_col 157M 158M 156M
sm_grp_many_col 164M 165M 164M
lg_grp_few_col 279M 284M 264M
lg_grp_many_col 596M 639M 591M
mixed_types 149M 150M 149M
================= ============== ========== ==================
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #54716 from
Yicong-Huang/SPARK-55726/bench/grouped-map-pandas-followup.
Authored-by: Yicong Huang <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/benchmarks/bench_eval_type.py | 136 ++++++++++++++++++++++++++++++++++-
1 file changed, 135 insertions(+), 1 deletion(-)
diff --git a/python/benchmarks/bench_eval_type.py
b/python/benchmarks/bench_eval_type.py
index 3445022eec81..9569eeacf23f 100644
--- a/python/benchmarks/bench_eval_type.py
+++ b/python/benchmarks/bench_eval_type.py
@@ -179,6 +179,50 @@ def _make_typed_batch(rows: int, n_cols: int) ->
tuple[pa.RecordBatch, IntegerTy
)
+def _make_grouped_batch(rows_per_group, n_cols):
+ """``group_key (int64)`` + ``(n_cols - 1)`` float32 value columns."""
+ arrays = [pa.array(np.zeros(rows_per_group, dtype=np.int64))] + [
+ pa.array(np.random.rand(rows_per_group).astype(np.float32)) for _ in
range(n_cols - 1)
+ ]
+ fields = [StructField("group_key", IntegerType())] + [
+ StructField(f"val_{i}", DoubleType()) for i in range(n_cols - 1)
+ ]
+ return (
+ pa.RecordBatch.from_arrays(arrays, names=[f.name for f in fields]),
+ StructType(fields),
+ )
+
+
+def _make_mixed_batch(rows_per_group):
+ """``id``, ``str_col``, ``float_col``, ``double_col``, ``long_col``."""
+ arrays = [
+ pa.array(np.zeros(rows_per_group, dtype=np.int64)),
+ pa.array([f"s{j}" for j in range(rows_per_group)]),
+ pa.array(np.random.rand(rows_per_group).astype(np.float32)),
+ pa.array(np.random.rand(rows_per_group)),
+ pa.array(np.zeros(rows_per_group, dtype=np.int64)),
+ ]
+ fields = [
+ StructField("id", IntegerType()),
+ StructField("str_col", StringType()),
+ StructField("float_col", DoubleType()),
+ StructField("double_col", DoubleType()),
+ StructField("long_col", IntegerType()),
+ ]
+ return (
+ pa.RecordBatch.from_arrays(arrays, names=[f.name for f in fields]),
+ StructType(fields),
+ )
+
+
+def _build_grouped_arg_offsets(n_cols, n_keys=0):
+ """``[len, num_keys, key_col_0, ..., val_col_0, ...]``"""
+ keys = list(range(n_keys))
+ vals = list(range(n_keys, n_cols))
+ offsets = [n_keys] + keys + vals
+ return [len(offsets)] + offsets
+
+
# ---------------------------------------------------------------------------
# General benchmark base classes
# ---------------------------------------------------------------------------
@@ -448,12 +492,20 @@ def _write_grouped_arrow_data(
groups: list[tuple[pa.RecordBatch, ...]],
num_dfs: int,
buf: io.BufferedIOBase,
+ max_records_per_batch: int | None = None,
) -> None:
"""Write grouped Arrow data in the wire protocol expected by
_load_group_dataframes."""
for group in groups:
write_int(num_dfs, buf)
for df_batch in group:
- _write_arrow_ipc_batches(iter([df_batch]), buf)
+ if max_records_per_batch and df_batch.num_rows >
max_records_per_batch:
+ sub_batches = [
+ df_batch.slice(offset, max_records_per_batch)
+ for offset in range(0, df_batch.num_rows,
max_records_per_batch)
+ ]
+ _write_arrow_ipc_batches(iter(sub_batches), buf)
+ else:
+ _write_arrow_ipc_batches(iter([df_batch]), buf)
write_int(0, buf) # end of groups
@@ -679,6 +731,88 @@ class
GroupedMapArrowIterUDFPeakmemBench(_GroupedMapArrowIterBenchMixin, _Peakme
param_names = ["scenario", "udf"]
+# -- SQL_GROUPED_MAP_PANDAS_UDF
------------------------------------------------
+# UDF receives a ``pandas.DataFrame`` per group, returns a
``pandas.DataFrame``.
+# Groups are sent as separate Arrow IPC streams with optional sub-batching
+# (``spark.sql.execution.arrow.maxRecordsPerBatch``).
+
+_MAX_RECORDS_PER_BATCH = 10_000
+
+
+def _build_grouped_map_pandas_scenarios():
+ scenarios = {}
+
+ for name, (rows, n_cols, num_groups, max_rpb) in {
+ "sm_grp_few_col": (1_000, 5, 200, None),
+ "sm_grp_many_col": (1_000, 50, 30, None),
+ "lg_grp_few_col": (100_000, 5, 30, _MAX_RECORDS_PER_BATCH),
+ "lg_grp_many_col": (100_000, 50, 5, _MAX_RECORDS_PER_BATCH),
+ }.items():
+ batch, schema = _make_grouped_batch(rows, n_cols)
+ scenarios[name] = (batch, num_groups, schema, max_rpb)
+
+ # mixed column types, small groups
+ batch, schema = _make_mixed_batch(3)
+ scenarios["mixed_types"] = (batch, 200, schema, None)
+
+ return scenarios
+
+
+_GROUPED_MAP_PANDAS_SCENARIOS = _build_grouped_map_pandas_scenarios()
+
+# Each UDF entry: (func, ret_type, n_args).
+# ret_type=None means "use the input schema" (excluding key columns for
n_args=2).
+# n_args=1 -> func(pdf), n_args=2 -> func(key, pdf).
+_GROUPED_MAP_PANDAS_UDFS = {
+ "identity_udf": (lambda df: df, None, 1),
+ "sort_udf": (lambda df: df.sort_values(df.columns[0]), None, 1),
+ "key_identity_udf": (lambda key, df: df, None, 2),
+}
+
+
+class _GroupedMapPandasBenchMixin:
+ """Provides ``_write_scenario`` for SQL_GROUPED_MAP_PANDAS_UDF.
+
+ Each scenario stores ``(batch, num_groups, schema, max_rpb)``.
+ Groups are written as separate Arrow IPC streams.
+ """
+
+ def _write_scenario(self, scenario, udf_name, buf):
+ batch, num_groups, schema, max_rpb = self._scenarios[scenario]
+ udf_func, ret_type, n_args = self._udfs[udf_name]
+ if ret_type is None:
+ # 2-arg UDFs receive (key, pdf) where pdf excludes key columns,
+ # so the return schema must also exclude them.
+ ret_type = StructType(schema.fields[n_args - 1 :]) if n_args > 1
else schema
+ n_cols = len(schema.fields)
+ arg_offsets = _build_grouped_arg_offsets(n_cols, n_keys=n_args - 1)
+ _write_worker_input(
+ PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
+ lambda b: _build_udf_payload(udf_func, ret_type, arg_offsets, b),
+ lambda b: _write_grouped_arrow_data(
+ [(batch,)] * num_groups,
+ num_dfs=1,
+ buf=b,
+ max_records_per_batch=max_rpb,
+ ),
+ buf,
+ )
+
+
+class GroupedMapPandasUDFTimeBench(_GroupedMapPandasBenchMixin,
_TimeBenchBase):
+ _scenarios = _GROUPED_MAP_PANDAS_SCENARIOS
+ _udfs = _GROUPED_MAP_PANDAS_UDFS
+ params = [list(_GROUPED_MAP_PANDAS_SCENARIOS),
list(_GROUPED_MAP_PANDAS_UDFS)]
+ param_names = ["scenario", "udf"]
+
+
+class GroupedMapPandasUDFPeakmemBench(_GroupedMapPandasBenchMixin,
_PeakmemBenchBase):
+ _scenarios = _GROUPED_MAP_PANDAS_SCENARIOS
+ _udfs = _GROUPED_MAP_PANDAS_UDFS
+ params = [list(_GROUPED_MAP_PANDAS_SCENARIOS),
list(_GROUPED_MAP_PANDAS_UDFS)]
+ param_names = ["scenario", "udf"]
+
+
# -- SQL_MAP_ARROW_ITER_UDF ------------------------------------------------
# UDF receives ``Iterator[pa.RecordBatch]``, returns
``Iterator[pa.RecordBatch]``.
# Used by ``mapInArrow``.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]