This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 32381138a67 [Improvement](runtime-filter) build runtime filter before
build hash table on join build probe (#29727)
32381138a67 is described below
commit 32381138a67aa72eed5f5bfafec66cfeecd6d078
Author: Pxl <[email protected]>
AuthorDate: Thu Jan 11 10:45:56 2024 +0800
[Improvement](runtime-filter) build runtime filter before build hash table
on join build probe (#29727)
build runtime filter before build hash table on join build probe
---
be/src/exprs/runtime_filter_slots.h | 16 ++++-----
be/src/pipeline/exec/hashjoin_build_sink.cpp | 24 +++----------
be/src/pipeline/exec/hashjoin_build_sink.h | 6 ++--
be/src/vec/exec/join/vhash_join_node.cpp | 22 ++----------
be/src/vec/exec/join/vhash_join_node.h | 52 ++++++++++++----------------
5 files changed, 39 insertions(+), 81 deletions(-)
diff --git a/be/src/exprs/runtime_filter_slots.h
b/be/src/exprs/runtime_filter_slots.h
index 495ac28e762..4859734a6a4 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -57,7 +57,7 @@ public:
throw Exception(ErrorCode::INTERNAL_ERROR, "filters empty,
filter_id={}",
filter_id);
}
- for (auto filter : filters) {
+ for (auto* filter : filters) {
filter->set_ignored("");
filter->signal();
}
@@ -166,7 +166,7 @@ public:
return Status::OK();
}
- void insert(const std::unordered_set<const vectorized::Block*>& datas) {
+ void insert(const vectorized::Block* block) {
for (int i = 0; i < _build_expr_context.size(); ++i) {
auto iter = _runtime_filters.find(i);
if (iter == _runtime_filters.end()) {
@@ -174,18 +174,16 @@ public:
}
int result_column_id =
_build_expr_context[i]->get_last_result_column_id();
- for (const auto* it : datas) {
- auto column = it->get_by_position(result_column_id).column;
- for (auto* filter : iter->second) {
- filter->insert_batch(column, 1);
- }
+ const auto& column =
block->get_by_position(result_column_id).column;
+ for (auto* filter : iter->second) {
+ filter->insert_batch(column, 1);
}
}
}
bool ready_finish_publish() {
for (auto& pair : _runtime_filters) {
- for (auto filter : pair.second) {
+ for (auto* filter : pair.second) {
if (!filter->is_finish_rpc()) {
return false;
}
@@ -196,7 +194,7 @@ public:
void finish_publish() {
for (auto& pair : _runtime_filters) {
- for (auto filter : pair.second) {
+ for (auto* filter : pair.second) {
static_cast<void>(filter->join_rpc());
}
}
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 757673c70a8..f02e203c783 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -484,29 +484,13 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
local_state._build_side_mutable_block.to_block());
COUNTER_UPDATE(local_state._build_blocks_memory_usage,
(*local_state._shared_state->build_block).bytes());
- RETURN_IF_ERROR(
- local_state.process_build_block(state,
(*local_state._shared_state->build_block)));
const bool use_global_rf =
local_state._parent->cast<HashJoinBuildSinkOperatorX>()._use_global_rf;
- auto ret = std::visit(
- Overload {[&](std::monostate&) -> Status {
- LOG(FATAL) << "FATAL: uninited hash table";
- __builtin_unreachable();
- },
- [&](auto&& arg) -> Status {
- vectorized::ProcessRuntimeFilterBuild
runtime_filter_build_process;
- return runtime_filter_build_process(state, arg,
&local_state,
-
use_global_rf);
- }},
- *local_state._shared_state->hash_table_variants);
- if (!ret.ok()) {
- if (_shared_hashtable_controller) {
- _shared_hash_table_context->status = ret;
- _shared_hashtable_controller->signal(node_id());
- }
- return ret;
- }
+ RETURN_IF_ERROR(vectorized::process_runtime_filter_build(
+ state, local_state._shared_state->build_block.get(),
&local_state, use_global_rf));
+ RETURN_IF_ERROR(
+ local_state.process_build_block(state,
(*local_state._shared_state->build_block)));
if (_shared_hashtable_controller) {
_shared_hash_table_context->status = Status::OK();
// arena will be shared with other instances.
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 5ea504d488d..3c1b772b30a 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -94,7 +94,10 @@ protected:
friend class HashJoinBuildSinkOperatorX;
template <class HashTableContext, typename Parent>
friend struct vectorized::ProcessHashTableBuild;
- friend struct vectorized::ProcessRuntimeFilterBuild;
+ template <typename Parent>
+ friend Status vectorized::process_runtime_filter_build(RuntimeState* state,
+ vectorized::Block*
block, Parent* parent,
+ bool is_global);
// build expr
vectorized::VExprContextSPtrs _build_expr_ctxs;
@@ -107,7 +110,6 @@ protected:
std::shared_ptr<VRuntimeFilterSlots> _runtime_filter_slots;
bool _has_set_need_null_map_for_build = false;
bool _build_side_ignore_null = false;
- std::unordered_set<const vectorized::Block*> _inserted_blocks;
std::shared_ptr<SharedHashTableDependency> _shared_hash_table_dependency;
std::vector<int> _build_col_ids;
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index c65513c807c..94cb5be876f 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -662,7 +662,7 @@ Status HashJoinNode::alloc_resource(doris::RuntimeState*
state) {
SCOPED_TIMER(_allocate_resource_timer);
RETURN_IF_ERROR(VJoinNodeBase::alloc_resource(state));
for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
- if (auto bf = _runtime_filters[i]->get_bloomfilter()) {
+ if (auto* bf = _runtime_filters[i]->get_bloomfilter()) {
RETURN_IF_ERROR(bf->init_with_fixed_length());
}
}
@@ -751,23 +751,8 @@ Status HashJoinNode::sink(doris::RuntimeState* state,
vectorized::Block* in_bloc
DCHECK(!_build_side_mutable_block.empty());
_build_block =
std::make_shared<Block>(_build_side_mutable_block.to_block());
COUNTER_UPDATE(_build_blocks_memory_usage, _build_block->bytes());
+ RETURN_IF_ERROR(process_runtime_filter_build(state,
_build_block.get(), this));
RETURN_IF_ERROR(_process_build_block(state, *_build_block));
- auto ret = std::visit(Overload {[&](std::monostate&) -> Status {
- LOG(FATAL) << "FATAL: uninited
hash table";
- __builtin_unreachable();
- },
- [&](auto&& arg) -> Status {
- ProcessRuntimeFilterBuild
runtime_filter_build_process;
- return
runtime_filter_build_process(state, arg, this);
- }},
- *_hash_table_variants);
- if (!ret.ok()) {
- if (_shared_hashtable_controller) {
- _shared_hash_table_context->status = ret;
- _shared_hashtable_controller->signal(id());
- }
- return ret;
- }
if (_shared_hashtable_controller) {
_shared_hash_table_context->status = Status::OK();
// arena will be shared with other instances.
@@ -949,9 +934,6 @@ void HashJoinNode::_set_build_ignore_flag(Block& block,
const std::vector<int>&
Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block) {
SCOPED_TIMER(_build_table_timer);
size_t rows = block.rows();
- if (UNLIKELY(rows == 0)) {
- return Status::OK();
- }
COUNTER_UPDATE(_build_rows_counter, rows);
ColumnRawPtrs raw_ptrs(_build_expr_ctxs.size());
diff --git a/be/src/vec/exec/join/vhash_join_node.h
b/be/src/vec/exec/join/vhash_join_node.h
index be94dacdcae..b9b3d18dff7 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -74,33 +74,28 @@ template <int JoinOpType, typename Parent>
struct ProcessHashTableProbe;
class HashJoinNode;
-struct ProcessRuntimeFilterBuild {
- template <class HashTableContext, typename Parent>
- Status operator()(RuntimeState* state, HashTableContext& hash_table_ctx,
Parent* parent,
- bool is_global = false) {
- if (parent->runtime_filter_descs().empty()) {
- return Status::OK();
- }
- parent->_runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>(
- parent->_build_expr_ctxs, parent->runtime_filter_descs(),
is_global);
-
- RETURN_IF_ERROR(
- parent->_runtime_filter_slots->init(state,
hash_table_ctx.hash_table->size()));
+template <typename Parent>
+Status process_runtime_filter_build(RuntimeState* state, Block* block, Parent*
parent,
+ bool is_global = false) {
+ if (parent->runtime_filter_descs().empty()) {
+ return Status::OK();
+ }
+ parent->_runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>(
+ parent->_build_expr_ctxs, parent->runtime_filter_descs(),
is_global);
- if (!parent->_runtime_filter_slots->empty() &&
!parent->_inserted_blocks.empty()) {
- {
- SCOPED_TIMER(parent->_runtime_filter_compute_timer);
-
parent->_runtime_filter_slots->insert(parent->_inserted_blocks);
- }
- }
- {
- SCOPED_TIMER(parent->_publish_runtime_filter_timer);
- RETURN_IF_ERROR(parent->_runtime_filter_slots->publish());
- }
+ RETURN_IF_ERROR(parent->_runtime_filter_slots->init(state, block->rows()));
- return Status::OK();
+ if (!parent->_runtime_filter_slots->empty() && block->rows() > 1) {
+ SCOPED_TIMER(parent->_runtime_filter_compute_timer);
+ parent->_runtime_filter_slots->insert(block);
+ }
+ {
+ SCOPED_TIMER(parent->_publish_runtime_filter_timer);
+ RETURN_IF_ERROR(parent->_runtime_filter_slots->publish());
}
-};
+
+ return Status::OK();
+}
using ProfileCounter = RuntimeProfile::Counter;
@@ -129,10 +124,6 @@ struct ProcessHashTableBuild {
}
}
- if (!_parent->runtime_filter_descs().empty()) {
- _parent->_inserted_blocks.insert(&_acquired_block);
- }
-
SCOPED_TIMER(_parent->_build_table_insert_timer);
hash_table_ctx.hash_table->template prepare_build<JoinOpType>(_rows,
_batch_size,
*has_null_key);
@@ -414,10 +405,11 @@ private:
template <int JoinOpType, typename Parent>
friend struct ProcessHashTableProbe;
- friend struct ProcessRuntimeFilterBuild;
+ template <typename Parent>
+ friend Status process_runtime_filter_build(RuntimeState* state,
vectorized::Block* block,
+ Parent* parent, bool is_global);
std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
- std::unordered_set<const Block*> _inserted_blocks;
std::vector<IRuntimeFilter*> _runtime_filters;
std::atomic_bool _probe_open_finish = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]