This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a3427cb822 [pipelineX](fix) Fix nested loop join operator (#24885)
a3427cb822 is described below

commit a3427cb822f356b368934e51dc92322070686686
Author: Gabriel <[email protected]>
AuthorDate: Tue Sep 26 13:27:34 2023 +0800

    [pipelineX](fix) Fix nested loop join operator (#24885)
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp             |  5 ++++-
 be/src/pipeline/exec/hashjoin_build_sink.h               |  5 +----
 be/src/pipeline/exec/join_build_sink_operator.cpp        |  3 ---
 be/src/pipeline/exec/join_build_sink_operator.h          |  2 --
 be/src/pipeline/exec/nested_loop_join_build_operator.cpp |  8 +++++---
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp           |  8 ++++----
 be/src/vec/runtime/shared_hash_table_controller.cpp      |  5 +++--
 be/src/vec/runtime/shared_hash_table_controller.h        | 10 +++++-----
 8 files changed, 22 insertions(+), 24 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 92763f9f27..da262645bb 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -77,6 +77,10 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
             profile()->add_info_string("ShareHashTableEnabled", "false");
         }
     }
+    if (!_should_build_hash_table) {
+        _shared_hash_table_dependency->block_writing();
+        p._shared_hashtable_controller->append_dependency(p.id(), 
_shared_hash_table_dependency);
+    }
 
     _memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
 
@@ -445,7 +449,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
-    SCOPED_TIMER(local_state._build_timer);
 
     // make one block for each 4 gigabytes
     constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 9c9572dca8..54848f7c86 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -50,7 +50,7 @@ class SharedHashTableDependency : public WriteDependency {
 public:
     ENABLE_FACTORY_CREATOR(SharedHashTableDependency);
     SharedHashTableDependency(int id) : WriteDependency(id, 
"SharedHashTableDependency") {}
-    ~SharedHashTableDependency() = default;
+    ~SharedHashTableDependency() override = default;
 
     void* shared_state() override { return nullptr; }
 };
@@ -135,9 +135,6 @@ public:
 
     WriteDependency* wait_for_dependency(RuntimeState* state) override {
         CREATE_SINK_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state);
-        if (local_state._should_build_hash_table) {
-            return nullptr;
-        }
         return local_state._shared_hash_table_dependency->write_blocked_by();
     }
 
diff --git a/be/src/pipeline/exec/join_build_sink_operator.cpp 
b/be/src/pipeline/exec/join_build_sink_operator.cpp
index 13cbcbcb03..fe5faf43a8 100644
--- a/be/src/pipeline/exec/join_build_sink_operator.cpp
+++ b/be/src/pipeline/exec/join_build_sink_operator.cpp
@@ -32,9 +32,6 @@ Status JoinBuildSinkLocalState<DependencyType, 
Derived>::init(RuntimeState* stat
 
     
PipelineXSinkLocalState<DependencyType>::profile()->add_info_string("JoinType",
                                                                         
to_string(p._join_op));
-    _build_get_next_timer =
-            ADD_TIMER(PipelineXSinkLocalState<DependencyType>::profile(), 
"BuildGetNextTime");
-    _build_timer = 
ADD_TIMER(PipelineXSinkLocalState<DependencyType>::profile(), "BuildTime");
     _build_rows_counter = 
ADD_COUNTER(PipelineXSinkLocalState<DependencyType>::profile(),
                                       "BuildRows", TUnit::UNIT);
 
diff --git a/be/src/pipeline/exec/join_build_sink_operator.h 
b/be/src/pipeline/exec/join_build_sink_operator.h
index 55553df462..a67c724f69 100644
--- a/be/src/pipeline/exec/join_build_sink_operator.h
+++ b/be/src/pipeline/exec/join_build_sink_operator.h
@@ -42,8 +42,6 @@ protected:
 
     bool _short_circuit_for_null_in_probe_side = false;
 
-    RuntimeProfile::Counter* _build_timer;
-    RuntimeProfile::Counter* _build_get_next_timer;
     RuntimeProfile::Counter* _build_rows_counter;
     RuntimeProfile::Counter* _push_down_timer;
     RuntimeProfile::Counter* _push_compute_timer;
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index 6ee816db1b..f3b1c1ee32 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -35,10 +35,15 @@ Status 
NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta
     SCOPED_TIMER(profile()->total_time_counter());
     SCOPED_TIMER(_open_timer);
     auto& p = _parent->cast<NestedLoopJoinBuildSinkOperatorX>();
+    _shared_state->join_op_variants = p._join_op_variants;
     _filter_src_expr_ctxs.resize(p._filter_src_expr_ctxs.size());
     for (size_t i = 0; i < _filter_src_expr_ctxs.size(); i++) {
         RETURN_IF_ERROR(p._filter_src_expr_ctxs[i]->clone(state, 
_filter_src_expr_ctxs[i]));
     }
+    for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
+        RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
+                p._runtime_filter_descs[i], state->query_options()));
+    }
     return Status::OK();
 }
 
@@ -61,8 +66,6 @@ Status NestedLoopJoinBuildSinkOperatorX::init(const 
TPlanNode& tnode, RuntimeSta
     std::vector<TExpr> filter_src_exprs;
     for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
         filter_src_exprs.push_back(_runtime_filter_descs[i].src_expr);
-        RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
-                _runtime_filter_descs[i], state->query_options()));
     }
     RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(filter_src_exprs, 
_filter_src_expr_ctxs));
     return Status::OK();
@@ -90,7 +93,6 @@ Status 
NestedLoopJoinBuildSinkOperatorX::sink(doris::RuntimeState* state, vector
     CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
-    SCOPED_TIMER(local_state._build_timer);
     auto rows = block->rows();
     auto mem_usage = block->allocated_bytes();
 
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 1d56bb2f63..a7bacdd96d 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -69,8 +69,8 @@ Status PipelineXTask::prepare(RuntimeState* state, const 
TPipelineInstanceParams
         auto& deps = get_downstream_dependency();
         std::vector<LocalSinkStateInfo> infos;
         for (auto& dep : deps) {
-            infos.push_back(LocalSinkStateInfo {_pipeline->pipeline_profile(),
-                                                local_params.sender_id, 
dep.get(), tsink});
+            infos.emplace_back(
+                    LocalSinkStateInfo {_parent_profile, 
local_params.sender_id, dep.get(), tsink});
         }
         RETURN_IF_ERROR(_sink->setup_local_states(state, infos));
     }
@@ -84,10 +84,10 @@ Status PipelineXTask::prepare(RuntimeState* state, const 
TPipelineInstanceParams
         for (auto& dep : deps) {
             LocalStateInfo info {
                     op_idx == _operators.size() - 1
-                            ? _pipeline->pipeline_profile()
+                            ? _parent_profile
                             : state->get_local_state(_operators[op_idx + 
1]->id())->profile(),
                     scan_ranges, dep.get()};
-            infos.push_back(info);
+            infos.emplace_back(info);
         }
         RETURN_IF_ERROR(_operators[op_idx]->setup_local_states(state, infos));
     }
diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp 
b/be/src/vec/runtime/shared_hash_table_controller.cpp
index 9d28983afb..524af272c1 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.cpp
+++ b/be/src/vec/runtime/shared_hash_table_controller.cpp
@@ -34,6 +34,7 @@ void 
SharedHashTableController::set_builder_and_consumers(TUniqueId builder, int
     std::lock_guard<std::mutex> lock(_mutex);
     DCHECK(_builder_fragment_ids.find(node_id) == 
_builder_fragment_ids.cend());
     _builder_fragment_ids.insert({node_id, builder});
+    _dependencies.insert({node_id, {}});
 }
 
 bool SharedHashTableController::should_build_hash_table(const TUniqueId& 
fragment_instance_id,
@@ -70,7 +71,7 @@ void SharedHashTableController::signal(int my_node_id, Status 
status) {
         it->second->status = status;
         _shared_contexts.erase(it);
     }
-    for (auto& dep : _dependencies) {
+    for (auto& dep : _dependencies[my_node_id]) {
         dep->set_ready_for_write();
     }
     _cv.notify_all();
@@ -83,7 +84,7 @@ void SharedHashTableController::signal(int my_node_id) {
         it->second->signaled = true;
         _shared_contexts.erase(it);
     }
-    for (auto& dep : _dependencies) {
+    for (auto& dep : _dependencies[my_node_id]) {
         dep->set_ready_for_write();
     }
     _cv.notify_all();
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h 
b/be/src/vec/runtime/shared_hash_table_controller.h
index e79ab97af5..fae87c3731 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -79,20 +79,20 @@ public:
     Status wait_for_signal(RuntimeState* state, const 
SharedHashTableContextPtr& context);
     bool should_build_hash_table(const TUniqueId& fragment_instance_id, int 
my_node_id);
     void set_pipeline_engine_enabled(bool enabled) { _pipeline_engine_enabled 
= enabled; }
-    void 
append_dependency(std::shared_ptr<pipeline::SharedHashTableDependency> dep) {
+    void append_dependency(int node_id, 
std::shared_ptr<pipeline::SharedHashTableDependency> dep) {
         std::lock_guard<std::mutex> lock(_mutex);
-        _dependencies.push_back(dep);
+        _dependencies[node_id].push_back(dep);
     }
 
 private:
     bool _pipeline_engine_enabled = false;
     std::mutex _mutex;
+    // For pipelineX, we update all dependencies once hash table is built;
+    std::map<int /*node id*/, 
std::vector<std::shared_ptr<pipeline::SharedHashTableDependency>>>
+            _dependencies;
     std::condition_variable _cv;
     std::map<int /*node id*/, TUniqueId /*fragment instance id*/> 
_builder_fragment_ids;
     std::map<int /*node id*/, SharedHashTableContextPtr> _shared_contexts;
-
-    // For pipelineX, we update all dependencies once hash table is built;
-    std::vector<std::shared_ptr<pipeline::SharedHashTableDependency>> 
_dependencies;
 };
 
 } // namespace vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to