This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch ckb_preview
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 8069df515854ddbcd82aa5e71d610830376b4aaa
Author: BiteTheDDDDt <[email protected]>
AuthorDate: Mon Mar 2 10:56:48 2026 +0800

    opt for StringHashTable's for_each
    
    update
    
    update
---
 be/src/pipeline/exec/aggregation_sink_operator.cpp |  56 ++--
 .../pipeline/exec/aggregation_source_operator.cpp  |   7 +-
 .../distinct_streaming_aggregation_operator.cpp    |   6 +-
 .../exec/streaming_aggregation_operator.cpp        |  15 +-
 be/src/pipeline/rec_cte_shared_state.h             |  62 ++--
 be/src/vec/common/hash_table/hash_map_context.h    | 369 ++++++++++++++++++++-
 be/src/vec/common/hash_table/string_hash_table.h   |  22 ++
 7 files changed, 451 insertions(+), 86 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 3af2bfc94f2..bf254be9111 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -565,10 +565,9 @@ void 
AggSinkLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* p
                            };
 
                            SCOPED_TIMER(_hash_table_emplace_timer);
-                           for (size_t i = 0; i < num_rows; ++i) {
-                               places[i] = *agg_method.lazy_emplace(state, i, 
creator,
-                                                                    
creator_for_null_key);
-                           }
+                           vectorized::lazy_emplace_batch(
+                                   agg_method, state, num_rows, creator, 
creator_for_null_key,
+                                   [&](uint32_t row, auto& mapped) { 
places[row] = mapped; });
 
                            COUNTER_UPDATE(_hash_table_input_counter, num_rows);
                        }},
@@ -650,10 +649,10 @@ bool 
AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
                             };
 
                             SCOPED_TIMER(_hash_table_emplace_timer);
-                            for (i = 0; i < num_rows; ++i) {
-                                places[i] = *agg_method.lazy_emplace(state, i, 
creator,
-                                                                     
creator_for_null_key);
-                            }
+                            vectorized::lazy_emplace_batch(
+                                    agg_method, state, num_rows, creator, 
creator_for_null_key,
+                                    [&](uint32_t row) { i = row; },
+                                    [&](uint32_t row, auto& mapped) { 
places[row] = mapped; });
                             COUNTER_UPDATE(_hash_table_input_counter, 
num_rows);
                             return true;
                         }
@@ -665,27 +664,26 @@ bool 
AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
 void AggSinkLocalState::_find_in_hash_table(vectorized::AggregateDataPtr* 
places,
                                             vectorized::ColumnRawPtrs& 
key_columns,
                                             uint32_t num_rows) {
-    std::visit(vectorized::Overload {[&](std::monostate& arg) -> void {
-                                         throw 
doris::Exception(ErrorCode::INTERNAL_ERROR,
-                                                                "uninited hash 
table");
-                                     },
-                                     [&](auto& agg_method) -> void {
-                                         using HashMethodType = 
std::decay_t<decltype(agg_method)>;
-                                         using AggState = typename 
HashMethodType::State;
-                                         AggState state(key_columns);
-                                         
agg_method.init_serialized_keys(key_columns, num_rows);
-
-                                         /// For all rows.
-                                         for (size_t i = 0; i < num_rows; ++i) 
{
-                                             auto find_result = 
agg_method.find(state, i);
-
-                                             if (find_result.is_found()) {
-                                                 places[i] = 
find_result.get_mapped();
-                                             } else {
-                                                 places[i] = nullptr;
-                                             }
-                                         }
-                                     }},
+    std::visit(vectorized::Overload {
+                       [&](std::monostate& arg) -> void {
+                           throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
"uninited hash table");
+                       },
+                       [&](auto& agg_method) -> void {
+                           using HashMethodType = 
std::decay_t<decltype(agg_method)>;
+                           using AggState = typename HashMethodType::State;
+                           AggState state(key_columns);
+                           agg_method.init_serialized_keys(key_columns, 
num_rows);
+
+                           /// For all rows.
+                           vectorized::find_batch(agg_method, state, num_rows,
+                                                  [&](uint32_t row, auto& 
find_result) {
+                                                      if 
(find_result.is_found()) {
+                                                          places[row] = 
find_result.get_mapped();
+                                                      } else {
+                                                          places[row] = 
nullptr;
+                                                      }
+                                                  });
+                       }},
                _agg_data->method_variant);
 }
 
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 846bfdf1c12..4dfc9be8b62 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -587,10 +587,9 @@ void 
AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place
                         };
 
                         SCOPED_TIMER(_hash_table_emplace_timer);
-                        for (size_t i = 0; i < num_rows; ++i) {
-                            places[i] = *agg_method.lazy_emplace(state, i, 
creator,
-                                                                 
creator_for_null_key);
-                        }
+                        vectorized::lazy_emplace_batch(
+                                agg_method, state, num_rows, creator, 
creator_for_null_key,
+                                [&](uint32_t row, auto& mapped) { places[row] 
= mapped; });
 
                         COUNTER_UPDATE(_hash_table_input_counter, num_rows);
                         COUNTER_SET(_hash_table_memory_usage,
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 972fc9ba923..1d39488b6c9 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -310,9 +310,9 @@ void 
DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct(
                         auto creator_for_null_key = [&]() { 
distinct_row.push_back(row); };
 
                         SCOPED_TIMER(_hash_table_emplace_timer);
-                        for (; row < num_rows; ++row) {
-                            agg_method.lazy_emplace(state, row, creator, 
creator_for_null_key);
-                        }
+                        vectorized::lazy_emplace_batch_void(agg_method, state, 
num_rows, creator,
+                                                            
creator_for_null_key,
+                                                            [&](uint32_t r) { 
row = r; });
 
                         COUNTER_UPDATE(_hash_table_input_counter, num_rows);
                     }},
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index fe9aef1662a..86461a8460c 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -728,10 +728,10 @@ bool 
StreamingAggLocalState::_emplace_into_hash_table_limit(vectorized::Aggregat
                             };
 
                             SCOPED_TIMER(_hash_table_emplace_timer);
-                            for (i = 0; i < num_rows; ++i) {
-                                places[i] = *agg_method.lazy_emplace(state, i, 
creator,
-                                                                     
creator_for_null_key);
-                            }
+                            vectorized::lazy_emplace_batch(
+                                    agg_method, state, num_rows, creator, 
creator_for_null_key,
+                                    [&](uint32_t row) { i = row; },
+                                    [&](uint32_t row, auto& mapped) { 
places[row] = mapped; });
                             COUNTER_UPDATE(_hash_table_input_counter, 
num_rows);
                             return true;
                         }
@@ -807,10 +807,9 @@ void 
StreamingAggLocalState::_emplace_into_hash_table(vectorized::AggregateDataP
                            };
 
                            SCOPED_TIMER(_hash_table_emplace_timer);
-                           for (size_t i = 0; i < num_rows; ++i) {
-                               places[i] = *agg_method.lazy_emplace(state, i, 
creator,
-                                                                    
creator_for_null_key);
-                           }
+                           vectorized::lazy_emplace_batch(
+                                   agg_method, state, num_rows, creator, 
creator_for_null_key,
+                                   [&](uint32_t row, auto& mapped) { 
places[row] = mapped; });
 
                            COUNTER_UPDATE(_hash_table_input_counter, num_rows);
                        }},
diff --git a/be/src/pipeline/rec_cte_shared_state.h 
b/be/src/pipeline/rec_cte_shared_state.h
index 8a57143b110..11f32a5993c 100644
--- a/be/src/pipeline/rec_cte_shared_state.h
+++ b/be/src/pipeline/rec_cte_shared_state.h
@@ -57,38 +57,36 @@ struct RecCTESharedState : public BasicSharedState {
                 raw_columns.push_back(col.get());
             }
 
-            std::visit(vectorized::Overload {
-                               [&](std::monostate& arg) -> void {
-                                   throw 
doris::Exception(ErrorCode::INTERNAL_ERROR,
-                                                          "uninited hash 
table");
-                               },
-                               [&](auto& agg_method) -> void {
-                                   SCOPED_TIMER(hash_table_compute_timer);
-                                   using HashMethodType = 
std::decay_t<decltype(agg_method)>;
-                                   using AggState = typename 
HashMethodType::State;
-
-                                   AggState agg_state(raw_columns);
-                                   
agg_method.init_serialized_keys(raw_columns, num_rows);
-                                   distinct_row.clear();
-
-                                   size_t row = 0;
-                                   auto creator = [&](const auto& ctor, auto& 
key, auto& origin) {
-                                       HashMethodType::try_presis_key(key, 
origin, arena);
-                                       ctor(key);
-                                       distinct_row.push_back(row);
-                                   };
-                                   auto creator_for_null_key = [&]() {
-                                       distinct_row.push_back(row);
-                                   };
-
-                                   SCOPED_TIMER(hash_table_emplace_timer);
-                                   for (; row < num_rows; ++row) {
-                                       agg_method.lazy_emplace(agg_state, row, 
creator,
-                                                               
creator_for_null_key);
-                                   }
-                                   COUNTER_UPDATE(hash_table_input_counter, 
num_rows);
-                               }},
-                       agg_data->method_variant);
+            std::visit(
+                    vectorized::Overload {
+                            [&](std::monostate& arg) -> void {
+                                throw 
doris::Exception(ErrorCode::INTERNAL_ERROR,
+                                                       "uninited hash table");
+                            },
+                            [&](auto& agg_method) -> void {
+                                SCOPED_TIMER(hash_table_compute_timer);
+                                using HashMethodType = 
std::decay_t<decltype(agg_method)>;
+                                using AggState = typename 
HashMethodType::State;
+
+                                AggState agg_state(raw_columns);
+                                agg_method.init_serialized_keys(raw_columns, 
num_rows);
+                                distinct_row.clear();
+
+                                size_t row = 0;
+                                auto creator = [&](const auto& ctor, auto& 
key, auto& origin) {
+                                    HashMethodType::try_presis_key(key, 
origin, arena);
+                                    ctor(key);
+                                    distinct_row.push_back(row);
+                                };
+                                auto creator_for_null_key = [&]() { 
distinct_row.push_back(row); };
+
+                                SCOPED_TIMER(hash_table_emplace_timer);
+                                
vectorized::lazy_emplace_batch_void(agg_method, agg_state, num_rows,
+                                                                    creator, 
creator_for_null_key,
+                                                                    
[&](uint32_t r) { row = r; });
+                                COUNTER_UPDATE(hash_table_input_counter, 
num_rows);
+                            }},
+                    agg_data->method_variant);
 
             if (distinct_row.size() == block.rows()) {
                 blocks.emplace_back(std::move(block));
diff --git a/be/src/vec/common/hash_table/hash_map_context.h 
b/be/src/vec/common/hash_table/hash_map_context.h
index 95bc05a5f09..f4cfaa229ca 100644
--- a/be/src/vec/common/hash_table/hash_map_context.h
+++ b/be/src/vec/common/hash_table/hash_map_context.h
@@ -112,7 +112,7 @@ struct MethodBaseInner {
     }
 
     template <bool read>
-    ALWAYS_INLINE void prefetch(size_t i) {
+    void prefetch(size_t i) {
         if (LIKELY(i + HASH_MAP_PREFETCH_DIST < hash_values.size())) {
             hash_table->template prefetch<read>(keys[i + 
HASH_MAP_PREFETCH_DIST],
                                                 hash_values[i + 
HASH_MAP_PREFETCH_DIST]);
@@ -120,19 +120,14 @@ struct MethodBaseInner {
     }
 
     template <typename State>
-    ALWAYS_INLINE auto find(State& state, size_t i) {
-        if constexpr (!is_string_hash_map()) {
-            prefetch<true>(i);
-        }
+    auto find(State& state, size_t i) {
+        prefetch<true>(i);
         return state.find_key_with_hash(*hash_table, i, keys[i], 
hash_values[i]);
     }
 
     template <typename State, typename F, typename FF>
-    ALWAYS_INLINE auto lazy_emplace(State& state, size_t i, F&& creator,
-                                    FF&& creator_for_null_key) {
-        if constexpr (!is_string_hash_map()) {
-            prefetch<false>(i);
-        }
+    auto lazy_emplace(State& state, size_t i, F&& creator, FF&& 
creator_for_null_key) {
+        prefetch<false>(i);
         return state.lazy_emplace_key(*hash_table, i, keys[i], hash_values[i], 
creator,
                                       creator_for_null_key);
     }
@@ -292,6 +287,56 @@ struct MethodSerialized : public MethodBase<TData> {
     }
 };
 
+/// Sub-table group indices for StringHashTable batch operations.
+/// StringHashTable dispatches keys to 6 sub-tables by string length:
+///   group 0: empty strings (size == 0) → m0
+///   group 1: size <= 2 → m1
+///   group 2: size <= 4 → m2
+///   group 3: size <= 8 → m3
+///   group 4: size <= 16 → m4
+///   group 5: size > 16 or trailing zero → ms
+/// By pre-grouping row indices, we can process each sub-table in a batch,
+/// achieving better cache locality and enabling prefetch within each group.
+struct StringKeySubTableGroups {
+    static constexpr int NUM_GROUPS = 6;
+    // Row indices for each sub-table group
+    DorisVector<uint32_t> group_row_indices[NUM_GROUPS];
+
+    void build(const StringRef* keys, uint32_t num_rows) {
+        for (int g = 0; g < NUM_GROUPS; g++) {
+            group_row_indices[g].clear();
+        }
+        // First pass: count sizes for each group to reserve memory
+        uint32_t counts[NUM_GROUPS] = {};
+        for (uint32_t i = 0; i < num_rows; i++) {
+            counts[get_group(keys[i])]++;
+        }
+        for (int g = 0; g < NUM_GROUPS; g++) {
+            group_row_indices[g].reserve(counts[g]);
+        }
+        // Second pass: fill group indices
+        for (uint32_t i = 0; i < num_rows; i++) {
+            group_row_indices[get_group(keys[i])].push_back(i);
+        }
+    }
+
+    static ALWAYS_INLINE int get_group(const StringRef& key) {
+        const size_t sz = key.size;
+        if (sz == 0) {
+            return 0;
+        }
+        if (key.data[sz - 1] == 0) {
+            // Trailing zero: goes to the generic long-string table (ms)
+            return 5;
+        }
+        if (sz <= 2) return 1;
+        if (sz <= 4) return 2;
+        if (sz <= 8) return 3;
+        if (sz <= 16) return 4;
+        return 5;
+    }
+};
+
 template <typename TData>
 struct MethodStringNoCache : public MethodBase<TData> {
     using Base = MethodBase<TData>;
@@ -305,6 +350,9 @@ struct MethodStringNoCache : public MethodBase<TData> {
     // refresh each time probe
     DorisVector<StringRef> _stored_keys;
 
+    // Sub-table groups for batch operations (only used for non-join 
aggregation path)
+    StringKeySubTableGroups _sub_table_groups;
+
     size_t serialized_keys_size(bool is_build) const override {
         return is_build ? (_build_stored_keys.size() * sizeof(StringRef))
                         : (_stored_keys.size() * sizeof(StringRef));
@@ -357,6 +405,10 @@ struct MethodStringNoCache : public MethodBase<TData> {
             Base::init_join_bucket_num(num_rows, bucket_size, null_map);
         } else {
             Base::init_hash_values(num_rows, null_map);
+            // Build sub-table groups for batch emplace/find (only for 
aggregation, not join)
+            if constexpr (Base::is_string_hash_map()) {
+                _sub_table_groups.build(Base::keys, num_rows);
+            }
         }
     }
 
@@ -365,8 +417,305 @@ struct MethodStringNoCache : public MethodBase<TData> {
         key_columns[0]->reserve(num_rows);
         key_columns[0]->insert_many_strings(input_keys.data(), num_rows);
     }
+
+    const StringKeySubTableGroups& get_sub_table_groups() const { return 
_sub_table_groups; }
+};
+
+/// Helper: detect whether HashMap is a nullable-wrapped StringHashMap.
+template <typename HashMap>
+struct IsNullableStringHashMap : std::false_type {};
+
+template <typename Mapped, typename Allocator>
+struct IsNullableStringHashMap<DataWithNullKey<StringHashMap<Mapped, 
Allocator>>> : std::true_type {
 };
 
+/// Helper: get the underlying StringHashTable from a hash table (handles 
DataWithNullKey wrapper).
+template <typename HashMap>
+auto& get_string_hash_table(HashMap& data) {
+    return data;
+}
+
+/// Compile-time key conversion for each sub-table group.
+/// Groups 1-4 use to_string_key<T>(); groups 0 and 5 use StringRef directly.
+/// Returns the converted key for the given group.
+/// For groups 0 and 5, the key is returned as a non-const copy 
(lazy_emplace_if_zero takes Key&).
+template <int GroupIdx>
+auto convert_key_for_submap(const StringRef& origin) {
+    if constexpr (GroupIdx == 0) {
+        return StringRef(origin); // copy — m0 needs non-const Key&
+    } else if constexpr (GroupIdx == 1) {
+        return to_string_key<StringKey2>(origin);
+    } else if constexpr (GroupIdx == 2) {
+        return to_string_key<StringKey4>(origin);
+    } else if constexpr (GroupIdx == 3) {
+        return to_string_key<StringKey8>(origin);
+    } else if constexpr (GroupIdx == 4) {
+        return to_string_key<StringKey16>(origin);
+    } else {
+        return StringRef(origin); // copy — ms uses StringRef as Key
+    }
+}
+
+/// Hash value to use for a given group. Group 0 (empty string) always uses 
hash=0.
+template <int GroupIdx, typename HashValues>
+size_t hash_for_group(const HashValues& hash_values, uint32_t row) {
+    if constexpr (GroupIdx == 0) {
+        return 0;
+    } else {
+        return hash_values[row];
+    }
+}
+
+/// Whether prefetch is useful for a group. Group 0 (StringHashTableEmpty, at 
most 1 element)
+/// does not benefit from prefetch.
+template <int GroupIdx>
+static constexpr bool group_needs_prefetch = (GroupIdx != 0);
+
+/// Process one sub-table group for emplace with result_handler.
+/// Handles nullable null-key check, prefetch, key conversion, and emplace.
+/// pre_handler(row) is called before each emplace, allowing callers to set 
per-row state
+/// (e.g., current row index used inside creator lambdas).
+template <int GroupIdx, bool is_nullable, typename Submap, typename 
HashMethodType, typename State,
+          typename HashMap, typename F, typename FF, typename PreHandler, 
typename ResultHandler>
+void process_submap_emplace(Submap& submap, const uint32_t* indices, size_t 
count,
+                            HashMethodType& agg_method, State& state, HashMap& 
hash_table,
+                            F&& creator, FF&& creator_for_null_key, 
PreHandler&& pre_handler,
+                            ResultHandler&& result_handler) {
+    using Mapped = typename HashMethodType::Mapped;
+    for (size_t j = 0; j < count; j++) {
+        if constexpr (group_needs_prefetch<GroupIdx>) {
+            if (j + HASH_MAP_PREFETCH_DIST < count) {
+                submap.template prefetch<false>(
+                        agg_method.hash_values[indices[j + 
HASH_MAP_PREFETCH_DIST]]);
+            }
+        }
+        uint32_t row = indices[j];
+        pre_handler(row);
+        if constexpr (is_nullable) {
+            if (state.key_column->is_null_at(row)) {
+                bool has_null_key = hash_table.has_null_key_data();
+                hash_table.has_null_key_data() = true;
+                if (!has_null_key) {
+                    std::forward<FF>(creator_for_null_key)(
+                            hash_table.template get_null_key_data<Mapped>());
+                }
+                result_handler(row, hash_table.template 
get_null_key_data<Mapped>());
+                continue;
+            }
+        }
+        const auto& origin = agg_method.keys[row];
+        auto converted_key = convert_key_for_submap<GroupIdx>(origin);
+        typename Submap::LookupResult result;
+        if constexpr (GroupIdx == 0 || GroupIdx == 5) {
+            // Groups 0,5: key and origin are the same StringRef
+            submap.lazy_emplace_with_origin(converted_key, converted_key, 
result,
+                                            
hash_for_group<GroupIdx>(agg_method.hash_values, row),
+                                            std::forward<F>(creator));
+        } else {
+            // Groups 1-4: converted_key differs from origin
+            submap.lazy_emplace_with_origin(converted_key, origin, result,
+                                            
hash_for_group<GroupIdx>(agg_method.hash_values, row),
+                                            std::forward<F>(creator));
+        }
+        result_handler(row, result->get_second());
+    }
+}
+
+/// Process one sub-table group for emplace without result_handler (void 
version).
+/// pre_handler(row) is called before each emplace.
+template <int GroupIdx, bool is_nullable, typename Submap, typename 
HashMethodType, typename State,
+          typename HashMap, typename F, typename FF, typename PreHandler>
+void process_submap_emplace_void(Submap& submap, const uint32_t* indices, 
size_t count,
+                                 HashMethodType& agg_method, State& state, 
HashMap& hash_table,
+                                 F&& creator, FF&& creator_for_null_key, 
PreHandler&& pre_handler) {
+    for (size_t j = 0; j < count; j++) {
+        if constexpr (group_needs_prefetch<GroupIdx>) {
+            if (j + HASH_MAP_PREFETCH_DIST < count) {
+                submap.template prefetch<false>(
+                        agg_method.hash_values[indices[j + 
HASH_MAP_PREFETCH_DIST]]);
+            }
+        }
+        uint32_t row = indices[j];
+        pre_handler(row);
+        if constexpr (is_nullable) {
+            if (state.key_column->is_null_at(row)) {
+                bool has_null_key = hash_table.has_null_key_data();
+                hash_table.has_null_key_data() = true;
+                if (!has_null_key) {
+                    std::forward<FF>(creator_for_null_key)();
+                }
+                continue;
+            }
+        }
+        const auto& origin = agg_method.keys[row];
+        auto converted_key = convert_key_for_submap<GroupIdx>(origin);
+        typename Submap::LookupResult result;
+        if constexpr (GroupIdx == 0 || GroupIdx == 5) {
+            submap.lazy_emplace_with_origin(converted_key, converted_key, 
result,
+                                            
hash_for_group<GroupIdx>(agg_method.hash_values, row),
+                                            std::forward<F>(creator));
+        } else {
+            submap.lazy_emplace_with_origin(converted_key, origin, result,
+                                            
hash_for_group<GroupIdx>(agg_method.hash_values, row),
+                                            std::forward<F>(creator));
+        }
+    }
+}
+
+/// Process one sub-table group for find with result_handler.
+template <int GroupIdx, bool is_nullable, typename Submap, typename 
HashMethodType, typename State,
+          typename HashMap, typename ResultHandler>
+void process_submap_find(Submap& submap, const uint32_t* indices, size_t count,
+                         HashMethodType& agg_method, State& state, HashMap& 
hash_table,
+                         ResultHandler&& result_handler) {
+    using Mapped = typename HashMethodType::Mapped;
+    using FindResult = typename 
ColumnsHashing::columns_hashing_impl::FindResultImpl<Mapped>;
+    for (size_t j = 0; j < count; j++) {
+        if constexpr (group_needs_prefetch<GroupIdx>) {
+            if (j + HASH_MAP_PREFETCH_DIST < count) {
+                submap.template prefetch<true>(
+                        agg_method.hash_values[indices[j + 
HASH_MAP_PREFETCH_DIST]]);
+            }
+        }
+        uint32_t row = indices[j];
+        if constexpr (is_nullable) {
+            if (state.key_column->is_null_at(row)) {
+                if (hash_table.has_null_key_data()) {
+                    FindResult res(&hash_table.template 
get_null_key_data<Mapped>(), true);
+                    result_handler(row, res);
+                } else {
+                    FindResult res(nullptr, false);
+                    result_handler(row, res);
+                }
+                continue;
+            }
+        }
+        auto converted_key = 
convert_key_for_submap<GroupIdx>(agg_method.keys[row]);
+        auto hash = hash_for_group<GroupIdx>(agg_method.hash_values, row);
+        auto it = submap.find(converted_key, hash);
+        if (it) {
+            FindResult res(&it->get_second(), true);
+            result_handler(row, res);
+        } else {
+            FindResult res(nullptr, false);
+            result_handler(row, res);
+        }
+    }
+}
+
+/// Batch emplace helper: for StringHashMap, directly accesses sub-tables 
bypassing dispatch();
+/// for other hash maps, does per-row loop with standard prefetch.
+/// pre_handler(row) is called before each emplace, allowing callers to set 
per-row state
+/// (e.g., current row index used inside creator lambdas).
+/// result_handler(row_index, mapped) is called after each emplace.
+template <typename HashMethodType, typename State, typename F, typename FF, 
typename PreHandler,
+          typename ResultHandler>
+void lazy_emplace_batch(HashMethodType& agg_method, State& state, uint32_t 
num_rows, F&& creator,
+                        FF&& creator_for_null_key, PreHandler&& pre_handler,
+                        ResultHandler&& result_handler) {
+    if constexpr (HashMethodType::is_string_hash_map()) {
+        using HashMap = typename HashMethodType::HashMapType;
+        constexpr bool is_nullable = IsNullableStringHashMap<HashMap>::value;
+
+        auto& hash_table = *agg_method.hash_table;
+        auto& sht = get_string_hash_table(hash_table);
+        const auto& groups = agg_method.get_sub_table_groups();
+
+        sht.visit_submaps([&](auto group_idx, auto& submap) {
+            constexpr int G = decltype(group_idx)::value;
+            const auto& indices = groups.group_row_indices[G];
+            if (!indices.empty()) {
+                process_submap_emplace<G, is_nullable>(
+                        submap, indices.data(), indices.size(), agg_method, 
state, hash_table,
+                        creator, creator_for_null_key, pre_handler, 
result_handler);
+            }
+        });
+    } else {
+        // Standard per-row loop with ahead prefetch
+        for (uint32_t i = 0; i < num_rows; ++i) {
+            agg_method.template prefetch<false>(i);
+            pre_handler(i);
+            result_handler(i, *state.lazy_emplace_key(*agg_method.hash_table, 
i, agg_method.keys[i],
+                                                      
agg_method.hash_values[i], creator,
+                                                      creator_for_null_key));
+        }
+    }
+}
+
+/// Convenience overload without pre_handler (uses no-op).
+template <typename HashMethodType, typename State, typename F, typename FF, 
typename ResultHandler>
+void lazy_emplace_batch(HashMethodType& agg_method, State& state, uint32_t 
num_rows, F&& creator,
+                        FF&& creator_for_null_key, ResultHandler&& 
result_handler) {
+    lazy_emplace_batch(
+            agg_method, state, num_rows, std::forward<F>(creator),
+            std::forward<FF>(creator_for_null_key), [](uint32_t) {},
+            std::forward<ResultHandler>(result_handler));
+}
+
+/// Batch emplace helper (void version): like lazy_emplace_batch but ignores 
the return value.
+/// pre_handler(row) is called before each emplace, allowing callers to update 
captured state
+/// (e.g., the current row index used inside creator lambdas).
+template <typename HashMethodType, typename State, typename F, typename FF, 
typename PreHandler>
+void lazy_emplace_batch_void(HashMethodType& agg_method, State& state, 
uint32_t num_rows,
+                             F&& creator, FF&& creator_for_null_key, 
PreHandler&& pre_handler) {
+    if constexpr (HashMethodType::is_string_hash_map()) {
+        using HashMap = typename HashMethodType::HashMapType;
+        constexpr bool is_nullable = IsNullableStringHashMap<HashMap>::value;
+
+        auto& hash_table = *agg_method.hash_table;
+        auto& sht = get_string_hash_table(hash_table);
+        const auto& groups = agg_method.get_sub_table_groups();
+
+        sht.visit_submaps([&](auto group_idx, auto& submap) {
+            constexpr int G = decltype(group_idx)::value;
+            const auto& indices = groups.group_row_indices[G];
+            if (!indices.empty()) {
+                process_submap_emplace_void<G, is_nullable>(submap, 
indices.data(), indices.size(),
+                                                            agg_method, state, 
hash_table, creator,
+                                                            
creator_for_null_key, pre_handler);
+            }
+        });
+    } else {
+        for (uint32_t i = 0; i < num_rows; ++i) {
+            agg_method.template prefetch<false>(i);
+            pre_handler(i);
+            state.lazy_emplace_key(*agg_method.hash_table, i, 
agg_method.keys[i],
+                                   agg_method.hash_values[i], creator, 
creator_for_null_key);
+        }
+    }
+}
+
+/// Batch find helper: for StringHashMap, directly accesses sub-tables 
bypassing dispatch();
+/// for other hash maps, does per-row loop with standard prefetch.
+template <typename HashMethodType, typename State, typename ResultHandler>
+void find_batch(HashMethodType& agg_method, State& state, uint32_t num_rows,
+                ResultHandler&& result_handler) {
+    if constexpr (HashMethodType::is_string_hash_map()) {
+        using HashMap = typename HashMethodType::HashMapType;
+        constexpr bool is_nullable = IsNullableStringHashMap<HashMap>::value;
+
+        auto& hash_table = *agg_method.hash_table;
+        auto& sht = get_string_hash_table(hash_table);
+        const auto& groups = agg_method.get_sub_table_groups();
+
+        sht.visit_submaps([&](auto group_idx, auto& submap) {
+            constexpr int G = decltype(group_idx)::value;
+            const auto& indices = groups.group_row_indices[G];
+            if (!indices.empty()) {
+                process_submap_find<G, is_nullable>(submap, indices.data(), 
indices.size(),
+                                                    agg_method, state, 
hash_table, result_handler);
+            }
+        });
+    } else {
+        for (uint32_t i = 0; i < num_rows; ++i) {
+            agg_method.template prefetch<true>(i);
+            auto find_result = state.find_key_with_hash(
+                    *agg_method.hash_table, i, agg_method.keys[i], 
agg_method.hash_values[i]);
+            result_handler(i, find_result);
+        }
+    }
+}
+
 /// For the case where there is one numeric key.
 /// FieldType is UInt8/16/32/64 for any type with corresponding bit width.
 template <typename FieldType, typename TData>
diff --git a/be/src/vec/common/hash_table/string_hash_table.h 
b/be/src/vec/common/hash_table/string_hash_table.h
index 82e0542cb75..cdd0b0b8d79 100644
--- a/be/src/vec/common/hash_table/string_hash_table.h
+++ b/be/src/vec/common/hash_table/string_hash_table.h
@@ -673,6 +673,28 @@ public:
     const_iterator cend() const { return end(); }
     iterator end() { return iterator(this, true); }
 
+    /// Public accessors for sub-tables, enabling direct batch operations
+    /// that bypass dispatch() for better performance (no per-row branching).
+    T0& get_submap_m0() { return m0; }
+    T1& get_submap_m1() { return m1; }
+    T2& get_submap_m2() { return m2; }
+    T3& get_submap_m3() { return m3; }
+    T4& get_submap_m4() { return m4; }
+    Ts& get_submap_ms() { return ms; }
+
+    /// Visit each (group_index, submap) pair with a generic callable.
+    /// Func signature: func(std::integral_constant<int, GroupIdx>, Submap&)
+    /// The integral_constant enables compile-time group dispatch in the 
lambda.
+    template <typename Func>
+    ALWAYS_INLINE void visit_submaps(Func&& func) {
+        func(std::integral_constant<int, 0> {}, m0);
+        func(std::integral_constant<int, 1> {}, m1);
+        func(std::integral_constant<int, 2> {}, m2);
+        func(std::integral_constant<int, 3> {}, m3);
+        func(std::integral_constant<int, 4> {}, m4);
+        func(std::integral_constant<int, 5> {}, ms);
+    }
+
     bool add_elem_size_overflow(size_t add_size) const {
         return m1.add_elem_size_overflow(add_size) || 
m2.add_elem_size_overflow(add_size) ||
                m3.add_elem_size_overflow(add_size) || 
m4.add_elem_size_overflow(add_size) ||


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to