This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a5c0df21a5f [SPARK-55726][PYTHON][TEST][FOLLOW-UP] Make 
SQL_GROUPED_MAP_PANDAS_UDF benchmark to two bench classes
2a5c0df21a5f is described below

commit 2a5c0df21a5f5096d5f7536ec3908b07490f7c22
Author: Yicong Huang <[email protected]>
AuthorDate: Thu Mar 12 17:05:11 2026 +0800

    [SPARK-55726][PYTHON][TEST][FOLLOW-UP] Make SQL_GROUPED_MAP_PANDAS_UDF 
benchmark to two bench classes
    
    ### What changes were proposed in this pull request?
    
    Follow-up to #54533 (SPARK-55726). The `SQL_GROUPED_MAP_PANDAS_UDF` 
benchmark was accidentally dropped when #54555 (SPARK-55754) rewrote 
`bench_eval_type.py`.
    
    This PR restores it using the two-class pattern (`TimeBench` + 
`PeakmemBench`) consistent with the other eval type benchmarks.
    
    ### Why are the changes needed?
    
    The benchmark was merged in #54533 but lost when #54555 rewrote the file.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. Benchmark files only.
    
    ### How was this patch tested?
    
    `COLUMNS=120 asv run --python=same --bench "GroupedMapPandas" --attribute 
"repeat=(3,5,5.0)"`:
    
    **GroupedMapPandasUDFTimeBench**:
    ```
    ================= ============== =========== ==================
    --                                     udf
    ----------------- ---------------------------------------------
          scenario      identity_udf    sort_udf   key_identity_udf
    ================= ============== =========== ==================
      sm_grp_few_col     427+-1ms     465+-0.8ms       392+-1ms
     sm_grp_many_col     353+-20ms     342+-2ms        329+-1ms
      lg_grp_few_col     260+-9ms      308+-2ms        221+-1ms
     lg_grp_many_col     396+-20ms     479+-30ms       417+-20ms
        mixed_types     453+-0.8ms     488+-5ms        405+-4ms
    ================= ============== =========== ==================
    ```
    
    **GroupedMapPandasUDFPeakmemBench**:
    ```
    ================= ============== ========== ==================
    --                                    udf
    ----------------- --------------------------------------------
          scenario      identity_udf   sort_udf   key_identity_udf
    ================= ============== ========== ==================
      sm_grp_few_col       157M         158M           156M
     sm_grp_many_col       164M         165M           164M
      lg_grp_few_col       279M         284M           264M
     lg_grp_many_col       596M         639M           591M
        mixed_types        149M         150M           149M
    ================= ============== ========== ==================
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #54716 from 
Yicong-Huang/SPARK-55726/bench/grouped-map-pandas-followup.
    
    Authored-by: Yicong Huang <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/benchmarks/bench_eval_type.py | 136 ++++++++++++++++++++++++++++++++++-
 1 file changed, 135 insertions(+), 1 deletion(-)

diff --git a/python/benchmarks/bench_eval_type.py 
b/python/benchmarks/bench_eval_type.py
index 3445022eec81..9569eeacf23f 100644
--- a/python/benchmarks/bench_eval_type.py
+++ b/python/benchmarks/bench_eval_type.py
@@ -179,6 +179,50 @@ def _make_typed_batch(rows: int, n_cols: int) -> 
tuple[pa.RecordBatch, IntegerTy
     )
 
 
+def _make_grouped_batch(rows_per_group, n_cols):
+    """``group_key (int64)`` + ``(n_cols - 1)`` float32 value columns."""
+    arrays = [pa.array(np.zeros(rows_per_group, dtype=np.int64))] + [
+        pa.array(np.random.rand(rows_per_group).astype(np.float32)) for _ in 
range(n_cols - 1)
+    ]
+    fields = [StructField("group_key", IntegerType())] + [
+        StructField(f"val_{i}", DoubleType()) for i in range(n_cols - 1)
+    ]
+    return (
+        pa.RecordBatch.from_arrays(arrays, names=[f.name for f in fields]),
+        StructType(fields),
+    )
+
+
+def _make_mixed_batch(rows_per_group):
+    """``id``, ``str_col``, ``float_col``, ``double_col``, ``long_col``."""
+    arrays = [
+        pa.array(np.zeros(rows_per_group, dtype=np.int64)),
+        pa.array([f"s{j}" for j in range(rows_per_group)]),
+        pa.array(np.random.rand(rows_per_group).astype(np.float32)),
+        pa.array(np.random.rand(rows_per_group)),
+        pa.array(np.zeros(rows_per_group, dtype=np.int64)),
+    ]
+    fields = [
+        StructField("id", IntegerType()),
+        StructField("str_col", StringType()),
+        StructField("float_col", DoubleType()),
+        StructField("double_col", DoubleType()),
+        StructField("long_col", IntegerType()),
+    ]
+    return (
+        pa.RecordBatch.from_arrays(arrays, names=[f.name for f in fields]),
+        StructType(fields),
+    )
+
+
+def _build_grouped_arg_offsets(n_cols, n_keys=0):
+    """``[len, num_keys, key_col_0, ..., val_col_0, ...]``"""
+    keys = list(range(n_keys))
+    vals = list(range(n_keys, n_cols))
+    offsets = [n_keys] + keys + vals
+    return [len(offsets)] + offsets
+
+
 # ---------------------------------------------------------------------------
 # General benchmark base classes
 # ---------------------------------------------------------------------------
@@ -448,12 +492,20 @@ def _write_grouped_arrow_data(
     groups: list[tuple[pa.RecordBatch, ...]],
     num_dfs: int,
     buf: io.BufferedIOBase,
+    max_records_per_batch: int | None = None,
 ) -> None:
     """Write grouped Arrow data in the wire protocol expected by 
_load_group_dataframes."""
     for group in groups:
         write_int(num_dfs, buf)
         for df_batch in group:
-            _write_arrow_ipc_batches(iter([df_batch]), buf)
+            if max_records_per_batch and df_batch.num_rows > 
max_records_per_batch:
+                sub_batches = [
+                    df_batch.slice(offset, max_records_per_batch)
+                    for offset in range(0, df_batch.num_rows, 
max_records_per_batch)
+                ]
+                _write_arrow_ipc_batches(iter(sub_batches), buf)
+            else:
+                _write_arrow_ipc_batches(iter([df_batch]), buf)
     write_int(0, buf)  # end of groups
 
 
@@ -679,6 +731,88 @@ class 
GroupedMapArrowIterUDFPeakmemBench(_GroupedMapArrowIterBenchMixin, _Peakme
     param_names = ["scenario", "udf"]
 
 
+# -- SQL_GROUPED_MAP_PANDAS_UDF 
------------------------------------------------
+# UDF receives a ``pandas.DataFrame`` per group, returns a 
``pandas.DataFrame``.
+# Groups are sent as separate Arrow IPC streams with optional sub-batching
+# (``spark.sql.execution.arrow.maxRecordsPerBatch``).
+
+_MAX_RECORDS_PER_BATCH = 10_000
+
+
+def _build_grouped_map_pandas_scenarios():
+    scenarios = {}
+
+    for name, (rows, n_cols, num_groups, max_rpb) in {
+        "sm_grp_few_col": (1_000, 5, 200, None),
+        "sm_grp_many_col": (1_000, 50, 30, None),
+        "lg_grp_few_col": (100_000, 5, 30, _MAX_RECORDS_PER_BATCH),
+        "lg_grp_many_col": (100_000, 50, 5, _MAX_RECORDS_PER_BATCH),
+    }.items():
+        batch, schema = _make_grouped_batch(rows, n_cols)
+        scenarios[name] = (batch, num_groups, schema, max_rpb)
+
+    # mixed column types, small groups
+    batch, schema = _make_mixed_batch(3)
+    scenarios["mixed_types"] = (batch, 200, schema, None)
+
+    return scenarios
+
+
+_GROUPED_MAP_PANDAS_SCENARIOS = _build_grouped_map_pandas_scenarios()
+
+# Each UDF entry: (func, ret_type, n_args).
+# ret_type=None means "use the input schema" (excluding key columns for 
n_args=2).
+# n_args=1 -> func(pdf), n_args=2 -> func(key, pdf).
+_GROUPED_MAP_PANDAS_UDFS = {
+    "identity_udf": (lambda df: df, None, 1),
+    "sort_udf": (lambda df: df.sort_values(df.columns[0]), None, 1),
+    "key_identity_udf": (lambda key, df: df, None, 2),
+}
+
+
+class _GroupedMapPandasBenchMixin:
+    """Provides ``_write_scenario`` for SQL_GROUPED_MAP_PANDAS_UDF.
+
+    Each scenario stores ``(batch, num_groups, schema, max_rpb)``.
+    Groups are written as separate Arrow IPC streams.
+    """
+
+    def _write_scenario(self, scenario, udf_name, buf):
+        batch, num_groups, schema, max_rpb = self._scenarios[scenario]
+        udf_func, ret_type, n_args = self._udfs[udf_name]
+        if ret_type is None:
+            # 2-arg UDFs receive (key, pdf) where pdf excludes key columns,
+            # so the return schema must also exclude them.
+            ret_type = StructType(schema.fields[n_args - 1 :]) if n_args > 1 
else schema
+        n_cols = len(schema.fields)
+        arg_offsets = _build_grouped_arg_offsets(n_cols, n_keys=n_args - 1)
+        _write_worker_input(
+            PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
+            lambda b: _build_udf_payload(udf_func, ret_type, arg_offsets, b),
+            lambda b: _write_grouped_arrow_data(
+                [(batch,)] * num_groups,
+                num_dfs=1,
+                buf=b,
+                max_records_per_batch=max_rpb,
+            ),
+            buf,
+        )
+
+
+class GroupedMapPandasUDFTimeBench(_GroupedMapPandasBenchMixin, 
_TimeBenchBase):
+    _scenarios = _GROUPED_MAP_PANDAS_SCENARIOS
+    _udfs = _GROUPED_MAP_PANDAS_UDFS
+    params = [list(_GROUPED_MAP_PANDAS_SCENARIOS), 
list(_GROUPED_MAP_PANDAS_UDFS)]
+    param_names = ["scenario", "udf"]
+
+
+class GroupedMapPandasUDFPeakmemBench(_GroupedMapPandasBenchMixin, 
_PeakmemBenchBase):
+    _scenarios = _GROUPED_MAP_PANDAS_SCENARIOS
+    _udfs = _GROUPED_MAP_PANDAS_UDFS
+    params = [list(_GROUPED_MAP_PANDAS_SCENARIOS), 
list(_GROUPED_MAP_PANDAS_UDFS)]
+    param_names = ["scenario", "udf"]
+
+
 # -- SQL_MAP_ARROW_ITER_UDF ------------------------------------------------
 # UDF receives ``Iterator[pa.RecordBatch]``, returns 
``Iterator[pa.RecordBatch]``.
 # Used by ``mapInArrow``.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to