This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 28aad017d1c [refactor](minor) delete unused shared hash table
construction (#35413)
28aad017d1c is described below
commit 28aad017d1c093e1ad4fda956ea9f94ed8561cfb
Author: Gabriel <[email protected]>
AuthorDate: Mon May 27 14:40:08 2024 +0800
[refactor](minor) delete unused shared hash table construction (#35413)
---
be/src/pipeline/exec/hashjoin_build_sink.cpp | 4 +-
be/src/runtime/fragment_mgr.cpp | 80 ----------
be/src/runtime/fragment_mgr.h | 10 --
be/src/vec/exec/join/vhash_join_node.cpp | 176 ++++++---------------
be/src/vec/exec/join/vhash_join_node.h | 14 --
.../vec/runtime/shared_hash_table_controller.cpp | 55 +------
be/src/vec/runtime/shared_hash_table_controller.h | 9 +-
.../main/java/org/apache/doris/qe/Coordinator.java | 6 -
gensrc/thrift/PaloInternalService.thrift | 2 +
9 files changed, 56 insertions(+), 300 deletions(-)
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 90aeb7070b6..7c486fd4b45 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -57,8 +57,8 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo
_should_build_hash_table = info.task_idx == 0;
if (_should_build_hash_table) {
profile()->add_info_string("ShareHashTableEnabled", "true");
- CHECK(p._shared_hashtable_controller->should_build_hash_table(
- state->fragment_instance_id(), p.node_id()));
+ p._shared_hashtable_controller->set_builder_and_consumers(
+ state->fragment_instance_id(), p.node_id());
}
} else {
profile()->add_info_string("ShareHashTableEnabled", "false");
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index cd54edd5f54..8534638f681 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -674,7 +674,6 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
query_ctx->set_rsc_info = true;
}
-
query_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(pipeline);
_set_scan_concurrency(params, query_ctx.get());
const bool is_pipeline = std::is_same_v<TPipelineFragmentParams,
Params>;
@@ -851,7 +850,6 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
params.query_options.enable_pipeline_x_engine) ||
(params.query_options.__isset.enable_pipeline_engine &&
params.query_options.enable_pipeline_engine));
- _setup_shared_hashtable_for_broadcast_join(params, query_ctx.get());
int64_t duration_ns = 0;
std::shared_ptr<pipeline::PipelineFragmentContext> context =
std::make_shared<pipeline::PipelineFragmentContext>(
@@ -1440,84 +1438,6 @@ Status FragmentMgr::merge_filter(const
PMergeFilterRequest* request,
return merge_status;
}
-void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const
TExecPlanFragmentParams& params,
- QueryContext*
query_ctx) {
- if
(!params.query_options.__isset.enable_share_hash_table_for_broadcast_join ||
- !params.query_options.enable_share_hash_table_for_broadcast_join) {
- return;
- }
-
- if (!params.__isset.fragment || !params.fragment.__isset.plan ||
- params.fragment.plan.nodes.empty()) {
- return;
- }
- for (auto& node : params.fragment.plan.nodes) {
- if (node.node_type != TPlanNodeType::HASH_JOIN_NODE ||
- !node.hash_join_node.__isset.is_broadcast_join ||
- !node.hash_join_node.is_broadcast_join) {
- continue;
- }
-
- if (params.build_hash_table_for_broadcast_join) {
-
query_ctx->get_shared_hash_table_controller()->set_builder_and_consumers(
- params.params.fragment_instance_id, node.node_id);
- }
- }
-}
-
-void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(
- const TPipelineFragmentParams& params, const TPipelineInstanceParams&
local_params,
- QueryContext* query_ctx) {
- if
(!params.query_options.__isset.enable_share_hash_table_for_broadcast_join ||
- !params.query_options.enable_share_hash_table_for_broadcast_join) {
- return;
- }
-
- if (!params.__isset.fragment || !params.fragment.__isset.plan ||
- params.fragment.plan.nodes.empty()) {
- return;
- }
- for (auto& node : params.fragment.plan.nodes) {
- if (node.node_type != TPlanNodeType::HASH_JOIN_NODE ||
- !node.hash_join_node.__isset.is_broadcast_join ||
- !node.hash_join_node.is_broadcast_join) {
- continue;
- }
-
- if (local_params.build_hash_table_for_broadcast_join) {
-
query_ctx->get_shared_hash_table_controller()->set_builder_and_consumers(
- local_params.fragment_instance_id, node.node_id);
- }
- }
-}
-
-void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const
TPipelineFragmentParams& params,
- QueryContext*
query_ctx) {
- if
(!params.query_options.__isset.enable_share_hash_table_for_broadcast_join ||
- !params.query_options.enable_share_hash_table_for_broadcast_join) {
- return;
- }
-
- if (!params.__isset.fragment || !params.fragment.__isset.plan ||
- params.fragment.plan.nodes.empty()) {
- return;
- }
- for (auto& node : params.fragment.plan.nodes) {
- if (node.node_type != TPlanNodeType::HASH_JOIN_NODE ||
- !node.hash_join_node.__isset.is_broadcast_join ||
- !node.hash_join_node.is_broadcast_join) {
- continue;
- }
-
- for (auto& local_param : params.local_params) {
- if (local_param.build_hash_table_for_broadcast_join) {
-
query_ctx->get_shared_hash_table_controller()->set_builder_and_consumers(
- local_param.fragment_instance_id, node.node_id);
- }
- }
- }
-}
-
void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>*
query_info_list) {
{
std::lock_guard<std::mutex> lock(_lock);
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 717b6813abc..c8298ee67b7 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -162,16 +162,6 @@ private:
template <typename Param>
void _set_scan_concurrency(const Param& params, QueryContext* query_ctx);
- void _setup_shared_hashtable_for_broadcast_join(const
TExecPlanFragmentParams& params,
- QueryContext* query_ctx);
-
- void _setup_shared_hashtable_for_broadcast_join(const
TPipelineFragmentParams& params,
- const
TPipelineInstanceParams& local_params,
- QueryContext* query_ctx);
-
- void _setup_shared_hashtable_for_broadcast_join(const
TPipelineFragmentParams& params,
- QueryContext* query_ctx);
-
template <typename Params>
Status _get_query_ctx(const Params& params, TUniqueId query_id, bool
pipeline,
std::shared_ptr<QueryContext>& query_ctx);
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index 9fec942d161..bee0c67c4b3 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -218,19 +218,8 @@ Status HashJoinNode::init(const TPlanNode& tnode,
RuntimeState* state) {
Status HashJoinNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(VJoinNodeBase::prepare(state));
- _should_build_hash_table = true;
if (_is_broadcast_join) {
runtime_profile()->add_info_string("BroadcastJoin", "true");
- if (state->enable_share_hash_table_for_broadcast_join()) {
- runtime_profile()->add_info_string("ShareHashTableEnabled",
"true");
- _shared_hashtable_controller =
- state->get_query_ctx()->get_shared_hash_table_controller();
- _shared_hash_table_context =
_shared_hashtable_controller->get_context(id());
- _should_build_hash_table =
_shared_hashtable_controller->should_build_hash_table(
- state->fragment_instance_id(), id());
- } else {
- runtime_profile()->add_info_string("ShareHashTableEnabled",
"false");
- }
}
_memory_usage_counter = ADD_LABEL_COUNTER(runtime_profile(),
"MemoryUsage");
@@ -245,8 +234,7 @@ Status HashJoinNode::prepare(RuntimeState* state) {
"ProbeKeyArena", TUnit::BYTES, "MemoryUsage");
// Build phase
- auto* record_profile =
- _should_build_hash_table ? _build_phase_profile :
faker_runtime_profile();
+ auto* record_profile = _build_phase_profile;
_build_get_next_timer = ADD_TIMER(record_profile, "BuildGetNextTime");
_build_timer = ADD_TIMER(record_profile, "BuildTime");
_build_rows_counter = ADD_COUNTER(record_profile, "BuildRows",
TUnit::UNIT);
@@ -691,31 +679,26 @@ Status
HashJoinNode::_materialize_build_side(RuntimeState* state) {
SCOPED_TIMER(_build_get_next_timer);
RETURN_IF_ERROR(child(1)->open(state));
}
- if (_should_build_hash_table) {
- bool eos = false;
- Block block;
- // If eos or have already met a null value using short-circuit
strategy, we do not need to pull
- // data from data.
- while (!eos && (!_short_circuit_for_null_in_build_side ||
!_has_null_in_build_side) &&
- (!_probe_open_finish ||
!_is_hash_join_early_start_probe_eos(state))) {
- release_block_memory(block, 1);
- RETURN_IF_CANCELLED(state);
- {
- SCOPED_TIMER(_build_get_next_timer);
- RETURN_IF_ERROR(child(1)->get_next_after_projects(
- state, &block, &eos,
- std::bind((Status(ExecNode::*)(RuntimeState*,
vectorized::Block*, bool*)) &
- ExecNode::get_next,
- _children[1], std::placeholders::_1,
std::placeholders::_2,
- std::placeholders::_3)));
- }
- RETURN_IF_ERROR(sink(state, &block, eos));
+ bool eos = false;
+ Block block;
+ // If eos or have already met a null value using short-circuit strategy,
we do not need to pull
+ // data from data.
+ while (!eos && (!_short_circuit_for_null_in_build_side ||
!_has_null_in_build_side) &&
+ (!_probe_open_finish ||
!_is_hash_join_early_start_probe_eos(state))) {
+ release_block_memory(block, 1);
+ RETURN_IF_CANCELLED(state);
+ {
+ SCOPED_TIMER(_build_get_next_timer);
+ RETURN_IF_ERROR(child(1)->get_next_after_projects(
+ state, &block, &eos,
+ std::bind((Status(ExecNode::*)(RuntimeState*,
vectorized::Block*, bool*)) &
+ ExecNode::get_next,
+ _children[1], std::placeholders::_1,
std::placeholders::_2,
+ std::placeholders::_3)));
}
- RETURN_IF_ERROR(child(1)->close(state));
- } else {
- RETURN_IF_ERROR(child(1)->close(state));
- RETURN_IF_ERROR(sink(state, nullptr, true));
+ RETURN_IF_ERROR(sink(state, &block, eos));
}
+ RETURN_IF_ERROR(child(1)->close(state));
return Status::OK();
}
@@ -723,38 +706,36 @@ Status HashJoinNode::sink(doris::RuntimeState* state,
vectorized::Block* in_bloc
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_build_timer);
- if (_should_build_hash_table) {
- // If eos or have already met a null value using short-circuit
strategy, we do not need to pull
- // data from probe side.
- _build_side_mem_used += in_block->allocated_bytes();
-
- if (_build_side_mutable_block.empty()) {
- auto tmp_build_block =
-
VectorizedUtils::create_empty_columnswithtypename(child(1)->row_desc());
- tmp_build_block = *(tmp_build_block.create_same_struct_block(1,
false));
- _build_col_ids.resize(_build_expr_ctxs.size());
- RETURN_IF_ERROR(_do_evaluate(tmp_build_block, _build_expr_ctxs,
*_build_expr_call_timer,
- _build_col_ids));
- _build_side_mutable_block =
MutableBlock::build_mutable_block(&tmp_build_block);
- }
+ // If eos or have already met a null value using short-circuit strategy,
we do not need to pull
+ // data from probe side.
+ _build_side_mem_used += in_block->allocated_bytes();
+
+ if (_build_side_mutable_block.empty()) {
+ auto tmp_build_block =
+
VectorizedUtils::create_empty_columnswithtypename(child(1)->row_desc());
+ tmp_build_block = *(tmp_build_block.create_same_struct_block(1,
false));
+ _build_col_ids.resize(_build_expr_ctxs.size());
+ RETURN_IF_ERROR(_do_evaluate(tmp_build_block, _build_expr_ctxs,
*_build_expr_call_timer,
+ _build_col_ids));
+ _build_side_mutable_block =
MutableBlock::build_mutable_block(&tmp_build_block);
+ }
- if (in_block->rows() != 0) {
- std::vector<int> res_col_ids(_build_expr_ctxs.size());
- RETURN_IF_ERROR(_do_evaluate(*in_block, _build_expr_ctxs,
*_build_expr_call_timer,
- res_col_ids));
-
- SCOPED_TIMER(_build_side_merge_block_timer);
-
RETURN_IF_ERROR(_build_side_mutable_block.merge_ignore_overflow(*in_block));
- if (_build_side_mutable_block.rows() > JOIN_BUILD_SIZE_LIMIT) {
- return Status::NotSupported(
- "Hash join do not support build table rows"
- " over:" +
- std::to_string(JOIN_BUILD_SIZE_LIMIT));
- }
+ if (in_block->rows() != 0) {
+ std::vector<int> res_col_ids(_build_expr_ctxs.size());
+ RETURN_IF_ERROR(
+ _do_evaluate(*in_block, _build_expr_ctxs,
*_build_expr_call_timer, res_col_ids));
+
+ SCOPED_TIMER(_build_side_merge_block_timer);
+
RETURN_IF_ERROR(_build_side_mutable_block.merge_ignore_overflow(*in_block));
+ if (_build_side_mutable_block.rows() > JOIN_BUILD_SIZE_LIMIT) {
+ return Status::NotSupported(
+ "Hash join do not support build table rows"
+ " over:" +
+ std::to_string(JOIN_BUILD_SIZE_LIMIT));
}
}
- if (_should_build_hash_table && eos) {
+ if (eos) {
DCHECK(!_build_side_mutable_block.empty());
_build_block =
std::make_shared<Block>(_build_side_mutable_block.to_block());
COUNTER_UPDATE(_build_blocks_memory_usage, _build_block->bytes());
@@ -762,65 +743,6 @@ Status HashJoinNode::sink(doris::RuntimeState* state,
vectorized::Block* in_bloc
std::make_shared<VRuntimeFilterSlots>(_build_expr_ctxs,
runtime_filters());
RETURN_IF_ERROR(_process_build_block(state, *_build_block));
RETURN_IF_ERROR(process_runtime_filter_build(state,
_build_block.get(), this));
- if (_shared_hashtable_controller) {
- _shared_hash_table_context->status = Status::OK();
- // arena will be shared with other instances.
- _shared_hash_table_context->arena = _arena;
- _shared_hash_table_context->block = _build_block;
- _shared_hash_table_context->hash_table_variants =
_hash_table_variants;
- _shared_hash_table_context->short_circuit_for_null_in_probe_side =
- _has_null_in_build_side;
- if (_runtime_filter_slots) {
-
_runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context);
- }
- _shared_hashtable_controller->signal(id());
- }
- } else if (!_should_build_hash_table) {
- DCHECK(_shared_hashtable_controller != nullptr);
- DCHECK(_shared_hash_table_context != nullptr);
- auto* wait_timer = ADD_TIMER(_build_phase_profile,
"WaitForSharedHashTableTime");
- SCOPED_TIMER(wait_timer);
- RETURN_IF_ERROR(
- _shared_hashtable_controller->wait_for_signal(state,
_shared_hash_table_context));
-
- _build_phase_profile->add_info_string(
- "SharedHashTableFrom",
-
print_id(_shared_hashtable_controller->get_builder_fragment_instance_id(id())));
- _has_null_in_build_side =
_shared_hash_table_context->short_circuit_for_null_in_probe_side;
- std::visit(
- [](auto&& dst, auto&& src) {
- if constexpr (!std::is_same_v<std::monostate,
std::decay_t<decltype(dst)>> &&
- std::is_same_v<std::decay_t<decltype(src)>,
- std::decay_t<decltype(dst)>>)
{
- dst.hash_table = src.hash_table;
- }
- },
- *_hash_table_variants,
- *std::static_pointer_cast<HashTableVariants>(
- _shared_hash_table_context->hash_table_variants));
- _build_block = _shared_hash_table_context->block;
-
- if (!_shared_hash_table_context->runtime_filters.empty()) {
- auto ret = std::visit(
- Overload {[&](std::monostate&) -> Status {
- LOG(FATAL) << "FATAL: uninited hash table";
- __builtin_unreachable();
- },
- [&](auto&& arg) -> Status {
- if (_runtime_filters.empty()) {
- return Status::OK();
- }
- _runtime_filter_slots =
std::make_shared<VRuntimeFilterSlots>(
- _build_expr_ctxs, _runtime_filters);
-
-
RETURN_IF_ERROR(_runtime_filter_slots->copy_from_shared_context(
- _shared_hash_table_context));
-
RETURN_IF_ERROR(_runtime_filter_slots->publish(true));
- return Status::OK();
- }},
- *_hash_table_variants);
- RETURN_IF_ERROR(ret);
- }
}
if (eos) {
@@ -1104,12 +1026,7 @@ std::vector<uint16_t>
HashJoinNode::_convert_block_to_null(Block& block) {
return results;
}
-HashJoinNode::~HashJoinNode() {
- if (_shared_hashtable_controller && _should_build_hash_table) {
- // signal at here is abnormal
- _shared_hashtable_controller->signal(id(), Status::Cancelled("signaled
in destructor"));
- }
-}
+HashJoinNode::~HashJoinNode() = default;
void HashJoinNode::_release_mem() {
_arena = nullptr;
@@ -1118,7 +1035,6 @@ void HashJoinNode::_release_mem() {
_null_map_column = nullptr;
_tuple_is_null_left_flag_column = nullptr;
_tuple_is_null_right_flag_column = nullptr;
- _shared_hash_table_context = nullptr;
_probe_block.clear();
}
diff --git a/be/src/vec/exec/join/vhash_join_node.h
b/be/src/vec/exec/join/vhash_join_node.h
index 20245be06c2..5a10e691d06 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -46,7 +46,6 @@
#include "vec/core/types.h"
#include "vec/exec/join/join_op.h" // IWYU pragma: keep
#include "vec/exprs/vexpr_fwd.h"
-#include "vec/runtime/shared_hash_table_controller.h"
#include "vjoin_node_base.h"
template <typename T>
@@ -217,15 +216,6 @@ public:
void debug_string(int indentation_level, std::stringstream* out) const
override;
- bool can_sink_write() const {
- if (_should_build_hash_table) {
- return true;
- }
- return _shared_hash_table_context &&
_shared_hash_table_context->signaled;
- }
-
- bool should_build_hash_table() const { return _should_build_hash_table; }
-
bool have_other_join_conjunct() const { return _have_other_join_conjunct; }
bool is_right_semi_anti() const { return _is_right_semi_anti; }
bool is_outer_join() const { return _is_outer_join; }
@@ -346,8 +336,6 @@ private:
bool _build_side_ignore_null = false;
bool _is_broadcast_join = false;
- bool _should_build_hash_table = true;
- std::shared_ptr<SharedHashTableController> _shared_hashtable_controller;
std::shared_ptr<VRuntimeFilterSlots> _runtime_filter_slots;
std::vector<SlotId> _hash_output_slot_ids;
@@ -358,8 +346,6 @@ private:
int64_t _build_side_last_mem_used = 0;
MutableBlock _build_side_mutable_block;
- SharedHashTableContextPtr _shared_hash_table_context = nullptr;
-
Status _materialize_build_side(RuntimeState* state) override;
Status _process_build_block(RuntimeState* state, Block& block);
diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp
b/be/src/vec/runtime/shared_hash_table_controller.cpp
index 4b1203d4822..a416ba6349e 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.cpp
+++ b/be/src/vec/runtime/shared_hash_table_controller.cpp
@@ -25,35 +25,13 @@
#include "pipeline/exec/hashjoin_build_sink.h"
-namespace doris {
-namespace vectorized {
+namespace doris::vectorized {
void SharedHashTableController::set_builder_and_consumers(TUniqueId builder,
int node_id) {
// Only need to set builder and consumers with pipeline engine enabled.
- DCHECK(_pipeline_engine_enabled);
std::lock_guard<std::mutex> lock(_mutex);
DCHECK(_builder_fragment_ids.find(node_id) ==
_builder_fragment_ids.cend());
_builder_fragment_ids.insert({node_id, builder});
- _dependencies.insert({node_id, {}});
- _finish_dependencies.insert({node_id, {}});
-}
-
-bool SharedHashTableController::should_build_hash_table(const TUniqueId&
fragment_instance_id,
- int my_node_id) {
- std::lock_guard<std::mutex> lock(_mutex);
- auto it = _builder_fragment_ids.find(my_node_id);
- if (_pipeline_engine_enabled) {
- if (it != _builder_fragment_ids.cend()) {
- return it->second == fragment_instance_id;
- }
- return false;
- }
-
- if (it == _builder_fragment_ids.cend()) {
- _builder_fragment_ids.insert({my_node_id, fragment_instance_id});
- return true;
- }
- return false;
}
SharedHashTableContextPtr SharedHashTableController::get_context(int
my_node_id) {
@@ -64,20 +42,6 @@ SharedHashTableContextPtr
SharedHashTableController::get_context(int my_node_id)
return _shared_contexts[my_node_id];
}
-void SharedHashTableController::signal(int my_node_id, Status status) {
- std::lock_guard<std::mutex> lock(_mutex);
- auto it = _shared_contexts.find(my_node_id);
- if (it != _shared_contexts.cend()) {
- it->second->signaled = true;
- it->second->status = status;
- _shared_contexts.erase(it);
- }
- for (auto& dep : _dependencies[my_node_id]) {
- dep->set_ready();
- }
- _cv.notify_all();
-}
-
void SharedHashTableController::signal(int my_node_id) {
std::lock_guard<std::mutex> lock(_mutex);
auto it = _shared_contexts.find(my_node_id);
@@ -108,19 +72,4 @@ TUniqueId
SharedHashTableController::get_builder_fragment_instance_id(int my_nod
return it->second;
}
-Status SharedHashTableController::wait_for_signal(RuntimeState* state,
- const
SharedHashTableContextPtr& context) {
- std::unique_lock<std::mutex> lock(_mutex);
- // maybe builder signaled before other instances waiting,
- // so here need to check value of `signaled`
- while (!context->signaled) {
- _cv.wait_for(lock, std::chrono::milliseconds(400),
- [&]() { return context->signaled.load(); });
- // return if the instances is cancelled(eg. query timeout)
- RETURN_IF_CANCELLED(state);
- }
- return context->status;
-}
-
-} // namespace vectorized
-} // namespace doris
+} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h
b/be/src/vec/runtime/shared_hash_table_controller.h
index 8fe46b97b85..35f4e9334ea 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -78,19 +78,18 @@ public:
SharedHashTableContextPtr get_context(int my_node_id);
void signal(int my_node_id);
void signal_finish(int my_node_id);
- void signal(int my_node_id, Status status);
- Status wait_for_signal(RuntimeState* state, const
SharedHashTableContextPtr& context);
- bool should_build_hash_table(const TUniqueId& fragment_instance_id, int
my_node_id);
- void set_pipeline_engine_enabled(bool enabled) { _pipeline_engine_enabled
= enabled; }
void append_dependency(int node_id, std::shared_ptr<pipeline::Dependency>
dep,
std::shared_ptr<pipeline::Dependency> finish_dep) {
std::lock_guard<std::mutex> lock(_mutex);
+ if (!_dependencies.contains(node_id)) {
+ _dependencies.insert({node_id, {}});
+ _finish_dependencies.insert({node_id, {}});
+ }
_dependencies[node_id].push_back(dep);
_finish_dependencies[node_id].push_back(finish_dep);
}
private:
- bool _pipeline_engine_enabled = false;
std::mutex _mutex;
// For pipelineX, we update all dependencies once hash table is built;
std::map<int /*node id*/,
std::vector<std::shared_ptr<pipeline::Dependency>>> _dependencies;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index ef071e78d39..265d3afdb35 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1678,7 +1678,6 @@ public class Coordinator implements CoordInterface {
destHosts.get(param.host).instancesSharingHashTable.add(param.instanceId);
} else {
destHosts.put(param.host, param);
- param.buildHashTableForBroadcastJoin = true;
TPlanFragmentDestination dest = new
TPlanFragmentDestination();
param.recvrId = params.destinations.size();
dest.fragment_instance_id = param.instanceId;
@@ -1802,7 +1801,6 @@ public class Coordinator implements CoordInterface {
destHosts.get(param.host).instancesSharingHashTable.add(param.instanceId);
} else {
destHosts.put(param.host, param);
- param.buildHashTableForBroadcastJoin = true;
TPlanFragmentDestination dest = new
TPlanFragmentDestination();
dest.fragment_instance_id = param.instanceId;
param.recvrId = params.destinations.size();
@@ -3689,7 +3687,6 @@ public class Coordinator implements CoordInterface {
params.setFragment(fragment.toThrift());
params.setDescTbl(descTable);
params.setParams(new TPlanFragmentExecParams());
-
params.setBuildHashTableForBroadcastJoin(instanceExecParam.buildHashTableForBroadcastJoin);
params.params.setQueryId(queryId);
params.params.setFragmentInstanceId(instanceExecParam.instanceId);
@@ -3848,7 +3845,6 @@ public class Coordinator implements CoordInterface {
params.getLocalParams().size());
TPipelineInstanceParams localParams = new
TPipelineInstanceParams();
-
localParams.setBuildHashTableForBroadcastJoin(instanceExecParam.buildHashTableForBroadcastJoin);
localParams.setFragmentInstanceId(instanceExecParam.instanceId);
localParams.setPerNodeScanRanges(scanRanges);
localParams.setPerNodeSharedScans(perNodeSharedScans);
@@ -4002,8 +3998,6 @@ public class Coordinator implements CoordInterface {
FragmentExecParams fragmentExecParams;
- boolean buildHashTableForBroadcastJoin = false;
-
int recvrId = -1;
List<TUniqueId> instancesSharingHashTable = Lists.newArrayList();
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 20e2dbe5e6e..586d40b8648 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -501,6 +501,7 @@ struct TExecPlanFragmentParams {
// Otherwise, the fragment will start executing directly on the BE side.
20: optional bool need_wait_execution_trigger = false;
+ // deprecated
21: optional bool build_hash_table_for_broadcast_join = false;
22: optional list<Types.TUniqueId> instances_sharing_hash_table;
@@ -705,6 +706,7 @@ struct TExportStatusResult {
struct TPipelineInstanceParams {
1: required Types.TUniqueId fragment_instance_id
+ // deprecated
2: optional bool build_hash_table_for_broadcast_join = false;
3: required map<Types.TPlanNodeId, list<TScanRangeParams>>
per_node_scan_ranges
4: optional i32 sender_id
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]