This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 61d810c157b [Improvement](agg) opt for StringHashTable's for_each 
(#60941)
61d810c157b is described below

commit 61d810c157b3d0160d0a06612842860f286b4148
Author: Pxl <[email protected]>
AuthorDate: Fri Mar 13 10:51:59 2026 +0800

    [Improvement](agg) opt for StringHashTable's for_each (#60941)
    
    This pull request refactors and optimizes the batch operations for hash
    table insertion and lookup in the aggregation and distinct aggregation
    pipelines. The main improvement is the introduction of more efficient
    batch-processing methods for hash tables, especially for string-keyed
    tables, which now benefit from better cache locality through sub-table
    grouping. Several aggregation and distinct aggregation operators are
    updated to use these new batch methods, resulting in cleaner and more
    efficient code.
    
    **Batch hash table operations and code modernization:**
    
    * Replaced per-row hash table insertion and lookup loops in aggregation
    and streaming aggregation operators (`aggregation_sink_operator.cpp`,
    `aggregation_source_operator.cpp`, `streaming_aggregation_operator.cpp`,
    `distinct_streaming_aggregation_operator.cpp`, and
    `rec_cte_shared_state.h`) with new batch methods:
    `vectorized::lazy_emplace_batch`, `vectorized::lazy_emplace_batch_void`,
    and `vectorized::find_batch`, improving performance and code clarity.
    
[[1]](diffhunk://#diff-df21df88282293d1bbe3223e0d95dcc5afc01da571e3a7c1d9dc9377a758d5daL568-R570)
    
[[2]](diffhunk://#diff-df21df88282293d1bbe3223e0d95dcc5afc01da571e3a7c1d9dc9377a758d5daL653-R655)
    
[[3]](diffhunk://#diff-df21df88282293d1bbe3223e0d95dcc5afc01da571e3a7c1d9dc9377a758d5daL679-R685)
    
[[4]](diffhunk://#diff-d45f483806d3e0d0230fcb32c6c3b5da4a3eee517ac7c8664c3e79338817b44dL590-R592)
    
[[5]](diffhunk://#diff-e78adff1d9b4effff7657f3966e776bf9e8acd9dee58f69df3aa2692a450501aL313-R315)
    
[[6]](diffhunk://#diff-5f2882c1f711fc0954459c6b98a1dcde9b688bb5634be71ce6f585332d8b6497L731-R734)
    
[[7]](diffhunk://#diff-5f2882c1f711fc0954459c6b98a1dcde9b688bb5634be71ce6f585332d8b6497L810-R812)
    
[[8]](diffhunk://#diff-ec838bdfeec4f39ddcf6bc242816257656407c19cec66f72c66a24db6a5080e7L80-R86)
    
    * Updated the hash table method interface in `hash_map_context.h` by
    removing unnecessary `ALWAYS_INLINE` annotations and simplifying the
    `prefetch`, `find`, and `lazy_emplace` methods, making the code easier
    to maintain.
    
    **String-keyed hash table optimizations:**
    
    * Introduced the `StringKeySubTableGroups` struct in
    `hash_map_context.h`, which groups row indices by string length for
    string-keyed hash tables. This enables batch processing by sub-table,
    improving cache locality and performance.
    
[[1]](diffhunk://#diff-b8623712a5a1728bb77cc67b6ee1bbf16ef2b842044f6f6bab64c3fc5c4575f3R290-R339)
    
[[2]](diffhunk://#diff-b8623712a5a1728bb77cc67b6ee1bbf16ef2b842044f6f6bab64c3fc5c4575f3R353-R355)
    
    * Integrated sub-table grouping into the string hash table
    initialization for aggregation (not join), so that batch operations can
    efficiently process each sub-table group.
    
    **Minor code cleanups:**
    
    * Standardized lambda formatting and improved code consistency in
    several places, such as the handling of creators for null keys and
    exception handling in hash table operations.
    
[[1]](diffhunk://#diff-ec838bdfeec4f39ddcf6bc242816257656407c19cec66f72c66a24db6a5080e7L60-R61)
    
[[2]](diffhunk://#diff-df21df88282293d1bbe3223e0d95dcc5afc01da571e3a7c1d9dc9377a758d5daL668-R669)
    
    These changes collectively improve the efficiency and maintainability of
    the aggregation pipeline, especially for workloads involving string
    keys.
---
 be/src/exec/common/hash_table/hash_map_context.h   | 369 ++++++++++++++++++++-
 be/src/exec/common/hash_table/string_hash_table.h  |  22 ++
 be/src/exec/operator/aggregation_sink_operator.cpp |  32 +-
 .../exec/operator/aggregation_source_operator.cpp  |   7 +-
 .../distinct_streaming_aggregation_operator.cpp    |   6 +-
 .../operator/streaming_aggregation_operator.cpp    |  15 +-
 be/src/exec/pipeline/rec_cte_shared_state.h        |   7 +-
 7 files changed, 412 insertions(+), 46 deletions(-)

diff --git a/be/src/exec/common/hash_table/hash_map_context.h 
b/be/src/exec/common/hash_table/hash_map_context.h
index 41e718fbd41..ad0763162e1 100644
--- a/be/src/exec/common/hash_table/hash_map_context.h
+++ b/be/src/exec/common/hash_table/hash_map_context.h
@@ -112,7 +112,7 @@ struct MethodBaseInner {
     }
 
     template <bool read>
-    ALWAYS_INLINE void prefetch(size_t i) {
+    void prefetch(size_t i) {
         if (LIKELY(i + HASH_MAP_PREFETCH_DIST < hash_values.size())) {
             hash_table->template prefetch<read>(keys[i + 
HASH_MAP_PREFETCH_DIST],
                                                 hash_values[i + 
HASH_MAP_PREFETCH_DIST]);
@@ -120,19 +120,14 @@ struct MethodBaseInner {
     }
 
     template <typename State>
-    ALWAYS_INLINE auto find(State& state, size_t i) {
-        if constexpr (!is_string_hash_map()) {
-            prefetch<true>(i);
-        }
+    auto find(State& state, size_t i) {
+        prefetch<true>(i);
         return state.find_key_with_hash(*hash_table, i, keys[i], 
hash_values[i]);
     }
 
     template <typename State, typename F, typename FF>
-    ALWAYS_INLINE auto lazy_emplace(State& state, size_t i, F&& creator,
-                                    FF&& creator_for_null_key) {
-        if constexpr (!is_string_hash_map()) {
-            prefetch<false>(i);
-        }
+    auto lazy_emplace(State& state, size_t i, F&& creator, FF&& 
creator_for_null_key) {
+        prefetch<false>(i);
         return state.lazy_emplace_key(*hash_table, i, keys[i], hash_values[i], 
creator,
                                       creator_for_null_key);
     }
@@ -292,6 +287,56 @@ struct MethodSerialized : public MethodBase<TData> {
     }
 };
 
+/// Sub-table group indices for StringHashTable batch operations.
+/// StringHashTable dispatches keys to 6 sub-tables by string length:
+///   group 0: empty strings (size == 0) → m0
+///   group 1: size <= 2 → m1
+///   group 2: size <= 4 → m2
+///   group 3: size <= 8 → m3
+///   group 4: size <= 16 → m4
+///   group 5: size > 16 or trailing zero → ms
+/// By pre-grouping row indices, we can process each sub-table in a batch,
+/// achieving better cache locality and enabling prefetch within each group.
+struct StringKeySubTableGroups {
+    static constexpr int NUM_GROUPS = 6;
+    // Row indices for each sub-table group
+    DorisVector<uint32_t> group_row_indices[NUM_GROUPS];
+
+    void build(const StringRef* keys, uint32_t num_rows) {
+        for (int g = 0; g < NUM_GROUPS; g++) {
+            group_row_indices[g].clear();
+        }
+        // First pass: count sizes for each group to reserve memory
+        uint32_t counts[NUM_GROUPS] = {};
+        for (uint32_t i = 0; i < num_rows; i++) {
+            counts[get_group(keys[i])]++;
+        }
+        for (int g = 0; g < NUM_GROUPS; g++) {
+            group_row_indices[g].reserve(counts[g]);
+        }
+        // Second pass: fill group indices
+        for (uint32_t i = 0; i < num_rows; i++) {
+            group_row_indices[get_group(keys[i])].push_back(i);
+        }
+    }
+
+    static ALWAYS_INLINE int get_group(const StringRef& key) {
+        const size_t sz = key.size;
+        if (sz == 0) {
+            return 0;
+        }
+        if (key.data[sz - 1] == 0) {
+            // Trailing zero: goes to the generic long-string table (ms)
+            return 5;
+        }
+        if (sz <= 2) return 1;
+        if (sz <= 4) return 2;
+        if (sz <= 8) return 3;
+        if (sz <= 16) return 4;
+        return 5;
+    }
+};
+
 template <typename TData>
 struct MethodStringNoCache : public MethodBase<TData> {
     using Base = MethodBase<TData>;
@@ -305,6 +350,9 @@ struct MethodStringNoCache : public MethodBase<TData> {
     // refresh each time probe
     DorisVector<StringRef> _stored_keys;
 
+    // Sub-table groups for batch operations (only used for non-join 
aggregation path)
+    StringKeySubTableGroups _sub_table_groups;
+
     size_t serialized_keys_size(bool is_build) const override {
         return is_build ? (_build_stored_keys.size() * sizeof(StringRef))
                         : (_stored_keys.size() * sizeof(StringRef));
@@ -357,6 +405,10 @@ struct MethodStringNoCache : public MethodBase<TData> {
             Base::init_join_bucket_num(num_rows, bucket_size, null_map);
         } else {
             Base::init_hash_values(num_rows, null_map);
+            // Build sub-table groups for batch emplace/find (only for 
aggregation, not join)
+            if constexpr (Base::is_string_hash_map()) {
+                _sub_table_groups.build(Base::keys, num_rows);
+            }
         }
     }
 
@@ -365,8 +417,305 @@ struct MethodStringNoCache : public MethodBase<TData> {
         key_columns[0]->reserve(num_rows);
         key_columns[0]->insert_many_strings(input_keys.data(), num_rows);
     }
+
+    const StringKeySubTableGroups& get_sub_table_groups() const { return 
_sub_table_groups; }
+};
+
+/// Helper: detect whether HashMap is a nullable-wrapped StringHashMap.
+template <typename HashMap>
+struct IsNullableStringHashMap : std::false_type {};
+
+template <typename Mapped, typename Allocator>
+struct IsNullableStringHashMap<DataWithNullKey<StringHashMap<Mapped, 
Allocator>>> : std::true_type {
 };
 
+/// Helper: get the underlying StringHashTable from a hash table (handles 
DataWithNullKey wrapper).
+template <typename HashMap>
+auto& get_string_hash_table(HashMap& data) {
+    return data;
+}
+
+/// Compile-time key conversion for each sub-table group.
+/// Groups 1-4 use to_string_key<T>(); groups 0 and 5 use StringRef directly.
+/// Returns the converted key for the given group.
+/// For groups 0 and 5, the key is returned as a non-const copy 
(lazy_emplace_if_zero takes Key&).
+template <int GroupIdx>
+auto convert_key_for_submap(const StringRef& origin) {
+    if constexpr (GroupIdx == 0) {
+        return StringRef(origin); // copy — m0 needs non-const Key&
+    } else if constexpr (GroupIdx == 1) {
+        return to_string_key<StringKey2>(origin);
+    } else if constexpr (GroupIdx == 2) {
+        return to_string_key<StringKey4>(origin);
+    } else if constexpr (GroupIdx == 3) {
+        return to_string_key<StringKey8>(origin);
+    } else if constexpr (GroupIdx == 4) {
+        return to_string_key<StringKey16>(origin);
+    } else {
+        return StringRef(origin); // copy — ms uses StringRef as Key
+    }
+}
+
+/// Hash value to use for a given group. Group 0 (empty string) always uses 
hash=0.
+template <int GroupIdx, typename HashValues>
+size_t hash_for_group(const HashValues& hash_values, uint32_t row) {
+    if constexpr (GroupIdx == 0) {
+        return 0;
+    } else {
+        return hash_values[row];
+    }
+}
+
+/// Whether prefetch is useful for a group. Group 0 (StringHashTableEmpty, at 
most 1 element)
+/// does not benefit from prefetch.
+template <int GroupIdx>
+static constexpr bool group_needs_prefetch = (GroupIdx != 0);
+
+/// Process one sub-table group for emplace with result_handler.
+/// Handles nullable null-key check, prefetch, key conversion, and emplace.
+/// pre_handler(row) is called before each emplace, allowing callers to set 
per-row state
+/// (e.g., current row index used inside creator lambdas).
+template <int GroupIdx, bool is_nullable, typename Submap, typename 
HashMethodType, typename State,
+          typename HashMap, typename F, typename FF, typename PreHandler, 
typename ResultHandler>
+void process_submap_emplace(Submap& submap, const uint32_t* indices, size_t 
count,
+                            HashMethodType& agg_method, State& state, HashMap& 
hash_table,
+                            F&& creator, FF&& creator_for_null_key, 
PreHandler&& pre_handler,
+                            ResultHandler&& result_handler) {
+    using Mapped = typename HashMethodType::Mapped;
+    for (size_t j = 0; j < count; j++) {
+        if constexpr (group_needs_prefetch<GroupIdx>) {
+            if (j + HASH_MAP_PREFETCH_DIST < count) {
+                submap.template prefetch<false>(
+                        agg_method.hash_values[indices[j + 
HASH_MAP_PREFETCH_DIST]]);
+            }
+        }
+        uint32_t row = indices[j];
+        pre_handler(row);
+        if constexpr (is_nullable) {
+            if (state.key_column->is_null_at(row)) {
+                bool has_null_key = hash_table.has_null_key_data();
+                hash_table.has_null_key_data() = true;
+                if (!has_null_key) {
+                    std::forward<FF>(creator_for_null_key)(
+                            hash_table.template get_null_key_data<Mapped>());
+                }
+                result_handler(row, hash_table.template 
get_null_key_data<Mapped>());
+                continue;
+            }
+        }
+        auto origin = agg_method.keys[row];
+        auto converted_key = convert_key_for_submap<GroupIdx>(origin);
+        typename Submap::LookupResult result;
+        if constexpr (GroupIdx == 0 || GroupIdx == 5) {
+            // Groups 0,5: key and origin are the same StringRef
+            submap.lazy_emplace_with_origin(converted_key, converted_key, 
result,
+                                            
hash_for_group<GroupIdx>(agg_method.hash_values, row),
+                                            std::forward<F>(creator));
+        } else {
+            // Groups 1-4: converted_key differs from origin
+            submap.lazy_emplace_with_origin(converted_key, origin, result,
+                                            
hash_for_group<GroupIdx>(agg_method.hash_values, row),
+                                            std::forward<F>(creator));
+        }
+        result_handler(row, result->get_second());
+    }
+}
+
+/// Process one sub-table group for emplace without result_handler (void 
version).
+/// pre_handler(row) is called before each emplace.
+template <int GroupIdx, bool is_nullable, typename Submap, typename 
HashMethodType, typename State,
+          typename HashMap, typename F, typename FF, typename PreHandler>
+void process_submap_emplace_void(Submap& submap, const uint32_t* indices, 
size_t count,
+                                 HashMethodType& agg_method, State& state, 
HashMap& hash_table,
+                                 F&& creator, FF&& creator_for_null_key, 
PreHandler&& pre_handler) {
+    for (size_t j = 0; j < count; j++) {
+        if constexpr (group_needs_prefetch<GroupIdx>) {
+            if (j + HASH_MAP_PREFETCH_DIST < count) {
+                submap.template prefetch<false>(
+                        agg_method.hash_values[indices[j + 
HASH_MAP_PREFETCH_DIST]]);
+            }
+        }
+        uint32_t row = indices[j];
+        pre_handler(row);
+        if constexpr (is_nullable) {
+            if (state.key_column->is_null_at(row)) {
+                bool has_null_key = hash_table.has_null_key_data();
+                hash_table.has_null_key_data() = true;
+                if (!has_null_key) {
+                    std::forward<FF>(creator_for_null_key)();
+                }
+                continue;
+            }
+        }
+        auto origin = agg_method.keys[row];
+        auto converted_key = convert_key_for_submap<GroupIdx>(origin);
+        typename Submap::LookupResult result;
+        if constexpr (GroupIdx == 0 || GroupIdx == 5) {
+            submap.lazy_emplace_with_origin(converted_key, converted_key, 
result,
+                                            
hash_for_group<GroupIdx>(agg_method.hash_values, row),
+                                            std::forward<F>(creator));
+        } else {
+            submap.lazy_emplace_with_origin(converted_key, origin, result,
+                                            
hash_for_group<GroupIdx>(agg_method.hash_values, row),
+                                            std::forward<F>(creator));
+        }
+    }
+}
+
+/// Process one sub-table group for find with result_handler.
+template <int GroupIdx, bool is_nullable, typename Submap, typename 
HashMethodType, typename State,
+          typename HashMap, typename ResultHandler>
+void process_submap_find(Submap& submap, const uint32_t* indices, size_t count,
+                         HashMethodType& agg_method, State& state, HashMap& 
hash_table,
+                         ResultHandler&& result_handler) {
+    using Mapped = typename HashMethodType::Mapped;
+    using FindResult = typename 
ColumnsHashing::columns_hashing_impl::FindResultImpl<Mapped>;
+    for (size_t j = 0; j < count; j++) {
+        if constexpr (group_needs_prefetch<GroupIdx>) {
+            if (j + HASH_MAP_PREFETCH_DIST < count) {
+                submap.template prefetch<true>(
+                        agg_method.hash_values[indices[j + 
HASH_MAP_PREFETCH_DIST]]);
+            }
+        }
+        uint32_t row = indices[j];
+        if constexpr (is_nullable) {
+            if (state.key_column->is_null_at(row)) {
+                if (hash_table.has_null_key_data()) {
+                    FindResult res(&hash_table.template 
get_null_key_data<Mapped>(), true);
+                    result_handler(row, res);
+                } else {
+                    FindResult res(nullptr, false);
+                    result_handler(row, res);
+                }
+                continue;
+            }
+        }
+        auto converted_key = 
convert_key_for_submap<GroupIdx>(agg_method.keys[row]);
+        auto hash = hash_for_group<GroupIdx>(agg_method.hash_values, row);
+        auto it = submap.find(converted_key, hash);
+        if (it) {
+            FindResult res(&it->get_second(), true);
+            result_handler(row, res);
+        } else {
+            FindResult res(nullptr, false);
+            result_handler(row, res);
+        }
+    }
+}
+
+/// Batch emplace helper: for StringHashMap, directly accesses sub-tables 
bypassing dispatch();
+/// for other hash maps, does per-row loop with standard prefetch.
+/// pre_handler(row) is called before each emplace, allowing callers to set 
per-row state
+/// (e.g., current row index used inside creator lambdas).
+/// result_handler(row_index, mapped) is called after each emplace.
+template <typename HashMethodType, typename State, typename F, typename FF, 
typename PreHandler,
+          typename ResultHandler>
+void lazy_emplace_batch(HashMethodType& agg_method, State& state, uint32_t 
num_rows, F&& creator,
+                        FF&& creator_for_null_key, PreHandler&& pre_handler,
+                        ResultHandler&& result_handler) {
+    if constexpr (HashMethodType::is_string_hash_map()) {
+        using HashMap = typename HashMethodType::HashMapType;
+        constexpr bool is_nullable = IsNullableStringHashMap<HashMap>::value;
+
+        auto& hash_table = *agg_method.hash_table;
+        auto& sht = get_string_hash_table(hash_table);
+        const auto& groups = agg_method.get_sub_table_groups();
+
+        sht.visit_submaps([&](auto group_idx, auto& submap) {
+            constexpr int G = decltype(group_idx)::value;
+            const auto& indices = groups.group_row_indices[G];
+            if (!indices.empty()) {
+                process_submap_emplace<G, is_nullable>(
+                        submap, indices.data(), indices.size(), agg_method, 
state, hash_table,
+                        creator, creator_for_null_key, pre_handler, 
result_handler);
+            }
+        });
+    } else {
+        // Standard per-row loop with ahead prefetch
+        for (uint32_t i = 0; i < num_rows; ++i) {
+            agg_method.template prefetch<false>(i);
+            pre_handler(i);
+            result_handler(i, *state.lazy_emplace_key(*agg_method.hash_table, 
i, agg_method.keys[i],
+                                                      
agg_method.hash_values[i], creator,
+                                                      creator_for_null_key));
+        }
+    }
+}
+
+/// Convenience overload without pre_handler (uses no-op).
+template <typename HashMethodType, typename State, typename F, typename FF, 
typename ResultHandler>
+void lazy_emplace_batch(HashMethodType& agg_method, State& state, uint32_t 
num_rows, F&& creator,
+                        FF&& creator_for_null_key, ResultHandler&& 
result_handler) {
+    lazy_emplace_batch(
+            agg_method, state, num_rows, std::forward<F>(creator),
+            std::forward<FF>(creator_for_null_key), [](uint32_t) {},
+            std::forward<ResultHandler>(result_handler));
+}
+
+/// Batch emplace helper (void version): like lazy_emplace_batch but ignores 
the return value.
+/// pre_handler(row) is called before each emplace, allowing callers to update 
captured state
+/// (e.g., the current row index used inside creator lambdas).
+template <typename HashMethodType, typename State, typename F, typename FF, 
typename PreHandler>
+void lazy_emplace_batch_void(HashMethodType& agg_method, State& state, 
uint32_t num_rows,
+                             F&& creator, FF&& creator_for_null_key, 
PreHandler&& pre_handler) {
+    if constexpr (HashMethodType::is_string_hash_map()) {
+        using HashMap = typename HashMethodType::HashMapType;
+        constexpr bool is_nullable = IsNullableStringHashMap<HashMap>::value;
+
+        auto& hash_table = *agg_method.hash_table;
+        auto& sht = get_string_hash_table(hash_table);
+        const auto& groups = agg_method.get_sub_table_groups();
+
+        sht.visit_submaps([&](auto group_idx, auto& submap) {
+            constexpr int G = decltype(group_idx)::value;
+            const auto& indices = groups.group_row_indices[G];
+            if (!indices.empty()) {
+                process_submap_emplace_void<G, is_nullable>(submap, 
indices.data(), indices.size(),
+                                                            agg_method, state, 
hash_table, creator,
+                                                            
creator_for_null_key, pre_handler);
+            }
+        });
+    } else {
+        for (uint32_t i = 0; i < num_rows; ++i) {
+            agg_method.template prefetch<false>(i);
+            pre_handler(i);
+            state.lazy_emplace_key(*agg_method.hash_table, i, 
agg_method.keys[i],
+                                   agg_method.hash_values[i], creator, 
creator_for_null_key);
+        }
+    }
+}
+
+/// Batch find helper: for StringHashMap, directly accesses sub-tables 
bypassing dispatch();
+/// for other hash maps, does per-row loop with standard prefetch.
+template <typename HashMethodType, typename State, typename ResultHandler>
+void find_batch(HashMethodType& agg_method, State& state, uint32_t num_rows,
+                ResultHandler&& result_handler) {
+    if constexpr (HashMethodType::is_string_hash_map()) {
+        using HashMap = typename HashMethodType::HashMapType;
+        constexpr bool is_nullable = IsNullableStringHashMap<HashMap>::value;
+
+        auto& hash_table = *agg_method.hash_table;
+        auto& sht = get_string_hash_table(hash_table);
+        const auto& groups = agg_method.get_sub_table_groups();
+
+        sht.visit_submaps([&](auto group_idx, auto& submap) {
+            constexpr int G = decltype(group_idx)::value;
+            const auto& indices = groups.group_row_indices[G];
+            if (!indices.empty()) {
+                process_submap_find<G, is_nullable>(submap, indices.data(), 
indices.size(),
+                                                    agg_method, state, 
hash_table, result_handler);
+            }
+        });
+    } else {
+        for (uint32_t i = 0; i < num_rows; ++i) {
+            agg_method.template prefetch<true>(i);
+            auto find_result = state.find_key_with_hash(
+                    *agg_method.hash_table, i, agg_method.keys[i], 
agg_method.hash_values[i]);
+            result_handler(i, find_result);
+        }
+    }
+}
+
 /// For the case where there is one numeric key.
 /// FieldType is UInt8/16/32/64 for any type with corresponding bit width.
 template <typename FieldType, typename TData>
diff --git a/be/src/exec/common/hash_table/string_hash_table.h 
b/be/src/exec/common/hash_table/string_hash_table.h
index a110291426f..5372f919739 100644
--- a/be/src/exec/common/hash_table/string_hash_table.h
+++ b/be/src/exec/common/hash_table/string_hash_table.h
@@ -679,6 +679,28 @@ public:
     const_iterator cend() const { return end(); }
     iterator end() { return iterator(this, true); }
 
+    /// Public accessors for sub-tables, enabling direct batch operations
+    /// that bypass dispatch() for better performance (no per-row branching).
+    T0& get_submap_m0() { return m0; }
+    T1& get_submap_m1() { return m1; }
+    T2& get_submap_m2() { return m2; }
+    T3& get_submap_m3() { return m3; }
+    T4& get_submap_m4() { return m4; }
+    Ts& get_submap_ms() { return ms; }
+
+    /// Visit each (group_index, submap) pair with a generic callable.
+    /// Func signature: func(std::integral_constant<int, GroupIdx>, Submap&)
+    /// The integral_constant enables compile-time group dispatch in the 
lambda.
+    template <typename Func>
+    ALWAYS_INLINE void visit_submaps(Func&& func) {
+        func(std::integral_constant<int, 0> {}, m0);
+        func(std::integral_constant<int, 1> {}, m1);
+        func(std::integral_constant<int, 2> {}, m2);
+        func(std::integral_constant<int, 3> {}, m3);
+        func(std::integral_constant<int, 4> {}, m4);
+        func(std::integral_constant<int, 5> {}, ms);
+    }
+
     bool add_elem_size_overflow(size_t add_size) const {
         return m1.add_elem_size_overflow(add_size) || 
m2.add_elem_size_overflow(add_size) ||
                m3.add_elem_size_overflow(add_size) || 
m4.add_elem_size_overflow(add_size) ||
diff --git a/be/src/exec/operator/aggregation_sink_operator.cpp 
b/be/src/exec/operator/aggregation_sink_operator.cpp
index 9317d6c5540..133b91f4f1a 100644
--- a/be/src/exec/operator/aggregation_sink_operator.cpp
+++ b/be/src/exec/operator/aggregation_sink_operator.cpp
@@ -561,10 +561,9 @@ void 
AggSinkLocalState::_emplace_into_hash_table(AggregateDataPtr* places,
                              };
 
                              SCOPED_TIMER(_hash_table_emplace_timer);
-                             for (size_t i = 0; i < num_rows; ++i) {
-                                 places[i] = *agg_method.lazy_emplace(state, 
i, creator,
-                                                                      
creator_for_null_key);
-                             }
+                             lazy_emplace_batch(
+                                     agg_method, state, num_rows, creator, 
creator_for_null_key,
+                                     [&](uint32_t row, auto& mapped) { 
places[row] = mapped; });
 
                              COUNTER_UPDATE(_hash_table_input_counter, 
num_rows);
                          }},
@@ -644,10 +643,10 @@ bool 
AggSinkLocalState::_emplace_into_hash_table_limit(AggregateDataPtr* places,
                               };
 
                               SCOPED_TIMER(_hash_table_emplace_timer);
-                              for (i = 0; i < num_rows; ++i) {
-                                  places[i] = *agg_method.lazy_emplace(state, 
i, creator,
-                                                                       
creator_for_null_key);
-                              }
+                              lazy_emplace_batch(
+                                      agg_method, state, num_rows, creator, 
creator_for_null_key,
+                                      [&](uint32_t row) { i = row; },
+                                      [&](uint32_t row, auto& mapped) { 
places[row] = mapped; });
                               COUNTER_UPDATE(_hash_table_input_counter, 
num_rows);
                               return true;
                           }
@@ -669,15 +668,14 @@ void 
AggSinkLocalState::_find_in_hash_table(AggregateDataPtr* places, ColumnRawP
                              agg_method.init_serialized_keys(key_columns, 
num_rows);
 
                              /// For all rows.
-                             for (size_t i = 0; i < num_rows; ++i) {
-                                 auto find_result = agg_method.find(state, i);
-
-                                 if (find_result.is_found()) {
-                                     places[i] = find_result.get_mapped();
-                                 } else {
-                                     places[i] = nullptr;
-                                 }
-                             }
+                             find_batch(agg_method, state, num_rows,
+                                        [&](uint32_t row, auto& find_result) {
+                                            if (find_result.is_found()) {
+                                                places[row] = 
find_result.get_mapped();
+                                            } else {
+                                                places[row] = nullptr;
+                                            }
+                                        });
                          }},
                _agg_data->method_variant);
 }
diff --git a/be/src/exec/operator/aggregation_source_operator.cpp 
b/be/src/exec/operator/aggregation_source_operator.cpp
index a47d66f96c3..9c6041e4dc4 100644
--- a/be/src/exec/operator/aggregation_source_operator.cpp
+++ b/be/src/exec/operator/aggregation_source_operator.cpp
@@ -578,10 +578,9 @@ void 
AggLocalState::_emplace_into_hash_table(AggregateDataPtr* places, ColumnRaw
                           };
 
                           SCOPED_TIMER(_hash_table_emplace_timer);
-                          for (size_t i = 0; i < num_rows; ++i) {
-                              places[i] = *agg_method.lazy_emplace(state, i, 
creator,
-                                                                   
creator_for_null_key);
-                          }
+                          lazy_emplace_batch(
+                                  agg_method, state, num_rows, creator, 
creator_for_null_key,
+                                  [&](uint32_t row, auto& mapped) { 
places[row] = mapped; });
 
                           COUNTER_UPDATE(_hash_table_input_counter, num_rows);
                           COUNTER_SET(_hash_table_memory_usage,
diff --git a/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp 
b/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp
index c3f4248b513..48a43ca4c39 100644
--- a/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp
@@ -307,9 +307,9 @@ void 
DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct(
                           auto creator_for_null_key = [&]() { 
distinct_row.push_back(row); };
 
                           SCOPED_TIMER(_hash_table_emplace_timer);
-                          for (; row < num_rows; ++row) {
-                              agg_method.lazy_emplace(state, row, creator, 
creator_for_null_key);
-                          }
+                          lazy_emplace_batch_void(agg_method, state, num_rows, 
creator,
+                                                  creator_for_null_key,
+                                                  [&](uint32_t r) { row = r; 
});
 
                           COUNTER_UPDATE(_hash_table_input_counter, num_rows);
                       }},
diff --git a/be/src/exec/operator/streaming_aggregation_operator.cpp 
b/be/src/exec/operator/streaming_aggregation_operator.cpp
index 9d3e338bf6d..4725e8ffb10 100644
--- a/be/src/exec/operator/streaming_aggregation_operator.cpp
+++ b/be/src/exec/operator/streaming_aggregation_operator.cpp
@@ -722,10 +722,10 @@ bool 
StreamingAggLocalState::_emplace_into_hash_table_limit(AggregateDataPtr* pl
                               };
 
                               SCOPED_TIMER(_hash_table_emplace_timer);
-                              for (i = 0; i < num_rows; ++i) {
-                                  places[i] = *agg_method.lazy_emplace(state, 
i, creator,
-                                                                       
creator_for_null_key);
-                              }
+                              lazy_emplace_batch(
+                                      agg_method, state, num_rows, creator, 
creator_for_null_key,
+                                      [&](uint32_t row) { i = row; },
+                                      [&](uint32_t row, auto& mapped) { 
places[row] = mapped; });
                               COUNTER_UPDATE(_hash_table_input_counter, 
num_rows);
                               return true;
                           }
@@ -800,10 +800,9 @@ void 
StreamingAggLocalState::_emplace_into_hash_table(AggregateDataPtr* places,
                              };
 
                              SCOPED_TIMER(_hash_table_emplace_timer);
-                             for (size_t i = 0; i < num_rows; ++i) {
-                                 places[i] = *agg_method.lazy_emplace(state, 
i, creator,
-                                                                      
creator_for_null_key);
-                             }
+                             lazy_emplace_batch(
+                                     agg_method, state, num_rows, creator, 
creator_for_null_key,
+                                     [&](uint32_t row, auto& mapped) { 
places[row] = mapped; });
 
                              COUNTER_UPDATE(_hash_table_input_counter, 
num_rows);
                          }},
diff --git a/be/src/exec/pipeline/rec_cte_shared_state.h 
b/be/src/exec/pipeline/rec_cte_shared_state.h
index 466133288dd..4d9fae7608a 100644
--- a/be/src/exec/pipeline/rec_cte_shared_state.h
+++ b/be/src/exec/pipeline/rec_cte_shared_state.h
@@ -81,10 +81,9 @@ struct RecCTESharedState : public BasicSharedState {
                                      };
 
                                      SCOPED_TIMER(hash_table_emplace_timer);
-                                     for (; row < num_rows; ++row) {
-                                         agg_method.lazy_emplace(agg_state, 
row, creator,
-                                                                 
creator_for_null_key);
-                                     }
+                                     lazy_emplace_batch_void(agg_method, 
agg_state, num_rows,
+                                                             creator, 
creator_for_null_key,
+                                                             [&](uint32_t r) { 
row = r; });
                                      COUNTER_UPDATE(hash_table_input_counter, 
num_rows);
                                  }},
                        agg_data->method_variant);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to