This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch ckb2
in repository https://gitbox.apache.org/repos/asf/doris.git

commit bb87382ebd0275d97e6e142d8cd82ec8619fde52
Author: BiteTheDDDDt <[email protected]>
AuthorDate: Mon Mar 2 10:56:48 2026 +0800

    opt for StringHashTable's for_each
    
    update
    
    update
    
    update
---
 be/src/exec/common/hash_table/hash_map_context.h   | 369 ++++++++++++++++++++-
 be/src/exec/common/hash_table/string_hash_table.h  |  22 ++
 be/src/exec/operator/aggregation_sink_operator.cpp |  32 +-
 .../exec/operator/aggregation_source_operator.cpp  |   7 +-
 .../distinct_streaming_aggregation_operator.cpp    |   6 +-
 .../operator/streaming_aggregation_operator.cpp    |  48 ++-
 be/src/exec/pipeline/rec_cte_shared_state.h        |   7 +-
 7 files changed, 428 insertions(+), 63 deletions(-)

diff --git a/be/src/exec/common/hash_table/hash_map_context.h 
b/be/src/exec/common/hash_table/hash_map_context.h
index e44c752ad0b..b036cb95590 100644
--- a/be/src/exec/common/hash_table/hash_map_context.h
+++ b/be/src/exec/common/hash_table/hash_map_context.h
@@ -112,7 +112,7 @@ struct MethodBaseInner {
     }
 
     template <bool read>
-    ALWAYS_INLINE void prefetch(size_t i) {
+    void prefetch(size_t i) {
         if (LIKELY(i + HASH_MAP_PREFETCH_DIST < hash_values.size())) {
             hash_table->template prefetch<read>(keys[i + 
HASH_MAP_PREFETCH_DIST],
                                                 hash_values[i + 
HASH_MAP_PREFETCH_DIST]);
@@ -120,19 +120,14 @@ struct MethodBaseInner {
     }
 
     template <typename State>
-    ALWAYS_INLINE auto find(State& state, size_t i) {
-        if constexpr (!is_string_hash_map()) {
-            prefetch<true>(i);
-        }
+    auto find(State& state, size_t i) {
+        prefetch<true>(i);
         return state.find_key_with_hash(*hash_table, i, keys[i], 
hash_values[i]);
     }
 
     template <typename State, typename F, typename FF>
-    ALWAYS_INLINE auto lazy_emplace(State& state, size_t i, F&& creator,
-                                    FF&& creator_for_null_key) {
-        if constexpr (!is_string_hash_map()) {
-            prefetch<false>(i);
-        }
+    auto lazy_emplace(State& state, size_t i, F&& creator, FF&& 
creator_for_null_key) {
+        prefetch<false>(i);
         return state.lazy_emplace_key(*hash_table, i, keys[i], hash_values[i], 
creator,
                                       creator_for_null_key);
     }
@@ -292,6 +287,56 @@ struct MethodSerialized : public MethodBase<TData> {
     }
 };
 
+/// Sub-table group indices for StringHashTable batch operations.
+/// StringHashTable dispatches keys to 6 sub-tables by string length:
+///   group 0: empty strings (size == 0) → m0
+///   group 1: size <= 2 → m1
+///   group 2: size <= 4 → m2
+///   group 3: size <= 8 → m3
+///   group 4: size <= 16 → m4
+///   group 5: size > 16 or trailing zero → ms
+/// By pre-grouping row indices, we can process each sub-table in a batch,
+/// achieving better cache locality and enabling prefetch within each group.
+struct StringKeySubTableGroups {
+    static constexpr int NUM_GROUPS = 6;
+    // Row indices for each sub-table group
+    DorisVector<uint32_t> group_row_indices[NUM_GROUPS];
+
+    void build(const StringRef* keys, uint32_t num_rows) {
+        for (int g = 0; g < NUM_GROUPS; g++) {
+            group_row_indices[g].clear();
+        }
+        // First pass: count sizes for each group to reserve memory
+        uint32_t counts[NUM_GROUPS] = {};
+        for (uint32_t i = 0; i < num_rows; i++) {
+            counts[get_group(keys[i])]++;
+        }
+        for (int g = 0; g < NUM_GROUPS; g++) {
+            group_row_indices[g].reserve(counts[g]);
+        }
+        // Second pass: fill group indices
+        for (uint32_t i = 0; i < num_rows; i++) {
+            group_row_indices[get_group(keys[i])].push_back(i);
+        }
+    }
+
+    static ALWAYS_INLINE int get_group(const StringRef& key) {
+        const size_t sz = key.size;
+        if (sz == 0) {
+            return 0;
+        }
+        if (key.data[sz - 1] == 0) {
+            // Trailing zero: goes to the generic long-string table (ms)
+            return 5;
+        }
+        if (sz <= 2) return 1;
+        if (sz <= 4) return 2;
+        if (sz <= 8) return 3;
+        if (sz <= 16) return 4;
+        return 5;
+    }
+};
+
 template <typename TData>
 struct MethodStringNoCache : public MethodBase<TData> {
     using Base = MethodBase<TData>;
@@ -305,6 +350,9 @@ struct MethodStringNoCache : public MethodBase<TData> {
     // refresh each time probe
     DorisVector<StringRef> _stored_keys;
 
+    // Sub-table groups for batch operations (only used for non-join 
aggregation path)
+    StringKeySubTableGroups _sub_table_groups;
+
     size_t serialized_keys_size(bool is_build) const override {
         return is_build ? (_build_stored_keys.size() * sizeof(StringRef))
                         : (_stored_keys.size() * sizeof(StringRef));
@@ -357,6 +405,10 @@ struct MethodStringNoCache : public MethodBase<TData> {
             Base::init_join_bucket_num(num_rows, bucket_size, null_map);
         } else {
             Base::init_hash_values(num_rows, null_map);
+            // Build sub-table groups for batch emplace/find (only for 
aggregation, not join)
+            if constexpr (Base::is_string_hash_map()) {
+                _sub_table_groups.build(Base::keys, num_rows);
+            }
         }
     }
 
@@ -365,8 +417,305 @@ struct MethodStringNoCache : public MethodBase<TData> {
         key_columns[0]->reserve(num_rows);
         key_columns[0]->insert_many_strings(input_keys.data(), num_rows);
     }
+
+    const StringKeySubTableGroups& get_sub_table_groups() const { return 
_sub_table_groups; }
+};
+
+/// Helper: detect whether HashMap is a nullable-wrapped StringHashMap.
+template <typename HashMap>
+struct IsNullableStringHashMap : std::false_type {};
+
+template <typename Mapped, typename Allocator>
+struct IsNullableStringHashMap<DataWithNullKey<StringHashMap<Mapped, 
Allocator>>> : std::true_type {
 };
 
+/// Helper: get the underlying StringHashTable from a hash table (handles 
DataWithNullKey wrapper).
+template <typename HashMap>
+auto& get_string_hash_table(HashMap& data) {
+    return data;
+}
+
+/// Compile-time key conversion for each sub-table group.
+/// Groups 1-4 use to_string_key<T>(); groups 0 and 5 use StringRef directly.
+/// Returns the converted key for the given group.
+/// For groups 0 and 5, the key is returned as a non-const copy 
(lazy_emplace_if_zero takes Key&).
+template <int GroupIdx>
+auto convert_key_for_submap(const StringRef& origin) {
+    if constexpr (GroupIdx == 0) {
+        return StringRef(origin); // copy — m0 needs non-const Key&
+    } else if constexpr (GroupIdx == 1) {
+        return to_string_key<StringKey2>(origin);
+    } else if constexpr (GroupIdx == 2) {
+        return to_string_key<StringKey4>(origin);
+    } else if constexpr (GroupIdx == 3) {
+        return to_string_key<StringKey8>(origin);
+    } else if constexpr (GroupIdx == 4) {
+        return to_string_key<StringKey16>(origin);
+    } else {
+        return StringRef(origin); // copy — ms uses StringRef as Key
+    }
+}
+
+/// Hash value to use for a given group. Group 0 (empty string) always uses 
hash=0.
+template <int GroupIdx, typename HashValues>
+size_t hash_for_group(const HashValues& hash_values, uint32_t row) {
+    if constexpr (GroupIdx == 0) {
+        return 0;
+    } else {
+        return hash_values[row];
+    }
+}
+
+/// Whether prefetch is useful for a group. Group 0 (StringHashTableEmpty, at 
most 1 element)
+/// does not benefit from prefetch.
+template <int GroupIdx>
+static constexpr bool group_needs_prefetch = (GroupIdx != 0);
+
+/// Process one sub-table group for emplace with result_handler.
+/// Handles nullable null-key check, prefetch, key conversion, and emplace.
+/// pre_handler(row) is called before each emplace, allowing callers to set 
per-row state
+/// (e.g., current row index used inside creator lambdas).
+template <int GroupIdx, bool is_nullable, typename Submap, typename 
HashMethodType, typename State,
+          typename HashMap, typename F, typename FF, typename PreHandler, 
typename ResultHandler>
+void process_submap_emplace(Submap& submap, const uint32_t* indices, size_t 
count,
+                            HashMethodType& agg_method, State& state, HashMap& 
hash_table,
+                            F&& creator, FF&& creator_for_null_key, 
PreHandler&& pre_handler,
+                            ResultHandler&& result_handler) {
+    using Mapped = typename HashMethodType::Mapped;
+    for (size_t j = 0; j < count; j++) {
+        if constexpr (group_needs_prefetch<GroupIdx>) {
+            if (j + HASH_MAP_PREFETCH_DIST < count) {
+                submap.template prefetch<false>(
+                        agg_method.hash_values[indices[j + 
HASH_MAP_PREFETCH_DIST]]);
+            }
+        }
+        uint32_t row = indices[j];
+        pre_handler(row);
+        if constexpr (is_nullable) {
+            if (state.key_column->is_null_at(row)) {
+                bool has_null_key = hash_table.has_null_key_data();
+                hash_table.has_null_key_data() = true;
+                if (!has_null_key) {
+                    std::forward<FF>(creator_for_null_key)(
+                            hash_table.template get_null_key_data<Mapped>());
+                }
+                result_handler(row, hash_table.template 
get_null_key_data<Mapped>());
+                continue;
+            }
+        }
+        const auto& origin = agg_method.keys[row];
+        auto converted_key = convert_key_for_submap<GroupIdx>(origin);
+        typename Submap::LookupResult result;
+        if constexpr (GroupIdx == 0 || GroupIdx == 5) {
+            // Groups 0,5: key and origin are the same StringRef
+            submap.lazy_emplace_with_origin(converted_key, converted_key, 
result,
+                                            
hash_for_group<GroupIdx>(agg_method.hash_values, row),
+                                            std::forward<F>(creator));
+        } else {
+            // Groups 1-4: converted_key differs from origin
+            submap.lazy_emplace_with_origin(converted_key, origin, result,
+                                            
hash_for_group<GroupIdx>(agg_method.hash_values, row),
+                                            std::forward<F>(creator));
+        }
+        result_handler(row, result->get_second());
+    }
+}
+
+/// Process one sub-table group for emplace without result_handler (void 
version).
+/// pre_handler(row) is called before each emplace.
+template <int GroupIdx, bool is_nullable, typename Submap, typename 
HashMethodType, typename State,
+          typename HashMap, typename F, typename FF, typename PreHandler>
+void process_submap_emplace_void(Submap& submap, const uint32_t* indices, 
size_t count,
+                                 HashMethodType& agg_method, State& state, 
HashMap& hash_table,
+                                 F&& creator, FF&& creator_for_null_key, 
PreHandler&& pre_handler) {
+    for (size_t j = 0; j < count; j++) {
+        if constexpr (group_needs_prefetch<GroupIdx>) {
+            if (j + HASH_MAP_PREFETCH_DIST < count) {
+                submap.template prefetch<false>(
+                        agg_method.hash_values[indices[j + 
HASH_MAP_PREFETCH_DIST]]);
+            }
+        }
+        uint32_t row = indices[j];
+        pre_handler(row);
+        if constexpr (is_nullable) {
+            if (state.key_column->is_null_at(row)) {
+                bool has_null_key = hash_table.has_null_key_data();
+                hash_table.has_null_key_data() = true;
+                if (!has_null_key) {
+                    std::forward<FF>(creator_for_null_key)();
+                }
+                continue;
+            }
+        }
+        const auto& origin = agg_method.keys[row];
+        auto converted_key = convert_key_for_submap<GroupIdx>(origin);
+        typename Submap::LookupResult result;
+        if constexpr (GroupIdx == 0 || GroupIdx == 5) {
+            submap.lazy_emplace_with_origin(converted_key, converted_key, 
result,
+                                            
hash_for_group<GroupIdx>(agg_method.hash_values, row),
+                                            std::forward<F>(creator));
+        } else {
+            submap.lazy_emplace_with_origin(converted_key, origin, result,
+                                            
hash_for_group<GroupIdx>(agg_method.hash_values, row),
+                                            std::forward<F>(creator));
+        }
+    }
+}
+
+/// Process one sub-table group for find with result_handler.
+template <int GroupIdx, bool is_nullable, typename Submap, typename 
HashMethodType, typename State,
+          typename HashMap, typename ResultHandler>
+void process_submap_find(Submap& submap, const uint32_t* indices, size_t count,
+                         HashMethodType& agg_method, State& state, HashMap& 
hash_table,
+                         ResultHandler&& result_handler) {
+    using Mapped = typename HashMethodType::Mapped;
+    using FindResult = typename 
ColumnsHashing::columns_hashing_impl::FindResultImpl<Mapped>;
+    for (size_t j = 0; j < count; j++) {
+        if constexpr (group_needs_prefetch<GroupIdx>) {
+            if (j + HASH_MAP_PREFETCH_DIST < count) {
+                submap.template prefetch<true>(
+                        agg_method.hash_values[indices[j + 
HASH_MAP_PREFETCH_DIST]]);
+            }
+        }
+        uint32_t row = indices[j];
+        if constexpr (is_nullable) {
+            if (state.key_column->is_null_at(row)) {
+                if (hash_table.has_null_key_data()) {
+                    FindResult res(&hash_table.template 
get_null_key_data<Mapped>(), true);
+                    result_handler(row, res);
+                } else {
+                    FindResult res(nullptr, false);
+                    result_handler(row, res);
+                }
+                continue;
+            }
+        }
+        auto converted_key = 
convert_key_for_submap<GroupIdx>(agg_method.keys[row]);
+        auto hash = hash_for_group<GroupIdx>(agg_method.hash_values, row);
+        auto it = submap.find(converted_key, hash);
+        if (it) {
+            FindResult res(&it->get_second(), true);
+            result_handler(row, res);
+        } else {
+            FindResult res(nullptr, false);
+            result_handler(row, res);
+        }
+    }
+}
+
+/// Batch emplace helper: for StringHashMap, directly accesses sub-tables 
bypassing dispatch();
+/// for other hash maps, does per-row loop with standard prefetch.
+/// pre_handler(row) is called before each emplace, allowing callers to set 
per-row state
+/// (e.g., current row index used inside creator lambdas).
+/// result_handler(row_index, mapped) is called after each emplace.
+template <typename HashMethodType, typename State, typename F, typename FF, 
typename PreHandler,
+          typename ResultHandler>
+void lazy_emplace_batch(HashMethodType& agg_method, State& state, uint32_t 
num_rows, F&& creator,
+                        FF&& creator_for_null_key, PreHandler&& pre_handler,
+                        ResultHandler&& result_handler) {
+    if constexpr (HashMethodType::is_string_hash_map()) {
+        using HashMap = typename HashMethodType::HashMapType;
+        constexpr bool is_nullable = IsNullableStringHashMap<HashMap>::value;
+
+        auto& hash_table = *agg_method.hash_table;
+        auto& sht = get_string_hash_table(hash_table);
+        const auto& groups = agg_method.get_sub_table_groups();
+
+        sht.visit_submaps([&](auto group_idx, auto& submap) {
+            constexpr int G = decltype(group_idx)::value;
+            const auto& indices = groups.group_row_indices[G];
+            if (!indices.empty()) {
+                process_submap_emplace<G, is_nullable>(
+                        submap, indices.data(), indices.size(), agg_method, 
state, hash_table,
+                        creator, creator_for_null_key, pre_handler, 
result_handler);
+            }
+        });
+    } else {
+        // Standard per-row loop with ahead prefetch
+        for (uint32_t i = 0; i < num_rows; ++i) {
+            agg_method.template prefetch<false>(i);
+            pre_handler(i);
+            result_handler(i, *state.lazy_emplace_key(*agg_method.hash_table, 
i, agg_method.keys[i],
+                                                      
agg_method.hash_values[i], creator,
+                                                      creator_for_null_key));
+        }
+    }
+}
+
+/// Convenience overload without pre_handler (uses no-op).
+template <typename HashMethodType, typename State, typename F, typename FF, 
typename ResultHandler>
+void lazy_emplace_batch(HashMethodType& agg_method, State& state, uint32_t 
num_rows, F&& creator,
+                        FF&& creator_for_null_key, ResultHandler&& 
result_handler) {
+    lazy_emplace_batch(
+            agg_method, state, num_rows, std::forward<F>(creator),
+            std::forward<FF>(creator_for_null_key), [](uint32_t) {},
+            std::forward<ResultHandler>(result_handler));
+}
+
+/// Batch emplace helper (void version): like lazy_emplace_batch but ignores 
the return value.
+/// pre_handler(row) is called before each emplace, allowing callers to update 
captured state
+/// (e.g., the current row index used inside creator lambdas).
+template <typename HashMethodType, typename State, typename F, typename FF, 
typename PreHandler>
+void lazy_emplace_batch_void(HashMethodType& agg_method, State& state, 
uint32_t num_rows,
+                             F&& creator, FF&& creator_for_null_key, 
PreHandler&& pre_handler) {
+    if constexpr (HashMethodType::is_string_hash_map()) {
+        using HashMap = typename HashMethodType::HashMapType;
+        constexpr bool is_nullable = IsNullableStringHashMap<HashMap>::value;
+
+        auto& hash_table = *agg_method.hash_table;
+        auto& sht = get_string_hash_table(hash_table);
+        const auto& groups = agg_method.get_sub_table_groups();
+
+        sht.visit_submaps([&](auto group_idx, auto& submap) {
+            constexpr int G = decltype(group_idx)::value;
+            const auto& indices = groups.group_row_indices[G];
+            if (!indices.empty()) {
+                process_submap_emplace_void<G, is_nullable>(submap, 
indices.data(), indices.size(),
+                                                            agg_method, state, 
hash_table, creator,
+                                                            
creator_for_null_key, pre_handler);
+            }
+        });
+    } else {
+        for (uint32_t i = 0; i < num_rows; ++i) {
+            agg_method.template prefetch<false>(i);
+            pre_handler(i);
+            state.lazy_emplace_key(*agg_method.hash_table, i, 
agg_method.keys[i],
+                                   agg_method.hash_values[i], creator, 
creator_for_null_key);
+        }
+    }
+}
+
+/// Batch find helper: for StringHashMap, directly accesses sub-tables 
bypassing dispatch();
+/// for other hash maps, does per-row loop with standard prefetch.
+template <typename HashMethodType, typename State, typename ResultHandler>
+void find_batch(HashMethodType& agg_method, State& state, uint32_t num_rows,
+                ResultHandler&& result_handler) {
+    if constexpr (HashMethodType::is_string_hash_map()) {
+        using HashMap = typename HashMethodType::HashMapType;
+        constexpr bool is_nullable = IsNullableStringHashMap<HashMap>::value;
+
+        auto& hash_table = *agg_method.hash_table;
+        auto& sht = get_string_hash_table(hash_table);
+        const auto& groups = agg_method.get_sub_table_groups();
+
+        sht.visit_submaps([&](auto group_idx, auto& submap) {
+            constexpr int G = decltype(group_idx)::value;
+            const auto& indices = groups.group_row_indices[G];
+            if (!indices.empty()) {
+                process_submap_find<G, is_nullable>(submap, indices.data(), 
indices.size(),
+                                                    agg_method, state, 
hash_table, result_handler);
+            }
+        });
+    } else {
+        for (uint32_t i = 0; i < num_rows; ++i) {
+            agg_method.template prefetch<true>(i);
+            auto find_result = state.find_key_with_hash(
+                    *agg_method.hash_table, i, agg_method.keys[i], 
agg_method.hash_values[i]);
+            result_handler(i, find_result);
+        }
+    }
+}
+
 /// For the case where there is one numeric key.
 /// FieldType is UInt8/16/32/64 for any type with corresponding bit width.
 template <typename FieldType, typename TData>
diff --git a/be/src/exec/common/hash_table/string_hash_table.h 
b/be/src/exec/common/hash_table/string_hash_table.h
index 166b56bda29..007c2cf8fc9 100644
--- a/be/src/exec/common/hash_table/string_hash_table.h
+++ b/be/src/exec/common/hash_table/string_hash_table.h
@@ -673,6 +673,28 @@ public:
     const_iterator cend() const { return end(); }
     iterator end() { return iterator(this, true); }
 
+    /// Public accessors for sub-tables, enabling direct batch operations
+    /// that bypass dispatch() for better performance (no per-row branching).
+    T0& get_submap_m0() { return m0; }
+    T1& get_submap_m1() { return m1; }
+    T2& get_submap_m2() { return m2; }
+    T3& get_submap_m3() { return m3; }
+    T4& get_submap_m4() { return m4; }
+    Ts& get_submap_ms() { return ms; }
+
+    /// Visit each (group_index, submap) pair with a generic callable.
+    /// Func signature: func(std::integral_constant<int, GroupIdx>, Submap&)
+    /// The integral_constant enables compile-time group dispatch in the 
lambda.
+    template <typename Func>
+    ALWAYS_INLINE void visit_submaps(Func&& func) {
+        func(std::integral_constant<int, 0> {}, m0);
+        func(std::integral_constant<int, 1> {}, m1);
+        func(std::integral_constant<int, 2> {}, m2);
+        func(std::integral_constant<int, 3> {}, m3);
+        func(std::integral_constant<int, 4> {}, m4);
+        func(std::integral_constant<int, 5> {}, ms);
+    }
+
     bool add_elem_size_overflow(size_t add_size) const {
         return m1.add_elem_size_overflow(add_size) || 
m2.add_elem_size_overflow(add_size) ||
                m3.add_elem_size_overflow(add_size) || 
m4.add_elem_size_overflow(add_size) ||
diff --git a/be/src/exec/operator/aggregation_sink_operator.cpp 
b/be/src/exec/operator/aggregation_sink_operator.cpp
index 9317d6c5540..133b91f4f1a 100644
--- a/be/src/exec/operator/aggregation_sink_operator.cpp
+++ b/be/src/exec/operator/aggregation_sink_operator.cpp
@@ -561,10 +561,9 @@ void 
AggSinkLocalState::_emplace_into_hash_table(AggregateDataPtr* places,
                              };
 
                              SCOPED_TIMER(_hash_table_emplace_timer);
-                             for (size_t i = 0; i < num_rows; ++i) {
-                                 places[i] = *agg_method.lazy_emplace(state, 
i, creator,
-                                                                      
creator_for_null_key);
-                             }
+                             lazy_emplace_batch(
+                                     agg_method, state, num_rows, creator, 
creator_for_null_key,
+                                     [&](uint32_t row, auto& mapped) { 
places[row] = mapped; });
 
                              COUNTER_UPDATE(_hash_table_input_counter, 
num_rows);
                          }},
@@ -644,10 +643,10 @@ bool 
AggSinkLocalState::_emplace_into_hash_table_limit(AggregateDataPtr* places,
                               };
 
                               SCOPED_TIMER(_hash_table_emplace_timer);
-                              for (i = 0; i < num_rows; ++i) {
-                                  places[i] = *agg_method.lazy_emplace(state, 
i, creator,
-                                                                       
creator_for_null_key);
-                              }
+                              lazy_emplace_batch(
+                                      agg_method, state, num_rows, creator, 
creator_for_null_key,
+                                      [&](uint32_t row) { i = row; },
+                                      [&](uint32_t row, auto& mapped) { 
places[row] = mapped; });
                               COUNTER_UPDATE(_hash_table_input_counter, 
num_rows);
                               return true;
                           }
@@ -669,15 +668,14 @@ void 
AggSinkLocalState::_find_in_hash_table(AggregateDataPtr* places, ColumnRawP
                              agg_method.init_serialized_keys(key_columns, 
num_rows);
 
                              /// For all rows.
-                             for (size_t i = 0; i < num_rows; ++i) {
-                                 auto find_result = agg_method.find(state, i);
-
-                                 if (find_result.is_found()) {
-                                     places[i] = find_result.get_mapped();
-                                 } else {
-                                     places[i] = nullptr;
-                                 }
-                             }
+                             find_batch(agg_method, state, num_rows,
+                                        [&](uint32_t row, auto& find_result) {
+                                            if (find_result.is_found()) {
+                                                places[row] = 
find_result.get_mapped();
+                                            } else {
+                                                places[row] = nullptr;
+                                            }
+                                        });
                          }},
                _agg_data->method_variant);
 }
diff --git a/be/src/exec/operator/aggregation_source_operator.cpp 
b/be/src/exec/operator/aggregation_source_operator.cpp
index a47d66f96c3..9c6041e4dc4 100644
--- a/be/src/exec/operator/aggregation_source_operator.cpp
+++ b/be/src/exec/operator/aggregation_source_operator.cpp
@@ -578,10 +578,9 @@ void 
AggLocalState::_emplace_into_hash_table(AggregateDataPtr* places, ColumnRaw
                           };
 
                           SCOPED_TIMER(_hash_table_emplace_timer);
-                          for (size_t i = 0; i < num_rows; ++i) {
-                              places[i] = *agg_method.lazy_emplace(state, i, 
creator,
-                                                                   
creator_for_null_key);
-                          }
+                          lazy_emplace_batch(
+                                  agg_method, state, num_rows, creator, 
creator_for_null_key,
+                                  [&](uint32_t row, auto& mapped) { 
places[row] = mapped; });
 
                           COUNTER_UPDATE(_hash_table_input_counter, num_rows);
                           COUNTER_SET(_hash_table_memory_usage,
diff --git a/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp 
b/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp
index c3f4248b513..48a43ca4c39 100644
--- a/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp
@@ -307,9 +307,9 @@ void 
DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct(
                           auto creator_for_null_key = [&]() { 
distinct_row.push_back(row); };
 
                           SCOPED_TIMER(_hash_table_emplace_timer);
-                          for (; row < num_rows; ++row) {
-                              agg_method.lazy_emplace(state, row, creator, 
creator_for_null_key);
-                          }
+                          lazy_emplace_batch_void(agg_method, state, num_rows, 
creator,
+                                                  creator_for_null_key,
+                                                  [&](uint32_t r) { row = r; 
});
 
                           COUNTER_UPDATE(_hash_table_input_counter, num_rows);
                       }},
diff --git a/be/src/exec/operator/streaming_aggregation_operator.cpp 
b/be/src/exec/operator/streaming_aggregation_operator.cpp
index 9d3e338bf6d..96d416aeb7e 100644
--- a/be/src/exec/operator/streaming_aggregation_operator.cpp
+++ b/be/src/exec/operator/streaming_aggregation_operator.cpp
@@ -311,23 +311,22 @@ bool 
StreamingAggLocalState::_should_not_do_pre_agg(size_t rows) {
     const auto spill_streaming_agg_mem_limit = 
p._spill_streaming_agg_mem_limit;
     const bool used_too_much_memory =
             spill_streaming_agg_mem_limit > 0 && _memory_usage() > 
spill_streaming_agg_mem_limit;
-    std::visit(
-            Overload {
-                    [&](std::monostate& arg) {
-                        throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
"uninited hash table");
-                    },
-                    [&](auto& agg_method) {
-                        auto& hash_tbl = *agg_method.hash_table;
-                        /// If too much memory is used during the 
pre-aggregation stage,
-                        /// it is better to output the data directly without 
performing further aggregation.
-                        // do not try to do agg, just init and serialize 
directly return the out_block
-                        if (used_too_much_memory || 
(hash_tbl.add_elem_size_overflow(rows) &&
-                                                     
!_should_expand_preagg_hash_tables())) {
-                            SCOPED_TIMER(_streaming_agg_timer);
-                            ret_flag = true;
-                        }
-                    }},
-            _agg_data->method_variant);
+    std::visit(Overload {[&](std::monostate& arg) {
+                             throw doris::Exception(ErrorCode::INTERNAL_ERROR,
+                                                    "uninited hash table");
+                         },
+                         [&](auto& agg_method) {
+                             auto& hash_tbl = *agg_method.hash_table;
+                             /// If too much memory is used during the 
pre-aggregation stage,
+                             /// it is better to output the data directly 
without performing further aggregation.
+                             // do not try to do agg, just init and serialize 
directly return the out_block
+                             if (used_too_much_memory || 
(hash_tbl.add_elem_size_overflow(rows) &&
+                                                          
!_should_expand_preagg_hash_tables())) {
+                                 SCOPED_TIMER(_streaming_agg_timer);
+                                 ret_flag = true;
+                             }
+                         }},
+               _agg_data->method_variant);
 
     return ret_flag;
 }
@@ -722,10 +721,10 @@ bool 
StreamingAggLocalState::_emplace_into_hash_table_limit(AggregateDataPtr* pl
                               };
 
                               SCOPED_TIMER(_hash_table_emplace_timer);
-                              for (i = 0; i < num_rows; ++i) {
-                                  places[i] = *agg_method.lazy_emplace(state, 
i, creator,
-                                                                       
creator_for_null_key);
-                              }
+                              lazy_emplace_batch(
+                                      agg_method, state, num_rows, creator, 
creator_for_null_key,
+                                      [&](uint32_t row) { i = row; },
+                                      [&](uint32_t row, auto& mapped) { 
places[row] = mapped; });
                               COUNTER_UPDATE(_hash_table_input_counter, 
num_rows);
                               return true;
                           }
@@ -800,10 +799,9 @@ void 
StreamingAggLocalState::_emplace_into_hash_table(AggregateDataPtr* places,
                              };
 
                              SCOPED_TIMER(_hash_table_emplace_timer);
-                             for (size_t i = 0; i < num_rows; ++i) {
-                                 places[i] = *agg_method.lazy_emplace(state, 
i, creator,
-                                                                      
creator_for_null_key);
-                             }
+                             lazy_emplace_batch(
+                                     agg_method, state, num_rows, creator, 
creator_for_null_key,
+                                     [&](uint32_t row, auto& mapped) { 
places[row] = mapped; });
 
                              COUNTER_UPDATE(_hash_table_input_counter, 
num_rows);
                          }},
diff --git a/be/src/exec/pipeline/rec_cte_shared_state.h 
b/be/src/exec/pipeline/rec_cte_shared_state.h
index 466133288dd..4d9fae7608a 100644
--- a/be/src/exec/pipeline/rec_cte_shared_state.h
+++ b/be/src/exec/pipeline/rec_cte_shared_state.h
@@ -81,10 +81,9 @@ struct RecCTESharedState : public BasicSharedState {
                                      };
 
                                      SCOPED_TIMER(hash_table_emplace_timer);
-                                     for (; row < num_rows; ++row) {
-                                         agg_method.lazy_emplace(agg_state, 
row, creator,
-                                                                 
creator_for_null_key);
-                                     }
+                                     lazy_emplace_batch_void(agg_method, 
agg_state, num_rows,
+                                                             creator, 
creator_for_null_key,
+                                                             [&](uint32_t r) { 
row = r; });
                                      COUNTER_UPDATE(hash_table_input_counter, 
num_rows);
                                  }},
                        agg_data->method_variant);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to