This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 587dfa4af2ea [SPARK-55947][PYTHON][TEST] Add ASV micro-benchmarks for 
SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF
587dfa4af2ea is described below

commit 587dfa4af2ea7d40d2582d1f41fa825f6f2407ad
Author: Yicong Huang <[email protected]>
AuthorDate: Thu Mar 12 15:14:57 2026 +0800

    [SPARK-55947][PYTHON][TEST] Add ASV micro-benchmarks for 
SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF
    
    ### What changes were proposed in this pull request?
    
    Add ASV micro-benchmarks for `SQL_GROUPED_MAP_ARROW_UDF` and 
`SQL_GROUPED_MAP_ARROW_ITER_UDF` eval types.
    
    ### Why are the changes needed?
    
    These benchmarks establish baseline performance measurements for grouped 
map Arrow UDF eval types, enabling data-driven optimization of the PySpark 
worker pipeline.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. Benchmark-only change.
    
    ### How was this patch tested?
    
    `COLUMNS=120 asv run --python=same --bench "GroupedMapArrow" --attribute 
"repeat=(3,5,5.0)"`:
    
    **GroupedMapArrowUDFTimeBench** (`SQL_GROUPED_MAP_ARROW_UDF`):
    ```
    ================ ============== ============ ============
    --                                 udf
    ---------------- ----------------------------------------
        scenario      identity_udf    sort_udf    filter_udf
    ================ ============== ============ ============
     few_groups_sm    13.3±0.06ms    37.5±0.1ms   20.0±0.1ms
     few_groups_lg     80.4±0.8ms    302±0.8ms    110±0.9ms
     many_groups_sm     449±10ms      691±20ms     613±10ms
     many_groups_lg     226±5ms       694±9ms      330±7ms
      wide_values      303±0.9ms      474±2ms     391±0.8ms
       multi_key       93.0±0.6ms     211±1ms      121±1ms
    ================ ============== ============ ============
    ```
    
    **GroupedMapArrowUDFPeakmemBench** (`SQL_GROUPED_MAP_ARROW_UDF`):
    ```
    ================ ============== ========== ============
    --                                udf
    ---------------- --------------------------------------
        scenario      identity_udf   sort_udf   filter_udf
    ================ ============== ========== ============
     few_groups_sm        635M         639M        638M
     few_groups_lg        698M         705M        704M
     many_groups_sm       654M         656M        656M
     many_groups_lg       753M         757M        758M
      wide_values         755M         760M        760M
       multi_key          662M         665M        665M
    ================ ============== ========== ============
    ```
    
    **GroupedMapArrowIterUDFTimeBench** (`SQL_GROUPED_MAP_ARROW_ITER_UDF`):
    ```
    ================ ============== ========== ============
    --                                udf
    ---------------- --------------------------------------
        scenario      identity_udf   sort_udf   filter_udf
    ================ ============== ========== ============
     few_groups_sm     13.1±0.3ms    37.6±3ms   19.6±0.2ms
     few_groups_lg     79.6±0.4ms    303±5ms     111±2ms
     many_groups_sm     440±9ms      673±20ms    584±10ms
     many_groups_lg     231±7ms      689±40ms   323±0.7ms
      wide_values       302±3ms      464±10ms    385±8ms
       multi_key       92.0±0.2ms    211±4ms    118±0.8ms
    ================ ============== ========== ============
    ```
    
    **GroupedMapArrowIterUDFPeakmemBench** (`SQL_GROUPED_MAP_ARROW_ITER_UDF`):
    ```
    ================ ============== ========== ============
    --                                udf
    ---------------- --------------------------------------
        scenario      identity_udf   sort_udf   filter_udf
    ================ ============== ========== ============
     few_groups_sm        634M         638M        637M
     few_groups_lg        698M         705M        704M
     many_groups_sm       653M         656M        656M
     many_groups_lg       753M         759M        758M
      wide_values         755M         760M        760M
       multi_key          662M         665M        664M
    ================ ============== ========== ============
    ```
    
    Both eval types show comparable time and memory profiles across all 
scenarios.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes.
    
    Closes #54743 from Yicong-Huang/SPARK-55947/bench/grouped-arrow-udf.
    
    Authored-by: Yicong Huang <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/benchmarks/bench_eval_type.py | 241 +++++++++++++++++++++++++++++++++++
 1 file changed, 241 insertions(+)

diff --git a/python/benchmarks/bench_eval_type.py 
b/python/benchmarks/bench_eval_type.py
index 95be9498f2fb..3445022eec81 100644
--- a/python/benchmarks/bench_eval_type.py
+++ b/python/benchmarks/bench_eval_type.py
@@ -41,6 +41,7 @@ from pyspark.sql.types import (
     BooleanType,
     DoubleType,
     IntegerType,
+    LongType,
     StringType,
     StructField,
     StructType,
@@ -438,6 +439,246 @@ class 
ArrowBatchedUDFPeakmemBench(_ArrowBatchedBenchMixin, _PeakmemBenchBase):
     param_names = ["scenario", "udf"]
 
 
+# ---------------------------------------------------------------------------
+# Grouped Arrow UDF helpers
+# ---------------------------------------------------------------------------
+
+
+def _write_grouped_arrow_data(
+    groups: list[tuple[pa.RecordBatch, ...]],
+    num_dfs: int,
+    buf: io.BufferedIOBase,
+) -> None:
+    """Write grouped Arrow data in the wire protocol expected by 
_load_group_dataframes."""
+    for group in groups:
+        write_int(num_dfs, buf)
+        for df_batch in group:
+            _write_arrow_ipc_batches(iter([df_batch]), buf)
+    write_int(0, buf)  # end of groups
+
+
+def _make_grouped_arg_offsets(num_key_cols: int, num_value_cols: int) -> 
list[int]:
+    """Build arg_offsets for grouped map UDFs.
+
+    Format: [total_len, num_keys, key_idx..., value_idx...]
+    All columns are in a single struct, so key indexes come first,
+    then value indexes.
+    """
+    total = num_key_cols + num_value_cols + 1  # +1 for the num_keys prefix
+    key_idxs = list(range(num_key_cols))
+    value_idxs = list(range(num_key_cols, num_key_cols + num_value_cols))
+    return [total, num_key_cols] + key_idxs + value_idxs
+
+
+def _make_grouped_batches(
+    num_groups: int,
+    rows_per_group: int,
+    num_key_cols: int,
+    num_value_cols: int,
+) -> tuple[list[tuple[pa.RecordBatch, ...]], StructType]:
+    """Create grouped data: each group is a single batch wrapped in a struct 
column.
+
+    Returns (groups, return_type) where each group is a 1-tuple of a
+    struct-wrapped RecordBatch suitable for ArrowStreamGroupUDFSerializer.
+    """
+    n_cols = num_key_cols + num_value_cols
+    type_cycle = [
+        (lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), 
LongType()),
+        (lambda r: pa.array([f"s{j}" for j in range(r)]), StringType()),
+        (lambda r: pa.array(np.random.choice([True, False], r)), 
BooleanType()),
+        (lambda r: pa.array(np.random.rand(r)), DoubleType()),
+    ]
+
+    groups = []
+    for g in range(num_groups):
+        arrays = [type_cycle[i % len(type_cycle)][0](rows_per_group) for i in 
range(n_cols)]
+        names = [f"col_{i}" for i in range(n_cols)]
+        batch = pa.RecordBatch.from_arrays(arrays, names=names)
+        # Wrap in struct to match JVM-side encoding (flatten_struct undoes 
this)
+        struct_array = pa.StructArray.from_arrays(batch.columns, names=names)
+        wrapped = pa.RecordBatch.from_arrays([struct_array], names=["_0"])
+        groups.append((wrapped,))
+
+    value_fields = [
+        StructField(f"col_{i}", type_cycle[i % len(type_cycle)][1])
+        for i in range(num_key_cols, n_cols)
+    ]
+    return_type = StructType(value_fields)
+
+    return groups, return_type
+
+
+# -- SQL_GROUPED_MAP_ARROW_UDF ------------------------------------------------
+# UDF receives (key: pa.RecordBatch, values: Iterator[pa.RecordBatch]),
+# returns Iterator[pa.RecordBatch].
+
+
+def _grouped_map_arrow_identity(table):
+    """Identity grouped map UDF: takes a pa.Table, returns it as-is."""
+    return table
+
+
+def _grouped_map_arrow_sort(table):
+    """Sort by first column."""
+    return table.sort_by([(table.column_names[0], "ascending")])
+
+
+def _grouped_map_arrow_filter(table):
+    """Filter rows where first column is valid."""
+    import pyarrow.compute as pc
+
+    return table.filter(pc.is_valid(table.column(0)))
+
+
+_GROUPED_MAP_ARROW_UDFS = {
+    "identity_udf": _grouped_map_arrow_identity,
+    "sort_udf": _grouped_map_arrow_sort,
+    "filter_udf": _grouped_map_arrow_filter,
+}
+
+
+def _build_grouped_map_arrow_scenarios():
+    """Build scenarios for SQL_GROUPED_MAP_ARROW_UDF.
+
+    Returns dict mapping name to (groups, return_type, arg_offsets).
+    """
+    scenarios = {}
+
+    for name, (num_groups, rows_per_group, num_key_cols, num_value_cols) in {
+        "few_groups_sm": (50, 5_000, 1, 4),
+        "few_groups_lg": (50, 50_000, 1, 4),
+        "many_groups_sm": (2_000, 500, 1, 4),
+        "many_groups_lg": (500, 10_000, 1, 4),
+        "wide_values": (200, 5_000, 1, 20),
+        "multi_key": (200, 5_000, 3, 5),
+    }.items():
+        groups, return_type = _make_grouped_batches(
+            num_groups, rows_per_group, num_key_cols, num_value_cols
+        )
+        arg_offsets = _make_grouped_arg_offsets(num_key_cols, num_value_cols)
+        scenarios[name] = (groups, return_type, arg_offsets)
+
+    return scenarios
+
+
+_GROUPED_MAP_ARROW_SCENARIOS = _build_grouped_map_arrow_scenarios()
+
+
+class _GroupedMapArrowBenchMixin:
+    """Provides _write_scenario for SQL_GROUPED_MAP_ARROW_UDF."""
+
+    def _write_scenario(self, scenario, udf_name, buf):
+        groups, return_type, arg_offsets = self._scenarios[scenario]
+        udf_func = self._udfs[udf_name]
+
+        def write_command(b):
+            # Grouped map uses a single UDF with grouped arg_offsets
+            write_int(1, b)  # num_udfs
+            write_int(len(arg_offsets), b)  # num_arg
+            for offset in arg_offsets:
+                write_int(offset, b)
+                _write_bool(False, b)  # is_kwarg
+            write_int(1, b)  # num_chained
+            command = cloudpickle_dumps((udf_func, return_type))
+            write_int(len(command), b)
+            b.write(command)
+            write_long(0, b)  # result_id
+
+        _write_worker_input(
+            PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF,
+            write_command,
+            lambda b: _write_grouped_arrow_data(groups, num_dfs=1, buf=b),
+            buf,
+        )
+
+
+class GroupedMapArrowUDFTimeBench(_GroupedMapArrowBenchMixin, _TimeBenchBase):
+    _scenarios = _GROUPED_MAP_ARROW_SCENARIOS
+    _udfs = _GROUPED_MAP_ARROW_UDFS
+    params = [list(_GROUPED_MAP_ARROW_SCENARIOS), 
list(_GROUPED_MAP_ARROW_UDFS)]
+    param_names = ["scenario", "udf"]
+
+
+class GroupedMapArrowUDFPeakmemBench(_GroupedMapArrowBenchMixin, 
_PeakmemBenchBase):
+    _scenarios = _GROUPED_MAP_ARROW_SCENARIOS
+    _udfs = _GROUPED_MAP_ARROW_UDFS
+    params = [list(_GROUPED_MAP_ARROW_SCENARIOS), 
list(_GROUPED_MAP_ARROW_UDFS)]
+    param_names = ["scenario", "udf"]
+
+
+# -- SQL_GROUPED_MAP_ARROW_ITER_UDF ------------------------------------------
+# UDF receives Iterator[pa.RecordBatch] per group,
+# returns Iterator[pa.RecordBatch].
+# Uses the same wire format and serializer as SQL_GROUPED_MAP_ARROW_UDF.
+
+
+def _grouped_map_arrow_iter_identity(batches):
+    """Identity grouped map iter UDF: yields each batch as-is."""
+    yield from batches
+
+
+def _grouped_map_arrow_iter_sort(batches):
+    """Sort each batch by first column."""
+    for batch in batches:
+        yield batch.sort_by([(batch.column_names[0], "ascending")])
+
+
+def _grouped_map_arrow_iter_filter(batches):
+    """Filter rows where first column is valid."""
+    import pyarrow.compute as pc
+
+    for batch in batches:
+        yield batch.filter(pc.is_valid(batch.column(0)))
+
+
+_GROUPED_MAP_ARROW_ITER_UDFS = {
+    "identity_udf": _grouped_map_arrow_iter_identity,
+    "sort_udf": _grouped_map_arrow_iter_sort,
+    "filter_udf": _grouped_map_arrow_iter_filter,
+}
+
+
+class _GroupedMapArrowIterBenchMixin:
+    """Provides _write_scenario for SQL_GROUPED_MAP_ARROW_ITER_UDF."""
+
+    def _write_scenario(self, scenario, udf_name, buf):
+        groups, return_type, arg_offsets = self._scenarios[scenario]
+        udf_func = self._udfs[udf_name]
+
+        def write_command(b):
+            write_int(1, b)  # num_udfs
+            write_int(len(arg_offsets), b)  # num_arg
+            for offset in arg_offsets:
+                write_int(offset, b)
+                _write_bool(False, b)  # is_kwarg
+            write_int(1, b)  # num_chained
+            command = cloudpickle_dumps((udf_func, return_type))
+            write_int(len(command), b)
+            b.write(command)
+            write_long(0, b)  # result_id
+
+        _write_worker_input(
+            PythonEvalType.SQL_GROUPED_MAP_ARROW_ITER_UDF,
+            write_command,
+            lambda b: _write_grouped_arrow_data(groups, num_dfs=1, buf=b),
+            buf,
+        )
+
+
+class GroupedMapArrowIterUDFTimeBench(_GroupedMapArrowIterBenchMixin, 
_TimeBenchBase):
+    _scenarios = _GROUPED_MAP_ARROW_SCENARIOS
+    _udfs = _GROUPED_MAP_ARROW_ITER_UDFS
+    params = [list(_GROUPED_MAP_ARROW_SCENARIOS), 
list(_GROUPED_MAP_ARROW_ITER_UDFS)]
+    param_names = ["scenario", "udf"]
+
+
+class GroupedMapArrowIterUDFPeakmemBench(_GroupedMapArrowIterBenchMixin, 
_PeakmemBenchBase):
+    _scenarios = _GROUPED_MAP_ARROW_SCENARIOS
+    _udfs = _GROUPED_MAP_ARROW_ITER_UDFS
+    params = [list(_GROUPED_MAP_ARROW_SCENARIOS), 
list(_GROUPED_MAP_ARROW_ITER_UDFS)]
+    param_names = ["scenario", "udf"]
+
+
 # -- SQL_MAP_ARROW_ITER_UDF ------------------------------------------------
 # UDF receives ``Iterator[pa.RecordBatch]``, returns 
``Iterator[pa.RecordBatch]``.
 # Used by ``mapInArrow``.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to