This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 61d810c157b [Improvement](agg) opt for StringHashTable's for_each
(#60941)
61d810c157b is described below
commit 61d810c157b3d0160d0a06612842860f286b4148
Author: Pxl <[email protected]>
AuthorDate: Fri Mar 13 10:51:59 2026 +0800
[Improvement](agg) opt for StringHashTable's for_each (#60941)
This pull request refactors and optimizes the batch operations for hash
table insertion and lookup in the aggregation and distinct aggregation
pipelines. The main improvement is the introduction of more efficient
batch-processing methods for hash tables, especially for string-keyed
tables, which now benefit from better cache locality through sub-table
grouping. Several aggregation and distinct aggregation operators are
updated to use these new batch methods, resulting in cleaner and more
efficient code.
**Batch hash table operations and code modernization:**
* Replaced per-row hash table insertion and lookup loops in aggregation
and streaming aggregation operators (`aggregation_sink_operator.cpp`,
`aggregation_source_operator.cpp`, `streaming_aggregation_operator.cpp`,
`distinct_streaming_aggregation_operator.cpp`, and
`rec_cte_shared_state.h`) with new batch methods:
`vectorized::lazy_emplace_batch`, `vectorized::lazy_emplace_batch_void`,
and `vectorized::find_batch`, improving performance and code clarity.
[[1]](diffhunk://#diff-df21df88282293d1bbe3223e0d95dcc5afc01da571e3a7c1d9dc9377a758d5daL568-R570)
[[2]](diffhunk://#diff-df21df88282293d1bbe3223e0d95dcc5afc01da571e3a7c1d9dc9377a758d5daL653-R655)
[[3]](diffhunk://#diff-df21df88282293d1bbe3223e0d95dcc5afc01da571e3a7c1d9dc9377a758d5daL679-R685)
[[4]](diffhunk://#diff-d45f483806d3e0d0230fcb32c6c3b5da4a3eee517ac7c8664c3e79338817b44dL590-R592)
[[5]](diffhunk://#diff-e78adff1d9b4effff7657f3966e776bf9e8acd9dee58f69df3aa2692a450501aL313-R315)
[[6]](diffhunk://#diff-5f2882c1f711fc0954459c6b98a1dcde9b688bb5634be71ce6f585332d8b6497L731-R734)
[[7]](diffhunk://#diff-5f2882c1f711fc0954459c6b98a1dcde9b688bb5634be71ce6f585332d8b6497L810-R812)
[[8]](diffhunk://#diff-ec838bdfeec4f39ddcf6bc242816257656407c19cec66f72c66a24db6a5080e7L80-R86)
* Updated the hash table method interface in `hash_map_context.h` by
removing unnecessary `ALWAYS_INLINE` annotations and simplifying the
`prefetch`, `find`, and `lazy_emplace` methods, making the code easier
to maintain.
**String-keyed hash table optimizations:**
* Introduced the `StringKeySubTableGroups` struct in
`hash_map_context.h`, which groups row indices by string length for
string-keyed hash tables. This enables batch processing by sub-table,
improving cache locality and performance.
[[1]](diffhunk://#diff-b8623712a5a1728bb77cc67b6ee1bbf16ef2b842044f6f6bab64c3fc5c4575f3R290-R339)
[[2]](diffhunk://#diff-b8623712a5a1728bb77cc67b6ee1bbf16ef2b842044f6f6bab64c3fc5c4575f3R353-R355)
* Integrated sub-table grouping into the string hash table
initialization for aggregation (not join), so that batch operations can
efficiently process each sub-table group.
**Minor code cleanups:**
* Standardized lambda formatting and improved code consistency in
several places, such as the handling of creators for null keys and
exception handling in hash table operations.
[[1]](diffhunk://#diff-ec838bdfeec4f39ddcf6bc242816257656407c19cec66f72c66a24db6a5080e7L60-R61)
[[2]](diffhunk://#diff-df21df88282293d1bbe3223e0d95dcc5afc01da571e3a7c1d9dc9377a758d5daL668-R669)
These changes collectively improve the efficiency and maintainability of
the aggregation pipeline, especially for workloads involving string
keys.
---
be/src/exec/common/hash_table/hash_map_context.h | 369 ++++++++++++++++++++-
be/src/exec/common/hash_table/string_hash_table.h | 22 ++
be/src/exec/operator/aggregation_sink_operator.cpp | 32 +-
.../exec/operator/aggregation_source_operator.cpp | 7 +-
.../distinct_streaming_aggregation_operator.cpp | 6 +-
.../operator/streaming_aggregation_operator.cpp | 15 +-
be/src/exec/pipeline/rec_cte_shared_state.h | 7 +-
7 files changed, 412 insertions(+), 46 deletions(-)
diff --git a/be/src/exec/common/hash_table/hash_map_context.h
b/be/src/exec/common/hash_table/hash_map_context.h
index 41e718fbd41..ad0763162e1 100644
--- a/be/src/exec/common/hash_table/hash_map_context.h
+++ b/be/src/exec/common/hash_table/hash_map_context.h
@@ -112,7 +112,7 @@ struct MethodBaseInner {
}
template <bool read>
- ALWAYS_INLINE void prefetch(size_t i) {
+ void prefetch(size_t i) {
if (LIKELY(i + HASH_MAP_PREFETCH_DIST < hash_values.size())) {
hash_table->template prefetch<read>(keys[i +
HASH_MAP_PREFETCH_DIST],
hash_values[i +
HASH_MAP_PREFETCH_DIST]);
@@ -120,19 +120,14 @@ struct MethodBaseInner {
}
template <typename State>
- ALWAYS_INLINE auto find(State& state, size_t i) {
- if constexpr (!is_string_hash_map()) {
- prefetch<true>(i);
- }
+ auto find(State& state, size_t i) {
+ prefetch<true>(i);
return state.find_key_with_hash(*hash_table, i, keys[i],
hash_values[i]);
}
template <typename State, typename F, typename FF>
- ALWAYS_INLINE auto lazy_emplace(State& state, size_t i, F&& creator,
- FF&& creator_for_null_key) {
- if constexpr (!is_string_hash_map()) {
- prefetch<false>(i);
- }
+ auto lazy_emplace(State& state, size_t i, F&& creator, FF&&
creator_for_null_key) {
+ prefetch<false>(i);
return state.lazy_emplace_key(*hash_table, i, keys[i], hash_values[i],
creator,
creator_for_null_key);
}
@@ -292,6 +287,56 @@ struct MethodSerialized : public MethodBase<TData> {
}
};
+/// Sub-table group indices for StringHashTable batch operations.
+/// StringHashTable dispatches keys to 6 sub-tables by string length:
+/// group 0: empty strings (size == 0) → m0
+/// group 1: size <= 2 → m1
+/// group 2: size <= 4 → m2
+/// group 3: size <= 8 → m3
+/// group 4: size <= 16 → m4
+/// group 5: size > 16 or trailing zero → ms
+/// By pre-grouping row indices, we can process each sub-table in a batch,
+/// achieving better cache locality and enabling prefetch within each group.
+struct StringKeySubTableGroups {
+ static constexpr int NUM_GROUPS = 6;
+ // Row indices for each sub-table group
+ DorisVector<uint32_t> group_row_indices[NUM_GROUPS];
+
+ void build(const StringRef* keys, uint32_t num_rows) {
+ for (int g = 0; g < NUM_GROUPS; g++) {
+ group_row_indices[g].clear();
+ }
+ // First pass: count sizes for each group to reserve memory
+ uint32_t counts[NUM_GROUPS] = {};
+ for (uint32_t i = 0; i < num_rows; i++) {
+ counts[get_group(keys[i])]++;
+ }
+ for (int g = 0; g < NUM_GROUPS; g++) {
+ group_row_indices[g].reserve(counts[g]);
+ }
+ // Second pass: fill group indices
+ for (uint32_t i = 0; i < num_rows; i++) {
+ group_row_indices[get_group(keys[i])].push_back(i);
+ }
+ }
+
+ static ALWAYS_INLINE int get_group(const StringRef& key) {
+ const size_t sz = key.size;
+ if (sz == 0) {
+ return 0;
+ }
+ if (key.data[sz - 1] == 0) {
+ // Trailing zero: goes to the generic long-string table (ms)
+ return 5;
+ }
+ if (sz <= 2) return 1;
+ if (sz <= 4) return 2;
+ if (sz <= 8) return 3;
+ if (sz <= 16) return 4;
+ return 5;
+ }
+};
+
template <typename TData>
struct MethodStringNoCache : public MethodBase<TData> {
using Base = MethodBase<TData>;
@@ -305,6 +350,9 @@ struct MethodStringNoCache : public MethodBase<TData> {
// refresh each time probe
DorisVector<StringRef> _stored_keys;
+ // Sub-table groups for batch operations (only used for non-join
aggregation path)
+ StringKeySubTableGroups _sub_table_groups;
+
size_t serialized_keys_size(bool is_build) const override {
return is_build ? (_build_stored_keys.size() * sizeof(StringRef))
: (_stored_keys.size() * sizeof(StringRef));
@@ -357,6 +405,10 @@ struct MethodStringNoCache : public MethodBase<TData> {
Base::init_join_bucket_num(num_rows, bucket_size, null_map);
} else {
Base::init_hash_values(num_rows, null_map);
+ // Build sub-table groups for batch emplace/find (only for
aggregation, not join)
+ if constexpr (Base::is_string_hash_map()) {
+ _sub_table_groups.build(Base::keys, num_rows);
+ }
}
}
@@ -365,8 +417,305 @@ struct MethodStringNoCache : public MethodBase<TData> {
key_columns[0]->reserve(num_rows);
key_columns[0]->insert_many_strings(input_keys.data(), num_rows);
}
+
+ const StringKeySubTableGroups& get_sub_table_groups() const { return
_sub_table_groups; }
+};
+
+/// Helper: detect whether HashMap is a nullable-wrapped StringHashMap.
+template <typename HashMap>
+struct IsNullableStringHashMap : std::false_type {};
+
+template <typename Mapped, typename Allocator>
+struct IsNullableStringHashMap<DataWithNullKey<StringHashMap<Mapped,
Allocator>>> : std::true_type {
};
+/// Helper: get the underlying StringHashTable from a hash table (handles
DataWithNullKey wrapper).
+template <typename HashMap>
+auto& get_string_hash_table(HashMap& data) {
+ return data;
+}
+
+/// Compile-time key conversion for each sub-table group.
+/// Groups 1-4 use to_string_key<T>(); groups 0 and 5 use StringRef directly.
+/// Returns the converted key for the given group.
+/// For groups 0 and 5, the key is returned as a non-const copy
(lazy_emplace_if_zero takes Key&).
+template <int GroupIdx>
+auto convert_key_for_submap(const StringRef& origin) {
+ if constexpr (GroupIdx == 0) {
+ return StringRef(origin); // copy — m0 needs non-const Key&
+ } else if constexpr (GroupIdx == 1) {
+ return to_string_key<StringKey2>(origin);
+ } else if constexpr (GroupIdx == 2) {
+ return to_string_key<StringKey4>(origin);
+ } else if constexpr (GroupIdx == 3) {
+ return to_string_key<StringKey8>(origin);
+ } else if constexpr (GroupIdx == 4) {
+ return to_string_key<StringKey16>(origin);
+ } else {
+ return StringRef(origin); // copy — ms uses StringRef as Key
+ }
+}
+
+/// Hash value to use for a given group. Group 0 (empty string) always uses
hash=0.
+template <int GroupIdx, typename HashValues>
+size_t hash_for_group(const HashValues& hash_values, uint32_t row) {
+ if constexpr (GroupIdx == 0) {
+ return 0;
+ } else {
+ return hash_values[row];
+ }
+}
+
+/// Whether prefetch is useful for a group. Group 0 (StringHashTableEmpty, at
most 1 element)
+/// does not benefit from prefetch.
+template <int GroupIdx>
+static constexpr bool group_needs_prefetch = (GroupIdx != 0);
+
+/// Process one sub-table group for emplace with result_handler.
+/// Handles nullable null-key check, prefetch, key conversion, and emplace.
+/// pre_handler(row) is called before each emplace, allowing callers to set
per-row state
+/// (e.g., current row index used inside creator lambdas).
+template <int GroupIdx, bool is_nullable, typename Submap, typename
HashMethodType, typename State,
+ typename HashMap, typename F, typename FF, typename PreHandler,
typename ResultHandler>
+void process_submap_emplace(Submap& submap, const uint32_t* indices, size_t
count,
+ HashMethodType& agg_method, State& state, HashMap&
hash_table,
+ F&& creator, FF&& creator_for_null_key,
PreHandler&& pre_handler,
+ ResultHandler&& result_handler) {
+ using Mapped = typename HashMethodType::Mapped;
+ for (size_t j = 0; j < count; j++) {
+ if constexpr (group_needs_prefetch<GroupIdx>) {
+ if (j + HASH_MAP_PREFETCH_DIST < count) {
+ submap.template prefetch<false>(
+ agg_method.hash_values[indices[j +
HASH_MAP_PREFETCH_DIST]]);
+ }
+ }
+ uint32_t row = indices[j];
+ pre_handler(row);
+ if constexpr (is_nullable) {
+ if (state.key_column->is_null_at(row)) {
+ bool has_null_key = hash_table.has_null_key_data();
+ hash_table.has_null_key_data() = true;
+ if (!has_null_key) {
+ std::forward<FF>(creator_for_null_key)(
+ hash_table.template get_null_key_data<Mapped>());
+ }
+ result_handler(row, hash_table.template
get_null_key_data<Mapped>());
+ continue;
+ }
+ }
+ auto origin = agg_method.keys[row];
+ auto converted_key = convert_key_for_submap<GroupIdx>(origin);
+ typename Submap::LookupResult result;
+ if constexpr (GroupIdx == 0 || GroupIdx == 5) {
+ // Groups 0,5: key and origin are the same StringRef
+ submap.lazy_emplace_with_origin(converted_key, converted_key,
result,
+
hash_for_group<GroupIdx>(agg_method.hash_values, row),
+ std::forward<F>(creator));
+ } else {
+ // Groups 1-4: converted_key differs from origin
+ submap.lazy_emplace_with_origin(converted_key, origin, result,
+
hash_for_group<GroupIdx>(agg_method.hash_values, row),
+ std::forward<F>(creator));
+ }
+ result_handler(row, result->get_second());
+ }
+}
+
+/// Process one sub-table group for emplace without result_handler (void
version).
+/// pre_handler(row) is called before each emplace.
+template <int GroupIdx, bool is_nullable, typename Submap, typename
HashMethodType, typename State,
+ typename HashMap, typename F, typename FF, typename PreHandler>
+void process_submap_emplace_void(Submap& submap, const uint32_t* indices,
size_t count,
+ HashMethodType& agg_method, State& state,
HashMap& hash_table,
+ F&& creator, FF&& creator_for_null_key,
PreHandler&& pre_handler) {
+ for (size_t j = 0; j < count; j++) {
+ if constexpr (group_needs_prefetch<GroupIdx>) {
+ if (j + HASH_MAP_PREFETCH_DIST < count) {
+ submap.template prefetch<false>(
+ agg_method.hash_values[indices[j +
HASH_MAP_PREFETCH_DIST]]);
+ }
+ }
+ uint32_t row = indices[j];
+ pre_handler(row);
+ if constexpr (is_nullable) {
+ if (state.key_column->is_null_at(row)) {
+ bool has_null_key = hash_table.has_null_key_data();
+ hash_table.has_null_key_data() = true;
+ if (!has_null_key) {
+ std::forward<FF>(creator_for_null_key)();
+ }
+ continue;
+ }
+ }
+ auto origin = agg_method.keys[row];
+ auto converted_key = convert_key_for_submap<GroupIdx>(origin);
+ typename Submap::LookupResult result;
+ if constexpr (GroupIdx == 0 || GroupIdx == 5) {
+ submap.lazy_emplace_with_origin(converted_key, converted_key,
result,
+
hash_for_group<GroupIdx>(agg_method.hash_values, row),
+ std::forward<F>(creator));
+ } else {
+ submap.lazy_emplace_with_origin(converted_key, origin, result,
+
hash_for_group<GroupIdx>(agg_method.hash_values, row),
+ std::forward<F>(creator));
+ }
+ }
+}
+
+/// Process one sub-table group for find with result_handler.
+template <int GroupIdx, bool is_nullable, typename Submap, typename
HashMethodType, typename State,
+ typename HashMap, typename ResultHandler>
+void process_submap_find(Submap& submap, const uint32_t* indices, size_t count,
+ HashMethodType& agg_method, State& state, HashMap&
hash_table,
+ ResultHandler&& result_handler) {
+ using Mapped = typename HashMethodType::Mapped;
+ using FindResult = typename
ColumnsHashing::columns_hashing_impl::FindResultImpl<Mapped>;
+ for (size_t j = 0; j < count; j++) {
+ if constexpr (group_needs_prefetch<GroupIdx>) {
+ if (j + HASH_MAP_PREFETCH_DIST < count) {
+ submap.template prefetch<true>(
+ agg_method.hash_values[indices[j +
HASH_MAP_PREFETCH_DIST]]);
+ }
+ }
+ uint32_t row = indices[j];
+ if constexpr (is_nullable) {
+ if (state.key_column->is_null_at(row)) {
+ if (hash_table.has_null_key_data()) {
+ FindResult res(&hash_table.template
get_null_key_data<Mapped>(), true);
+ result_handler(row, res);
+ } else {
+ FindResult res(nullptr, false);
+ result_handler(row, res);
+ }
+ continue;
+ }
+ }
+ auto converted_key =
convert_key_for_submap<GroupIdx>(agg_method.keys[row]);
+ auto hash = hash_for_group<GroupIdx>(agg_method.hash_values, row);
+ auto it = submap.find(converted_key, hash);
+ if (it) {
+ FindResult res(&it->get_second(), true);
+ result_handler(row, res);
+ } else {
+ FindResult res(nullptr, false);
+ result_handler(row, res);
+ }
+ }
+}
+
+/// Batch emplace helper: for StringHashMap, directly accesses sub-tables
bypassing dispatch();
+/// for other hash maps, does per-row loop with standard prefetch.
+/// pre_handler(row) is called before each emplace, allowing callers to set
per-row state
+/// (e.g., current row index used inside creator lambdas).
+/// result_handler(row_index, mapped) is called after each emplace.
+template <typename HashMethodType, typename State, typename F, typename FF,
typename PreHandler,
+ typename ResultHandler>
+void lazy_emplace_batch(HashMethodType& agg_method, State& state, uint32_t
num_rows, F&& creator,
+ FF&& creator_for_null_key, PreHandler&& pre_handler,
+ ResultHandler&& result_handler) {
+ if constexpr (HashMethodType::is_string_hash_map()) {
+ using HashMap = typename HashMethodType::HashMapType;
+ constexpr bool is_nullable = IsNullableStringHashMap<HashMap>::value;
+
+ auto& hash_table = *agg_method.hash_table;
+ auto& sht = get_string_hash_table(hash_table);
+ const auto& groups = agg_method.get_sub_table_groups();
+
+ sht.visit_submaps([&](auto group_idx, auto& submap) {
+ constexpr int G = decltype(group_idx)::value;
+ const auto& indices = groups.group_row_indices[G];
+ if (!indices.empty()) {
+ process_submap_emplace<G, is_nullable>(
+ submap, indices.data(), indices.size(), agg_method,
state, hash_table,
+ creator, creator_for_null_key, pre_handler,
result_handler);
+ }
+ });
+ } else {
+ // Standard per-row loop with ahead prefetch
+ for (uint32_t i = 0; i < num_rows; ++i) {
+ agg_method.template prefetch<false>(i);
+ pre_handler(i);
+ result_handler(i, *state.lazy_emplace_key(*agg_method.hash_table,
i, agg_method.keys[i],
+
agg_method.hash_values[i], creator,
+ creator_for_null_key));
+ }
+ }
+}
+
+/// Convenience overload without pre_handler (uses no-op).
+template <typename HashMethodType, typename State, typename F, typename FF,
typename ResultHandler>
+void lazy_emplace_batch(HashMethodType& agg_method, State& state, uint32_t
num_rows, F&& creator,
+ FF&& creator_for_null_key, ResultHandler&&
result_handler) {
+ lazy_emplace_batch(
+ agg_method, state, num_rows, std::forward<F>(creator),
+ std::forward<FF>(creator_for_null_key), [](uint32_t) {},
+ std::forward<ResultHandler>(result_handler));
+}
+
+/// Batch emplace helper (void version): like lazy_emplace_batch but ignores
the return value.
+/// pre_handler(row) is called before each emplace, allowing callers to update
captured state
+/// (e.g., the current row index used inside creator lambdas).
+template <typename HashMethodType, typename State, typename F, typename FF,
typename PreHandler>
+void lazy_emplace_batch_void(HashMethodType& agg_method, State& state,
uint32_t num_rows,
+ F&& creator, FF&& creator_for_null_key,
PreHandler&& pre_handler) {
+ if constexpr (HashMethodType::is_string_hash_map()) {
+ using HashMap = typename HashMethodType::HashMapType;
+ constexpr bool is_nullable = IsNullableStringHashMap<HashMap>::value;
+
+ auto& hash_table = *agg_method.hash_table;
+ auto& sht = get_string_hash_table(hash_table);
+ const auto& groups = agg_method.get_sub_table_groups();
+
+ sht.visit_submaps([&](auto group_idx, auto& submap) {
+ constexpr int G = decltype(group_idx)::value;
+ const auto& indices = groups.group_row_indices[G];
+ if (!indices.empty()) {
+ process_submap_emplace_void<G, is_nullable>(submap,
indices.data(), indices.size(),
+ agg_method, state,
hash_table, creator,
+
creator_for_null_key, pre_handler);
+ }
+ });
+ } else {
+ for (uint32_t i = 0; i < num_rows; ++i) {
+ agg_method.template prefetch<false>(i);
+ pre_handler(i);
+ state.lazy_emplace_key(*agg_method.hash_table, i,
agg_method.keys[i],
+ agg_method.hash_values[i], creator,
creator_for_null_key);
+ }
+ }
+}
+
+/// Batch find helper: for StringHashMap, directly accesses sub-tables
bypassing dispatch();
+/// for other hash maps, does per-row loop with standard prefetch.
+template <typename HashMethodType, typename State, typename ResultHandler>
+void find_batch(HashMethodType& agg_method, State& state, uint32_t num_rows,
+ ResultHandler&& result_handler) {
+ if constexpr (HashMethodType::is_string_hash_map()) {
+ using HashMap = typename HashMethodType::HashMapType;
+ constexpr bool is_nullable = IsNullableStringHashMap<HashMap>::value;
+
+ auto& hash_table = *agg_method.hash_table;
+ auto& sht = get_string_hash_table(hash_table);
+ const auto& groups = agg_method.get_sub_table_groups();
+
+ sht.visit_submaps([&](auto group_idx, auto& submap) {
+ constexpr int G = decltype(group_idx)::value;
+ const auto& indices = groups.group_row_indices[G];
+ if (!indices.empty()) {
+ process_submap_find<G, is_nullable>(submap, indices.data(),
indices.size(),
+ agg_method, state,
hash_table, result_handler);
+ }
+ });
+ } else {
+ for (uint32_t i = 0; i < num_rows; ++i) {
+ agg_method.template prefetch<true>(i);
+ auto find_result = state.find_key_with_hash(
+ *agg_method.hash_table, i, agg_method.keys[i],
agg_method.hash_values[i]);
+ result_handler(i, find_result);
+ }
+ }
+}
+
/// For the case where there is one numeric key.
/// FieldType is UInt8/16/32/64 for any type with corresponding bit width.
template <typename FieldType, typename TData>
diff --git a/be/src/exec/common/hash_table/string_hash_table.h
b/be/src/exec/common/hash_table/string_hash_table.h
index a110291426f..5372f919739 100644
--- a/be/src/exec/common/hash_table/string_hash_table.h
+++ b/be/src/exec/common/hash_table/string_hash_table.h
@@ -679,6 +679,28 @@ public:
const_iterator cend() const { return end(); }
iterator end() { return iterator(this, true); }
+ /// Public accessors for sub-tables, enabling direct batch operations
+ /// that bypass dispatch() for better performance (no per-row branching).
+ T0& get_submap_m0() { return m0; }
+ T1& get_submap_m1() { return m1; }
+ T2& get_submap_m2() { return m2; }
+ T3& get_submap_m3() { return m3; }
+ T4& get_submap_m4() { return m4; }
+ Ts& get_submap_ms() { return ms; }
+
+ /// Visit each (group_index, submap) pair with a generic callable.
+ /// Func signature: func(std::integral_constant<int, GroupIdx>, Submap&)
+ /// The integral_constant enables compile-time group dispatch in the
lambda.
+ template <typename Func>
+ ALWAYS_INLINE void visit_submaps(Func&& func) {
+ func(std::integral_constant<int, 0> {}, m0);
+ func(std::integral_constant<int, 1> {}, m1);
+ func(std::integral_constant<int, 2> {}, m2);
+ func(std::integral_constant<int, 3> {}, m3);
+ func(std::integral_constant<int, 4> {}, m4);
+ func(std::integral_constant<int, 5> {}, ms);
+ }
+
bool add_elem_size_overflow(size_t add_size) const {
return m1.add_elem_size_overflow(add_size) ||
m2.add_elem_size_overflow(add_size) ||
m3.add_elem_size_overflow(add_size) ||
m4.add_elem_size_overflow(add_size) ||
diff --git a/be/src/exec/operator/aggregation_sink_operator.cpp
b/be/src/exec/operator/aggregation_sink_operator.cpp
index 9317d6c5540..133b91f4f1a 100644
--- a/be/src/exec/operator/aggregation_sink_operator.cpp
+++ b/be/src/exec/operator/aggregation_sink_operator.cpp
@@ -561,10 +561,9 @@ void
AggSinkLocalState::_emplace_into_hash_table(AggregateDataPtr* places,
};
SCOPED_TIMER(_hash_table_emplace_timer);
- for (size_t i = 0; i < num_rows; ++i) {
- places[i] = *agg_method.lazy_emplace(state,
i, creator,
-
creator_for_null_key);
- }
+ lazy_emplace_batch(
+ agg_method, state, num_rows, creator,
creator_for_null_key,
+ [&](uint32_t row, auto& mapped) {
places[row] = mapped; });
COUNTER_UPDATE(_hash_table_input_counter,
num_rows);
}},
@@ -644,10 +643,10 @@ bool
AggSinkLocalState::_emplace_into_hash_table_limit(AggregateDataPtr* places,
};
SCOPED_TIMER(_hash_table_emplace_timer);
- for (i = 0; i < num_rows; ++i) {
- places[i] = *agg_method.lazy_emplace(state,
i, creator,
-
creator_for_null_key);
- }
+ lazy_emplace_batch(
+ agg_method, state, num_rows, creator,
creator_for_null_key,
+ [&](uint32_t row) { i = row; },
+ [&](uint32_t row, auto& mapped) {
places[row] = mapped; });
COUNTER_UPDATE(_hash_table_input_counter,
num_rows);
return true;
}
@@ -669,15 +668,14 @@ void
AggSinkLocalState::_find_in_hash_table(AggregateDataPtr* places, ColumnRawP
agg_method.init_serialized_keys(key_columns,
num_rows);
/// For all rows.
- for (size_t i = 0; i < num_rows; ++i) {
- auto find_result = agg_method.find(state, i);
-
- if (find_result.is_found()) {
- places[i] = find_result.get_mapped();
- } else {
- places[i] = nullptr;
- }
- }
+ find_batch(agg_method, state, num_rows,
+ [&](uint32_t row, auto& find_result) {
+ if (find_result.is_found()) {
+ places[row] =
find_result.get_mapped();
+ } else {
+ places[row] = nullptr;
+ }
+ });
}},
_agg_data->method_variant);
}
diff --git a/be/src/exec/operator/aggregation_source_operator.cpp
b/be/src/exec/operator/aggregation_source_operator.cpp
index a47d66f96c3..9c6041e4dc4 100644
--- a/be/src/exec/operator/aggregation_source_operator.cpp
+++ b/be/src/exec/operator/aggregation_source_operator.cpp
@@ -578,10 +578,9 @@ void
AggLocalState::_emplace_into_hash_table(AggregateDataPtr* places, ColumnRaw
};
SCOPED_TIMER(_hash_table_emplace_timer);
- for (size_t i = 0; i < num_rows; ++i) {
- places[i] = *agg_method.lazy_emplace(state, i,
creator,
-
creator_for_null_key);
- }
+ lazy_emplace_batch(
+ agg_method, state, num_rows, creator,
creator_for_null_key,
+ [&](uint32_t row, auto& mapped) {
places[row] = mapped; });
COUNTER_UPDATE(_hash_table_input_counter, num_rows);
COUNTER_SET(_hash_table_memory_usage,
diff --git a/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp
b/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp
index c3f4248b513..48a43ca4c39 100644
--- a/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp
@@ -307,9 +307,9 @@ void
DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct(
auto creator_for_null_key = [&]() {
distinct_row.push_back(row); };
SCOPED_TIMER(_hash_table_emplace_timer);
- for (; row < num_rows; ++row) {
- agg_method.lazy_emplace(state, row, creator,
creator_for_null_key);
- }
+ lazy_emplace_batch_void(agg_method, state, num_rows,
creator,
+ creator_for_null_key,
+ [&](uint32_t r) { row = r;
});
COUNTER_UPDATE(_hash_table_input_counter, num_rows);
}},
diff --git a/be/src/exec/operator/streaming_aggregation_operator.cpp
b/be/src/exec/operator/streaming_aggregation_operator.cpp
index 9d3e338bf6d..4725e8ffb10 100644
--- a/be/src/exec/operator/streaming_aggregation_operator.cpp
+++ b/be/src/exec/operator/streaming_aggregation_operator.cpp
@@ -722,10 +722,10 @@ bool
StreamingAggLocalState::_emplace_into_hash_table_limit(AggregateDataPtr* pl
};
SCOPED_TIMER(_hash_table_emplace_timer);
- for (i = 0; i < num_rows; ++i) {
- places[i] = *agg_method.lazy_emplace(state,
i, creator,
-
creator_for_null_key);
- }
+ lazy_emplace_batch(
+ agg_method, state, num_rows, creator,
creator_for_null_key,
+ [&](uint32_t row) { i = row; },
+ [&](uint32_t row, auto& mapped) {
places[row] = mapped; });
COUNTER_UPDATE(_hash_table_input_counter,
num_rows);
return true;
}
@@ -800,10 +800,9 @@ void
StreamingAggLocalState::_emplace_into_hash_table(AggregateDataPtr* places,
};
SCOPED_TIMER(_hash_table_emplace_timer);
- for (size_t i = 0; i < num_rows; ++i) {
- places[i] = *agg_method.lazy_emplace(state,
i, creator,
-
creator_for_null_key);
- }
+ lazy_emplace_batch(
+ agg_method, state, num_rows, creator,
creator_for_null_key,
+ [&](uint32_t row, auto& mapped) {
places[row] = mapped; });
COUNTER_UPDATE(_hash_table_input_counter,
num_rows);
}},
diff --git a/be/src/exec/pipeline/rec_cte_shared_state.h
b/be/src/exec/pipeline/rec_cte_shared_state.h
index 466133288dd..4d9fae7608a 100644
--- a/be/src/exec/pipeline/rec_cte_shared_state.h
+++ b/be/src/exec/pipeline/rec_cte_shared_state.h
@@ -81,10 +81,9 @@ struct RecCTESharedState : public BasicSharedState {
};
SCOPED_TIMER(hash_table_emplace_timer);
- for (; row < num_rows; ++row) {
- agg_method.lazy_emplace(agg_state,
row, creator,
-
creator_for_null_key);
- }
+ lazy_emplace_batch_void(agg_method,
agg_state, num_rows,
+ creator,
creator_for_null_key,
+ [&](uint32_t r) {
row = r; });
COUNTER_UPDATE(hash_table_input_counter,
num_rows);
}},
agg_data->method_variant);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]