This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 481f8a96d974 [SPARK-57138][PYTHON][TEST] Share base mixin across 
Window and Cogroup Arrow/Pandas siblings
481f8a96d974 is described below

commit 481f8a96d9743b39165b56154b1fa3fc937960e6
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Fri May 29 00:34:23 2026 -0700

    [SPARK-57138][PYTHON][TEST] Share base mixin across Window and Cogroup 
Arrow/Pandas siblings
    
    ### What changes were proposed in this pull request?
    
    Follow-up to SPARK-57137. Extend the Arrow/Pandas sibling base-mixin 
pattern to the remaining two pairs in `python/benchmarks/bench_eval_type.py`:
    
    - `_WindowAggPandasBenchMixin` now subclasses `_WindowAggArrowBenchMixin`
    - `_CogroupedMapPandasBenchMixin` now subclasses 
`_CogroupedMapArrowBenchMixin`
    
    The shared `_build_scenario` / `_write_scenario` are pulled up into the 
Arrow base, with the eval type parameterized via the `_eval_type` class 
attribute and `_build_scenario` converted from `staticmethod` to `classmethod` 
so subclasses read their own `_scenario_configs` (the same mechanism 
SPARK-57137 used for the Scalar and GroupedAgg pairs).
    
    - Window: the Pandas half drops `_scenario_configs` entirely (identical to 
the Arrow variant) and keeps only `_eval_type`, its UDFs, and `params`.
    - Cogroup: the Arrow `_udfs` values are normalized to `(func, n_args)` 
tuples to match the Pandas sibling, so the inherited `_write_scenario` works 
unchanged. The Pandas half keeps its own scaled-down `_scenario_configs` and 
the extra 3-arg `key_identity_udf` variant.
    
    Net diff: +39 / -102 lines.
    
    ### Why are the changes needed?
    
    These two pairs were intentionally left out of SPARK-57137 to avoid 
conflicting with two in-flight PRs (#56167 made the Window Arrow mixin 
scenarios lazy; #56171 renamed `wide_values` to `wide_cols` in the Cogroup 
pair). Both have since merged, so the pairs can now be folded into the same 
base-class pattern.
    
    Before this change, each sibling pair carried two near-identical 
`_write_scenario` bodies differing only in the `PythonEvalType.SQL_...` 
constant (and, for Cogroup, the UDF set) -- a known footgun where any 
protocol-writing change had to be applied in lock-step across both halves.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. Test-only change in the benchmark module.
    
    ### How was this patch tested?
    
    - Structural: confirmed `_eval_type` resolves correctly via MRO for all 
four affected mixins; confirmed 
`_CogroupedMapPandasBenchMixin._scenario_configs` still holds the scaled-down 
pandas row counts (not the Arrow base's), the main MRO-resolution risk of 
switching `_build_scenario` to `classmethod`.
    - Ran `setup` + `time_worker` end-to-end for all four affected `*TimeBench` 
classes across every UDF (including the Pandas-only 3-arg `key_identity_udf`).
    - Ran `peakmem_worker` (disk-replay path) for the Window and Cogroup Pandas 
classes.
    - Confirmed the generated wire bytes are byte-identical to the pre-refactor 
output for Window and Cogroup Arrow; the Cogroup Pandas UDF pickles differ only 
in the embedded class-hierarchy reference (same length, identical execution), 
matching what SPARK-57137 produced for the Scalar/GroupedAgg pairs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes. Generated-by: Claude Code (claude-opus-4-8)
    
    Closes #56194 from viirya/SPARK-55724-window-cogroup-base.
    
    Authored-by: Liang-Chi Hsieh <[email protected]>
    Signed-off-by: Liang-Chi Hsieh <[email protected]>
---
 python/benchmarks/bench_eval_type.py | 141 ++++++++++-------------------------
 1 file changed, 39 insertions(+), 102 deletions(-)

diff --git a/python/benchmarks/bench_eval_type.py 
b/python/benchmarks/bench_eval_type.py
index 1140ec7db3d6..17b112e5f89f 100644
--- a/python/benchmarks/bench_eval_type.py
+++ b/python/benchmarks/bench_eval_type.py
@@ -758,17 +758,15 @@ class _CogroupedMapArrowBenchMixin:
         "multi_key": (200, 5_000, 3, 5),
     }
 
-    @staticmethod
-    def _build_scenario(name):
+    @classmethod
+    def _build_scenario(cls, name):
         """Build a cogroup scenario: two DataFrames with the same grouping 
structure.
 
         Unlike grouped map (which wraps columns in a struct), cogroup batches
         have flat columns: [key_col_0, ..., key_col_k, val_col_0, ..., 
val_col_v].
         """
         np.random.seed(42)
-        num_groups, rows_per_group, num_key_cols, num_value_cols = (
-            _CogroupedMapArrowBenchMixin._scenario_configs[name]
-        )
+        num_groups, rows_per_group, num_key_cols, num_value_cols = 
cls._scenario_configs[name]
         n_cols = num_key_cols + num_value_cols
         type_pool = MockDataFactory.MIXED_TYPES[:n_cols]
         while len(type_pool) < n_cols:
@@ -784,22 +782,27 @@ class _CogroupedMapArrowBenchMixin:
         return_type = StructType(schema.fields[num_key_cols:])
         return (cogroups, return_type, num_key_cols, num_value_cols)
 
+    _eval_type = PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF
+    # Each UDF entry: (func, n_args). n_args=2 -> func(left, right);
+    # n_args=3 -> func(key, left, right). The Arrow path has no 3-arg variant,
+    # but the tuple shape is shared with the Pandas sibling so 
``_write_scenario``
+    # can be inherited unchanged.
     _udfs = {
-        "identity_udf": _cogrouped_map_arrow_identity,
-        "concat_udf": _cogrouped_map_arrow_concat,
-        "left_semi_udf": _cogrouped_map_arrow_left_semi,
+        "identity_udf": (_cogrouped_map_arrow_identity, 2),
+        "concat_udf": (_cogrouped_map_arrow_concat, 2),
+        "left_semi_udf": (_cogrouped_map_arrow_left_semi, 2),
     }
     params = [list(_scenario_configs), list(_udfs)]
     param_names = ["scenario", "udf"]
 
     def _write_scenario(self, scenario, udf_name, buf):
         groups, schema, num_key_cols, num_value_cols = 
self._build_scenario(scenario)
-        udf_func = self._udfs[udf_name]
+        udf_func, _ = self._udfs[udf_name]
         left_offsets = MockUDFFactory.make_grouped_arg_offsets(num_key_cols, 
num_value_cols)
         right_offsets = MockUDFFactory.make_grouped_arg_offsets(num_key_cols, 
num_value_cols)
         arg_offsets = left_offsets + right_offsets
         MockProtocolWriter.write_worker_input(
-            PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF,
+            self._eval_type,
             lambda b: MockProtocolWriter.write_udf_payload(udf_func, schema, 
arg_offsets, b),
             lambda b: MockProtocolWriter.write_grouped_data_payload(groups, 
buf=b),
             buf,
@@ -819,8 +822,15 @@ class 
CogroupedMapArrowUDFPeakmemBench(_CogroupedMapArrowBenchMixin, _PeakmemBen
 # ``pandas.DataFrame``. Optional 3-arg variant ``(key, left, right)``.
 
 
-class _CogroupedMapPandasBenchMixin:
-    """Provides _write_scenario for SQL_COGROUPED_MAP_PANDAS_UDF."""
+class _CogroupedMapPandasBenchMixin(_CogroupedMapArrowBenchMixin):
+    """Provides _write_scenario for SQL_COGROUPED_MAP_PANDAS_UDF.
+
+    Inherits ``_build_scenario`` and ``_write_scenario`` from the Arrow
+    sibling; only the eval type, the UDFs, and the per-scenario row counts
+    differ. Adds a 3-arg ``key_identity_udf`` variant the Arrow path lacks
+    (``_write_scenario`` ignores the ``n_args`` slot, so the extra entry is
+    handled by the inherited writer).
+    """
 
     def _cogrouped_map_pandas_identity(left, right):
         """Identity cogroup UDF: returns left DataFrame as-is."""
@@ -852,32 +862,7 @@ class _CogroupedMapPandasBenchMixin:
         "multi_key": (100, 1_000, 3, 5),
     }
 
-    @staticmethod
-    def _build_scenario(name):
-        """Build a cogroup scenario: two DataFrames with the same grouping 
structure.
-
-        Like cogrouped arrow, batches have flat columns:
-        [key_col_0, ..., key_col_k, val_col_0, ..., val_col_v].
-        """
-        np.random.seed(42)
-        num_groups, rows_per_group, num_key_cols, num_value_cols = (
-            _CogroupedMapPandasBenchMixin._scenario_configs[name]
-        )
-        n_cols = num_key_cols + num_value_cols
-        type_pool = MockDataFactory.MIXED_TYPES[:n_cols]
-        while len(type_pool) < n_cols:
-            type_pool = type_pool + MockDataFactory.MIXED_TYPES[: n_cols - 
len(type_pool)]
-
-        cogroups, schema = MockDataFactory.make_cogrouped_batches(
-            num_groups=num_groups,
-            num_rows=rows_per_group,
-            num_cols=n_cols,
-            spark_type_pool=type_pool,
-            batch_size=rows_per_group,
-        )
-        return_type = StructType(schema.fields[num_key_cols:])
-        return (cogroups, return_type, num_key_cols, num_value_cols)
-
+    _eval_type = PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF
     # Each UDF entry: (func, n_args). n_args=2 -> func(left, right);
     # n_args=3 -> func(key, left, right).
     _udfs = {
@@ -889,19 +874,6 @@ class _CogroupedMapPandasBenchMixin:
     params = [list(_scenario_configs), list(_udfs)]
     param_names = ["scenario", "udf"]
 
-    def _write_scenario(self, scenario, udf_name, buf):
-        groups, schema, num_key_cols, num_value_cols = 
self._build_scenario(scenario)
-        udf_func, _ = self._udfs[udf_name]
-        left_offsets = MockUDFFactory.make_grouped_arg_offsets(num_key_cols, 
num_value_cols)
-        right_offsets = MockUDFFactory.make_grouped_arg_offsets(num_key_cols, 
num_value_cols)
-        arg_offsets = left_offsets + right_offsets
-        MockProtocolWriter.write_worker_input(
-            PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF,
-            lambda b: MockProtocolWriter.write_udf_payload(udf_func, schema, 
arg_offsets, b),
-            lambda b: MockProtocolWriter.write_grouped_data_payload(groups, 
buf=b),
-            buf,
-        )
-
 
 class CogroupedMapPandasUDFTimeBench(_CogroupedMapPandasBenchMixin, 
_TimeBenchBase):
     pass
@@ -1729,11 +1701,11 @@ class _WindowAggArrowBenchMixin:
         "wide_cols": (200, 5_000, 20),
     }
 
-    @staticmethod
-    def _build_scenario(name):
+    @classmethod
+    def _build_scenario(cls, name):
         """Build a single scenario by name."""
         np.random.seed(42)
-        num_groups, rows_per_group, n_cols = 
_WindowAggArrowBenchMixin._scenario_configs[name]
+        num_groups, rows_per_group, n_cols = cls._scenario_configs[name]
         return MockDataFactory.make_grouped_batches(
             num_groups=num_groups,
             num_rows=rows_per_group,
@@ -1742,6 +1714,7 @@ class _WindowAggArrowBenchMixin:
             batch_size=rows_per_group,
         )
 
+    _eval_type = PythonEvalType.SQL_WINDOW_AGG_ARROW_UDF
     _udfs = {
         "sum_udf": _window_agg_arrow_sum,
         "mean_multi_udf": _window_agg_arrow_mean_multi,
@@ -1765,7 +1738,7 @@ class _WindowAggArrowBenchMixin:
             MockProtocolWriter.write_udf_payload(udf_func, return_type, 
arg_offsets, b)
 
         MockProtocolWriter.write_worker_input(
-            PythonEvalType.SQL_WINDOW_AGG_ARROW_UDF,
+            self._eval_type,
             write_udf,
             lambda b: MockProtocolWriter.write_grouped_data_payload(groups, 
buf=b),
             buf,
@@ -1785,8 +1758,15 @@ class 
WindowAggArrowUDFPeakmemBench(_WindowAggArrowBenchMixin, _PeakmemBenchBase
 # UDF receives ``pd.Series`` columns for the entire window partition, returns 
scalar.
 
 
-class _WindowAggPandasBenchMixin:
-    """Provides _write_scenario for SQL_WINDOW_AGG_PANDAS_UDF."""
+class _WindowAggPandasBenchMixin(_WindowAggArrowBenchMixin):
+    """Provides _write_scenario for SQL_WINDOW_AGG_PANDAS_UDF.
+
+    Inherits ``_build_scenario`` and ``_write_scenario`` from the Arrow
+    sibling; only the eval type and the UDFs differ. ``_scenario_configs``
+    is intentionally identical to the Arrow variant for apples-to-apples
+    comparison (the aggregations are cheap enough that pandas conversion
+    is not the bottleneck here).
+    """
 
     def _window_agg_pandas_sum(col):
         """Sum a single Pandas Series."""
@@ -1796,57 +1776,14 @@ class _WindowAggPandasBenchMixin:
         """Mean of two Pandas Series combined."""
         return (col0.mean() or 0) + (col1.mean() or 0)
 
-    _scenario_configs = {
-        "few_groups_sm": (50, 5_000, 5),
-        "few_groups_lg": (50, 50_000, 5),
-        "many_groups_sm": (2_000, 500, 5),
-        "many_groups_lg": (500, 10_000, 5),
-        "wide_cols": (200, 5_000, 20),
-    }
-
-    @staticmethod
-    def _build_scenario(name):
-        """Build a single scenario by name."""
-        np.random.seed(42)
-        num_groups, rows_per_group, n_cols = 
_WindowAggPandasBenchMixin._scenario_configs[name]
-        return MockDataFactory.make_grouped_batches(
-            num_groups=num_groups,
-            num_rows=rows_per_group,
-            num_cols=n_cols,
-            spark_type_pool=MockDataFactory.NUMERIC_TYPES,
-            batch_size=rows_per_group,
-        )
-
+    _eval_type = PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF
     _udfs = {
         "sum_udf": _window_agg_pandas_sum,
         "mean_multi_udf": _window_agg_pandas_mean_multi,
     }
-    params = [list(_scenario_configs), list(_udfs)]
+    params = [list(_WindowAggArrowBenchMixin._scenario_configs), list(_udfs)]
     param_names = ["scenario", "udf"]
 
-    def _write_scenario(self, scenario, udf_name, buf):
-        groups, _schema = self._build_scenario(scenario)
-        udf_func = self._udfs[udf_name]
-
-        # sum_udf uses 1 arg, mean_multi_udf uses 2 args
-        if "multi" in udf_name:
-            arg_offsets = [0, 1]
-        else:
-            arg_offsets = [0]
-
-        return_type = DoubleType()
-
-        def write_udf(b):
-            MockProtocolWriter.write_udf_payload(udf_func, return_type, 
arg_offsets, b)
-
-        MockProtocolWriter.write_worker_input(
-            PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF,
-            write_udf,
-            lambda b: MockProtocolWriter.write_grouped_data_payload(groups, 
buf=b),
-            buf,
-            runner_conf={"window_bound_types": "unbounded"},
-        )
-
 
 class WindowAggPandasUDFTimeBench(_WindowAggPandasBenchMixin, _TimeBenchBase):
     pass


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to