This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 66949cc255f9 [SPARK-55724][PYTHON][TEST] Add ASV microbenchmark for 
SQL_MAP_ARROW_ITER_UDF
66949cc255f9 is described below

commit 66949cc255f9c574ce82318b806e055831bd777a
Author: Yicong Huang <[email protected]>
AuthorDate: Tue Mar 10 21:11:02 2026 +0800

    [SPARK-55724][PYTHON][TEST] Add ASV microbenchmark for 
SQL_MAP_ARROW_ITER_UDF
    
    ### What changes were proposed in this pull request?
    
    Add ASV microbenchmarks for `SQL_MAP_ARROW_ITER_UDF` (used by `mapInArrow`).
    
    ### Why are the changes needed?
    
    Part of SPARK-55724.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. Benchmark files only.
    
    ### How was this patch tested?
    
    `COLUMNS=120 asv run --python=same --bench "MapArrowIter" --attribute 
"repeat=(3,5,5.0)"`:
    
    **MapArrowIterUDFTimeBench** (`SQL_MAP_ARROW_ITER_UDF`):
    ```
    =================== ============== =========== ============
    --                                    udf
    ------------------- ---------------------------------------
          scenario       identity_udf    sort_udf   filter_udf
    =================== ============== =========== ============
      sm_batch_few_col     45.4+-5ms     119+-6ms    78.7+-5ms
     sm_batch_many_col     29.0+-3ms    56.1+-4ms    49.8+-4ms
      lg_batch_few_col    178+-0.3ms    362+-0.9ms    311+-1ms
     lg_batch_many_col    173+-0.2ms     347+-2ms     295+-2ms
          pure_ints       90.6+-0.3ms   128+-0.5ms   114+-0.9ms
         pure_floats      89.4+-0.3ms   206+-0.8ms    115+-3ms
         pure_strings     96.5+-0.5ms   258+-0.4ms   186+-0.3ms
           pure_ts        89.4+-0.1ms   132+-0.4ms    113+-1ms
         mixed_types      81.2+-0.7ms   157+-0.8ms    148+-5ms
    =================== ============== =========== ============
    ```
    
    **MapArrowIterUDFPeakmemBench** (`SQL_MAP_ARROW_ITER_UDF`):
    ```
    =================== ============== ========== ============
    --                                    udf
    ------------------- --------------------------------------
          scenario       identity_udf   sort_udf   filter_udf
    =================== ============== ========== ============
      sm_batch_few_col       142M         145M        144M
     sm_batch_many_col       142M         146M        146M
      lg_batch_few_col       296M         299M        298M
     lg_batch_many_col       298M         310M        309M
          pure_ints          207M         209M        209M
         pure_floats         207M         209M        209M
         pure_strings        215M         219M        219M
           pure_ts           207M         209M        209M
         mixed_types         195M         197M        197M
    =================== ============== ========== ============
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #54715 from Yicong-Huang/SPARK-55724/bench/map-arrow-iter-udf.
    
    Authored-by: Yicong Huang <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/benchmarks/bench_eval_type.py | 123 +++++++++++++++++++++++++++++++++++
 1 file changed, 123 insertions(+)

diff --git a/python/benchmarks/bench_eval_type.py 
b/python/benchmarks/bench_eval_type.py
index 920b64a53e2a..95be9498f2fb 100644
--- a/python/benchmarks/bench_eval_type.py
+++ b/python/benchmarks/bench_eval_type.py
@@ -438,6 +438,129 @@ class 
ArrowBatchedUDFPeakmemBench(_ArrowBatchedBenchMixin, _PeakmemBenchBase):
     param_names = ["scenario", "udf"]
 
 
+# -- SQL_MAP_ARROW_ITER_UDF ------------------------------------------------
+# UDF receives ``Iterator[pa.RecordBatch]``, returns 
``Iterator[pa.RecordBatch]``.
+# Used by ``mapInArrow``.
+
+
+def _identity_batch_iter(it):
+    yield from it
+
+
+def _sort_batch_iter(it):
+    import pyarrow.compute as pc
+
+    for batch in it:
+        indices = pc.sort_indices(batch.column(0))
+        yield batch.take(indices)
+
+
+def _filter_batch_iter(it):
+    import pyarrow.compute as pc
+
+    for batch in it:
+        mask = pc.is_valid(batch.column(0))
+        yield batch.filter(mask)
+
+
+_MAP_ARROW_ITER_UDFS = {
+    "identity_udf": (_identity_batch_iter, None, [0]),
+    "sort_udf": (_sort_batch_iter, None, [0]),
+    "filter_udf": (_filter_batch_iter, None, [0]),
+}
+
+
+def _build_map_arrow_iter_scenarios():
+    """Build scenarios for SQL_MAP_ARROW_ITER_UDF.
+
+    Same data shapes as non-grouped scenarios but with reduced batch counts
+    to account for the struct wrap/unwrap overhead per batch.
+    """
+    scenarios = {}
+
+    for name, (rows, n_cols, num_batches) in {
+        "sm_batch_few_col": (1_000, 5, 500),
+        "sm_batch_many_col": (1_000, 50, 50),
+        "lg_batch_few_col": (10_000, 5, 500),
+        "lg_batch_many_col": (10_000, 50, 50),
+    }.items():
+        batch, col0_type = _make_typed_batch(rows, n_cols)
+        scenarios[name] = (batch, num_batches, col0_type)
+
+    _PURE_ROWS, _PURE_COLS, _PURE_BATCHES = 5_000, 10, 200
+
+    for scenario_name, make_array, spark_type in [
+        (
+            "pure_ints",
+            lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)),
+            IntegerType(),
+        ),
+        ("pure_floats", lambda r: pa.array(np.random.rand(r)), DoubleType()),
+        ("pure_strings", lambda r: pa.array([f"s{j}" for j in range(r)]), 
StringType()),
+        (
+            "pure_ts",
+            lambda r: pa.array(
+                np.arange(0, r, dtype="datetime64[us]"), 
type=pa.timestamp("us", tz=None)
+            ),
+            TimestampNTZType(),
+        ),
+    ]:
+        batch = _make_pure_batch(_PURE_ROWS, _PURE_COLS, make_array, 
spark_type)
+        scenarios[scenario_name] = (batch, _PURE_BATCHES, spark_type)
+
+    scenarios["mixed_types"] = (
+        _make_typed_batch(_PURE_ROWS, _PURE_COLS)[0],
+        _PURE_BATCHES,
+        IntegerType(),
+    )
+
+    return scenarios
+
+
+_MAP_ARROW_ITER_SCENARIOS = _build_map_arrow_iter_scenarios()
+
+
+def _wrap_batch_in_struct(batch: pa.RecordBatch) -> pa.RecordBatch:
+    """Wrap all columns into a single struct column, mimicking JVM-side 
encoding."""
+    struct_array = pa.StructArray.from_arrays(batch.columns, 
names=batch.schema.names)
+    return pa.RecordBatch.from_arrays([struct_array], names=["_0"])
+
+
+class _MapArrowIterBenchMixin:
+    """Provides ``_write_scenario`` for SQL_MAP_ARROW_ITER_UDF.
+
+    Like ``_NonGroupedBenchMixin`` but wraps input batches in a struct column
+    to match the JVM-side wire format (``flatten_struct`` undoes this).
+    """
+
+    def _write_scenario(self, scenario, udf_name, buf):
+        batch, num_batches, col0_type = self._scenarios[scenario]
+        udf_func, ret_type, arg_offsets = self._udfs[udf_name]
+        if ret_type is None:
+            ret_type = col0_type
+        wrapped = _wrap_batch_in_struct(batch)
+        _write_worker_input(
+            PythonEvalType.SQL_MAP_ARROW_ITER_UDF,
+            lambda b: _build_udf_payload(udf_func, ret_type, arg_offsets, b),
+            lambda b: _write_arrow_ipc_batches((wrapped for _ in 
range(num_batches)), b),
+            buf,
+        )
+
+
+class MapArrowIterUDFTimeBench(_MapArrowIterBenchMixin, _TimeBenchBase):
+    _scenarios = _MAP_ARROW_ITER_SCENARIOS
+    _udfs = _MAP_ARROW_ITER_UDFS
+    params = [list(_MAP_ARROW_ITER_SCENARIOS), list(_MAP_ARROW_ITER_UDFS)]
+    param_names = ["scenario", "udf"]
+
+
+class MapArrowIterUDFPeakmemBench(_MapArrowIterBenchMixin, _PeakmemBenchBase):
+    _scenarios = _MAP_ARROW_ITER_SCENARIOS
+    _udfs = _MAP_ARROW_ITER_UDFS
+    params = [list(_MAP_ARROW_ITER_SCENARIOS), list(_MAP_ARROW_ITER_UDFS)]
+    param_names = ["scenario", "udf"]
+
+
 # -- SQL_SCALAR_ARROW_UDF ---------------------------------------------------
 # UDF receives individual ``pa.Array`` columns, returns a ``pa.Array``.
 # All UDFs operate on arg_offsets=[0] so they work with any column type.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to