This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 88846d73a13b [SPARK-55899][PYTHON][TEST] Add ASV microbenchmark for 
SQL_ARROW_BATCHED_UDF
88846d73a13b is described below

commit 88846d73a13bedc56443405604787f62cea25ba5
Author: Yicong Huang <[email protected]>
AuthorDate: Tue Mar 10 10:43:32 2026 +0800

    [SPARK-55899][PYTHON][TEST] Add ASV microbenchmark for SQL_ARROW_BATCHED_UDF
    
    ### What changes were proposed in this pull request?
    
    Add ASV microbenchmarks for `SQL_ARROW_BATCHED_UDF`.
    
    ### Why are the changes needed?
    
    Part of SPARK-55724. Establishes baseline performance metrics for 
`SQL_ARROW_BATCHED_UDF` before future refactoring work.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. Benchmark files only.
    
    ### How was this patch tested?
    
    `COLUMNS=120 asv run --python=same --bench "ArrowBatched" --attribute 
"repeat=(3,5,5.0)"`:
    
    **ArrowBatchedUDFTimeBench** (`SQL_ARROW_BATCHED_UDF`):
    ```
    =================== ============== =============== ===============
    --                                       udf
    ------------------- ----------------------------------------------
          scenario       identity_udf   stringify_udf   nullcheck_udf
    =================== ============== =============== ===============
      sm_batch_few_col    62.1+-0.2ms      66.1+-0.8ms      61.2+-0.1ms
     sm_batch_many_col    154+-0.4ms       155+-0.4ms       154+-0.3ms
      lg_batch_few_col    148+-0.3ms       157+-0.4ms       147+-0.5ms
     lg_batch_many_col     623+-2ms         624+-2ms         620+-3ms
         pure_ints        220+-0.5ms       231+-0.7ms        220+-6ms
        pure_floats       224+-0.8ms        262+-1ms        225+-0.7ms
        pure_strings       414+-1ms        415+-0.6ms        404+-1ms
        mixed_types        311+-1ms        318+-0.8ms       308+-0.7ms
    =================== ============== =============== ===============
    ```
    
    **ArrowBatchedUDFPeakmemBench** (`SQL_ARROW_BATCHED_UDF`):
    ```
    =================== ============== =============== ===============
    --                                       udf
    ------------------- ----------------------------------------------
          scenario       identity_udf   stringify_udf   nullcheck_udf
    =================== ============== =============== ===============
      sm_batch_few_col       119M            119M            118M
     sm_batch_many_col       123M            123M            123M
      lg_batch_few_col       124M            124M            122M
     lg_batch_many_col       159M            160M            159M
         pure_ints           122M            123M            122M
        pure_floats          124M            125M            123M
        pure_strings         125M            125M            124M
        mixed_types          123M            124M            123M
    =================== ============== =============== ===============
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #54702 from Yicong-Huang/SPARK-55724/bench/arrow-batch-udf.
    
    Authored-by: Yicong Huang <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/benchmarks/bench_eval_type.py | 109 +++++++++++++++++++++++++++++++++++
 1 file changed, 109 insertions(+)

diff --git a/python/benchmarks/bench_eval_type.py 
b/python/benchmarks/bench_eval_type.py
index 53a4f208e99d..920b64a53e2a 100644
--- a/python/benchmarks/bench_eval_type.py
+++ b/python/benchmarks/bench_eval_type.py
@@ -329,6 +329,115 @@ class _NonGroupedBenchMixin:
         )
 
 
+# -- SQL_ARROW_BATCHED_UDF --------------------------------------------------
+# Arrow-optimized Python UDF: receives individual Python values per row,
+# returns a single Python value.  The wire protocol includes an extra
+# ``input_type`` (StructType JSON) before the UDF payload.
+
+
+def _build_arrow_batched_scenarios():
+    """Build scenarios for SQL_ARROW_BATCHED_UDF.
+
+    Returns a dict mapping scenario name to
+    ``(batch, num_batches, input_struct_type, col0_type)``.
+    ``input_struct_type`` is a StructType matching the batch schema,
+    needed for the wire protocol.
+    """
+    scenarios = {}
+
+    # Row-by-row processing is ~100x slower than columnar Arrow UDFs,
+    # so batch counts are much smaller to keep benchmarks under 60s.
+    for name, (rows, n_cols, num_batches) in {
+        "sm_batch_few_col": (1_000, 5, 20),
+        "sm_batch_many_col": (1_000, 50, 5),
+        "lg_batch_few_col": (10_000, 5, 5),
+        "lg_batch_many_col": (10_000, 50, 2),
+    }.items():
+        batch, col0_type = _make_typed_batch(rows, n_cols)
+        type_cycle = [IntegerType(), StringType(), BinaryType(), BooleanType()]
+        input_struct = StructType(
+            [StructField(f"col_{i}", type_cycle[i % len(type_cycle)]) for i in 
range(n_cols)]
+        )
+        scenarios[name] = (batch, num_batches, input_struct, col0_type)
+
+    _PURE_ROWS, _PURE_COLS, _PURE_BATCHES = 5_000, 10, 10
+
+    for scenario_name, make_array, spark_type in [
+        (
+            "pure_ints",
+            lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)),
+            IntegerType(),
+        ),
+        ("pure_floats", lambda r: pa.array(np.random.rand(r)), DoubleType()),
+        ("pure_strings", lambda r: pa.array([f"s{j}" for j in range(r)]), 
StringType()),
+    ]:
+        batch = _make_pure_batch(_PURE_ROWS, _PURE_COLS, make_array, 
spark_type)
+        input_struct = StructType([StructField(f"col_{i}", spark_type) for i 
in range(_PURE_COLS)])
+        scenarios[scenario_name] = (batch, _PURE_BATCHES, input_struct, 
spark_type)
+
+    # mixed types
+    batch, col0_type = _make_typed_batch(_PURE_ROWS, _PURE_COLS)
+    type_cycle = [IntegerType(), StringType(), BinaryType(), BooleanType()]
+    input_struct = StructType(
+        [StructField(f"col_{i}", type_cycle[i % len(type_cycle)]) for i in 
range(_PURE_COLS)]
+    )
+    scenarios["mixed_types"] = (batch, _PURE_BATCHES, input_struct, col0_type)
+
+    return scenarios
+
+
+_ARROW_BATCHED_SCENARIOS = _build_arrow_batched_scenarios()
+
+
+# UDFs for SQL_ARROW_BATCHED_UDF operate on individual Python values.
+# arg_offsets=[0] means the UDF receives column 0 value per row.
+_ARROW_BATCHED_UDFS = {
+    "identity_udf": (lambda x: x, None, [0]),
+    "stringify_udf": (lambda x: str(x), StringType(), [0]),
+    "nullcheck_udf": (lambda x: x is not None, BooleanType(), [0]),
+}
+
+
+class _ArrowBatchedBenchMixin:
+    """Provides ``_write_scenario`` for SQL_ARROW_BATCHED_UDF.
+
+    Like ``_NonGroupedBenchMixin`` but writes the extra ``input_type``
+    (StructType JSON) that the wire protocol requires.
+    """
+
+    def _write_scenario(self, scenario, udf_name, buf):
+        batch, num_batches, input_struct, col0_type = self._scenarios[scenario]
+        udf_func, ret_type, arg_offsets = self._udfs[udf_name]
+        if ret_type is None:
+            ret_type = col0_type
+
+        def write_command(b):
+            # input_type is read before UDF payloads for ARROW_BATCHED_UDF
+            _write_utf8(input_struct.json(), b)
+            _build_udf_payload(udf_func, ret_type, arg_offsets, b)
+
+        _write_worker_input(
+            PythonEvalType.SQL_ARROW_BATCHED_UDF,
+            write_command,
+            lambda b: _write_arrow_ipc_batches((batch for _ in 
range(num_batches)), b),
+            buf,
+        )
+
+
+class ArrowBatchedUDFTimeBench(_ArrowBatchedBenchMixin, _TimeBenchBase):
+    _scenarios = _ARROW_BATCHED_SCENARIOS
+    _udfs = _ARROW_BATCHED_UDFS
+    params = [list(_ARROW_BATCHED_SCENARIOS), list(_ARROW_BATCHED_UDFS)]
+    param_names = ["scenario", "udf"]
+
+
+class ArrowBatchedUDFPeakmemBench(_ArrowBatchedBenchMixin, _PeakmemBenchBase):
+    _scenarios = _ARROW_BATCHED_SCENARIOS
+    _udfs = _ARROW_BATCHED_UDFS
+    params = [list(_ARROW_BATCHED_SCENARIOS), list(_ARROW_BATCHED_UDFS)]
+    param_names = ["scenario", "udf"]
+
+
 # -- SQL_SCALAR_ARROW_UDF ---------------------------------------------------
 # UDF receives individual ``pa.Array`` columns, returns a ``pa.Array``.
 # All UDFs operate on arg_offsets=[0] so they work with any column type.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to