This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 32381138a67 [Improvement](runtime-filter) build runtime filter before 
build hash table on join build probe (#29727)
32381138a67 is described below

commit 32381138a67aa72eed5f5bfafec66cfeecd6d078
Author: Pxl <[email protected]>
AuthorDate: Thu Jan 11 10:45:56 2024 +0800

    [Improvement](runtime-filter) build runtime filter before build hash table 
on join build probe (#29727)
    
    build runtime filter before build hash table on join build probe
---
 be/src/exprs/runtime_filter_slots.h          | 16 ++++-----
 be/src/pipeline/exec/hashjoin_build_sink.cpp | 24 +++----------
 be/src/pipeline/exec/hashjoin_build_sink.h   |  6 ++--
 be/src/vec/exec/join/vhash_join_node.cpp     | 22 ++----------
 be/src/vec/exec/join/vhash_join_node.h       | 52 ++++++++++++----------------
 5 files changed, 39 insertions(+), 81 deletions(-)

diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index 495ac28e762..4859734a6a4 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -57,7 +57,7 @@ public:
                 throw Exception(ErrorCode::INTERNAL_ERROR, "filters empty, 
filter_id={}",
                                 filter_id);
             }
-            for (auto filter : filters) {
+            for (auto* filter : filters) {
                 filter->set_ignored("");
                 filter->signal();
             }
@@ -166,7 +166,7 @@ public:
         return Status::OK();
     }
 
-    void insert(const std::unordered_set<const vectorized::Block*>& datas) {
+    void insert(const vectorized::Block* block) {
         for (int i = 0; i < _build_expr_context.size(); ++i) {
             auto iter = _runtime_filters.find(i);
             if (iter == _runtime_filters.end()) {
@@ -174,18 +174,16 @@ public:
             }
 
             int result_column_id = 
_build_expr_context[i]->get_last_result_column_id();
-            for (const auto* it : datas) {
-                auto column = it->get_by_position(result_column_id).column;
-                for (auto* filter : iter->second) {
-                    filter->insert_batch(column, 1);
-                }
+            const auto& column = 
block->get_by_position(result_column_id).column;
+            for (auto* filter : iter->second) {
+                filter->insert_batch(column, 1);
             }
         }
     }
 
     bool ready_finish_publish() {
         for (auto& pair : _runtime_filters) {
-            for (auto filter : pair.second) {
+            for (auto* filter : pair.second) {
                 if (!filter->is_finish_rpc()) {
                     return false;
                 }
@@ -196,7 +194,7 @@ public:
 
     void finish_publish() {
         for (auto& pair : _runtime_filters) {
-            for (auto filter : pair.second) {
+            for (auto* filter : pair.second) {
                 static_cast<void>(filter->join_rpc());
             }
         }
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 757673c70a8..f02e203c783 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -484,29 +484,13 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
                 local_state._build_side_mutable_block.to_block());
         COUNTER_UPDATE(local_state._build_blocks_memory_usage,
                        (*local_state._shared_state->build_block).bytes());
-        RETURN_IF_ERROR(
-                local_state.process_build_block(state, 
(*local_state._shared_state->build_block)));
 
         const bool use_global_rf =
                 
local_state._parent->cast<HashJoinBuildSinkOperatorX>()._use_global_rf;
-        auto ret = std::visit(
-                Overload {[&](std::monostate&) -> Status {
-                              LOG(FATAL) << "FATAL: uninited hash table";
-                              __builtin_unreachable();
-                          },
-                          [&](auto&& arg) -> Status {
-                              vectorized::ProcessRuntimeFilterBuild 
runtime_filter_build_process;
-                              return runtime_filter_build_process(state, arg, 
&local_state,
-                                                                  
use_global_rf);
-                          }},
-                *local_state._shared_state->hash_table_variants);
-        if (!ret.ok()) {
-            if (_shared_hashtable_controller) {
-                _shared_hash_table_context->status = ret;
-                _shared_hashtable_controller->signal(node_id());
-            }
-            return ret;
-        }
+        RETURN_IF_ERROR(vectorized::process_runtime_filter_build(
+                state, local_state._shared_state->build_block.get(), 
&local_state, use_global_rf));
+        RETURN_IF_ERROR(
+                local_state.process_build_block(state, 
(*local_state._shared_state->build_block)));
         if (_shared_hashtable_controller) {
             _shared_hash_table_context->status = Status::OK();
             // arena will be shared with other instances.
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 5ea504d488d..3c1b772b30a 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -94,7 +94,10 @@ protected:
     friend class HashJoinBuildSinkOperatorX;
     template <class HashTableContext, typename Parent>
     friend struct vectorized::ProcessHashTableBuild;
-    friend struct vectorized::ProcessRuntimeFilterBuild;
+    template <typename Parent>
+    friend Status vectorized::process_runtime_filter_build(RuntimeState* state,
+                                                           vectorized::Block* 
block, Parent* parent,
+                                                           bool is_global);
 
     // build expr
     vectorized::VExprContextSPtrs _build_expr_ctxs;
@@ -107,7 +110,6 @@ protected:
     std::shared_ptr<VRuntimeFilterSlots> _runtime_filter_slots;
     bool _has_set_need_null_map_for_build = false;
     bool _build_side_ignore_null = false;
-    std::unordered_set<const vectorized::Block*> _inserted_blocks;
     std::shared_ptr<SharedHashTableDependency> _shared_hash_table_dependency;
     std::vector<int> _build_col_ids;
 
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index c65513c807c..94cb5be876f 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -662,7 +662,7 @@ Status HashJoinNode::alloc_resource(doris::RuntimeState* 
state) {
     SCOPED_TIMER(_allocate_resource_timer);
     RETURN_IF_ERROR(VJoinNodeBase::alloc_resource(state));
     for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
-        if (auto bf = _runtime_filters[i]->get_bloomfilter()) {
+        if (auto* bf = _runtime_filters[i]->get_bloomfilter()) {
             RETURN_IF_ERROR(bf->init_with_fixed_length());
         }
     }
@@ -751,23 +751,8 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
         DCHECK(!_build_side_mutable_block.empty());
         _build_block = 
std::make_shared<Block>(_build_side_mutable_block.to_block());
         COUNTER_UPDATE(_build_blocks_memory_usage, _build_block->bytes());
+        RETURN_IF_ERROR(process_runtime_filter_build(state, 
_build_block.get(), this));
         RETURN_IF_ERROR(_process_build_block(state, *_build_block));
-        auto ret = std::visit(Overload {[&](std::monostate&) -> Status {
-                                            LOG(FATAL) << "FATAL: uninited 
hash table";
-                                            __builtin_unreachable();
-                                        },
-                                        [&](auto&& arg) -> Status {
-                                            ProcessRuntimeFilterBuild 
runtime_filter_build_process;
-                                            return 
runtime_filter_build_process(state, arg, this);
-                                        }},
-                              *_hash_table_variants);
-        if (!ret.ok()) {
-            if (_shared_hashtable_controller) {
-                _shared_hash_table_context->status = ret;
-                _shared_hashtable_controller->signal(id());
-            }
-            return ret;
-        }
         if (_shared_hashtable_controller) {
             _shared_hash_table_context->status = Status::OK();
             // arena will be shared with other instances.
@@ -949,9 +934,6 @@ void HashJoinNode::_set_build_ignore_flag(Block& block, 
const std::vector<int>&
 Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block) {
     SCOPED_TIMER(_build_table_timer);
     size_t rows = block.rows();
-    if (UNLIKELY(rows == 0)) {
-        return Status::OK();
-    }
     COUNTER_UPDATE(_build_rows_counter, rows);
 
     ColumnRawPtrs raw_ptrs(_build_expr_ctxs.size());
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index be94dacdcae..b9b3d18dff7 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -74,33 +74,28 @@ template <int JoinOpType, typename Parent>
 struct ProcessHashTableProbe;
 class HashJoinNode;
 
-struct ProcessRuntimeFilterBuild {
-    template <class HashTableContext, typename Parent>
-    Status operator()(RuntimeState* state, HashTableContext& hash_table_ctx, 
Parent* parent,
-                      bool is_global = false) {
-        if (parent->runtime_filter_descs().empty()) {
-            return Status::OK();
-        }
-        parent->_runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>(
-                parent->_build_expr_ctxs, parent->runtime_filter_descs(), 
is_global);
-
-        RETURN_IF_ERROR(
-                parent->_runtime_filter_slots->init(state, 
hash_table_ctx.hash_table->size()));
+template <typename Parent>
+Status process_runtime_filter_build(RuntimeState* state, Block* block, Parent* 
parent,
+                                    bool is_global = false) {
+    if (parent->runtime_filter_descs().empty()) {
+        return Status::OK();
+    }
+    parent->_runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>(
+            parent->_build_expr_ctxs, parent->runtime_filter_descs(), 
is_global);
 
-        if (!parent->_runtime_filter_slots->empty() && 
!parent->_inserted_blocks.empty()) {
-            {
-                SCOPED_TIMER(parent->_runtime_filter_compute_timer);
-                
parent->_runtime_filter_slots->insert(parent->_inserted_blocks);
-            }
-        }
-        {
-            SCOPED_TIMER(parent->_publish_runtime_filter_timer);
-            RETURN_IF_ERROR(parent->_runtime_filter_slots->publish());
-        }
+    RETURN_IF_ERROR(parent->_runtime_filter_slots->init(state, block->rows()));
 
-        return Status::OK();
+    if (!parent->_runtime_filter_slots->empty() && block->rows() > 1) {
+        SCOPED_TIMER(parent->_runtime_filter_compute_timer);
+        parent->_runtime_filter_slots->insert(block);
+    }
+    {
+        SCOPED_TIMER(parent->_publish_runtime_filter_timer);
+        RETURN_IF_ERROR(parent->_runtime_filter_slots->publish());
     }
-};
+
+    return Status::OK();
+}
 
 using ProfileCounter = RuntimeProfile::Counter;
 
@@ -129,10 +124,6 @@ struct ProcessHashTableBuild {
             }
         }
 
-        if (!_parent->runtime_filter_descs().empty()) {
-            _parent->_inserted_blocks.insert(&_acquired_block);
-        }
-
         SCOPED_TIMER(_parent->_build_table_insert_timer);
         hash_table_ctx.hash_table->template prepare_build<JoinOpType>(_rows, 
_batch_size,
                                                                       
*has_null_key);
@@ -414,10 +405,11 @@ private:
     template <int JoinOpType, typename Parent>
     friend struct ProcessHashTableProbe;
 
-    friend struct ProcessRuntimeFilterBuild;
+    template <typename Parent>
+    friend Status process_runtime_filter_build(RuntimeState* state, 
vectorized::Block* block,
+                                               Parent* parent, bool is_global);
 
     std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
-    std::unordered_set<const Block*> _inserted_blocks;
 
     std::vector<IRuntimeFilter*> _runtime_filters;
     std::atomic_bool _probe_open_finish = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to