This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 31a5e072e76 [refactor](pipelineX) Simplify set operation (#25502)
31a5e072e76 is described below
commit 31a5e072e76eb6f5956c1fedc135cc6e95d982d8
Author: Gabriel <[email protected]>
AuthorDate: Tue Oct 17 15:11:46 2023 +0800
[refactor](pipelineX) Simplify set operation (#25502)
---
be/src/pipeline/exec/set_probe_sink_operator.cpp | 6 +--
be/src/pipeline/exec/set_probe_sink_operator.h | 5 +-
be/src/pipeline/exec/set_sink_operator.cpp | 6 +--
be/src/pipeline/exec/set_sink_operator.h | 6 ++-
.../vec/common/hash_table/hash_table_set_build.h | 55 +++-------------------
.../vec/common/hash_table/hash_table_set_probe.h | 52 +++-----------------
be/src/vec/exec/vset_operation_node.cpp | 2 +-
be/src/vec/exec/vset_operation_node.h | 3 ++
8 files changed, 30 insertions(+), 105 deletions(-)
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp
b/be/src/pipeline/exec/set_probe_sink_operator.cpp
index 81c30d45d1e..aea9aff0e75 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -131,9 +131,9 @@ Status
SetProbeSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized
[&](auto&& arg) -> Status {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
- vectorized::HashTableProbeX<HashTableCtxType,
is_intersect>
- process_hashtable_ctx(local_state, probe_rows);
- return
process_hashtable_ctx.mark_data_in_hashtable(local_state, arg);
+ vectorized::HashTableProbe<HashTableCtxType,
is_intersect>
+ process_hashtable_ctx(&local_state,
probe_rows);
+ return
process_hashtable_ctx.mark_data_in_hashtable(arg);
} else {
LOG(FATAL) << "FATAL: uninited hash table";
}
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h
b/be/src/pipeline/exec/set_probe_sink_operator.h
index 2c2f1ce1c60..45176fd0099 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -31,7 +31,7 @@ class RuntimeState;
namespace vectorized {
class Block;
template <class HashTableContext, bool is_intersected>
-struct HashTableProbeX;
+struct HashTableProbe;
} // namespace vectorized
namespace pipeline {
@@ -81,11 +81,12 @@ public:
: Base(parent, state) {}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+ int64_t* valid_element_in_hash_tbl() { return
&_shared_state->valid_element_in_hash_tbl; }
private:
friend class SetProbeSinkOperatorX<is_intersect>;
template <class HashTableContext, bool is_intersected>
- friend struct vectorized::HashTableProbeX;
+ friend struct vectorized::HashTableProbe;
//record insert column id during probe
std::vector<uint16_t> _probe_column_inserted_id;
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp
b/be/src/pipeline/exec/set_sink_operator.cpp
index 604729a4700..6725deffa14 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -116,9 +116,9 @@ Status SetSinkOperatorX<is_intersect>::_process_build_block(
[&](auto&& arg) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
- vectorized::HashTableBuildX<HashTableCtxType, is_intersect>
- hash_table_build_process(rows, raw_ptrs, offset,
state);
- static_cast<void>(hash_table_build_process(local_state,
arg));
+ vectorized::HashTableBuild<HashTableCtxType, is_intersect>
+ hash_table_build_process(&local_state, rows,
raw_ptrs, offset, state);
+ static_cast<void>(hash_table_build_process(arg,
local_state._arena));
} else {
LOG(FATAL) << "FATAL: uninited hash table";
}
diff --git a/be/src/pipeline/exec/set_sink_operator.h
b/be/src/pipeline/exec/set_sink_operator.h
index 945ec06891c..5383b1b3a55 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -29,7 +29,7 @@ class ExecNode;
namespace vectorized {
template <class HashTableContext, bool is_intersected>
-struct HashTableBuildX;
+struct HashTableBuild;
}
namespace pipeline {
@@ -74,10 +74,12 @@ public:
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+ int64_t* mem_used() { return &_shared_state->mem_used; };
+
private:
friend class SetSinkOperatorX<is_intersect>;
template <class HashTableContext, bool is_intersected>
- friend struct vectorized::HashTableBuildX;
+ friend struct vectorized::HashTableBuild;
RuntimeProfile::Counter* _build_timer; // time to build hash table
vectorized::MutableBlock _mutable_block;
diff --git a/be/src/vec/common/hash_table/hash_table_set_build.h
b/be/src/vec/common/hash_table/hash_table_set_build.h
index ff1fec3ab1c..e3c1ed27b1f 100644
--- a/be/src/vec/common/hash_table/hash_table_set_build.h
+++ b/be/src/vec/common/hash_table/hash_table_set_build.h
@@ -23,13 +23,13 @@
namespace doris::vectorized {
template <class HashTableContext, bool is_intersect>
struct HashTableBuild {
- HashTableBuild(int rows, ColumnRawPtrs& build_raw_ptrs,
- VSetOperationNode<is_intersect>* operation_node, uint8_t
offset,
+ template <typename Parent>
+ HashTableBuild(Parent* parent, int rows, ColumnRawPtrs& build_raw_ptrs,
uint8_t offset,
RuntimeState* state)
- : _rows(rows),
+ : _mem_used(parent->mem_used()),
+ _rows(rows),
_offset(offset),
_build_raw_ptrs(build_raw_ptrs),
- _operation_node(operation_node),
_state(state) {}
Status operator()(HashTableContext& hash_table_ctx, Arena& arena) {
@@ -39,7 +39,7 @@ struct HashTableBuild {
Defer defer {[&]() {
int64_t bucket_bytes =
hash_table_ctx.hash_table->get_buffer_size_in_bytes();
- _operation_node->_mem_used += bucket_bytes - old_bucket_bytes;
+ *_mem_used += bucket_bytes - old_bucket_bytes;
}};
KeyGetter key_getter(_build_raw_ptrs);
@@ -62,54 +62,11 @@ struct HashTableBuild {
}
private:
+ int64_t* _mem_used;
const int _rows;
const uint8_t _offset;
ColumnRawPtrs& _build_raw_ptrs;
- VSetOperationNode<is_intersect>* _operation_node;
RuntimeState* _state;
};
-template <class HashTableContext, bool is_intersect>
-struct HashTableBuildX {
- HashTableBuildX(int rows, ColumnRawPtrs& build_raw_ptrs, uint8_t offset,
RuntimeState* state)
- : _rows(rows), _offset(offset), _build_raw_ptrs(build_raw_ptrs),
_state(state) {}
-
- Status operator()(pipeline::SetSinkLocalState<is_intersect>& local_state,
- HashTableContext& hash_table_ctx) {
- using KeyGetter = typename HashTableContext::State;
- using Mapped = typename HashTableContext::Mapped;
- int64_t old_bucket_bytes =
hash_table_ctx.hash_table->get_buffer_size_in_bytes();
-
- Defer defer {[&]() {
- int64_t bucket_bytes =
hash_table_ctx.hash_table->get_buffer_size_in_bytes();
- local_state._shared_state->mem_used += bucket_bytes -
old_bucket_bytes;
- }};
-
- KeyGetter key_getter(_build_raw_ptrs);
- hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows);
-
- size_t k = 0;
- auto creator = [&](const auto& ctor, auto& key, auto& origin) {
- HashTableContext::try_presis_key(key, origin, local_state._arena);
- ctor(key, Mapped {k, _offset});
- };
- auto creator_for_null_key = [&](auto& mapped) { mapped = {k, _offset};
};
-
- for (; k < _rows; ++k) {
- if (k % CHECK_FRECUENCY == 0) {
- RETURN_IF_CANCELLED(_state);
- }
- hash_table_ctx.lazy_emplace(key_getter, k, creator,
creator_for_null_key);
- }
- return Status::OK();
- }
-
-private:
- const int _rows;
- const uint8_t _offset;
- ColumnRawPtrs& _build_raw_ptrs;
- RuntimeState* _state;
- std::vector<size_t> _build_side_hash_values;
-};
-
} // namespace doris::vectorized
diff --git a/be/src/vec/common/hash_table/hash_table_set_probe.h
b/be/src/vec/common/hash_table/hash_table_set_probe.h
index 4a79b86a146..eb00cca8561 100644
--- a/be/src/vec/common/hash_table/hash_table_set_probe.h
+++ b/be/src/vec/common/hash_table/hash_table_set_probe.h
@@ -24,10 +24,11 @@ namespace doris::vectorized {
template <class HashTableContext, bool is_intersected>
struct HashTableProbe {
- HashTableProbe(VSetOperationNode<is_intersected>* operation_node, int
probe_rows)
- : _operation_node(operation_node),
+ template <typename Parent>
+ HashTableProbe(Parent* parent, int probe_rows)
+ : _valid_element_in_hash_tbl(parent->valid_element_in_hash_tbl()),
_probe_rows(probe_rows),
- _probe_raw_ptrs(operation_node->_probe_columns) {}
+ _probe_raw_ptrs(parent->_probe_columns) {}
Status mark_data_in_hashtable(HashTableContext& hash_table_ctx) {
using KeyGetter = typename HashTableContext::State;
@@ -43,49 +44,9 @@ struct HashTableProbe {
if (!(it->visited)) {
it->visited = true;
if constexpr (is_intersected) { //intersected
- _operation_node->_valid_element_in_hash_tbl++;
+ (*_valid_element_in_hash_tbl)++;
} else {
- _operation_node->_valid_element_in_hash_tbl--;
//except
- }
- }
- }
- }
- } else {
- LOG(FATAL) << "Invalid RowRefListType!";
- }
- return Status::OK();
- }
-
-private:
- VSetOperationNode<is_intersected>* _operation_node;
- const size_t _probe_rows;
- ColumnRawPtrs& _probe_raw_ptrs;
- std::vector<StringRef> _probe_keys;
-};
-
-template <class HashTableContext, bool is_intersected>
-struct HashTableProbeX {
- HashTableProbeX(pipeline::SetProbeSinkLocalState<is_intersected>&
local_state, int probe_rows)
- : _probe_rows(probe_rows),
_probe_raw_ptrs(local_state._probe_columns) {}
-
- Status
mark_data_in_hashtable(pipeline::SetProbeSinkLocalState<is_intersected>&
local_state,
- HashTableContext& hash_table_ctx) {
- using KeyGetter = typename HashTableContext::State;
-
- KeyGetter key_getter(_probe_raw_ptrs);
- hash_table_ctx.init_serialized_keys(_probe_raw_ptrs, _probe_rows);
-
- if constexpr (std::is_same_v<typename HashTableContext::Mapped,
RowRefListWithFlags>) {
- for (int probe_index = 0; probe_index < _probe_rows;
probe_index++) {
- auto find_result = hash_table_ctx.find(key_getter,
probe_index);
- if (find_result.is_found()) { //if found, marked visited
- auto it = find_result.get_mapped().begin();
- if (!(it->visited)) {
- it->visited = true;
- if constexpr (is_intersected) { //intersected
-
local_state._shared_state->valid_element_in_hash_tbl++;
- } else {
-
local_state._shared_state->valid_element_in_hash_tbl--; //except
+ (*_valid_element_in_hash_tbl)--; //except
}
}
}
@@ -97,6 +58,7 @@ struct HashTableProbeX {
}
private:
+ int64_t* _valid_element_in_hash_tbl;
const size_t _probe_rows;
ColumnRawPtrs& _probe_raw_ptrs;
std::vector<StringRef> _probe_keys;
diff --git a/be/src/vec/exec/vset_operation_node.cpp
b/be/src/vec/exec/vset_operation_node.cpp
index 9b15db67b3c..d284385b8ed 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -320,7 +320,7 @@ Status
VSetOperationNode<is_intersect>::process_build_block(Block& block, uint8_
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
HashTableBuild<HashTableCtxType, is_intersect>
hash_table_build_process(
- rows, raw_ptrs, this, offset, state);
+ this, rows, raw_ptrs, offset, state);
st = hash_table_build_process(arg, _arena);
} else {
LOG(FATAL) << "FATAL: uninited hash table";
diff --git a/be/src/vec/exec/vset_operation_node.h
b/be/src/vec/exec/vset_operation_node.h
index dfd96430115..ff016469f49 100644
--- a/be/src/vec/exec/vset_operation_node.h
+++ b/be/src/vec/exec/vset_operation_node.h
@@ -73,6 +73,9 @@ public:
bool is_child_finished(int child_id) const;
+ int64_t* valid_element_in_hash_tbl() { return &_valid_element_in_hash_tbl;
}
+ int64_t* mem_used() { return &_mem_used; };
+
private:
void _finalize_probe(int child_id);
//Todo: In build process of hashtable, It's same as join node.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]