This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 89fbda1dacc [refactor](join) Refine broadcast controller (#49556)
89fbda1dacc is described below
commit 89fbda1dacc1510327ab559e9460bda20b7c64dd
Author: Gabriel <[email protected]>
AuthorDate: Mon Mar 31 11:15:49 2025 +0800
[refactor](join) Refine broadcast controller (#49556)
Broadcast controller could be removed on pipelineX engine since we use
operator and fragment to manage global state shared between instances.
---
be/src/pipeline/dependency.cpp | 10 ++
be/src/pipeline/dependency.h | 51 ++++----
be/src/pipeline/exec/exchange_sink_operator.cpp | 2 +-
be/src/pipeline/exec/hashjoin_build_sink.cpp | 128 +++++++++------------
be/src/pipeline/exec/hashjoin_build_sink.h | 22 ++--
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 44 ++++++-
be/src/pipeline/exec/hashjoin_probe_operator.h | 5 +-
be/src/pipeline/exec/jdbc_table_sink_operator.cpp | 3 +-
.../pipeline/exec/memory_scratch_sink_operator.cpp | 3 +-
be/src/pipeline/exec/operator.cpp | 30 ++++-
be/src/pipeline/exec/operator.h | 8 +-
.../exec/partitioned_hash_join_sink_operator.cpp | 3 +-
be/src/pipeline/exec/result_file_sink_operator.cpp | 6 +-
be/src/pipeline/exec/result_sink_operator.cpp | 3 +-
be/src/pipeline/pipeline_fragment_context.cpp | 65 ++++++-----
be/src/pipeline/pipeline_fragment_context.h | 18 ++-
be/src/pipeline/pipeline_task.cpp | 26 ++---
be/src/pipeline/pipeline_task.h | 11 +-
be/src/runtime/fragment_mgr.cpp | 1 -
be/src/runtime/query_context.cpp | 2 -
be/src/runtime/query_context.h | 6 -
be/src/runtime_filter/runtime_filter.h | 2 +
be/src/runtime_filter/runtime_filter_producer.h | 10 --
.../runtime_filter_producer_helper.cpp | 12 +-
.../runtime_filter_producer_helper.h | 7 +-
.../vec/runtime/shared_hash_table_controller.cpp | 70 -----------
be/src/vec/runtime/shared_hash_table_controller.h | 96 ----------------
be/test/olap/wal/wal_manager_test.cpp | 9 +-
be/test/pipeline/local_exchanger_test.cpp | 12 +-
be/test/pipeline/operator/agg_operator_test.cpp | 4 +-
...istinct_streaming_aggregation_operator_test.cpp | 2 +-
.../operator/exchange_sink_operator_test.cpp | 2 +-
.../operator/exchange_source_operator_test.cpp | 2 +-
.../local_merge_sort_source_operator_test.cpp | 2 +-
.../partitioned_aggregation_sink_operator_test.cpp | 14 +--
...artitioned_aggregation_source_operator_test.cpp | 18 +--
.../partitioned_aggregation_test_helper.cpp | 7 +-
.../partitioned_hash_join_probe_operator_test.cpp | 5 +-
.../partitioned_hash_join_sink_operator_test.cpp | 8 +-
.../operator/partitioned_hash_join_test_helper.cpp | 7 +-
be/test/pipeline/operator/repeat_operator_test.cpp | 2 +-
be/test/pipeline/operator/set_operator_test.cpp | 6 +-
be/test/pipeline/operator/sort_operator_test.cpp | 4 +-
.../operator/spill_sort_sink_operator_test.cpp | 10 +-
.../operator/spill_sort_source_operator_test.cpp | 10 +-
.../pipeline/operator/spill_sort_test_helper.cpp | 7 +-
be/test/pipeline/operator/union_operator_test.cpp | 6 +-
be/test/pipeline/pipeline_test.cpp | 16 +--
.../runtime_filter_producer_helper_test.cpp | 19 ++-
be/test/vec/exec/vfile_scanner_exception_test.cpp | 9 +-
50 files changed, 368 insertions(+), 457 deletions(-)
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 76b950f63a2..276d4d0a0c1 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -33,6 +33,7 @@
namespace doris::pipeline {
#include "common/compile_check_begin.h"
+
Dependency* BasicSharedState::create_source_dependency(int operator_id, int
node_id,
const std::string&
name) {
source_deps.push_back(std::make_shared<Dependency>(operator_id, node_id,
name + "_DEPENDENCY"));
@@ -40,6 +41,15 @@ Dependency* BasicSharedState::create_source_dependency(int
operator_id, int node
return source_deps.back().get();
}
+void BasicSharedState::create_source_dependencies(int num_sources, int
operator_id, int node_id,
+ const std::string& name) {
+ source_deps.resize(num_sources, nullptr);
+ for (auto& source_dep : source_deps) {
+ source_dep = std::make_shared<Dependency>(operator_id, node_id, name +
"_DEPENDENCY");
+ source_dep->set_shared_state(this);
+ }
+}
+
Dependency* BasicSharedState::create_sink_dependency(int dest_id, int node_id,
const std::string& name) {
sink_deps.push_back(std::make_shared<Dependency>(dest_id, node_id, name +
"_DEPENDENCY", true));
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index e31274a8791..0bf0e404253 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -84,8 +84,13 @@ struct BasicSharedState {
virtual ~BasicSharedState() = default;
Dependency* create_source_dependency(int operator_id, int node_id, const
std::string& name);
-
+ void create_source_dependencies(int num_sources, int operator_id, int
node_id,
+ const std::string& name);
Dependency* create_sink_dependency(int dest_id, int node_id, const
std::string& name);
+ std::vector<DependencySPtr> get_dep_by_channel_id(int channel_id) {
+ DCHECK_LT(channel_id, source_deps.size());
+ return {source_deps[channel_id]};
+ }
};
class Dependency : public std::enable_shared_from_this<Dependency> {
@@ -110,11 +115,10 @@ public:
[[nodiscard]] Dependency* is_blocked_by(PipelineTask* task = nullptr);
// Notify downstream pipeline tasks this dependency is ready.
virtual void set_ready();
- void set_ready_to_read() {
- DCHECK_EQ(_shared_state->source_deps.size(), 1) << debug_string();
- _shared_state->source_deps.front()->set_ready();
+ void set_ready_to_read(int channel_id = 0) {
+ DCHECK_LT(channel_id, _shared_state->source_deps.size()) <<
debug_string();
+ _shared_state->source_deps[channel_id]->set_ready();
}
-
void set_ready_to_write() {
DCHECK_EQ(_shared_state->sink_deps.size(), 1) << debug_string();
_shared_state->sink_deps.front()->set_ready();
@@ -593,19 +597,32 @@ struct JoinSharedState : public BasicSharedState {
struct HashJoinSharedState : public JoinSharedState {
ENABLE_FACTORY_CREATOR(HashJoinSharedState)
- // mark the join column whether support null eq
- std::vector<bool> is_null_safe_eq_join;
-
- // mark the build hash table whether it needs to store null value
- std::vector<bool> serialize_null_into_key;
+ HashJoinSharedState() {
+
hash_table_variant_vector.push_back(std::make_shared<JoinDataVariants>());
+ }
+ HashJoinSharedState(int num_instances) {
+ source_deps.resize(num_instances, nullptr);
+ hash_table_variant_vector.resize(num_instances, nullptr);
+ for (int i = 0; i < num_instances; i++) {
+ hash_table_variant_vector[i] =
std::make_shared<JoinDataVariants>();
+ }
+ }
std::shared_ptr<vectorized::Arena> arena =
std::make_shared<vectorized::Arena>();
- // maybe share hash table with other fragment instances
- std::shared_ptr<JoinDataVariants> hash_table_variants =
std::make_shared<JoinDataVariants>();
const std::vector<TupleDescriptor*> build_side_child_desc;
size_t build_exprs_size = 0;
std::shared_ptr<vectorized::Block> build_block;
std::shared_ptr<std::vector<uint32_t>> build_indexes_null;
+
+ // Used by shared hash table
+ // For probe operator, hash table in _hash_table_variants is read-only if
visited flags is not
+ // used. (visited flags will be used only in right / full outer join).
+ //
+ // For broadcast join, although hash table is read-only, some states in
`_hash_table_variants`
+ // are still could be written. For example, serialized keys will be
written in a continuous
+ // memory in `_hash_table_variants`. So before execution, we should use a
local _hash_table_variants
+ // which has a shared hash table in it.
+ std::vector<std::shared_ptr<JoinDataVariants>> hash_table_variant_vector;
};
struct PartitionedHashJoinSharedState
@@ -750,13 +767,6 @@ public:
std::atomic<size_t> _buffer_mem_limit =
config::local_exchange_buffer_mem_limit;
// We need to make sure to add mem_usage first and then enqueue, otherwise
sub mem_usage may cause negative mem_usage during concurrent dequeue.
std::mutex le_lock;
- void create_dependencies(int local_exchange_id) {
- for (auto& source_dep : source_deps) {
- source_dep = std::make_shared<Dependency>(local_exchange_id,
local_exchange_id,
-
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
- source_dep->set_shared_state(this);
- }
- }
void sub_running_sink_operators();
void sub_running_source_operators();
void _set_always_ready() {
@@ -770,9 +780,6 @@ public:
}
}
- std::vector<DependencySPtr> get_dep_by_channel_id(int channel_id) {
- return {source_deps[channel_id]};
- }
Dependency* get_sink_dep_by_channel_id(int channel_id) { return nullptr; }
void set_ready_to_read(int channel_id) {
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 44f1b00f889..b105985e11a 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -267,7 +267,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
RuntimeState* state, const RowDescriptor& row_desc, int operator_id,
const TDataStreamSink& sink, const
std::vector<TPlanFragmentDestination>& destinations,
const std::vector<TUniqueId>& fragment_instance_ids)
- : DataSinkOperatorX(operator_id, sink.dest_node_id, 0),
+ : DataSinkOperatorX(operator_id, sink.dest_node_id,
std::numeric_limits<int>::max()),
_texprs(sink.output_partition.partition_exprs),
_row_desc(row_desc),
_part_type(sink.output_partition.type),
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 15153d1df40..56a6a1a3784 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -43,11 +43,10 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState*
state, LocalSinkStateInfo
RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
+ _task_idx = info.task_idx;
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
_shared_state->join_op_variants = p._join_op_variants;
- _shared_state->is_null_safe_eq_join = p._is_null_safe_eq_join;
- _shared_state->serialize_null_into_key = p._serialize_null_into_key;
_build_expr_ctxs.resize(p._build_expr_ctxs.size());
for (size_t i = 0; i < _build_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._build_expr_ctxs[i]->clone(state,
_build_expr_ctxs[i]));
@@ -56,24 +55,20 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState*
state, LocalSinkStateInfo
_should_build_hash_table = true;
profile()->add_info_string("BroadcastJoin",
std::to_string(p._is_broadcast_join));
- if (p._is_broadcast_join) {
- if (state->enable_share_hash_table_for_broadcast_join()) {
- _should_build_hash_table = info.task_idx == 0;
- if (_should_build_hash_table) {
- p._shared_hashtable_controller->set_builder_and_consumers(
- state->fragment_instance_id(), p.node_id());
- }
- }
+ if (p._use_shared_hash_table) {
+ _should_build_hash_table = info.task_idx == 0;
}
profile()->add_info_string("BuildShareHashTable",
std::to_string(_should_build_hash_table));
- profile()->add_info_string("ShareHashTableEnabled",
-
std::to_string(state->enable_share_hash_table_for_broadcast_join()));
+ profile()->add_info_string("ShareHashTableEnabled",
std::to_string(p._use_shared_hash_table));
if (!_should_build_hash_table) {
_dependency->block();
_finish_dependency->block();
- p._shared_hashtable_controller->append_dependency(p.node_id(),
-
_dependency->shared_from_this(),
-
_finish_dependency->shared_from_this());
+ {
+ std::lock_guard<std::mutex> guard(p._mutex);
+ p._finish_dependencies.push_back(_finish_dependency);
+ }
+ } else {
+ _dependency->set_ready();
}
_build_blocks_memory_usage =
@@ -187,7 +182,7 @@ size_t
HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state, bo
raw_ptrs,
block.rows(), true, true,
bucket_size);
}},
- _shared_state->hash_table_variants->method_variant);
+
_shared_state->hash_table_variant_vector.front()->method_variant);
}
}
return size_to_reserve;
@@ -197,7 +192,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState*
state, Status exec_statu
if (_closed) {
return Status::OK();
}
- auto p = _parent->cast<HashJoinBuildSinkOperatorX>();
+ auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
Defer defer {[&]() {
if (!_should_build_hash_table) {
return;
@@ -211,12 +206,19 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState*
state, Status exec_statu
if (_shared_state->build_block) {
// release the memory of unused column in probe stage
- _shared_state->build_block->clear_column_mem_not_keep(
- p._should_keep_column_flags,
bool(p._shared_hashtable_controller));
+
_shared_state->build_block->clear_column_mem_not_keep(p._should_keep_column_flags,
+
p._use_shared_hash_table);
}
- if (p._shared_hashtable_controller) {
- p._shared_hashtable_controller->signal_finish(p.node_id());
+ if (p._use_shared_hash_table) {
+ std::unique_lock(p._mutex);
+ p._signaled = true;
+ for (auto& dep : _shared_state->sink_deps) {
+ dep->set_ready();
+ }
+ for (auto& dep : p._finish_dependencies) {
+ dep->set_ready();
+ }
}
}};
@@ -226,11 +228,11 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState*
state, Status exec_statu
try {
RETURN_IF_ERROR(_runtime_filter_producer_helper->process(
- state, _shared_state->build_block.get(),
p._shared_hash_table_context));
+ state, _shared_state->build_block.get(),
p._use_shared_hash_table,
+ p._runtime_filters));
} catch (Exception& e) {
- bool blocked_by_shared_hash_table_signal = !_should_build_hash_table &&
-
p._shared_hashtable_controller &&
-
!p._shared_hash_table_context->signaled;
+ bool blocked_by_shared_hash_table_signal =
+ !_should_build_hash_table && p._use_shared_hash_table &&
!p._signaled;
return Status::InternalError(
"rf process meet error: {}, wake_up_early: {},
should_build_hash_table: "
@@ -309,15 +311,18 @@ std::vector<uint16_t>
HashJoinBuildSinkLocalState::_convert_block_to_null(
Status HashJoinBuildSinkLocalState::_extract_join_column(
vectorized::Block& block, vectorized::ColumnUInt8::MutablePtr&
null_map,
vectorized::ColumnRawPtrs& raw_ptrs, const std::vector<int>&
res_col_ids) {
+ DCHECK(_should_build_hash_table);
auto& shared_state = *_shared_state;
for (size_t i = 0; i < shared_state.build_exprs_size; ++i) {
const auto* column =
block.get_by_position(res_col_ids[i]).column.get();
- if (!column->is_nullable() && shared_state.serialize_null_into_key[i])
{
+ if (!column->is_nullable() &&
+
_parent->cast<HashJoinBuildSinkOperatorX>()._serialize_null_into_key[i]) {
_key_columns_holder.emplace_back(
vectorized::make_nullable(block.get_by_position(res_col_ids[i]).column));
raw_ptrs[i] = _key_columns_holder.back().get();
} else if (const auto* nullable =
check_and_get_column<vectorized::ColumnNullable>(*column);
- !shared_state.serialize_null_into_key[i] && nullable) {
+
!_parent->cast<HashJoinBuildSinkOperatorX>()._serialize_null_into_key[i] &&
+ nullable) {
// update nulllmap and split nested out of ColumnNullable when
serialize_null_into_key is false and column is nullable
const auto& col_nested = nullable->get_nested_column();
const auto& col_nullmap = nullable->get_null_map_data();
@@ -333,6 +338,7 @@ Status HashJoinBuildSinkLocalState::_extract_join_column(
Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
vectorized::Block&
block) {
+ DCHECK(_should_build_hash_table);
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
SCOPED_TIMER(_build_table_timer);
size_t rows = block.rows();
@@ -399,7 +405,8 @@ Status
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
arg.serialized_keys_size(true)));
return st;
}},
- _shared_state->hash_table_variants->method_variant,
_shared_state->join_op_variants,
+ _shared_state->hash_table_variant_vector.front()->method_variant,
+ _shared_state->join_op_variants,
vectorized::make_bool_variant(p._short_circuit_for_null_in_build_side),
vectorized::make_bool_variant((p._have_other_join_conjunct)));
return st;
@@ -407,6 +414,7 @@ Status
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
void HashJoinBuildSinkLocalState::_set_build_side_has_external_nullmap(
vectorized::Block& block, const std::vector<int>& res_col_ids) {
+ DCHECK(_should_build_hash_table);
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
if (p._short_circuit_for_null_in_build_side) {
_build_side_has_external_nullmap = true;
@@ -414,7 +422,7 @@ void
HashJoinBuildSinkLocalState::_set_build_side_has_external_nullmap(
}
for (size_t i = 0; i < _build_expr_ctxs.size(); ++i) {
const auto* column =
block.get_by_position(res_col_ids[i]).column.get();
- if (column->is_nullable() &&
!_shared_state->serialize_null_into_key[i]) {
+ if (column->is_nullable() && !p._serialize_null_into_key[i]) {
_build_side_has_external_nullmap = true;
return;
}
@@ -438,8 +446,10 @@ Status
HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {
if (_build_expr_ctxs.size() == 1) {
p._should_keep_hash_key_column = true;
}
- return
init_hash_method<JoinDataVariants>(_shared_state->hash_table_variants.get(),
data_types,
- true);
+ return init_hash_method<JoinDataVariants>(
+ _shared_state->hash_table_variant_vector[p._use_shared_hash_table
? _task_idx : 0]
+ .get(),
+ data_types, true);
}
HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, int
operator_id,
@@ -514,13 +524,8 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode&
tnode, RuntimeState* st
Status HashJoinBuildSinkOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState>::prepare(state));
- if (_is_broadcast_join) {
- if (state->enable_share_hash_table_for_broadcast_join()) {
- _shared_hashtable_controller =
- state->get_query_ctx()->get_shared_hash_table_controller();
- _shared_hash_table_context =
_shared_hashtable_controller->get_context(node_id());
- }
- }
+ _use_shared_hash_table =
+ _is_broadcast_join &&
state->enable_share_hash_table_for_broadcast_join();
auto init_keep_column_flags = [&](auto& tuple_descs, auto&
output_slot_flags) {
for (const auto& tuple_desc : tuple_descs) {
for (const auto& slot_desc : tuple_desc->slots()) {
@@ -593,39 +598,18 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
RETURN_IF_ERROR(
local_state.process_build_block(state,
(*local_state._shared_state->build_block)));
- if (_shared_hashtable_controller) {
- _shared_hash_table_context->status = Status::OK();
- // arena will be shared with other instances.
- _shared_hash_table_context->arena =
local_state._shared_state->arena;
- _shared_hash_table_context->hash_table_variants =
- local_state._shared_state->hash_table_variants;
- _shared_hash_table_context->short_circuit_for_null_in_probe_side =
- local_state._shared_state->_has_null_in_build_side;
- _shared_hash_table_context->block =
local_state._shared_state->build_block;
- _shared_hash_table_context->build_indexes_null =
- local_state._shared_state->build_indexes_null;
- }
+ local_state.init_short_circuit_for_probe();
} else if (!local_state._should_build_hash_table) {
- DCHECK(_shared_hashtable_controller != nullptr);
- DCHECK(_shared_hash_table_context != nullptr);
// the instance which is not build hash table, it's should wait the
signal of hash table build finished.
// but if it's running and signaled == false, maybe the source
operator have closed caused by some short circuit
// return eof will make task marked as wake_up_early
// todo: remove signaled after we can guarantee that wake up eraly is
always set accurately
- if (!_shared_hash_table_context->signaled ||
state->get_task()->wake_up_early()) {
+ if (!_signaled || state->get_task()->wake_up_early()) {
return Status::Error<ErrorCode::END_OF_FILE>("source have closed");
}
- if (!_shared_hash_table_context->status.ok()) {
- return _shared_hash_table_context->status;
- }
-
- local_state.profile()->add_info_string(
- "SharedHashTableFrom",
- print_id(
-
_shared_hashtable_controller->get_builder_fragment_instance_id(node_id())));
- local_state._shared_state->_has_null_in_build_side =
-
_shared_hash_table_context->short_circuit_for_null_in_probe_side;
+ DCHECK_LE(local_state._task_idx,
+ local_state._shared_state->hash_table_variant_vector.size());
std::visit(
[](auto&& dst, auto&& src) {
if constexpr (!std::is_same_v<std::monostate,
std::decay_t<decltype(dst)>> &&
@@ -634,20 +618,18 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
dst.hash_table = src.hash_table;
}
},
- local_state._shared_state->hash_table_variants->method_variant,
- std::static_pointer_cast<JoinDataVariants>(
- _shared_hash_table_context->hash_table_variants)
- ->method_variant);
-
- local_state._shared_state->build_block =
_shared_hash_table_context->block;
- local_state._shared_state->build_indexes_null =
- _shared_hash_table_context->build_indexes_null;
+
local_state._shared_state->hash_table_variant_vector[local_state._task_idx]
+ ->method_variant,
+
local_state._shared_state->hash_table_variant_vector.front()->method_variant);
}
if (eos) {
local_state._eos = true;
- local_state.init_short_circuit_for_probe();
- local_state._dependency->set_ready_to_read();
+ // If a shared hash table is used, states are shared by all tasks.
+ // Sink and source has n-n relationship If a shared hash table is used
otherwise 1-1 relationship.
+ // So we should notify the `_task_idx` source task if a shared hash
table is used.
+ local_state._dependency->set_ready_to_read(_use_shared_hash_table ?
local_state._task_idx
+ : 0);
}
return Status::OK();
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 7ac62160bbd..ca743069726 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -54,8 +54,6 @@ public:
Status close(RuntimeState* state, Status exec_status) override;
- Status disable_runtime_filters(RuntimeState* state);
-
[[nodiscard]] MOCK_FUNCTION size_t get_reserve_mem_size(RuntimeState*
state, bool eos);
protected:
@@ -82,6 +80,7 @@ protected:
size_t _evaluate_mem_usage = 0;
size_t _build_side_rows = 0;
+ int _task_idx;
vectorized::MutableBlock _build_side_mutable_block;
std::shared_ptr<RuntimeFilterProducerHelper>
_runtime_filter_producer_helper;
@@ -154,6 +153,7 @@ public:
return _join_distribution != TJoinDistributionType::BROADCAST &&
_join_distribution != TJoinDistributionType::NONE;
}
+ std::vector<bool>& is_null_safe_eq_join() { return _is_null_safe_eq_join; }
private:
friend class HashJoinBuildSinkLocalState;
@@ -168,9 +168,6 @@ private:
std::vector<bool> _is_null_safe_eq_join;
bool _is_broadcast_join = false;
- std::shared_ptr<vectorized::SharedHashTableController>
_shared_hashtable_controller;
-
- vectorized::SharedHashTableContextPtr _shared_hash_table_context = nullptr;
const std::vector<TExpr> _partition_exprs;
std::vector<SlotId> _hash_output_slot_ids;
@@ -179,6 +176,12 @@ private:
// if build side has variant column and need output variant column
// need to finalize variant column to speed up the join op
bool _need_finalize_variant_column = false;
+
+ bool _use_shared_hash_table = false;
+ std::atomic<bool> _signaled = false;
+ std::mutex _mutex;
+ std::vector<std::shared_ptr<pipeline::Dependency>> _finish_dependencies;
+ std::map<int, std::shared_ptr<RuntimeFilterWrapper>> _runtime_filters;
};
template <class HashTableContext>
@@ -226,8 +229,13 @@ struct ProcessHashTableBuild {
with_other_conjuncts) {
// null aware join with other conjuncts
keep_null_key = true;
- } else if (_parent->_shared_state->is_null_safe_eq_join.size() == 1 &&
- _parent->_shared_state->is_null_safe_eq_join[0]) {
+ } else if (_parent->parent()
+ ->cast<HashJoinBuildSinkOperatorX>()
+ .is_null_safe_eq_join()
+ .size() == 1 &&
+ _parent->parent()
+ ->cast<HashJoinBuildSinkOperatorX>()
+ .is_null_safe_eq_join()[0]) {
// single null safe eq
keep_null_key = true;
}
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 9d8f3544ba2..ab52f01fa5b 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -38,6 +38,7 @@ Status HashJoinProbeLocalState::init(RuntimeState* state,
LocalStateInfo& info)
RETURN_IF_ERROR(JoinProbeLocalState::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
+ _task_idx = info.task_idx;
auto& p = _parent->cast<HashJoinProbeOperatorX>();
_probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) {
@@ -124,7 +125,8 @@ bool
HashJoinProbeLocalState::_need_probe_null_map(vectorized::Block& block,
const std::vector<int>&
res_col_ids) {
for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) {
const auto* column =
block.get_by_position(res_col_ids[i]).column.get();
- if (column->is_nullable() &&
!_shared_state->serialize_null_into_key[i]) {
+ if (column->is_nullable() &&
+
!_parent->cast<HashJoinProbeOperatorX>()._serialize_null_into_key[i]) {
return true;
}
}
@@ -239,7 +241,11 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
st = Status::InternalError("uninited hash table
probe");
}
},
- local_state._shared_state->hash_table_variants->method_variant,
+ local_state._shared_state->hash_table_variant_vector.size() ==
1
+ ?
local_state._shared_state->hash_table_variant_vector[0]->method_variant
+ : local_state._shared_state
+
->hash_table_variant_vector[local_state._task_idx]
+ ->method_variant,
*local_state._process_hashtable_ctx_variants);
} else if (local_state._probe_eos) {
if (_is_right_semi_anti || (_is_outer_join && _join_op !=
TJoinOp::LEFT_OUTER_JOIN)) {
@@ -258,7 +264,12 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
st = Status::InternalError("uninited hash table
probe");
}
},
-
local_state._shared_state->hash_table_variants->method_variant,
+
local_state._shared_state->hash_table_variant_vector.size() == 1
+ ?
local_state._shared_state->hash_table_variant_vector[0]
+ ->method_variant
+ : local_state._shared_state
+
->hash_table_variant_vector[local_state._task_idx]
+ ->method_variant,
*local_state._process_hashtable_ctx_variants);
} else {
*eos = true;
@@ -311,12 +322,14 @@ Status
HashJoinProbeLocalState::_extract_join_column(vectorized::Block& block,
auto& shared_state = *_shared_state;
for (size_t i = 0; i < shared_state.build_exprs_size; ++i) {
const auto* column =
block.get_by_position(res_col_ids[i]).column.get();
- if (!column->is_nullable() && shared_state.serialize_null_into_key[i])
{
+ if (!column->is_nullable() &&
+
_parent->cast<HashJoinProbeOperatorX>()._serialize_null_into_key[i]) {
_key_columns_holder.emplace_back(
vectorized::make_nullable(block.get_by_position(res_col_ids[i]).column));
_probe_columns[i] = _key_columns_holder.back().get();
} else if (const auto* nullable =
check_and_get_column<vectorized::ColumnNullable>(*column);
- nullable && !shared_state.serialize_null_into_key[i]) {
+ nullable &&
+
!_parent->cast<HashJoinProbeOperatorX>()._serialize_null_into_key[i]) {
// update nulllmap and split nested out of ColumnNullable when
serialize_null_into_key is false and column is nullable
const auto& col_nested = nullable->get_nested_column();
const auto& col_nullmap = nullable->get_null_map_data();
@@ -429,6 +442,27 @@ Status HashJoinProbeOperatorX::init(const TPlanNode&
tnode, RuntimeState* state)
vectorized::VExprContextSPtr ctx;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(eq_join_conjunct.left,
ctx));
_probe_expr_ctxs.push_back(ctx);
+
+ /// null safe equal means null = null is true, the operator in SQL
should be: <=>.
+ const bool is_null_safe_equal =
+ eq_join_conjunct.__isset.opcode &&
+ (eq_join_conjunct.opcode == TExprOpcode::EQ_FOR_NULL) &&
+ // For a null safe equal join, FE may generate a plan that
+ // both sides of the conjuct are not nullable, we just treat it
+ // as a normal equal join conjunct.
+ (eq_join_conjunct.right.nodes[0].is_nullable ||
+ eq_join_conjunct.left.nodes[0].is_nullable);
+
+ if (eq_join_conjuncts.size() == 1) {
+ // single column key serialize method must use nullmap for
represent null to instead serialize null into key
+ _serialize_null_into_key.emplace_back(false);
+ } else if (is_null_safe_equal) {
+ // use serialize null into key to represent multi column null value
+ _serialize_null_into_key.emplace_back(true);
+ } else {
+ // on normal conditions, because null!=null, it can be expressed
directly with nullmap.
+ _serialize_null_into_key.emplace_back(false);
+ }
}
if (tnode.hash_join_node.__isset.other_join_conjuncts &&
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index eaa0b9376ca..3914fc0d58d 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -106,7 +106,7 @@ private:
std::unique_ptr<HashTableCtxVariants> _process_hashtable_ctx_variants =
std::make_unique<HashTableCtxVariants>();
- ssize_t _estimated_mem_in_push = -1;
+ int _task_idx;
RuntimeProfile::Counter* _probe_expr_call_timer = nullptr;
RuntimeProfile::Counter* _probe_side_output_timer = nullptr;
@@ -142,6 +142,7 @@ public:
_partition_exprs)
:
DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs));
}
+ bool is_broadcast_join() const { return _is_broadcast_join; }
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
@@ -172,6 +173,8 @@ private:
vectorized::VExprContextSPtrs _other_join_conjuncts;
vectorized::VExprContextSPtrs _mark_join_conjuncts;
+ // mark the build hash table whether it needs to store null value
+ std::vector<bool> _serialize_null_into_key;
// probe expr
vectorized::VExprContextSPtrs _probe_expr_ctxs;
diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
index d0abe6aa0d2..c213616e25c 100644
--- a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
+++ b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
@@ -28,7 +28,8 @@ namespace doris::pipeline {
#include "common/compile_check_begin.h"
JdbcTableSinkOperatorX::JdbcTableSinkOperatorX(const RowDescriptor& row_desc,
int operator_id,
const std::vector<TExpr>&
t_output_expr)
- : DataSinkOperatorX(operator_id, 0, 0),
+ : DataSinkOperatorX(operator_id, std::numeric_limits<int>::max(),
+ std::numeric_limits<int>::max()),
_row_desc(row_desc),
_t_output_expr(t_output_expr) {}
diff --git a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp
b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp
index 86afd607432..fe67703bfe9 100644
--- a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp
+++ b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp
@@ -66,7 +66,8 @@ Status MemoryScratchSinkLocalState::close(RuntimeState*
state, Status exec_statu
MemoryScratchSinkOperatorX::MemoryScratchSinkOperatorX(const RowDescriptor&
row_desc,
int operator_id,
const
std::vector<TExpr>& t_output_expr)
- : DataSinkOperatorX(operator_id, 0, 0),
+ : DataSinkOperatorX(operator_id, std::numeric_limits<int>::max(),
+ std::numeric_limits<int>::max()),
_row_desc(row_desc),
_t_output_expr(t_output_expr) {}
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index a82207c92c1..1f0d40069ed 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -470,14 +470,18 @@ Status
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
info.parent_profile->add_child(_runtime_profile.get(), /*indent=*/false,
nullptr);
constexpr auto is_fake_shared = std::is_same_v<SharedStateArg,
FakeSharedState>;
if constexpr (!is_fake_shared) {
- if constexpr (std::is_same_v<LocalExchangeSharedState,
SharedStateArg>) {
- DCHECK(info.le_state_map.find(_parent->operator_id()) !=
info.le_state_map.end());
- _shared_state =
info.le_state_map.at(_parent->operator_id()).first.get();
+ if (info.shared_state_map.find(_parent->operator_id()) !=
info.shared_state_map.end()) {
+ _shared_state = info.shared_state_map.at(_parent->operator_id())
+ .first.get()
+ ->template cast<SharedStateArg>();
_dependency =
_shared_state->get_dep_by_channel_id(info.task_idx).front().get();
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_runtime_profile, "WaitForDependency[" +
_dependency->name() + "]Time", 1);
} else if (info.shared_state) {
+ if constexpr (std::is_same_v<LocalExchangeSharedState,
SharedStateArg>) {
+ DCHECK(false);
+ }
// For UnionSourceOperator without children, there is no shared
state.
_shared_state = info.shared_state->template cast<SharedStateArg>();
@@ -485,6 +489,10 @@ Status
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
_parent->operator_id(), _parent->node_id(),
_parent->get_name());
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_runtime_profile, "WaitForDependency[" +
_dependency->name() + "]Time", 1);
+ } else {
+ if constexpr (std::is_same_v<LocalExchangeSharedState,
SharedStateArg>) {
+ DCHECK(false);
+ }
}
}
@@ -543,11 +551,21 @@ Status
PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
_wait_for_finish_dependency_timer = ADD_TIMER(_profile,
"PendingFinishDependency");
constexpr auto is_fake_shared = std::is_same_v<SharedState,
FakeSharedState>;
if constexpr (!is_fake_shared) {
- if constexpr (std::is_same_v<LocalExchangeSharedState, SharedState>) {
- DCHECK(info.le_state_map.find(_parent->dests_id().front()) !=
info.le_state_map.end());
- _dependency =
info.le_state_map.at(_parent->dests_id().front()).second.get();
+ if (info.shared_state_map.find(_parent->dests_id().front()) !=
+ info.shared_state_map.end()) {
+ if constexpr (std::is_same_v<LocalExchangeSharedState,
SharedState>) {
+
DCHECK(info.shared_state_map.at(_parent->dests_id().front()).second.size() ==
1);
+ }
+ _dependency = info.shared_state_map.at(_parent->dests_id().front())
+
.second[std::is_same_v<LocalExchangeSharedState, SharedState>
+ ? 0
+ : info.task_idx]
+ .get();
_shared_state = _dependency->shared_state()->template
cast<SharedState>();
} else {
+ if constexpr (std::is_same_v<LocalExchangeSharedState,
SharedState>) {
+ DCHECK(false);
+ }
_shared_state = info.shared_state->template cast<SharedState>();
_dependency = _shared_state->create_sink_dependency(
_parent->dests_id().front(), _parent->node_id(),
_parent->get_name());
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index fa8849640b6..ad982e509f2 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -69,8 +69,8 @@ struct LocalStateInfo {
RuntimeProfile* parent_profile = nullptr;
const std::vector<TScanRangeParams>& scan_ranges;
BasicSharedState* shared_state;
- const std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
- std::shared_ptr<Dependency>>>& le_state_map;
+ const std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
+ std::vector<std::shared_ptr<Dependency>>>>&
shared_state_map;
const int task_idx;
};
@@ -80,8 +80,8 @@ struct LocalSinkStateInfo {
RuntimeProfile* parent_profile = nullptr;
const int sender_id;
BasicSharedState* shared_state;
- const std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
- std::shared_ptr<Dependency>>>& le_state_map;
+ const std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
+ std::vector<std::shared_ptr<Dependency>>>>&
shared_state_map;
const TDataSink& tsink;
};
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index c6a024d99a3..79b74d4c313 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -180,7 +180,8 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
if (auto* tmp_sink_state =
_shared_state->inner_runtime_state->get_sink_local_state()) {
inner_sink_state =
assert_cast<HashJoinBuildSinkLocalState*>(tmp_sink_state);
}
- _shared_state->inner_shared_state->hash_table_variants.reset();
+
DCHECK_EQ(_shared_state->inner_shared_state->hash_table_variant_vector.size(),
1);
+
_shared_state->inner_shared_state->hash_table_variant_vector.front().reset();
if (inner_sink_state) {
COUNTER_UPDATE(_memory_used_counter,
-(inner_sink_state->_hash_table_memory_usage->value() +
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index de2e2922b26..0e2667c752f 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -36,7 +36,8 @@
ResultFileSinkLocalState::ResultFileSinkLocalState(DataSinkOperatorXBase* parent
ResultFileSinkOperatorX::ResultFileSinkOperatorX(int operator_id, const
RowDescriptor& row_desc,
const std::vector<TExpr>&
t_output_expr)
- : DataSinkOperatorX(operator_id, 0, 0),
+ : DataSinkOperatorX(operator_id, std::numeric_limits<int>::max(),
+ std::numeric_limits<int>::max()),
_row_desc(row_desc),
_t_output_expr(t_output_expr) {}
@@ -44,7 +45,8 @@ ResultFileSinkOperatorX::ResultFileSinkOperatorX(
int operator_id, const RowDescriptor& row_desc, const TResultFileSink&
sink,
const std::vector<TPlanFragmentDestination>& destinations,
const std::vector<TExpr>& t_output_expr, DescriptorTbl& descs)
- : DataSinkOperatorX(operator_id, 0, 0),
+ : DataSinkOperatorX(operator_id, std::numeric_limits<int>::max(),
+ std::numeric_limits<int>::max()),
_row_desc(row_desc),
_t_output_expr(t_output_expr),
_dests(destinations),
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp
b/be/src/pipeline/exec/result_sink_operator.cpp
index 9f3647b93fb..6ad81a2673f 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -101,7 +101,8 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
ResultSinkOperatorX::ResultSinkOperatorX(int operator_id, const RowDescriptor&
row_desc,
const std::vector<TExpr>&
t_output_expr,
const TResultSink& sink)
- : DataSinkOperatorX(operator_id, 0, 0),
+ : DataSinkOperatorX(operator_id, std::numeric_limits<int>::max(),
+ std::numeric_limits<int>::max()),
_sink_type(!sink.__isset.type || sink.type ==
TResultSinkType::MYSQL_PROTOCAL
? TResultSinkType::MYSQL_PROTOCAL
: sink.type),
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index e62addd95ad..b782474a3a8 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -159,7 +159,7 @@ PipelineFragmentContext::~PipelineFragmentContext() {
_runtime_state.reset();
_runtime_filter_states.clear();
_runtime_filter_mgr_map.clear();
- _op_id_to_le_state.clear();
+ _op_id_to_shared_state.clear();
}
bool PipelineFragmentContext::is_timeout(timespec now) const {
@@ -381,23 +381,26 @@ Status
PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
std::make_unique<RuntimeFilterMgr>(request.query_id,
_runtime_filter_states[i],
_query_ctx->query_mem_tracker(), false);
std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
- auto get_local_exchange_state = [&](PipelinePtr pipeline)
- -> std::map<int,
std::pair<std::shared_ptr<LocalExchangeSharedState>,
- std::shared_ptr<Dependency>>> {
- std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
- std::shared_ptr<Dependency>>>
- le_state_map;
- auto source_id = pipeline->operators().front()->operator_id();
- if (auto iter = _op_id_to_le_state.find(source_id); iter !=
_op_id_to_le_state.end()) {
- le_state_map.insert({source_id, iter->second});
+ auto get_shared_state = [&](PipelinePtr pipeline)
+ -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
+
std::vector<std::shared_ptr<Dependency>>>> {
+ std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
+ std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map;
+ for (auto& op : pipeline->operators()) {
+ auto source_id = op->operator_id();
+ if (auto iter = _op_id_to_shared_state.find(source_id);
+ iter != _op_id_to_shared_state.end()) {
+ shared_state_map.insert({source_id, iter->second});
+ }
}
for (auto sink_to_source_id : pipeline->sink()->dests_id()) {
- if (auto iter = _op_id_to_le_state.find(sink_to_source_id);
- iter != _op_id_to_le_state.end()) {
- le_state_map.insert({sink_to_source_id, iter->second});
+ if (auto iter = _op_id_to_shared_state.find(sink_to_source_id);
+ iter != _op_id_to_shared_state.end()) {
+ shared_state_map.insert({sink_to_source_id, iter->second});
}
}
- return le_state_map;
+ return shared_state_map;
};
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
@@ -449,10 +452,9 @@ Status
PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
auto cur_task_id = _total_tasks++;
task_runtime_state->set_task_id(cur_task_id);
task_runtime_state->set_task_num(pipeline->num_tasks());
- auto task = std::make_unique<PipelineTask>(pipeline,
cur_task_id,
-
task_runtime_state.get(), this,
-
pipeline_id_to_profile[pip_idx].get(),
-
get_local_exchange_state(pipeline), i);
+ auto task = std::make_unique<PipelineTask>(
+ pipeline, cur_task_id, task_runtime_state.get(), this,
+ pipeline_id_to_profile[pip_idx].get(),
get_shared_state(pipeline), i);
pipeline->incr_created_tasks(i, task.get());
task_runtime_state->set_task(task.get());
pipeline_id_to_task.insert({pipeline->id(), task.get()});
@@ -552,7 +554,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const
doris::TPipelineFrag
}
}
_pipeline_parent_map.clear();
- _op_id_to_le_state.clear();
+ _op_id_to_shared_state.clear();
return Status::OK();
}
@@ -821,11 +823,10 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
return Status::InternalError("Unsupported local exchange type : " +
std::to_string((int)data_distribution.distribution_type));
}
- auto sink_dep = std::make_shared<Dependency>(sink_id, local_exchange_id,
-
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
- sink_dep->set_shared_state(shared_state.get());
- shared_state->sink_deps.push_back(sink_dep);
- _op_id_to_le_state.insert({local_exchange_id, {shared_state, sink_dep}});
+ shared_state->create_source_dependencies(_num_instances,
local_exchange_id, local_exchange_id,
+ "LOCAL_EXCHANGE_OPERATOR");
+ shared_state->create_sink_dependency(sink_id, local_exchange_id,
"LOCAL_EXCHANGE_SINK");
+ _op_id_to_shared_state.insert({local_exchange_id, {shared_state,
shared_state->sink_deps}});
// 3. Set two pipelines' operator list. For example, split pipeline [Scan
- AggSink] to
// pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource
- AggSink].
@@ -848,8 +849,6 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
}
operators.insert(operators.begin(), source_op);
- shared_state->create_dependencies(local_exchange_id);
-
// 5. Set children for two pipelines separately.
std::vector<std::shared_ptr<Pipeline>> new_children;
std::vector<PipelineId> edges_with_source;
@@ -1431,6 +1430,20 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
}
+ if (is_broadcast_join &&
_runtime_state->enable_share_hash_table_for_broadcast_join()) {
+ std::shared_ptr<HashJoinSharedState> shared_state =
+ HashJoinSharedState::create_shared(_num_instances);
+ for (int i = 0; i < _num_instances; i++) {
+ auto sink_dep =
std::make_shared<Dependency>(op->operator_id(), op->node_id(),
+
"HASH_JOIN_BUILD_DEPENDENCY");
+ sink_dep->set_shared_state(shared_state.get());
+ shared_state->sink_deps.push_back(sink_dep);
+ }
+ shared_state->create_source_dependencies(_num_instances,
op->operator_id(),
+ op->node_id(),
"HASH_JOIN_PROBE");
+ _op_id_to_shared_state.insert(
+ {op->operator_id(), {shared_state,
shared_state->sink_deps}});
+ }
_require_bucket_distribution =
_require_bucket_distribution ||
op->require_data_distribution();
break;
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 992f86bc76d..587a1989fa7 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -279,8 +279,22 @@ private:
int _operator_id = 0;
int _sink_operator_id = 0;
- std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
std::shared_ptr<Dependency>>>
- _op_id_to_le_state;
+ /**
+ * Some states are shared by tasks in different instances (e.g. local
exchange , broadcast join).
+ *
+ * local exchange sink 0 -> -> local
exchange source 0
+ * LocalExchangeSharedState
+ * local exchange sink 1 -> -> local
exchange source 1
+ *
+ * hash join build sink 0 -> -> hash join
build source 0
+ * HashJoinSharedState
+ * hash join build sink 1 -> -> hash join
build source 1
+ *
+ * So we should keep states here.
+ */
+ std::map<int,
+ std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
+ _op_id_to_shared_state;
std::map<PipelineId, Pipeline*> _pip_id_to_pipeline;
std::vector<std::unique_ptr<RuntimeFilterMgr>> _runtime_filter_mgr_map;
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 0db8a26efca..bfcfa03070e 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -53,13 +53,13 @@ class RuntimeState;
namespace doris::pipeline {
-PipelineTask::PipelineTask(
- PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
- PipelineFragmentContext* fragment_context, RuntimeProfile*
parent_profile,
- std::map<int,
- std::pair<std::shared_ptr<LocalExchangeSharedState>,
std::shared_ptr<Dependency>>>
- le_state_map,
- int task_idx)
+PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t task_id,
RuntimeState* state,
+ PipelineFragmentContext* fragment_context,
+ RuntimeProfile* parent_profile,
+ std::map<int,
std::pair<std::shared_ptr<BasicSharedState>,
+
std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map,
+ int task_idx)
: _index(task_id),
_pipeline(pipeline),
_opened(false),
@@ -70,7 +70,7 @@ PipelineTask::PipelineTask(
_source(_operators.front().get()),
_root(_operators.back().get()),
_sink(pipeline->sink_shared_pointer()),
- _le_state_map(std::move(le_state_map)),
+ _shared_state_map(std::move(shared_state_map)),
_task_idx(task_idx),
_execution_dep(state->get_query_ctx()->get_execution_dependency()),
_memory_sufficient_dependency(
@@ -96,9 +96,9 @@ Status PipelineTask::prepare(const
std::vector<TScanRangeParams>& scan_range, co
});
{
// set sink local state
- LocalSinkStateInfo info {_task_idx, _task_profile.get(),
- sender_id, get_sink_shared_state().get(),
- _le_state_map, tsink};
+ LocalSinkStateInfo info {_task_idx, _task_profile.get(),
+ sender_id,
get_sink_shared_state().get(),
+ _shared_state_map, tsink};
RETURN_IF_ERROR(_sink->setup_local_state(_state, info));
}
@@ -108,7 +108,7 @@ Status PipelineTask::prepare(const
std::vector<TScanRangeParams>& scan_range, co
for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
auto& op = _operators[op_idx];
LocalStateInfo info {parent_profile, _scan_ranges,
get_op_shared_state(op->operator_id()),
- _le_state_map, _task_idx};
+ _shared_state_map, _task_idx};
RETURN_IF_ERROR(op->setup_local_state(_state, info));
parent_profile = _state->get_local_state(op->operator_id())->profile();
}
@@ -604,7 +604,7 @@ Status PipelineTask::finalize() {
RETURN_IF_ERROR(_state_transition(State::FINALIZED));
_sink_shared_state.reset();
_op_shared_states.clear();
- _le_state_map.clear();
+ _shared_state_map.clear();
return Status::OK();
}
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 0c39f09c90b..a4fd3b4eac9 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -48,9 +48,9 @@ class PipelineTask {
public:
PipelineTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
PipelineFragmentContext* fragment_context, RuntimeProfile*
parent_profile,
- std::map<int,
std::pair<std::shared_ptr<LocalExchangeSharedState>,
- std::shared_ptr<Dependency>>>
- le_state_map,
+ std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
+
std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map,
int task_idx);
Status prepare(const std::vector<TScanRangeParams>& scan_range, const int
sender_id,
@@ -284,8 +284,9 @@ private:
std::map<int, std::shared_ptr<BasicSharedState>> _op_shared_states;
std::shared_ptr<BasicSharedState> _sink_shared_state;
std::vector<TScanRangeParams> _scan_ranges;
- std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
std::shared_ptr<Dependency>>>
- _le_state_map;
+ std::map<int,
+ std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
+ _shared_state_map;
int _task_idx;
bool _dry_run = false;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 9b0cbb4665c..233175005d4 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -90,7 +90,6 @@
#include "util/threadpool.h"
#include "util/thrift_util.h"
#include "util/uid_util.h"
-#include "vec/runtime/shared_hash_table_controller.h"
namespace doris {
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index fb6eee2bfd6..76fd17e7d94 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -125,7 +125,6 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv*
exec_env,
_init_resource_context();
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker());
_query_watcher.start();
- _shared_hash_table_controller.reset(new
vectorized::SharedHashTableController());
_execution_dependency =
pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency",
false);
_memory_sufficient_dependency =
@@ -264,7 +263,6 @@ QueryContext::~QueryContext() {
#endif
_runtime_filter_mgr.reset();
_execution_dependency.reset();
- _shared_hash_table_controller.reset();
_runtime_predicates.clear();
file_scan_range_params_map.clear();
obj_pool.clear();
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 84128875176..53f008438ec 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -41,7 +41,6 @@
#include "util/hash_util.hpp"
#include "util/threadpool.h"
#include "vec/exec/scan/scanner_scheduler.h"
-#include "vec/runtime/shared_hash_table_controller.h"
#include "workload_group/workload_group.h"
namespace doris {
@@ -185,10 +184,6 @@ public:
void set_ready_to_execute_only();
- std::shared_ptr<vectorized::SharedHashTableController>
get_shared_hash_table_controller() {
- return _shared_hash_table_controller;
- }
-
bool has_runtime_predicate(int source_node_id) {
return _runtime_predicates.contains(source_node_id);
}
@@ -414,7 +409,6 @@ private:
void _init_resource_context();
void _init_query_mem_tracker();
- std::shared_ptr<vectorized::SharedHashTableController>
_shared_hash_table_controller;
std::unordered_map<int, vectorized::RuntimePredicate> _runtime_predicates;
std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
diff --git a/be/src/runtime_filter/runtime_filter.h
b/be/src/runtime_filter/runtime_filter.h
index 0a66bc02ba0..06b12a69835 100644
--- a/be/src/runtime_filter/runtime_filter.h
+++ b/be/src/runtime_filter/runtime_filter.h
@@ -82,6 +82,8 @@ public:
}
virtual std::string debug_string() const = 0;
+ std::shared_ptr<RuntimeFilterWrapper> wrapper() const { return _wrapper; }
+ void set_wrapper(std::shared_ptr<RuntimeFilterWrapper> wrapper) { _wrapper
= wrapper; }
protected:
RuntimeFilter(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc*
desc)
diff --git a/be/src/runtime_filter/runtime_filter_producer.h
b/be/src/runtime_filter/runtime_filter_producer.h
index bfb9239a0b4..7a56d97189d 100644
--- a/be/src/runtime_filter/runtime_filter_producer.h
+++ b/be/src/runtime_filter/runtime_filter_producer.h
@@ -22,7 +22,6 @@
#include "pipeline/dependency.h"
#include "runtime/query_context.h"
#include "runtime_filter/runtime_filter.h"
-#include "vec/runtime/shared_hash_table_controller.h"
namespace doris {
#include "common/compile_check_begin.h"
@@ -109,15 +108,6 @@ public:
}
}
- void copy_to_shared_context(const vectorized::SharedHashTableContextPtr&
context) {
- DCHECK(!context->runtime_filters.contains(_wrapper->filter_id()));
- context->runtime_filters[_wrapper->filter_id()] = _wrapper;
- }
- void copy_from_shared_context(const vectorized::SharedHashTableContextPtr&
context) {
- DCHECK(context->runtime_filters.contains(_wrapper->filter_id()));
- _wrapper = context->runtime_filters[_wrapper->filter_id()];
- }
-
bool set_state(State state) {
std::unique_lock<std::mutex> l(_mtx);
if (_rf_state == State::PUBLISHED ||
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.cpp
b/be/src/runtime_filter/runtime_filter_producer_helper.cpp
index 24861b61b92..b9f3caca364 100644
--- a/be/src/runtime_filter/runtime_filter_producer_helper.cpp
+++ b/be/src/runtime_filter/runtime_filter_producer_helper.cpp
@@ -85,8 +85,8 @@ Status RuntimeFilterProducerHelper::_publish(RuntimeState*
state) {
}
Status RuntimeFilterProducerHelper::process(
- RuntimeState* state, const vectorized::Block* block,
- const vectorized::SharedHashTableContextPtr& shared_hash_table_ctx) {
+ RuntimeState* state, const vectorized::Block* block, bool
use_shared_table,
+ std::map<int, std::shared_ptr<RuntimeFilterWrapper>>& runtime_filters)
{
if (_skip_runtime_filters_process) {
return Status::OK();
}
@@ -106,12 +106,14 @@ Status RuntimeFilterProducerHelper::process(
}
for (const auto& filter : _producers) {
- if (shared_hash_table_ctx && !wake_up_early) {
+ if (use_shared_table && !wake_up_early) {
DCHECK(_is_broadcast_join);
if (_should_build_hash_table) {
- filter->copy_to_shared_context(shared_hash_table_ctx);
+
DCHECK(!runtime_filters.contains(filter->wrapper()->filter_id()));
+ runtime_filters[filter->wrapper()->filter_id()] =
filter->wrapper();
} else {
- filter->copy_from_shared_context(shared_hash_table_ctx);
+
DCHECK(runtime_filters.contains(filter->wrapper()->filter_id()));
+
filter->set_wrapper(runtime_filters[filter->wrapper()->filter_id()]);
}
}
filter->set_wrapper_state_and_ready_to_publish(wrapper_state);
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.h
b/be/src/runtime_filter/runtime_filter_producer_helper.h
index 3a703c9f591..fc8944ea3cb 100644
--- a/be/src/runtime_filter/runtime_filter_producer_helper.h
+++ b/be/src/runtime_filter/runtime_filter_producer_helper.h
@@ -25,13 +25,12 @@
#include "runtime_filter/runtime_filter_producer.h"
#include "vec/core/block.h" // IWYU pragma: keep
#include "vec/exprs/vexpr_context.h"
-#include "vec/runtime/shared_hash_table_controller.h"
namespace doris {
#include "common/compile_check_begin.h"
// this class used in hash join node
/**
- * init -> (skip_runtime_filters ->) send_filter_size -> process
+ * init -> (skip_process ->) send_filter_size -> (share_filters ->) process
*/
class RuntimeFilterProducerHelper {
public:
@@ -64,8 +63,8 @@ public:
MOCK_FUNCTION Status skip_process(RuntimeState* state);
// build rf's predicate and publish rf
- Status process(RuntimeState* state, const vectorized::Block* block,
- const vectorized::SharedHashTableContextPtr&
shared_hash_table_ctx);
+ Status process(RuntimeState* state, const vectorized::Block* block, bool
use_shared_table,
+ std::map<int, std::shared_ptr<RuntimeFilterWrapper>>&
runtime_filters);
protected:
virtual void _init_expr(const vectorized::VExprContextSPtrs&
build_expr_ctxs,
diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp
b/be/src/vec/runtime/shared_hash_table_controller.cpp
deleted file mode 100644
index 286f32bb38b..00000000000
--- a/be/src/vec/runtime/shared_hash_table_controller.cpp
+++ /dev/null
@@ -1,70 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "shared_hash_table_controller.h"
-
-#include <glog/logging.h>
-#include <runtime/runtime_state.h>
-// IWYU pragma: no_include <bits/chrono.h>
-#include <chrono> // IWYU pragma: keep
-#include <utility>
-
-#include "pipeline/exec/hashjoin_build_sink.h"
-
-namespace doris::vectorized {
-#include "common/compile_check_begin.h"
-
-void SharedHashTableController::set_builder_and_consumers(TUniqueId builder,
int node_id) {
- // Only need to set builder and consumers with pipeline engine enabled.
- std::lock_guard<std::mutex> lock(_mutex);
- DCHECK(_builder_fragment_ids.find(node_id) ==
_builder_fragment_ids.cend());
- _builder_fragment_ids.insert({node_id, builder});
-}
-
-SharedHashTableContextPtr SharedHashTableController::get_context(int
my_node_id) {
- std::lock_guard<std::mutex> lock(_mutex);
- if (!_shared_contexts.contains(my_node_id)) {
- _shared_contexts.insert({my_node_id,
std::make_shared<SharedHashTableContext>()});
- }
- return _shared_contexts[my_node_id];
-}
-
-void SharedHashTableController::signal_finish(int my_node_id) {
- std::lock_guard<std::mutex> lock(_mutex);
- auto it = _shared_contexts.find(my_node_id);
- if (it != _shared_contexts.cend()) {
- it->second->signaled = true;
- _shared_contexts.erase(it);
- }
- for (auto& dep : _dependencies[my_node_id]) {
- dep->set_ready();
- }
- for (auto& dep : _finish_dependencies[my_node_id]) {
- dep->set_ready();
- }
-}
-
-TUniqueId SharedHashTableController::get_builder_fragment_instance_id(int
my_node_id) {
- std::lock_guard<std::mutex> lock(_mutex);
- auto it = _builder_fragment_ids.find(my_node_id);
- if (it == _builder_fragment_ids.cend()) {
- return TUniqueId {};
- }
- return it->second;
-}
-
-} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h
b/be/src/vec/runtime/shared_hash_table_controller.h
deleted file mode 100644
index 51f4cfda3b8..00000000000
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ /dev/null
@@ -1,96 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <gen_cpp/Types_types.h>
-
-#include <condition_variable>
-#include <map>
-#include <memory>
-#include <mutex>
-#include <vector>
-
-#include "common/status.h"
-#include "runtime_filter/runtime_filter_definitions.h"
-#include "runtime_filter/runtime_filter_wrapper.h"
-#include "vec/core/block.h"
-
-namespace doris {
-#include "common/compile_check_begin.h"
-
-class RuntimeState;
-class MinMaxFuncBase;
-class HybridSetBase;
-class BloomFilterFuncBase;
-class BitmapFilterFuncBase;
-
-namespace pipeline {
-class Dependency;
-}
-namespace vectorized {
-
-class Arena;
-
-struct SharedHashTableContext {
- SharedHashTableContext()
- : hash_table_variants(nullptr),
block(std::make_shared<vectorized::Block>()) {}
-
- Status status;
- std::shared_ptr<Arena> arena;
- std::shared_ptr<void> hash_table_variants;
- std::shared_ptr<Block> block;
- std::shared_ptr<std::vector<uint32_t>> build_indexes_null;
- std::map<int, std::shared_ptr<RuntimeFilterWrapper>> runtime_filters;
- std::atomic<bool> signaled = false;
- bool short_circuit_for_null_in_probe_side = false;
-};
-
-using SharedHashTableContextPtr = std::shared_ptr<SharedHashTableContext>;
-
-class SharedHashTableController {
-public:
- /// set hash table builder's fragment instance id and consumers' fragment
instance id
- void set_builder_and_consumers(TUniqueId builder, int node_id);
- TUniqueId get_builder_fragment_instance_id(int my_node_id);
- SharedHashTableContextPtr get_context(int my_node_id);
- void signal_finish(int my_node_id);
- void append_dependency(int node_id, std::shared_ptr<pipeline::Dependency>
dep,
- std::shared_ptr<pipeline::Dependency> finish_dep) {
- std::lock_guard<std::mutex> lock(_mutex);
- if (!_dependencies.contains(node_id)) {
- _dependencies.insert({node_id, {}});
- _finish_dependencies.insert({node_id, {}});
- }
- _dependencies[node_id].push_back(dep);
- _finish_dependencies[node_id].push_back(finish_dep);
- }
-
-private:
- std::mutex _mutex;
- // For pipelineX, we update all dependencies once hash table is built;
- std::map<int /*node id*/,
std::vector<std::shared_ptr<pipeline::Dependency>>> _dependencies;
- std::map<int /*node id*/,
std::vector<std::shared_ptr<pipeline::Dependency>>>
- _finish_dependencies;
- std::map<int /*node id*/, TUniqueId /*fragment instance id*/>
_builder_fragment_ids;
- std::map<int /*node id*/, SharedHashTableContextPtr> _shared_contexts;
-};
-
-} // namespace vectorized
-} // namespace doris
-
-#include "common/compile_check_end.h"
diff --git a/be/test/olap/wal/wal_manager_test.cpp
b/be/test/olap/wal/wal_manager_test.cpp
index 459a61d4b76..2229ee73cf7 100644
--- a/be/test/olap/wal/wal_manager_test.cpp
+++ b/be/test/olap/wal/wal_manager_test.cpp
@@ -282,10 +282,11 @@ void WalManagerTest::init() {
auto local_state =
pipeline::FileScanLocalState::create_unique(&_runtime_state,
_scan_node.get());
std::vector<TScanRangeParams> scan_ranges;
- std::map<int,
std::pair<std::shared_ptr<pipeline::LocalExchangeSharedState>,
- std::shared_ptr<pipeline::Dependency>>>
- le_state_map;
- pipeline::LocalStateInfo info {&_global_profile, scan_ranges, nullptr,
le_state_map, 0};
+ pipeline::LocalStateInfo info {.parent_profile = &_global_profile,
+ .scan_ranges = scan_ranges,
+ .shared_state = nullptr,
+ .shared_state_map = {},
+ .task_idx = 0};
WARN_IF_ERROR(local_state->init(&_runtime_state, info), "fail to init
local_state");
_runtime_state.emplace_local_state(_scan_node->operator_id(),
std::move(local_state));
diff --git a/be/test/pipeline/local_exchanger_test.cpp
b/be/test/pipeline/local_exchanger_test.cpp
index 9a6c4acb9f3..d3a9b0e2d5d 100644
--- a/be/test/pipeline/local_exchanger_test.cpp
+++ b/be/test/pipeline/local_exchanger_test.cpp
@@ -95,7 +95,7 @@ TEST_F(LocalExchangerTest, ShuffleExchanger) {
auto sink_dep = std::make_shared<Dependency>(0, 0,
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
sink_dep->set_shared_state(shared_state.get());
shared_state->sink_deps.push_back(sink_dep);
- shared_state->create_dependencies(0);
+ shared_state->create_source_dependencies(num_sources, 0, 0, "TEST");
auto* exchanger = (ShuffleExchanger*)shared_state->exchanger.get();
for (size_t i = 0; i < num_sink; i++) {
@@ -338,7 +338,7 @@ TEST_F(LocalExchangerTest, PassthroughExchanger) {
auto sink_dep = std::make_shared<Dependency>(0, 0,
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
sink_dep->set_shared_state(shared_state.get());
shared_state->sink_deps.push_back(sink_dep);
- shared_state->create_dependencies(0);
+ shared_state->create_source_dependencies(num_sources, 0, 0, "TEST");
auto* exchanger = (PassthroughExchanger*)shared_state->exchanger.get();
for (size_t i = 0; i < num_sink; i++) {
@@ -532,7 +532,7 @@ TEST_F(LocalExchangerTest, PassToOneExchanger) {
auto sink_dep = std::make_shared<Dependency>(0, 0,
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
sink_dep->set_shared_state(shared_state.get());
shared_state->sink_deps.push_back(sink_dep);
- shared_state->create_dependencies(0);
+ shared_state->create_source_dependencies(num_sources, 0, 0, "TEST");
auto* exchanger = (PassToOneExchanger*)shared_state->exchanger.get();
for (size_t i = 0; i < num_sink; i++) {
@@ -734,7 +734,7 @@ TEST_F(LocalExchangerTest, BroadcastExchanger) {
auto sink_dep = std::make_shared<Dependency>(0, 0,
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
sink_dep->set_shared_state(shared_state.get());
shared_state->sink_deps.push_back(sink_dep);
- shared_state->create_dependencies(0);
+ shared_state->create_source_dependencies(num_sources, 0, 0, "TEST");
auto* exchanger = (BroadcastExchanger*)shared_state->exchanger.get();
for (size_t i = 0; i < num_sink; i++) {
@@ -931,7 +931,7 @@ TEST_F(LocalExchangerTest, AdaptivePassthroughExchanger) {
auto sink_dep = std::make_shared<Dependency>(0, 0,
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
sink_dep->set_shared_state(shared_state.get());
shared_state->sink_deps.push_back(sink_dep);
- shared_state->create_dependencies(0);
+ shared_state->create_source_dependencies(num_sources, 0, 0, "TEST");
auto* exchanger =
(AdaptivePassthroughExchanger*)shared_state->exchanger.get();
for (size_t i = 0; i < num_sink; i++) {
@@ -1140,7 +1140,7 @@ TEST_F(LocalExchangerTest, TestShuffleExchangerWrongMap) {
auto sink_dep = std::make_shared<Dependency>(0, 0,
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
sink_dep->set_shared_state(shared_state.get());
shared_state->sink_deps.push_back(sink_dep);
- shared_state->create_dependencies(0);
+ shared_state->create_source_dependencies(num_sources, 0, 0, "TEST");
auto* exchanger = (ShuffleExchanger*)shared_state->exchanger.get();
auto texpr = TExprNodeBuilder(TExprNodeType::SLOT_REF,
diff --git a/be/test/pipeline/operator/agg_operator_test.cpp
b/be/test/pipeline/operator/agg_operator_test.cpp
index de9af15c1ba..1cb002c85d9 100644
--- a/be/test/pipeline/operator/agg_operator_test.cpp
+++ b/be/test/pipeline/operator/agg_operator_test.cpp
@@ -42,7 +42,7 @@ auto static
init_sink_and_source(std::shared_ptr<AggSinkOperatorX> sink_op,
.parent_profile = &ctx.profile,
.sender_id = 0,
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.tsink = TDataSink {}};
EXPECT_TRUE(local_state->init(&ctx.state, info).ok());
ctx.state.emplace_sink_local_state(0, std::move(local_state));
@@ -54,7 +54,7 @@ auto static
init_sink_and_source(std::shared_ptr<AggSinkOperatorX> sink_op,
LocalStateInfo info {.parent_profile = &ctx.profile,
.scan_ranges = {},
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.task_idx = 0};
EXPECT_TRUE(local_state->init(&ctx.state, info).ok());
diff --git
a/be/test/pipeline/operator/distinct_streaming_aggregation_operator_test.cpp
b/be/test/pipeline/operator/distinct_streaming_aggregation_operator_test.cpp
index c1e92272739..1377ee03328 100644
--- a/be/test/pipeline/operator/distinct_streaming_aggregation_operator_test.cpp
+++ b/be/test/pipeline/operator/distinct_streaming_aggregation_operator_test.cpp
@@ -57,7 +57,7 @@ struct DistinctStreamingAggOperatorTest : public
::testing::Test {
LocalStateInfo info {.parent_profile = &profile,
.scan_ranges = {},
.shared_state = nullptr,
- .le_state_map = {},
+ .shared_state_map = {},
.task_idx = 0};
EXPECT_TRUE(local_state->init(state.get(), info));
state->resize_op_id_to_local_state(-100);
diff --git a/be/test/pipeline/operator/exchange_sink_operator_test.cpp
b/be/test/pipeline/operator/exchange_sink_operator_test.cpp
index 945eab84ec5..aa3fc6f7877 100644
--- a/be/test/pipeline/operator/exchange_sink_operator_test.cpp
+++ b/be/test/pipeline/operator/exchange_sink_operator_test.cpp
@@ -92,7 +92,7 @@ auto create_exchange_sink(std::vector<ChannelInfo>
channel_info) {
.parent_profile = &ctx->profile,
.sender_id = 0,
.shared_state = nullptr,
- .le_state_map = {},
+ .shared_state_map = {},
.tsink = TDataSink {}};
EXPECT_TRUE(local_state->init(&ctx->state, info).ok());
ctx->state.emplace_sink_local_state(0, std::move(local_state));
diff --git a/be/test/pipeline/operator/exchange_source_operator_test.cpp
b/be/test/pipeline/operator/exchange_source_operator_test.cpp
index db71bc6982a..709921ac48c 100644
--- a/be/test/pipeline/operator/exchange_source_operator_test.cpp
+++ b/be/test/pipeline/operator/exchange_source_operator_test.cpp
@@ -88,7 +88,7 @@ struct ExchangeSourceOperatorXTest : public ::testing::Test {
LocalStateInfo info {.parent_profile = &profile,
.scan_ranges = {},
.shared_state = nullptr,
- .le_state_map = {},
+ .shared_state_map = {},
.task_idx = 0};
auto st = local_state->init(state.get(), info);
state->resize_op_id_to_local_state(-100);
diff --git
a/be/test/pipeline/operator/local_merge_sort_source_operator_test.cpp
b/be/test/pipeline/operator/local_merge_sort_source_operator_test.cpp
index 6264c1c52ab..ee653a950cf 100644
--- a/be/test/pipeline/operator/local_merge_sort_source_operator_test.cpp
+++ b/be/test/pipeline/operator/local_merge_sort_source_operator_test.cpp
@@ -66,7 +66,7 @@ struct LocalMergeSOrtSourceOperatorTest : public
testing::Test {
LocalStateInfo info {.parent_profile = &profile,
.scan_ranges = {},
.shared_state = shared_states[i].get(),
- .le_state_map = {},
+ .shared_state_map = {},
.task_idx = i};
EXPECT_TRUE(local_state->init(runtime_states[i].get(), info));
runtime_states[i]->resize_op_id_to_local_state(-100);
diff --git
a/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp
b/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp
index ba24d77d5d7..c4b04260b4c 100644
--- a/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp
+++ b/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp
@@ -62,7 +62,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest, Init) {
.parent_profile = _helper.runtime_profile.get(),
.sender_id = 0,
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.tsink = TDataSink()};
st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
@@ -101,7 +101,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest, Sink) {
.parent_profile = _helper.runtime_profile.get(),
.sender_id = 0,
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.tsink = TDataSink()};
st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
@@ -152,7 +152,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest,
SinkWithEmptyEOS) {
.parent_profile = _helper.runtime_profile.get(),
.sender_id = 0,
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.tsink = TDataSink()};
st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
@@ -204,7 +204,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest,
SinkWithSpill) {
.parent_profile = _helper.runtime_profile.get(),
.sender_id = 0,
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.tsink = TDataSink()};
st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
@@ -274,7 +274,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest,
SinkWithSpillAndEmptyEOS) {
.parent_profile = _helper.runtime_profile.get(),
.sender_id = 0,
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.tsink = TDataSink()};
st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
@@ -341,7 +341,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest,
SinkWithSpillLargeData) {
.parent_profile = _helper.runtime_profile.get(),
.sender_id = 0,
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.tsink = TDataSink()};
st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
@@ -424,7 +424,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest,
SinkWithSpilError) {
.parent_profile = _helper.runtime_profile.get(),
.sender_id = 0,
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.tsink = TDataSink()};
st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
diff --git
a/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp
b/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp
index 8daff47ead5..67d49eb24c3 100644
--- a/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp
+++ b/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp
@@ -68,7 +68,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest, Init) {
.parent_profile = _helper.runtime_profile.get(),
.scan_ranges = {},
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.task_idx = 0,
};
st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
@@ -113,7 +113,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest,
GetBlockEmpty) {
.parent_profile =
_helper.runtime_profile.get(),
.sender_id = 0,
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.tsink = TDataSink()};
st = sink_operator->setup_local_state(_helper.runtime_state.get(),
sink_info);
ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
@@ -128,7 +128,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest,
GetBlockEmpty) {
.parent_profile = _helper.runtime_profile.get(),
.scan_ranges = {},
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.task_idx = 0,
};
st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
@@ -184,7 +184,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest, GetBlock) {
.parent_profile =
_helper.runtime_profile.get(),
.sender_id = 0,
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.tsink = TDataSink()};
st = sink_operator->setup_local_state(_helper.runtime_state.get(),
sink_info);
ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
@@ -218,7 +218,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest, GetBlock) {
.parent_profile = _helper.runtime_profile.get(),
.scan_ranges = {},
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.task_idx = 0,
};
st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
@@ -276,7 +276,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest,
GetBlockWithSpill) {
.parent_profile =
_helper.runtime_profile.get(),
.sender_id = 0,
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.tsink = TDataSink()};
st = sink_operator->setup_local_state(_helper.runtime_state.get(),
sink_info);
ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
@@ -323,7 +323,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest,
GetBlockWithSpill) {
.parent_profile = _helper.runtime_profile.get(),
.scan_ranges = {},
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.task_idx = 0,
};
st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
@@ -389,7 +389,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest,
GetBlockWithSpillError) {
.parent_profile =
_helper.runtime_profile.get(),
.sender_id = 0,
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.tsink = TDataSink()};
st = sink_operator->setup_local_state(_helper.runtime_state.get(),
sink_info);
ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
@@ -436,7 +436,7 @@ TEST_F(PartitionedAggregationSourceOperatorTest,
GetBlockWithSpillError) {
.parent_profile = _helper.runtime_profile.get(),
.scan_ranges = {},
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.task_idx = 0,
};
st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
diff --git a/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp
b/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp
index 0d2a653c6c3..c8480cbb2ec 100644
--- a/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp
+++ b/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp
@@ -181,10 +181,11 @@ PartitionedAggregationTestHelper::create_operators() {
EXPECT_TRUE(sink_operator->set_child(child_operator));
// Setup task and state
- std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
std::shared_ptr<Dependency>>>
- le_state_map;
+ std::map<int,
+ std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map;
pipeline_task = std::make_shared<PipelineTask>(source_pipeline, 0,
runtime_state.get(), nullptr,
- nullptr, le_state_map, 0);
+ nullptr, shared_state_map,
0);
runtime_state->set_task(pipeline_task.get());
return {std::move(source_operator), std::move(sink_operator)};
}
diff --git
a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
index 9f2c3b48790..b9b1a1329df 100644
--- a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
+++ b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
@@ -67,9 +67,6 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, debug_string) {
TEST_F(PartitionedHashJoinProbeOperatorTest, InitAndOpen) {
auto [probe_operator, sink_operator] = _helper.create_operators();
- std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
std::shared_ptr<Dependency>>>
- le_state_map;
-
auto local_state = PartitionedHashJoinProbeLocalState::create_shared(
_helper.runtime_state.get(), probe_operator.get());
@@ -77,7 +74,7 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, InitAndOpen) {
LocalStateInfo info {.parent_profile = _helper.runtime_profile.get(),
.scan_ranges = {},
.shared_state = shared_state.get(),
- .le_state_map = le_state_map,
+ .shared_state_map = {},
.task_idx = 0};
auto st = local_state->init(_helper.runtime_state.get(), info);
ASSERT_TRUE(st) << "init failed: " << st.to_string();
diff --git
a/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp
b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp
index 29cb355b0a0..a9dae776a2d 100644
--- a/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp
+++ b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp
@@ -118,12 +118,10 @@ TEST_F(PartitionedHashJoinSinkOperatorTest,
InitLocalState) {
ASSERT_TRUE(st.ok()) << "Prepare failed: " << st.to_string();
RuntimeProfile runtime_profile("test");
- std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
std::shared_ptr<Dependency>>>
- le_state_map;
TDataSink t_sink;
LocalSinkStateInfo info {.parent_profile = &runtime_profile,
.shared_state = shared_state.get(),
- .le_state_map = le_state_map,
+ .shared_state_map = {},
.tsink = t_sink};
st = local_state->init(_helper.runtime_state.get(), info);
ASSERT_TRUE(st) << "init failed: " << st.to_string();
@@ -222,12 +220,10 @@ TEST_F(PartitionedHashJoinSinkOperatorTest,
SinkEosAndSpill) {
auto shared_state = std::make_shared<MockPartitionedHashJoinSharedState>();
- std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
std::shared_ptr<Dependency>>>
- le_state_map;
LocalSinkStateInfo sink_info {.task_idx = 0,
.parent_profile =
_helper.runtime_profile.get(),
.shared_state = shared_state.get(),
- .le_state_map = le_state_map,
+ .shared_state_map = {},
.tsink = TDataSink()};
auto st = sink_operator->setup_local_state(_helper.runtime_state.get(),
sink_info);
ASSERT_TRUE(st.ok()) << "Setup local state failed: " << st.to_string();
diff --git a/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp
b/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp
index 001bcd8e224..7d15e80fa67 100644
--- a/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp
+++ b/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp
@@ -154,10 +154,11 @@ PartitionedHashJoinTestHelper::create_operators() {
sink_operator->set_inner_operators(inner_sink_operator,
inner_probe_operator);
// Setup task and state
- std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
std::shared_ptr<Dependency>>>
- le_state_map;
+ std::map<int,
+ std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map;
pipeline_task = std::make_shared<PipelineTask>(probe_pipeline, 0,
runtime_state.get(), nullptr,
- nullptr, le_state_map, 0);
+ nullptr, shared_state_map,
0);
runtime_state->set_task(pipeline_task.get());
return {probe_operator, sink_operator};
}
diff --git a/be/test/pipeline/operator/repeat_operator_test.cpp
b/be/test/pipeline/operator/repeat_operator_test.cpp
index a1c120e95f3..e49c7c452df 100644
--- a/be/test/pipeline/operator/repeat_operator_test.cpp
+++ b/be/test/pipeline/operator/repeat_operator_test.cpp
@@ -50,7 +50,7 @@ struct RepeatOperatorTest : public ::testing::Test {
LocalStateInfo info {.parent_profile = &profile,
.scan_ranges = {},
.shared_state = nullptr,
- .le_state_map = {},
+ .shared_state_map = {},
.task_idx = 0};
EXPECT_TRUE(local_state->init(state.get(), info));
state->resize_op_id_to_local_state(-100);
diff --git a/be/test/pipeline/operator/set_operator_test.cpp
b/be/test/pipeline/operator/set_operator_test.cpp
index 78bc2c02a36..62675144ffa 100644
--- a/be/test/pipeline/operator/set_operator_test.cpp
+++ b/be/test/pipeline/operator/set_operator_test.cpp
@@ -78,7 +78,7 @@ struct SetOperatorTest : public ::testing::Test {
LocalStateInfo info {.parent_profile = &profile,
.scan_ranges = {},
.shared_state = shared_state_sptr.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.task_idx = 0};
EXPECT_TRUE(source_local_state->init(state.get(), info));
state->resize_op_id_to_local_state(-100);
@@ -91,7 +91,7 @@ struct SetOperatorTest : public ::testing::Test {
.parent_profile = &profile,
.sender_id = 0,
.shared_state = shared_state_sptr.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.tsink = TDataSink {}};
EXPECT_TRUE(sink_local_state->init(state.get(), info));
state->emplace_sink_local_state(sink_op->operator_id(),
@@ -112,7 +112,7 @@ struct SetOperatorTest : public ::testing::Test {
.parent_profile = &profile,
.sender_id = 0,
.shared_state = shared_state_sptr.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.tsink = TDataSink {}};
EXPECT_TRUE(probe_sink_local_state[i]->init(states[i].get(),
info));
states[i]->emplace_sink_local_state(probe_sink_ops[i]->operator_id(),
diff --git a/be/test/pipeline/operator/sort_operator_test.cpp
b/be/test/pipeline/operator/sort_operator_test.cpp
index 1867f31dbf1..cd1a4c35d85 100644
--- a/be/test/pipeline/operator/sort_operator_test.cpp
+++ b/be/test/pipeline/operator/sort_operator_test.cpp
@@ -90,7 +90,7 @@ struct SortOperatorTest : public ::testing::Test {
.parent_profile = &profile,
.sender_id = 0,
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.tsink = TDataSink {}};
EXPECT_TRUE(sink_local_state_uptr->init(state.get(), info).ok());
state->emplace_sink_local_state(0,
std::move(sink_local_state_uptr));
@@ -102,7 +102,7 @@ struct SortOperatorTest : public ::testing::Test {
LocalStateInfo info {.parent_profile = &profile,
.scan_ranges = {},
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.task_idx = 0};
EXPECT_TRUE(source_local_state_uptr->init(state.get(), info).ok());
diff --git a/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp
b/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp
index 75101ae4edb..ac5da984389 100644
--- a/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp
+++ b/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp
@@ -60,7 +60,7 @@ TEST_F(SpillSortSinkOperatorTest, Basic) {
.parent_profile = _helper.runtime_profile.get(),
.sender_id = 0,
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.tsink = {}};
st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
@@ -112,7 +112,7 @@ TEST_F(SpillSortSinkOperatorTest, Sink) {
.parent_profile = _helper.runtime_profile.get(),
.sender_id = 0,
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.tsink = {}};
st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
@@ -209,7 +209,7 @@ TEST_F(SpillSortSinkOperatorTest, SinkWithSpill) {
.parent_profile = _helper.runtime_profile.get(),
.sender_id = 0,
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.tsink = {}};
st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
@@ -287,7 +287,7 @@ TEST_F(SpillSortSinkOperatorTest, SinkWithSpill2) {
.parent_profile = _helper.runtime_profile.get(),
.sender_id = 0,
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.tsink = {}};
st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
@@ -349,7 +349,7 @@ TEST_F(SpillSortSinkOperatorTest, SinkWithSpillError) {
.parent_profile = _helper.runtime_profile.get(),
.sender_id = 0,
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.tsink = {}};
st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
diff --git a/be/test/pipeline/operator/spill_sort_source_operator_test.cpp
b/be/test/pipeline/operator/spill_sort_source_operator_test.cpp
index aa9340a7d9b..87e5c9d1bf5 100644
--- a/be/test/pipeline/operator/spill_sort_source_operator_test.cpp
+++ b/be/test/pipeline/operator/spill_sort_source_operator_test.cpp
@@ -62,7 +62,7 @@ TEST_F(SpillSortSourceOperatorTest, Basic) {
LocalStateInfo info {.parent_profile = _helper.runtime_profile.get(),
.scan_ranges = {},
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.task_idx = 0};
st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
@@ -109,7 +109,7 @@ TEST_F(SpillSortSourceOperatorTest, GetBlock) {
LocalStateInfo info {.parent_profile = _helper.runtime_profile.get(),
.scan_ranges = {},
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.task_idx = 0};
st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
@@ -186,7 +186,7 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpill) {
LocalStateInfo info {.parent_profile = _helper.runtime_profile.get(),
.scan_ranges = {},
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.task_idx = 0};
st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
@@ -332,7 +332,7 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpill2) {
LocalStateInfo info {.parent_profile = _helper.runtime_profile.get(),
.scan_ranges = {},
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.task_idx = 0};
st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
@@ -482,7 +482,7 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpillError)
{
LocalStateInfo info {.parent_profile = _helper.runtime_profile.get(),
.scan_ranges = {},
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.task_idx = 0};
st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
diff --git a/be/test/pipeline/operator/spill_sort_test_helper.cpp
b/be/test/pipeline/operator/spill_sort_test_helper.cpp
index ca93291d273..0e291343737 100644
--- a/be/test/pipeline/operator/spill_sort_test_helper.cpp
+++ b/be/test/pipeline/operator/spill_sort_test_helper.cpp
@@ -159,10 +159,11 @@ SpillSortTestHelper::create_operators() {
EXPECT_TRUE(sink_operator->set_child(child_operator));
// Setup task and state
- std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
std::shared_ptr<Dependency>>>
- le_state_map;
+ std::map<int,
+ std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map;
pipeline_task = std::make_shared<PipelineTask>(source_pipeline, 0,
runtime_state.get(), nullptr,
- nullptr, le_state_map, 0);
+ nullptr, shared_state_map,
0);
runtime_state->set_task(pipeline_task.get());
return {std::move(source_operator), std::move(sink_operator)};
}
diff --git a/be/test/pipeline/operator/union_operator_test.cpp
b/be/test/pipeline/operator/union_operator_test.cpp
index d655b7bf4ed..2c57d10b89e 100644
--- a/be/test/pipeline/operator/union_operator_test.cpp
+++ b/be/test/pipeline/operator/union_operator_test.cpp
@@ -96,7 +96,7 @@ TEST_F(UnionOperatorTest, test_all_const_expr) {
LocalStateInfo info {.parent_profile = &profile,
.scan_ranges = {},
.shared_state = nullptr,
- .le_state_map = {},
+ .shared_state_map = {},
.task_idx = 0};
EXPECT_TRUE(source_local_state->init(state.get(), info));
state->resize_op_id_to_local_state(-100);
@@ -190,7 +190,7 @@ TEST_F(UnionOperatorTest, test_sink_and_source) {
LocalStateInfo info {.parent_profile = &profile,
.scan_ranges = {},
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.task_idx = 0};
EXPECT_TRUE(source_local_state->init(state.get(), info));
state->resize_op_id_to_local_state(-100);
@@ -209,7 +209,7 @@ TEST_F(UnionOperatorTest, test_sink_and_source) {
.parent_profile = &profile,
.sender_id = 0,
.shared_state = shared_state.get(),
- .le_state_map = {},
+ .shared_state_map = {},
.tsink = TDataSink {}};
EXPECT_TRUE(sink_local_state->init(sink_state[i].get(), info));
sink_state[i]->resize_op_id_to_local_state(-100);
diff --git a/be/test/pipeline/pipeline_test.cpp
b/be/test/pipeline/pipeline_test.cpp
index 337b214b62f..1f9d345087a 100644
--- a/be/test/pipeline/pipeline_test.cpp
+++ b/be/test/pipeline/pipeline_test.cpp
@@ -272,12 +272,12 @@ TEST_F(PipelineTest, HAPPY_PATH) {
_pipeline_profiles[cur_pipe->id()] =
std::make_shared<RuntimeProfile>("Pipeline : " +
std::to_string(cur_pipe->id()));
- std::map<int,
- std::pair<std::shared_ptr<LocalExchangeSharedState>,
std::shared_ptr<Dependency>>>
- le_state_map;
+ std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
+ std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map;
auto task = std::make_unique<PipelineTask>(
cur_pipe, task_id, local_runtime_state.get(),
_context.back().get(),
- _pipeline_profiles[cur_pipe->id()].get(), le_state_map,
task_id);
+ _pipeline_profiles[cur_pipe->id()].get(), shared_state_map,
task_id);
cur_pipe->incr_created_tasks(task_id, task.get());
local_runtime_state->set_task(task.get());
task->set_task_queue(_task_queue.get());
@@ -936,12 +936,12 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
std::static_pointer_cast<TaskExecutionContext>(_context.back()));
local_runtime_state->set_runtime_filter_mgr(_runtime_filter_mgrs[j].get());
_runtime_filter_mgrs[j]->_state->set_state(local_runtime_state.get());
- std::map<int,
std::pair<std::shared_ptr<LocalExchangeSharedState>,
- std::shared_ptr<Dependency>>>
- le_state_map;
+ std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
+
std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map;
auto task = std::make_unique<PipelineTask>(
_pipelines[i], task_id, local_runtime_state.get(),
_context.back().get(),
- _pipeline_profiles[_pipelines[i]->id()].get(),
le_state_map, j);
+ _pipeline_profiles[_pipelines[i]->id()].get(),
shared_state_map, j);
_pipelines[i]->incr_created_tasks(j, task.get());
local_runtime_state->set_task(task.get());
task->set_task_queue(_task_queue.get());
diff --git a/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp
b/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp
index af48faffcf4..63f10a69838 100644
--- a/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp
@@ -76,9 +76,9 @@ TEST_F(RuntimeFilterProducerHelperTest, basic) {
column->insert(2);
block.insert({std::move(column),
std::make_shared<vectorized::DataTypeInt32>(), "col1"});
- vectorized::SharedHashTableContextPtr shared_hash_table_ctx;
+ std::map<int, std::shared_ptr<RuntimeFilterWrapper>> runtime_filters;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
- helper.process(_runtime_states[0].get(), &block,
shared_hash_table_ctx));
+ helper.process(_runtime_states[0].get(), &block, false,
runtime_filters));
}
TEST_F(RuntimeFilterProducerHelperTest, wake_up_eraly) {
@@ -101,10 +101,10 @@ TEST_F(RuntimeFilterProducerHelperTest, wake_up_eraly) {
column->insert(2);
block.insert({std::move(column),
std::make_shared<vectorized::DataTypeInt32>(), "col1"});
- vectorized::SharedHashTableContextPtr shared_hash_table_ctx;
_tasks[0]->set_wake_up_early();
+ std::map<int, std::shared_ptr<RuntimeFilterWrapper>> runtime_filters;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
- helper.process(_runtime_states[0].get(), &block,
shared_hash_table_ctx));
+ helper.process(_runtime_states[0].get(), &block, false,
runtime_filters));
}
TEST_F(RuntimeFilterProducerHelperTest, skip_process) {
@@ -132,9 +132,9 @@ TEST_F(RuntimeFilterProducerHelperTest, skip_process) {
column->insert(2);
block.insert({std::move(column),
std::make_shared<vectorized::DataTypeInt32>(), "col1"});
- vectorized::SharedHashTableContextPtr shared_hash_table_ctx;
+ std::map<int, std::shared_ptr<RuntimeFilterWrapper>> runtime_filters;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
- helper.process(_runtime_states[0].get(), &block,
shared_hash_table_ctx));
+ helper.process(_runtime_states[0].get(), &block, false,
runtime_filters));
}
TEST_F(RuntimeFilterProducerHelperTest, broadcast) {
@@ -156,16 +156,15 @@ TEST_F(RuntimeFilterProducerHelperTest, broadcast) {
column->insert(2);
block.insert({std::move(column),
std::make_shared<vectorized::DataTypeInt32>(), "col1"});
- vectorized::SharedHashTableContextPtr shared_hash_table_ctx =
- std::make_shared<vectorized::SharedHashTableContext>();
+ std::map<int, std::shared_ptr<RuntimeFilterWrapper>> runtime_filters;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
- helper.process(_runtime_states[0].get(), &block,
shared_hash_table_ctx));
+ helper.process(_runtime_states[0].get(), &block, true,
runtime_filters));
auto helper2 = RuntimeFilterProducerHelper(&_profile, false, true);
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
helper2.init(_runtime_states[1].get(), build_expr_ctxs,
runtime_filter_descs));
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
- helper2.process(_runtime_states[1].get(), &block,
shared_hash_table_ctx));
+ helper2.process(_runtime_states[1].get(), &block, true,
runtime_filters));
}
} // namespace doris
diff --git a/be/test/vec/exec/vfile_scanner_exception_test.cpp
b/be/test/vec/exec/vfile_scanner_exception_test.cpp
index 0a0dee944da..8fa37c26278 100644
--- a/be/test/vec/exec/vfile_scanner_exception_test.cpp
+++ b/be/test/vec/exec/vfile_scanner_exception_test.cpp
@@ -252,10 +252,11 @@ void VfileScannerExceptionTest::init() {
auto local_state =
pipeline::FileScanLocalState::create_unique(&_runtime_state,
_scan_node.get());
std::vector<TScanRangeParams> scan_ranges;
- std::map<int,
std::pair<std::shared_ptr<pipeline::LocalExchangeSharedState>,
- std::shared_ptr<pipeline::Dependency>>>
- le_state_map;
- pipeline::LocalStateInfo info {&_global_profile, scan_ranges, nullptr,
le_state_map, 0};
+ pipeline::LocalStateInfo info {.parent_profile = &_global_profile,
+ .scan_ranges = scan_ranges,
+ .shared_state = nullptr,
+ .shared_state_map = {},
+ .task_idx = 0};
WARN_IF_ERROR(local_state->init(&_runtime_state, info), "fail to init
local_state");
_runtime_state.emplace_local_state(_scan_node->operator_id(),
std::move(local_state));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]