This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 842a5b8e24 [refactor](agg) Abstract the hash operation into a method"
(#11399)
842a5b8e24 is described below
commit 842a5b8e243f631d02d20cde68f2b6ed2d069567
Author: Jerry Hu <[email protected]>
AuthorDate: Tue Aug 2 17:27:19 2022 +0800
[refactor](agg) Abstract the hash operation into a method" (#11399)
---
be/src/vec/exec/vaggregation_node.cpp | 271 +++++++++-------------------------
be/src/vec/exec/vaggregation_node.h | 3 +
2 files changed, 76 insertions(+), 198 deletions(-)
diff --git a/be/src/vec/exec/vaggregation_node.cpp
b/be/src/vec/exec/vaggregation_node.cpp
index 6de36c6d1e..1209750203 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -706,6 +706,76 @@ bool AggregationNode::_should_expand_preagg_hash_tables() {
_agg_data._aggregated_method_variant);
}
+void AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places,
ColumnRawPtrs& key_columns,
+ const size_t num_rows) {
+ std::visit(
+ [&](auto&& agg_method) -> void {
+ using HashMethodType = std::decay_t<decltype(agg_method)>;
+ using HashTableType = std::decay_t<decltype(agg_method.data)>;
+ using AggState = typename HashMethodType::State;
+ AggState state(key_columns, _probe_key_sz, nullptr);
+
+ _pre_serialize_key_if_need(state, agg_method, key_columns,
num_rows);
+
+ std::vector<size_t> hash_values;
+
+ if constexpr (HashTableTraits<HashTableType>::is_phmap) {
+ if (hash_values.size() < num_rows)
hash_values.resize(num_rows);
+ if constexpr
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
+ AggState>::value) {
+ for (size_t i = 0; i < num_rows; ++i) {
+ hash_values[i] =
agg_method.data.hash(agg_method.keys[i]);
+ }
+ } else {
+ for (size_t i = 0; i < num_rows; ++i) {
+ hash_values[i] =
+
agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool));
+ }
+ }
+ }
+
+ /// For all rows.
+ for (size_t i = 0; i < num_rows; ++i) {
+ AggregateDataPtr aggregate_data = nullptr;
+
+ auto emplace_result = [&]() {
+ if constexpr
(HashTableTraits<HashTableType>::is_phmap) {
+ if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows))
{
+ if constexpr
(HashTableTraits<HashTableType>::is_parallel_phmap) {
+
agg_method.data.prefetch_by_key(state.get_key_holder(
+ i + HASH_MAP_PREFETCH_DIST,
_agg_arena_pool));
+ } else
+ agg_method.data.prefetch_by_hash(
+ hash_values[i +
HASH_MAP_PREFETCH_DIST]);
+ }
+
+ return state.emplace_key(agg_method.data,
hash_values[i], i,
+ _agg_arena_pool);
+ } else {
+ return state.emplace_key(agg_method.data, i,
_agg_arena_pool);
+ }
+ }();
+
+ /// If a new key is inserted, initialize the states of the
aggregate functions, and possibly something related to the key.
+ if (emplace_result.is_inserted()) {
+ /// exception-safety - if you can not allocate memory
or create states, then destructors will not be called.
+ emplace_result.set_mapped(nullptr);
+
+ aggregate_data = _agg_arena_pool.aligned_alloc(
+ _total_size_of_aggregate_states,
_align_aggregate_states);
+ _create_agg_status(aggregate_data);
+
+ emplace_result.set_mapped(aggregate_data);
+ } else
+ aggregate_data = emplace_result.get_mapped();
+
+ places[i] = aggregate_data;
+ assert(places[i] != nullptr);
+ }
+ },
+ _agg_data._aggregated_method_variant);
+}
+
Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block*
in_block,
doris::vectorized::Block*
out_block) {
SCOPED_TIMER(_build_timer);
@@ -816,73 +886,7 @@ Status
AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
_agg_data._aggregated_method_variant);
if (!ret_flag) {
- std::visit(
- [&](auto&& agg_method) -> void {
- using HashMethodType = std::decay_t<decltype(agg_method)>;
- using HashTableType =
std::decay_t<decltype(agg_method.data)>;
- using AggState = typename HashMethodType::State;
- AggState state(key_columns, _probe_key_sz, nullptr);
-
- _pre_serialize_key_if_need(state, agg_method, key_columns,
rows);
-
- std::vector<size_t> hash_values;
-
- if constexpr (HashTableTraits<HashTableType>::is_phmap) {
- if (hash_values.size() < rows)
hash_values.resize(rows);
- if constexpr
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
- AggState>::value) {
- for (size_t i = 0; i < rows; ++i) {
- hash_values[i] =
agg_method.data.hash(agg_method.keys[i]);
- }
- } else {
- for (size_t i = 0; i < rows; ++i) {
- hash_values[i] = agg_method.data.hash(
- state.get_key_holder(i,
_agg_arena_pool));
- }
- }
- }
-
- /// For all rows.
- for (size_t i = 0; i < rows; ++i) {
- AggregateDataPtr aggregate_data = nullptr;
-
- auto emplace_result = [&]() {
- if constexpr
(HashTableTraits<HashTableType>::is_phmap) {
- if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows))
{
- if constexpr (HashTableTraits<
-
HashTableType>::is_parallel_phmap) {
-
agg_method.data.prefetch_by_key(state.get_key_holder(
- i + HASH_MAP_PREFETCH_DIST,
_agg_arena_pool));
- } else
- agg_method.data.prefetch_by_hash(
- hash_values[i +
HASH_MAP_PREFETCH_DIST]);
- }
-
- return state.emplace_key(agg_method.data,
hash_values[i], i,
- _agg_arena_pool);
- } else {
- return state.emplace_key(agg_method.data, i,
_agg_arena_pool);
- }
- }();
-
- /// If a new key is inserted, initialize the states of
the aggregate functions, and possibly something related to the key.
- if (emplace_result.is_inserted()) {
- /// exception-safety - if you can not allocate
memory or create states, then destructors will not be called.
- emplace_result.set_mapped(nullptr);
-
- aggregate_data = _agg_arena_pool.aligned_alloc(
- _total_size_of_aggregate_states,
_align_aggregate_states);
- _create_agg_status(aggregate_data);
-
- emplace_result.set_mapped(aggregate_data);
- } else
- aggregate_data = emplace_result.get_mapped();
-
- places[i] = aggregate_data;
- assert(places[i] != nullptr);
- }
- },
- _agg_data._aggregated_method_variant);
+ _emplace_into_hash_table(places.data(), key_columns, rows);
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->execute_batch_add(in_block,
_offsets_of_aggregate_states[i],
@@ -914,72 +918,7 @@ Status
AggregationNode::_execute_with_serialized_key(Block* block) {
int rows = block->rows();
PODArray<AggregateDataPtr> places(rows);
- std::visit(
- [&](auto&& agg_method) -> void {
- using HashMethodType = std::decay_t<decltype(agg_method)>;
- using HashTableType = std::decay_t<decltype(agg_method.data)>;
- using AggState = typename HashMethodType::State;
- AggState state(key_columns, _probe_key_sz, nullptr);
-
- _pre_serialize_key_if_need(state, agg_method, key_columns,
rows);
-
- std::vector<size_t> hash_values;
-
- if constexpr (HashTableTraits<HashTableType>::is_phmap) {
- if (hash_values.size() < rows) hash_values.resize(rows);
- if constexpr
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
- AggState>::value) {
- for (size_t i = 0; i < rows; ++i) {
- hash_values[i] =
agg_method.data.hash(agg_method.keys[i]);
- }
- } else {
- for (size_t i = 0; i < rows; ++i) {
- hash_values[i] =
-
agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool));
- }
- }
- }
-
- /// For all rows.
- for (size_t i = 0; i < rows; ++i) {
- AggregateDataPtr aggregate_data = nullptr;
-
- auto emplace_result = [&]() {
- if constexpr
(HashTableTraits<HashTableType>::is_phmap) {
- if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) {
- if constexpr
(HashTableTraits<HashTableType>::is_parallel_phmap) {
-
agg_method.data.prefetch_by_key(state.get_key_holder(
- i + HASH_MAP_PREFETCH_DIST,
_agg_arena_pool));
- } else
- agg_method.data.prefetch_by_hash(
- hash_values[i +
HASH_MAP_PREFETCH_DIST]);
- }
-
- return state.emplace_key(agg_method.data,
hash_values[i], i,
- _agg_arena_pool);
- } else {
- return state.emplace_key(agg_method.data, i,
_agg_arena_pool);
- }
- }();
-
- /// If a new key is inserted, initialize the states of the
aggregate functions, and possibly something related to the key.
- if (emplace_result.is_inserted()) {
- /// exception-safety - if you can not allocate memory
or create states, then destructors will not be called.
- emplace_result.set_mapped(nullptr);
-
- aggregate_data = _agg_arena_pool.aligned_alloc(
- _total_size_of_aggregate_states,
_align_aggregate_states);
- _create_agg_status(aggregate_data);
-
- emplace_result.set_mapped(aggregate_data);
- } else
- aggregate_data = emplace_result.get_mapped();
-
- places[i] = aggregate_data;
- assert(places[i] != nullptr);
- }
- },
- _agg_data._aggregated_method_variant);
+ _emplace_into_hash_table(places.data(), key_columns, rows);
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->execute_batch_add(block,
_offsets_of_aggregate_states[i],
@@ -1188,71 +1127,7 @@ Status
AggregationNode::_merge_with_serialized_key(Block* block) {
int rows = block->rows();
PODArray<AggregateDataPtr> places(rows);
- std::visit(
- [&](auto&& agg_method) -> void {
- using HashMethodType = std::decay_t<decltype(agg_method)>;
- using HashTableType = std::decay_t<decltype(agg_method.data)>;
- using AggState = typename HashMethodType::State;
- AggState state(key_columns, _probe_key_sz, nullptr);
-
- _pre_serialize_key_if_need(state, agg_method, key_columns,
rows);
-
- std::vector<size_t> hash_values;
-
- if constexpr (HashTableTraits<HashTableType>::is_phmap) {
- if (hash_values.size() < rows) hash_values.resize(rows);
- if constexpr
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
- AggState>::value) {
- for (size_t i = 0; i < rows; ++i) {
- hash_values[i] =
agg_method.data.hash(agg_method.keys[i]);
- }
- } else {
- for (size_t i = 0; i < rows; ++i) {
- hash_values[i] =
-
agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool));
- }
- }
- }
-
- /// For all rows.
- for (size_t i = 0; i < rows; ++i) {
- AggregateDataPtr aggregate_data = nullptr;
-
- auto emplace_result = [&]() {
- if constexpr
(HashTableTraits<HashTableType>::is_phmap) {
- if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) {
- if constexpr
(HashTableTraits<HashTableType>::is_parallel_phmap) {
-
agg_method.data.prefetch_by_key(state.get_key_holder(
- i + HASH_MAP_PREFETCH_DIST,
_agg_arena_pool));
- } else
- agg_method.data.prefetch_by_hash(
- hash_values[i +
HASH_MAP_PREFETCH_DIST]);
- }
- return state.emplace_key(agg_method.data,
hash_values[i], i,
- _agg_arena_pool);
- } else {
- return state.emplace_key(agg_method.data, i,
_agg_arena_pool);
- }
- }();
-
- /// If a new key is inserted, initialize the states of the
aggregate functions, and possibly something related to the key.
- if (emplace_result.is_inserted()) {
- /// exception-safety - if you can not allocate memory
or create states, then destructors will not be called.
- emplace_result.set_mapped(nullptr);
-
- aggregate_data = _agg_arena_pool.aligned_alloc(
- _total_size_of_aggregate_states,
_align_aggregate_states);
- _create_agg_status(aggregate_data);
-
- emplace_result.set_mapped(aggregate_data);
- } else
- aggregate_data = emplace_result.get_mapped();
-
- places[i] = aggregate_data;
- assert(places[i] != nullptr);
- }
- },
- _agg_data._aggregated_method_variant);
+ _emplace_into_hash_table(places.data(), key_columns, rows);
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
DCHECK(_aggregate_evaluators[i]->input_exprs_ctxs().size() == 1 &&
diff --git a/be/src/vec/exec/vaggregation_node.h
b/be/src/vec/exec/vaggregation_node.h
index 478cd563ca..ba7cf24b70 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -710,6 +710,9 @@ private:
}
}
+ void _emplace_into_hash_table(AggregateDataPtr* places, ColumnRawPtrs&
key_columns,
+ const size_t num_rows);
+
void release_tracker();
using vectorized_execute = std::function<Status(Block* block)>;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]