This is an automated email from the ASF dual-hosted git repository.

zhengruifeng pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new ac9617b8cd71 [SPARK-56717][PYTHON][TESTS] Add ASV microbenchmark for 
SQL_ARROW_TABLE_UDF
ac9617b8cd71 is described below

commit ac9617b8cd7106075667a362b584ee8522368183
Author: Yicong Huang <[email protected]>
AuthorDate: Tue May 12 12:05:36 2026 +0800

    [SPARK-56717][PYTHON][TESTS] Add ASV microbenchmark for SQL_ARROW_TABLE_UDF
    
    ### What changes were proposed in this pull request?
    
    Add an ASV micro-benchmark for `SQL_ARROW_TABLE_UDF` (Python UDTF with 
`useArrow=True`) eval type to `bench_eval_type.py`.
    
    The new benchmark drives the worker through the UDTF wire protocol via the 
existing `write_arrow_udtf_payload` helper, and threads `input_type` through 
`EvalConf` so the non-legacy Arrow code path is exercised. Adds 
`connInfo`/`secret` to the preamble `TaskContextInfo` JSON (required after the 
SPARK-56519 worker-protocol refactor).
    
    UDTFs covered: `identity_udtf` (1->1), `explode_udtf` (1->3), `filter_udtf` 
(1->0/1), `stringify_udtf` (1->1, type change). Row counts are scaled down vs 
`SQL_ARROW_BATCHED_UDF` because the worker calls 
`LocalDataToArrowConversion.convert` once per input row in this path.
    
    ### Why are the changes needed?
    
    Part of [SPARK-55724](https://issues.apache.org/jira/browse/SPARK-55724). 
Establishes a performance baseline before refactoring `SQL_ARROW_TABLE_UDF`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    `COLUMNS=120 ./python/asv run --python=same --bench "ArrowTableUDF" -a 
"repeat=(3,5,5.0)"` (one of two stable runs):
    
    `ArrowTableUDFTimeBench`:
    
    ```text
    =================== =============== ============== ============= 
================
    --                                              udtf
    ------------------- 
-------------------------------------------------------------
         scenario        identity_udtf   explode_udtf   filter_udtf   
stringify_udtf
    =================== =============== ============== ============= 
================
     sm_batch_few_col       120+/-0.2ms     122+/-0.3ms   94.0+/-0.4ms    
119+/-0.2ms
     sm_batch_many_col     40.1+/-0.3ms    40.4+/-0.2ms   33.4+/-0.3ms   
39.9+/-0.2ms
     lg_batch_few_col       299+/-0.5ms      304+/-1ms      232+/-1ms     
297+/-0.6ms
     lg_batch_many_col      157+/-0.1ms    158+/-0.7ms    129+/-0.4ms     
155+/-0.4ms
     pure_ints                306+/-2ms      309+/-1ms    237+/-0.4ms       
304+/-2ms
     pure_strings           313+/-0.6ms      319+/-1ms      249+/-1ms     
313+/-0.7ms
    =================== =============== ============== ============= 
================
    ```
    
    `ArrowTableUDFPeakmemBench`:
    
    ```text
    =================== =============== ============== ============= 
================
    --                                              udtf
    ------------------- 
-------------------------------------------------------------
         scenario        identity_udtf   explode_udtf   filter_udtf   
stringify_udtf
    =================== =============== ============== ============= 
================
     sm_batch_few_col        464M           464M           464M           464M
     sm_batch_many_col       464M           464M           465M           464M
     lg_batch_few_col        465M           465M           465M           465M
     lg_batch_many_col       468M           468M           468M           468M
     pure_ints               465M           465M           465M           465M
     pure_strings            465M           465M           465M           465M
    =================== =============== ============== ============= 
================
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #55673 from Yicong-Huang/SPARK-56717.
    
    Authored-by: Yicong Huang <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
    (cherry picked from commit 74934af1b888400dc24f9b2967d0f11295fdef2d)
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/benchmarks/bench_eval_type.py | 98 ++++++++++++++++++++++++++++++++++++
 1 file changed, 98 insertions(+)

diff --git a/python/benchmarks/bench_eval_type.py 
b/python/benchmarks/bench_eval_type.py
index 5b648f8e3aaf..a4fda48dfcb1 100644
--- a/python/benchmarks/bench_eval_type.py
+++ b/python/benchmarks/bench_eval_type.py
@@ -109,6 +109,8 @@ class MockProtocolWriter:
             json.dumps(
                 {
                     "isBarrier": False,
+                    "connInfo": None,
+                    "secret": None,
                     "stageId": 0,
                     "partitionId": 0,
                     "attemptNumber": 0,
@@ -608,6 +610,102 @@ class ArrowUDTFPeakmemBench(_ArrowUDTFBenchMixin, 
_PeakmemBenchBase):
     pass
 
 
+# -- SQL_ARROW_TABLE_UDF ----------------------------------------------------
+# Python UDTF (``@udtf(useArrow=True)``): handler is a class with ``eval(self, 
*args)``
+# that yields output rows. Each input row triggers one ``eval`` call; yielded 
rows
+# are converted to Arrow via ``LocalDataToArrowConversion``.
+
+
+class _ArrowTableUDFIdentity:
+    def eval(self, x):
+        yield (x,)
+
+
+class _ArrowTableUDFExplode:
+    def eval(self, x):
+        for _ in range(3):
+            yield (x,)
+
+
+class _ArrowTableUDFFilter:
+    def eval(self, x):
+        if x is not None and (hash(x) & 1):
+            yield (x,)
+
+
+class _ArrowTableUDFStringify:
+    def eval(self, x):
+        yield (str(x),)
+
+
+class _ArrowTableUDFBenchMixin:
+    """Provides ``_write_scenario`` for SQL_ARROW_TABLE_UDF (Python UDTF, 
useArrow=True).
+
+    Writes the extra ``input_type`` (StructType JSON) into ``EvalConf`` that 
the
+    non-legacy path requires, and uses the UDTF wire protocol (no 
num_udfs/result_id).
+    """
+
+    # Per-input-row ``LocalDataToArrowConversion.convert`` call makes this path
+    # ~15-20x slower than SQL_ARROW_BATCHED_UDF, so row counts are scaled down
+    # accordingly to keep each measurement under ASV's per-sample budget.
+    _scenario_configs = {
+        "sm_batch_few_col": ("mixed", 2_000, 5, 500),
+        "sm_batch_many_col": ("mixed", 500, 50, 500),
+        "lg_batch_few_col": ("mixed", 5_000, 5, 2_500),
+        "lg_batch_many_col": ("mixed", 2_000, 50, 2_000),
+        "pure_ints": ("pure_ints", 5_000, 10, 2_500),
+        "pure_strings": ("pure_strings", 5_000, 10, 2_500),
+    }
+
+    @staticmethod
+    def _build_scenario(name):
+        np.random.seed(42)
+        type_key, num_rows, num_cols, batch_size = 
_ArrowTableUDFBenchMixin._scenario_configs[name]
+        pool = MockDataFactory.NAMED_TYPE_POOLS[type_key]
+        return MockDataFactory.make_batches(
+            num_rows=num_rows,
+            num_cols=num_cols,
+            spark_type_pool=pool,
+            batch_size=batch_size,
+        )
+
+    # Each entry: (handler_class, return_type_or_None, arg_offsets).
+    # ``None`` return_type means "use input column 0's type".
+    _udtfs = {
+        "identity_udtf": (_ArrowTableUDFIdentity, None, [0]),
+        "explode_udtf": (_ArrowTableUDFExplode, None, [0]),
+        "filter_udtf": (_ArrowTableUDFFilter, None, [0]),
+        "stringify_udtf": (_ArrowTableUDFStringify, StringType(), [0]),
+    }
+    params = [list(_scenario_configs), list(_udtfs)]
+    param_names = ["scenario", "udtf"]
+
+    def _write_scenario(self, scenario, udtf_name, buf):
+        batches, schema = self._build_scenario(scenario)
+        handler, ret_type, arg_offsets = self._udtfs[udtf_name]
+        if ret_type is None:
+            ret_type = schema.fields[0].dataType
+        return_type = StructType([StructField("c0", ret_type)])
+
+        MockProtocolWriter.write_worker_input(
+            PythonEvalType.SQL_ARROW_TABLE_UDF,
+            lambda b: MockProtocolWriter.write_arrow_udtf_payload(
+                handler, return_type, arg_offsets, b
+            ),
+            lambda b: MockProtocolWriter.write_data_payload(iter(batches), b),
+            buf,
+            eval_conf={"input_type": schema.json()},
+        )
+
+
+class ArrowTableUDFTimeBench(_ArrowTableUDFBenchMixin, _TimeBenchBase):
+    pass
+
+
+class ArrowTableUDFPeakmemBench(_ArrowTableUDFBenchMixin, _PeakmemBenchBase):
+    pass
+
+
 # -- SQL_COGROUPED_MAP_ARROW_UDF 
------------------------------------------------
 # UDF receives two ``pa.Table`` (left, right) per co-group, returns 
``pa.Table``.
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to