This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch ckb_preview in repository https://gitbox.apache.org/repos/asf/doris.git
commit 8069df515854ddbcd82aa5e71d610830376b4aaa Author: BiteTheDDDDt <[email protected]> AuthorDate: Mon Mar 2 10:56:48 2026 +0800 opt for StringHashTable's for_each update update --- be/src/pipeline/exec/aggregation_sink_operator.cpp | 56 ++-- .../pipeline/exec/aggregation_source_operator.cpp | 7 +- .../distinct_streaming_aggregation_operator.cpp | 6 +- .../exec/streaming_aggregation_operator.cpp | 15 +- be/src/pipeline/rec_cte_shared_state.h | 62 ++-- be/src/vec/common/hash_table/hash_map_context.h | 369 ++++++++++++++++++++- be/src/vec/common/hash_table/string_hash_table.h | 22 ++ 7 files changed, 451 insertions(+), 86 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 3af2bfc94f2..bf254be9111 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -565,10 +565,9 @@ void AggSinkLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* p }; SCOPED_TIMER(_hash_table_emplace_timer); - for (size_t i = 0; i < num_rows; ++i) { - places[i] = *agg_method.lazy_emplace(state, i, creator, - creator_for_null_key); - } + vectorized::lazy_emplace_batch( + agg_method, state, num_rows, creator, creator_for_null_key, + [&](uint32_t row, auto& mapped) { places[row] = mapped; }); COUNTER_UPDATE(_hash_table_input_counter, num_rows); }}, @@ -650,10 +649,10 @@ bool AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData }; SCOPED_TIMER(_hash_table_emplace_timer); - for (i = 0; i < num_rows; ++i) { - places[i] = *agg_method.lazy_emplace(state, i, creator, - creator_for_null_key); - } + vectorized::lazy_emplace_batch( + agg_method, state, num_rows, creator, creator_for_null_key, + [&](uint32_t row) { i = row; }, + [&](uint32_t row, auto& mapped) { places[row] = mapped; }); COUNTER_UPDATE(_hash_table_input_counter, num_rows); return true; } @@ -665,27 +664,26 @@ bool AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData void AggSinkLocalState::_find_in_hash_table(vectorized::AggregateDataPtr* places, vectorized::ColumnRawPtrs& key_columns, uint32_t num_rows) { - std::visit(vectorized::Overload {[&](std::monostate& arg) -> void { - throw doris::Exception(ErrorCode::INTERNAL_ERROR, - "uninited hash table"); - }, - [&](auto& agg_method) -> void { - using HashMethodType = std::decay_t<decltype(agg_method)>; - using AggState = typename HashMethodType::State; - AggState state(key_columns); - agg_method.init_serialized_keys(key_columns, num_rows); - - /// For all rows. - for (size_t i = 0; i < num_rows; ++i) { - auto find_result = agg_method.find(state, i); - - if (find_result.is_found()) { - places[i] = find_result.get_mapped(); - } else { - places[i] = nullptr; - } - } - }}, + std::visit(vectorized::Overload { + [&](std::monostate& arg) -> void { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table"); + }, + [&](auto& agg_method) -> void { + using HashMethodType = std::decay_t<decltype(agg_method)>; + using AggState = typename HashMethodType::State; + AggState state(key_columns); + agg_method.init_serialized_keys(key_columns, num_rows); + + /// For all rows. + vectorized::find_batch(agg_method, state, num_rows, + [&](uint32_t row, auto& find_result) { + if (find_result.is_found()) { + places[row] = find_result.get_mapped(); + } else { + places[row] = nullptr; + } + }); + }}, _agg_data->method_variant); } diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index 846bfdf1c12..4dfc9be8b62 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -587,10 +587,9 @@ void AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place }; SCOPED_TIMER(_hash_table_emplace_timer); - for (size_t i = 0; i < num_rows; ++i) { - places[i] = *agg_method.lazy_emplace(state, i, creator, - creator_for_null_key); - } + vectorized::lazy_emplace_batch( + agg_method, state, num_rows, creator, creator_for_null_key, + [&](uint32_t row, auto& mapped) { places[row] = mapped; }); COUNTER_UPDATE(_hash_table_input_counter, num_rows); COUNTER_SET(_hash_table_memory_usage, diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index 972fc9ba923..1d39488b6c9 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -310,9 +310,9 @@ void DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct( auto creator_for_null_key = [&]() { distinct_row.push_back(row); }; SCOPED_TIMER(_hash_table_emplace_timer); - for (; row < num_rows; ++row) { - agg_method.lazy_emplace(state, row, creator, creator_for_null_key); - } + vectorized::lazy_emplace_batch_void(agg_method, state, num_rows, creator, + creator_for_null_key, + [&](uint32_t r) { row = r; }); COUNTER_UPDATE(_hash_table_input_counter, num_rows); }}, diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index fe9aef1662a..86461a8460c 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -728,10 +728,10 @@ bool StreamingAggLocalState::_emplace_into_hash_table_limit(vectorized::Aggregat }; SCOPED_TIMER(_hash_table_emplace_timer); - for (i = 0; i < num_rows; ++i) { - places[i] = *agg_method.lazy_emplace(state, i, creator, - creator_for_null_key); - } + vectorized::lazy_emplace_batch( + agg_method, state, num_rows, creator, creator_for_null_key, + [&](uint32_t row) { i = row; }, + [&](uint32_t row, auto& mapped) { places[row] = mapped; }); COUNTER_UPDATE(_hash_table_input_counter, num_rows); return true; } @@ -807,10 +807,9 @@ void StreamingAggLocalState::_emplace_into_hash_table(vectorized::AggregateDataP }; SCOPED_TIMER(_hash_table_emplace_timer); - for (size_t i = 0; i < num_rows; ++i) { - places[i] = *agg_method.lazy_emplace(state, i, creator, - creator_for_null_key); - } + vectorized::lazy_emplace_batch( + agg_method, state, num_rows, creator, creator_for_null_key, + [&](uint32_t row, auto& mapped) { places[row] = mapped; }); COUNTER_UPDATE(_hash_table_input_counter, num_rows); }}, diff --git a/be/src/pipeline/rec_cte_shared_state.h b/be/src/pipeline/rec_cte_shared_state.h index 8a57143b110..11f32a5993c 100644 --- a/be/src/pipeline/rec_cte_shared_state.h +++ b/be/src/pipeline/rec_cte_shared_state.h @@ -57,38 +57,36 @@ struct RecCTESharedState : public BasicSharedState { raw_columns.push_back(col.get()); } - std::visit(vectorized::Overload { - [&](std::monostate& arg) -> void { - throw doris::Exception(ErrorCode::INTERNAL_ERROR, - "uninited hash table"); - }, - [&](auto& agg_method) -> void { - SCOPED_TIMER(hash_table_compute_timer); - using HashMethodType = std::decay_t<decltype(agg_method)>; - using AggState = typename HashMethodType::State; - - AggState agg_state(raw_columns); - agg_method.init_serialized_keys(raw_columns, num_rows); - distinct_row.clear(); - - size_t row = 0; - auto creator = [&](const auto& ctor, auto& key, auto& origin) { - HashMethodType::try_presis_key(key, origin, arena); - ctor(key); - distinct_row.push_back(row); - }; - auto creator_for_null_key = [&]() { - distinct_row.push_back(row); - }; - - SCOPED_TIMER(hash_table_emplace_timer); - for (; row < num_rows; ++row) { - agg_method.lazy_emplace(agg_state, row, creator, - creator_for_null_key); - } - COUNTER_UPDATE(hash_table_input_counter, num_rows); - }}, - agg_data->method_variant); + std::visit( + vectorized::Overload { + [&](std::monostate& arg) -> void { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, + "uninited hash table"); + }, + [&](auto& agg_method) -> void { + SCOPED_TIMER(hash_table_compute_timer); + using HashMethodType = std::decay_t<decltype(agg_method)>; + using AggState = typename HashMethodType::State; + + AggState agg_state(raw_columns); + agg_method.init_serialized_keys(raw_columns, num_rows); + distinct_row.clear(); + + size_t row = 0; + auto creator = [&](const auto& ctor, auto& key, auto& origin) { + HashMethodType::try_presis_key(key, origin, arena); + ctor(key); + distinct_row.push_back(row); + }; + auto creator_for_null_key = [&]() { distinct_row.push_back(row); }; + + SCOPED_TIMER(hash_table_emplace_timer); + vectorized::lazy_emplace_batch_void(agg_method, agg_state, num_rows, + creator, creator_for_null_key, + [&](uint32_t r) { row = r; }); + COUNTER_UPDATE(hash_table_input_counter, num_rows); + }}, + agg_data->method_variant); if (distinct_row.size() == block.rows()) { blocks.emplace_back(std::move(block)); diff --git a/be/src/vec/common/hash_table/hash_map_context.h b/be/src/vec/common/hash_table/hash_map_context.h index 95bc05a5f09..f4cfaa229ca 100644 --- a/be/src/vec/common/hash_table/hash_map_context.h +++ b/be/src/vec/common/hash_table/hash_map_context.h @@ -112,7 +112,7 @@ struct MethodBaseInner { } template <bool read> - ALWAYS_INLINE void prefetch(size_t i) { + void prefetch(size_t i) { if (LIKELY(i + HASH_MAP_PREFETCH_DIST < hash_values.size())) { hash_table->template prefetch<read>(keys[i + HASH_MAP_PREFETCH_DIST], hash_values[i + HASH_MAP_PREFETCH_DIST]); @@ -120,19 +120,14 @@ struct MethodBaseInner { } template <typename State> - ALWAYS_INLINE auto find(State& state, size_t i) { - if constexpr (!is_string_hash_map()) { - prefetch<true>(i); - } + auto find(State& state, size_t i) { + prefetch<true>(i); return state.find_key_with_hash(*hash_table, i, keys[i], hash_values[i]); } template <typename State, typename F, typename FF> - ALWAYS_INLINE auto lazy_emplace(State& state, size_t i, F&& creator, - FF&& creator_for_null_key) { - if constexpr (!is_string_hash_map()) { - prefetch<false>(i); - } + auto lazy_emplace(State& state, size_t i, F&& creator, FF&& creator_for_null_key) { + prefetch<false>(i); return state.lazy_emplace_key(*hash_table, i, keys[i], hash_values[i], creator, creator_for_null_key); } @@ -292,6 +287,56 @@ struct MethodSerialized : public MethodBase<TData> { } }; +/// Sub-table group indices for StringHashTable batch operations. +/// StringHashTable dispatches keys to 6 sub-tables by string length: +/// group 0: empty strings (size == 0) → m0 +/// group 1: size <= 2 → m1 +/// group 2: size <= 4 → m2 +/// group 3: size <= 8 → m3 +/// group 4: size <= 16 → m4 +/// group 5: size > 16 or trailing zero → ms +/// By pre-grouping row indices, we can process each sub-table in a batch, +/// achieving better cache locality and enabling prefetch within each group. +struct StringKeySubTableGroups { + static constexpr int NUM_GROUPS = 6; + // Row indices for each sub-table group + DorisVector<uint32_t> group_row_indices[NUM_GROUPS]; + + void build(const StringRef* keys, uint32_t num_rows) { + for (int g = 0; g < NUM_GROUPS; g++) { + group_row_indices[g].clear(); + } + // First pass: count sizes for each group to reserve memory + uint32_t counts[NUM_GROUPS] = {}; + for (uint32_t i = 0; i < num_rows; i++) { + counts[get_group(keys[i])]++; + } + for (int g = 0; g < NUM_GROUPS; g++) { + group_row_indices[g].reserve(counts[g]); + } + // Second pass: fill group indices + for (uint32_t i = 0; i < num_rows; i++) { + group_row_indices[get_group(keys[i])].push_back(i); + } + } + + static ALWAYS_INLINE int get_group(const StringRef& key) { + const size_t sz = key.size; + if (sz == 0) { + return 0; + } + if (key.data[sz - 1] == 0) { + // Trailing zero: goes to the generic long-string table (ms) + return 5; + } + if (sz <= 2) return 1; + if (sz <= 4) return 2; + if (sz <= 8) return 3; + if (sz <= 16) return 4; + return 5; + } +}; + template <typename TData> struct MethodStringNoCache : public MethodBase<TData> { using Base = MethodBase<TData>; @@ -305,6 +350,9 @@ struct MethodStringNoCache : public MethodBase<TData> { // refresh each time probe DorisVector<StringRef> _stored_keys; + // Sub-table groups for batch operations (only used for non-join aggregation path) + StringKeySubTableGroups _sub_table_groups; + size_t serialized_keys_size(bool is_build) const override { return is_build ? (_build_stored_keys.size() * sizeof(StringRef)) : (_stored_keys.size() * sizeof(StringRef)); @@ -357,6 +405,10 @@ struct MethodStringNoCache : public MethodBase<TData> { Base::init_join_bucket_num(num_rows, bucket_size, null_map); } else { Base::init_hash_values(num_rows, null_map); + // Build sub-table groups for batch emplace/find (only for aggregation, not join) + if constexpr (Base::is_string_hash_map()) { + _sub_table_groups.build(Base::keys, num_rows); + } } } @@ -365,8 +417,305 @@ struct MethodStringNoCache : public MethodBase<TData> { key_columns[0]->reserve(num_rows); key_columns[0]->insert_many_strings(input_keys.data(), num_rows); } + + const StringKeySubTableGroups& get_sub_table_groups() const { return _sub_table_groups; } +}; + +/// Helper: detect whether HashMap is a nullable-wrapped StringHashMap. +template <typename HashMap> +struct IsNullableStringHashMap : std::false_type {}; + +template <typename Mapped, typename Allocator> +struct IsNullableStringHashMap<DataWithNullKey<StringHashMap<Mapped, Allocator>>> : std::true_type { }; +/// Helper: get the underlying StringHashTable from a hash table (handles DataWithNullKey wrapper). +template <typename HashMap> +auto& get_string_hash_table(HashMap& data) { + return data; +} + +/// Compile-time key conversion for each sub-table group. +/// Groups 1-4 use to_string_key<T>(); groups 0 and 5 use StringRef directly. +/// Returns the converted key for the given group. +/// For groups 0 and 5, the key is returned as a non-const copy (lazy_emplace_if_zero takes Key&). +template <int GroupIdx> +auto convert_key_for_submap(const StringRef& origin) { + if constexpr (GroupIdx == 0) { + return StringRef(origin); // copy — m0 needs non-const Key& + } else if constexpr (GroupIdx == 1) { + return to_string_key<StringKey2>(origin); + } else if constexpr (GroupIdx == 2) { + return to_string_key<StringKey4>(origin); + } else if constexpr (GroupIdx == 3) { + return to_string_key<StringKey8>(origin); + } else if constexpr (GroupIdx == 4) { + return to_string_key<StringKey16>(origin); + } else { + return StringRef(origin); // copy — ms uses StringRef as Key + } +} + +/// Hash value to use for a given group. Group 0 (empty string) always uses hash=0. +template <int GroupIdx, typename HashValues> +size_t hash_for_group(const HashValues& hash_values, uint32_t row) { + if constexpr (GroupIdx == 0) { + return 0; + } else { + return hash_values[row]; + } +} + +/// Whether prefetch is useful for a group. Group 0 (StringHashTableEmpty, at most 1 element) +/// does not benefit from prefetch. +template <int GroupIdx> +static constexpr bool group_needs_prefetch = (GroupIdx != 0); + +/// Process one sub-table group for emplace with result_handler. +/// Handles nullable null-key check, prefetch, key conversion, and emplace. +/// pre_handler(row) is called before each emplace, allowing callers to set per-row state +/// (e.g., current row index used inside creator lambdas). +template <int GroupIdx, bool is_nullable, typename Submap, typename HashMethodType, typename State, + typename HashMap, typename F, typename FF, typename PreHandler, typename ResultHandler> +void process_submap_emplace(Submap& submap, const uint32_t* indices, size_t count, + HashMethodType& agg_method, State& state, HashMap& hash_table, + F&& creator, FF&& creator_for_null_key, PreHandler&& pre_handler, + ResultHandler&& result_handler) { + using Mapped = typename HashMethodType::Mapped; + for (size_t j = 0; j < count; j++) { + if constexpr (group_needs_prefetch<GroupIdx>) { + if (j + HASH_MAP_PREFETCH_DIST < count) { + submap.template prefetch<false>( + agg_method.hash_values[indices[j + HASH_MAP_PREFETCH_DIST]]); + } + } + uint32_t row = indices[j]; + pre_handler(row); + if constexpr (is_nullable) { + if (state.key_column->is_null_at(row)) { + bool has_null_key = hash_table.has_null_key_data(); + hash_table.has_null_key_data() = true; + if (!has_null_key) { + std::forward<FF>(creator_for_null_key)( + hash_table.template get_null_key_data<Mapped>()); + } + result_handler(row, hash_table.template get_null_key_data<Mapped>()); + continue; + } + } + const auto& origin = agg_method.keys[row]; + auto converted_key = convert_key_for_submap<GroupIdx>(origin); + typename Submap::LookupResult result; + if constexpr (GroupIdx == 0 || GroupIdx == 5) { + // Groups 0,5: key and origin are the same StringRef + submap.lazy_emplace_with_origin(converted_key, converted_key, result, + hash_for_group<GroupIdx>(agg_method.hash_values, row), + std::forward<F>(creator)); + } else { + // Groups 1-4: converted_key differs from origin + submap.lazy_emplace_with_origin(converted_key, origin, result, + hash_for_group<GroupIdx>(agg_method.hash_values, row), + std::forward<F>(creator)); + } + result_handler(row, result->get_second()); + } +} + +/// Process one sub-table group for emplace without result_handler (void version). +/// pre_handler(row) is called before each emplace. +template <int GroupIdx, bool is_nullable, typename Submap, typename HashMethodType, typename State, + typename HashMap, typename F, typename FF, typename PreHandler> +void process_submap_emplace_void(Submap& submap, const uint32_t* indices, size_t count, + HashMethodType& agg_method, State& state, HashMap& hash_table, + F&& creator, FF&& creator_for_null_key, PreHandler&& pre_handler) { + for (size_t j = 0; j < count; j++) { + if constexpr (group_needs_prefetch<GroupIdx>) { + if (j + HASH_MAP_PREFETCH_DIST < count) { + submap.template prefetch<false>( + agg_method.hash_values[indices[j + HASH_MAP_PREFETCH_DIST]]); + } + } + uint32_t row = indices[j]; + pre_handler(row); + if constexpr (is_nullable) { + if (state.key_column->is_null_at(row)) { + bool has_null_key = hash_table.has_null_key_data(); + hash_table.has_null_key_data() = true; + if (!has_null_key) { + std::forward<FF>(creator_for_null_key)(); + } + continue; + } + } + const auto& origin = agg_method.keys[row]; + auto converted_key = convert_key_for_submap<GroupIdx>(origin); + typename Submap::LookupResult result; + if constexpr (GroupIdx == 0 || GroupIdx == 5) { + submap.lazy_emplace_with_origin(converted_key, converted_key, result, + hash_for_group<GroupIdx>(agg_method.hash_values, row), + std::forward<F>(creator)); + } else { + submap.lazy_emplace_with_origin(converted_key, origin, result, + hash_for_group<GroupIdx>(agg_method.hash_values, row), + std::forward<F>(creator)); + } + } +} + +/// Process one sub-table group for find with result_handler. +template <int GroupIdx, bool is_nullable, typename Submap, typename HashMethodType, typename State, + typename HashMap, typename ResultHandler> +void process_submap_find(Submap& submap, const uint32_t* indices, size_t count, + HashMethodType& agg_method, State& state, HashMap& hash_table, + ResultHandler&& result_handler) { + using Mapped = typename HashMethodType::Mapped; + using FindResult = typename ColumnsHashing::columns_hashing_impl::FindResultImpl<Mapped>; + for (size_t j = 0; j < count; j++) { + if constexpr (group_needs_prefetch<GroupIdx>) { + if (j + HASH_MAP_PREFETCH_DIST < count) { + submap.template prefetch<true>( + agg_method.hash_values[indices[j + HASH_MAP_PREFETCH_DIST]]); + } + } + uint32_t row = indices[j]; + if constexpr (is_nullable) { + if (state.key_column->is_null_at(row)) { + if (hash_table.has_null_key_data()) { + FindResult res(&hash_table.template get_null_key_data<Mapped>(), true); + result_handler(row, res); + } else { + FindResult res(nullptr, false); + result_handler(row, res); + } + continue; + } + } + auto converted_key = convert_key_for_submap<GroupIdx>(agg_method.keys[row]); + auto hash = hash_for_group<GroupIdx>(agg_method.hash_values, row); + auto it = submap.find(converted_key, hash); + if (it) { + FindResult res(&it->get_second(), true); + result_handler(row, res); + } else { + FindResult res(nullptr, false); + result_handler(row, res); + } + } +} + +/// Batch emplace helper: for StringHashMap, directly accesses sub-tables bypassing dispatch(); +/// for other hash maps, does per-row loop with standard prefetch. +/// pre_handler(row) is called before each emplace, allowing callers to set per-row state +/// (e.g., current row index used inside creator lambdas). +/// result_handler(row_index, mapped) is called after each emplace. +template <typename HashMethodType, typename State, typename F, typename FF, typename PreHandler, + typename ResultHandler> +void lazy_emplace_batch(HashMethodType& agg_method, State& state, uint32_t num_rows, F&& creator, + FF&& creator_for_null_key, PreHandler&& pre_handler, + ResultHandler&& result_handler) { + if constexpr (HashMethodType::is_string_hash_map()) { + using HashMap = typename HashMethodType::HashMapType; + constexpr bool is_nullable = IsNullableStringHashMap<HashMap>::value; + + auto& hash_table = *agg_method.hash_table; + auto& sht = get_string_hash_table(hash_table); + const auto& groups = agg_method.get_sub_table_groups(); + + sht.visit_submaps([&](auto group_idx, auto& submap) { + constexpr int G = decltype(group_idx)::value; + const auto& indices = groups.group_row_indices[G]; + if (!indices.empty()) { + process_submap_emplace<G, is_nullable>( + submap, indices.data(), indices.size(), agg_method, state, hash_table, + creator, creator_for_null_key, pre_handler, result_handler); + } + }); + } else { + // Standard per-row loop with ahead prefetch + for (uint32_t i = 0; i < num_rows; ++i) { + agg_method.template prefetch<false>(i); + pre_handler(i); + result_handler(i, *state.lazy_emplace_key(*agg_method.hash_table, i, agg_method.keys[i], + agg_method.hash_values[i], creator, + creator_for_null_key)); + } + } +} + +/// Convenience overload without pre_handler (uses no-op). +template <typename HashMethodType, typename State, typename F, typename FF, typename ResultHandler> +void lazy_emplace_batch(HashMethodType& agg_method, State& state, uint32_t num_rows, F&& creator, + FF&& creator_for_null_key, ResultHandler&& result_handler) { + lazy_emplace_batch( + agg_method, state, num_rows, std::forward<F>(creator), + std::forward<FF>(creator_for_null_key), [](uint32_t) {}, + std::forward<ResultHandler>(result_handler)); +} + +/// Batch emplace helper (void version): like lazy_emplace_batch but ignores the return value. +/// pre_handler(row) is called before each emplace, allowing callers to update captured state +/// (e.g., the current row index used inside creator lambdas). +template <typename HashMethodType, typename State, typename F, typename FF, typename PreHandler> +void lazy_emplace_batch_void(HashMethodType& agg_method, State& state, uint32_t num_rows, + F&& creator, FF&& creator_for_null_key, PreHandler&& pre_handler) { + if constexpr (HashMethodType::is_string_hash_map()) { + using HashMap = typename HashMethodType::HashMapType; + constexpr bool is_nullable = IsNullableStringHashMap<HashMap>::value; + + auto& hash_table = *agg_method.hash_table; + auto& sht = get_string_hash_table(hash_table); + const auto& groups = agg_method.get_sub_table_groups(); + + sht.visit_submaps([&](auto group_idx, auto& submap) { + constexpr int G = decltype(group_idx)::value; + const auto& indices = groups.group_row_indices[G]; + if (!indices.empty()) { + process_submap_emplace_void<G, is_nullable>(submap, indices.data(), indices.size(), + agg_method, state, hash_table, creator, + creator_for_null_key, pre_handler); + } + }); + } else { + for (uint32_t i = 0; i < num_rows; ++i) { + agg_method.template prefetch<false>(i); + pre_handler(i); + state.lazy_emplace_key(*agg_method.hash_table, i, agg_method.keys[i], + agg_method.hash_values[i], creator, creator_for_null_key); + } + } +} + +/// Batch find helper: for StringHashMap, directly accesses sub-tables bypassing dispatch(); +/// for other hash maps, does per-row loop with standard prefetch. +template <typename HashMethodType, typename State, typename ResultHandler> +void find_batch(HashMethodType& agg_method, State& state, uint32_t num_rows, + ResultHandler&& result_handler) { + if constexpr (HashMethodType::is_string_hash_map()) { + using HashMap = typename HashMethodType::HashMapType; + constexpr bool is_nullable = IsNullableStringHashMap<HashMap>::value; + + auto& hash_table = *agg_method.hash_table; + auto& sht = get_string_hash_table(hash_table); + const auto& groups = agg_method.get_sub_table_groups(); + + sht.visit_submaps([&](auto group_idx, auto& submap) { + constexpr int G = decltype(group_idx)::value; + const auto& indices = groups.group_row_indices[G]; + if (!indices.empty()) { + process_submap_find<G, is_nullable>(submap, indices.data(), indices.size(), + agg_method, state, hash_table, result_handler); + } + }); + } else { + for (uint32_t i = 0; i < num_rows; ++i) { + agg_method.template prefetch<true>(i); + auto find_result = state.find_key_with_hash( + *agg_method.hash_table, i, agg_method.keys[i], agg_method.hash_values[i]); + result_handler(i, find_result); + } + } +} + /// For the case where there is one numeric key. /// FieldType is UInt8/16/32/64 for any type with corresponding bit width. template <typename FieldType, typename TData> diff --git a/be/src/vec/common/hash_table/string_hash_table.h b/be/src/vec/common/hash_table/string_hash_table.h index 82e0542cb75..cdd0b0b8d79 100644 --- a/be/src/vec/common/hash_table/string_hash_table.h +++ b/be/src/vec/common/hash_table/string_hash_table.h @@ -673,6 +673,28 @@ public: const_iterator cend() const { return end(); } iterator end() { return iterator(this, true); } + /// Public accessors for sub-tables, enabling direct batch operations + /// that bypass dispatch() for better performance (no per-row branching). + T0& get_submap_m0() { return m0; } + T1& get_submap_m1() { return m1; } + T2& get_submap_m2() { return m2; } + T3& get_submap_m3() { return m3; } + T4& get_submap_m4() { return m4; } + Ts& get_submap_ms() { return ms; } + + /// Visit each (group_index, submap) pair with a generic callable. + /// Func signature: func(std::integral_constant<int, GroupIdx>, Submap&) + /// The integral_constant enables compile-time group dispatch in the lambda. + template <typename Func> + ALWAYS_INLINE void visit_submaps(Func&& func) { + func(std::integral_constant<int, 0> {}, m0); + func(std::integral_constant<int, 1> {}, m1); + func(std::integral_constant<int, 2> {}, m2); + func(std::integral_constant<int, 3> {}, m3); + func(std::integral_constant<int, 4> {}, m4); + func(std::integral_constant<int, 5> {}, ms); + } + bool add_elem_size_overflow(size_t add_size) const { return m1.add_elem_size_overflow(add_size) || m2.add_elem_size_overflow(add_size) || m3.add_elem_size_overflow(add_size) || m4.add_elem_size_overflow(add_size) || --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
