This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a3427cb822 [pipelineX](fix) Fix nested loop join operator (#24885)
a3427cb822 is described below
commit a3427cb822f356b368934e51dc92322070686686
Author: Gabriel <[email protected]>
AuthorDate: Tue Sep 26 13:27:34 2023 +0800
[pipelineX](fix) Fix nested loop join operator (#24885)
---
be/src/pipeline/exec/hashjoin_build_sink.cpp | 5 ++++-
be/src/pipeline/exec/hashjoin_build_sink.h | 5 +----
be/src/pipeline/exec/join_build_sink_operator.cpp | 3 ---
be/src/pipeline/exec/join_build_sink_operator.h | 2 --
be/src/pipeline/exec/nested_loop_join_build_operator.cpp | 8 +++++---
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 8 ++++----
be/src/vec/runtime/shared_hash_table_controller.cpp | 5 +++--
be/src/vec/runtime/shared_hash_table_controller.h | 10 +++++-----
8 files changed, 22 insertions(+), 24 deletions(-)
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 92763f9f27..da262645bb 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -77,6 +77,10 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState*
state, LocalSinkStateInfo
profile()->add_info_string("ShareHashTableEnabled", "false");
}
}
+ if (!_should_build_hash_table) {
+ _shared_hash_table_dependency->block_writing();
+ p._shared_hashtable_controller->append_dependency(p.id(),
_shared_hash_table_dependency);
+ }
_memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
@@ -445,7 +449,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
- SCOPED_TIMER(local_state._build_timer);
// make one block for each 4 gigabytes
constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 9c9572dca8..54848f7c86 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -50,7 +50,7 @@ class SharedHashTableDependency : public WriteDependency {
public:
ENABLE_FACTORY_CREATOR(SharedHashTableDependency);
SharedHashTableDependency(int id) : WriteDependency(id,
"SharedHashTableDependency") {}
- ~SharedHashTableDependency() = default;
+ ~SharedHashTableDependency() override = default;
void* shared_state() override { return nullptr; }
};
@@ -135,9 +135,6 @@ public:
WriteDependency* wait_for_dependency(RuntimeState* state) override {
CREATE_SINK_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state);
- if (local_state._should_build_hash_table) {
- return nullptr;
- }
return local_state._shared_hash_table_dependency->write_blocked_by();
}
diff --git a/be/src/pipeline/exec/join_build_sink_operator.cpp
b/be/src/pipeline/exec/join_build_sink_operator.cpp
index 13cbcbcb03..fe5faf43a8 100644
--- a/be/src/pipeline/exec/join_build_sink_operator.cpp
+++ b/be/src/pipeline/exec/join_build_sink_operator.cpp
@@ -32,9 +32,6 @@ Status JoinBuildSinkLocalState<DependencyType,
Derived>::init(RuntimeState* stat
PipelineXSinkLocalState<DependencyType>::profile()->add_info_string("JoinType",
to_string(p._join_op));
- _build_get_next_timer =
- ADD_TIMER(PipelineXSinkLocalState<DependencyType>::profile(),
"BuildGetNextTime");
- _build_timer =
ADD_TIMER(PipelineXSinkLocalState<DependencyType>::profile(), "BuildTime");
_build_rows_counter =
ADD_COUNTER(PipelineXSinkLocalState<DependencyType>::profile(),
"BuildRows", TUnit::UNIT);
diff --git a/be/src/pipeline/exec/join_build_sink_operator.h
b/be/src/pipeline/exec/join_build_sink_operator.h
index 55553df462..a67c724f69 100644
--- a/be/src/pipeline/exec/join_build_sink_operator.h
+++ b/be/src/pipeline/exec/join_build_sink_operator.h
@@ -42,8 +42,6 @@ protected:
bool _short_circuit_for_null_in_probe_side = false;
- RuntimeProfile::Counter* _build_timer;
- RuntimeProfile::Counter* _build_get_next_timer;
RuntimeProfile::Counter* _build_rows_counter;
RuntimeProfile::Counter* _push_down_timer;
RuntimeProfile::Counter* _push_compute_timer;
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index 6ee816db1b..f3b1c1ee32 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -35,10 +35,15 @@ Status
NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta
SCOPED_TIMER(profile()->total_time_counter());
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<NestedLoopJoinBuildSinkOperatorX>();
+ _shared_state->join_op_variants = p._join_op_variants;
_filter_src_expr_ctxs.resize(p._filter_src_expr_ctxs.size());
for (size_t i = 0; i < _filter_src_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._filter_src_expr_ctxs[i]->clone(state,
_filter_src_expr_ctxs[i]));
}
+ for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
+ RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
+ p._runtime_filter_descs[i], state->query_options()));
+ }
return Status::OK();
}
@@ -61,8 +66,6 @@ Status NestedLoopJoinBuildSinkOperatorX::init(const
TPlanNode& tnode, RuntimeSta
std::vector<TExpr> filter_src_exprs;
for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
filter_src_exprs.push_back(_runtime_filter_descs[i].src_expr);
- RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
- _runtime_filter_descs[i], state->query_options()));
}
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(filter_src_exprs,
_filter_src_expr_ctxs));
return Status::OK();
@@ -90,7 +93,6 @@ Status
NestedLoopJoinBuildSinkOperatorX::sink(doris::RuntimeState* state, vector
CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
- SCOPED_TIMER(local_state._build_timer);
auto rows = block->rows();
auto mem_usage = block->allocated_bytes();
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 1d56bb2f63..a7bacdd96d 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -69,8 +69,8 @@ Status PipelineXTask::prepare(RuntimeState* state, const
TPipelineInstanceParams
auto& deps = get_downstream_dependency();
std::vector<LocalSinkStateInfo> infos;
for (auto& dep : deps) {
- infos.push_back(LocalSinkStateInfo {_pipeline->pipeline_profile(),
- local_params.sender_id,
dep.get(), tsink});
+ infos.emplace_back(
+ LocalSinkStateInfo {_parent_profile,
local_params.sender_id, dep.get(), tsink});
}
RETURN_IF_ERROR(_sink->setup_local_states(state, infos));
}
@@ -84,10 +84,10 @@ Status PipelineXTask::prepare(RuntimeState* state, const
TPipelineInstanceParams
for (auto& dep : deps) {
LocalStateInfo info {
op_idx == _operators.size() - 1
- ? _pipeline->pipeline_profile()
+ ? _parent_profile
: state->get_local_state(_operators[op_idx +
1]->id())->profile(),
scan_ranges, dep.get()};
- infos.push_back(info);
+ infos.emplace_back(info);
}
RETURN_IF_ERROR(_operators[op_idx]->setup_local_states(state, infos));
}
diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp
b/be/src/vec/runtime/shared_hash_table_controller.cpp
index 9d28983afb..524af272c1 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.cpp
+++ b/be/src/vec/runtime/shared_hash_table_controller.cpp
@@ -34,6 +34,7 @@ void
SharedHashTableController::set_builder_and_consumers(TUniqueId builder, int
std::lock_guard<std::mutex> lock(_mutex);
DCHECK(_builder_fragment_ids.find(node_id) ==
_builder_fragment_ids.cend());
_builder_fragment_ids.insert({node_id, builder});
+ _dependencies.insert({node_id, {}});
}
bool SharedHashTableController::should_build_hash_table(const TUniqueId&
fragment_instance_id,
@@ -70,7 +71,7 @@ void SharedHashTableController::signal(int my_node_id, Status
status) {
it->second->status = status;
_shared_contexts.erase(it);
}
- for (auto& dep : _dependencies) {
+ for (auto& dep : _dependencies[my_node_id]) {
dep->set_ready_for_write();
}
_cv.notify_all();
@@ -83,7 +84,7 @@ void SharedHashTableController::signal(int my_node_id) {
it->second->signaled = true;
_shared_contexts.erase(it);
}
- for (auto& dep : _dependencies) {
+ for (auto& dep : _dependencies[my_node_id]) {
dep->set_ready_for_write();
}
_cv.notify_all();
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h
b/be/src/vec/runtime/shared_hash_table_controller.h
index e79ab97af5..fae87c3731 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -79,20 +79,20 @@ public:
Status wait_for_signal(RuntimeState* state, const
SharedHashTableContextPtr& context);
bool should_build_hash_table(const TUniqueId& fragment_instance_id, int
my_node_id);
void set_pipeline_engine_enabled(bool enabled) { _pipeline_engine_enabled
= enabled; }
- void
append_dependency(std::shared_ptr<pipeline::SharedHashTableDependency> dep) {
+ void append_dependency(int node_id,
std::shared_ptr<pipeline::SharedHashTableDependency> dep) {
std::lock_guard<std::mutex> lock(_mutex);
- _dependencies.push_back(dep);
+ _dependencies[node_id].push_back(dep);
}
private:
bool _pipeline_engine_enabled = false;
std::mutex _mutex;
+ // For pipelineX, we update all dependencies once hash table is built;
+ std::map<int /*node id*/,
std::vector<std::shared_ptr<pipeline::SharedHashTableDependency>>>
+ _dependencies;
std::condition_variable _cv;
std::map<int /*node id*/, TUniqueId /*fragment instance id*/>
_builder_fragment_ids;
std::map<int /*node id*/, SharedHashTableContextPtr> _shared_contexts;
-
- // For pipelineX, we update all dependencies once hash table is built;
- std::vector<std::shared_ptr<pipeline::SharedHashTableDependency>>
_dependencies;
};
} // namespace vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]