This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 276625d17d4 [pipelineX](refactor) refine relationship between shared
state and dependency (#30294)
276625d17d4 is described below
commit 276625d17d4b5e189250d15b8b8865b0a19819b8
Author: Gabriel <[email protected]>
AuthorDate: Wed Jan 24 17:00:48 2024 +0800
[pipelineX](refactor) refine relationship between shared state and
dependency (#30294)
---
be/src/pipeline/exec/aggregation_sink_operator.h | 14 ++--
be/src/pipeline/exec/aggregation_source_operator.h | 2 +-
be/src/pipeline/exec/analytic_sink_operator.h | 4 -
be/src/pipeline/exec/analytic_source_operator.h | 4 +-
.../pipeline/exec/multi_cast_data_stream_sink.cpp | 11 ---
be/src/pipeline/exec/multi_cast_data_stream_sink.h | 3 +-
.../exec/multi_cast_data_stream_source.cpp | 7 --
.../pipeline/exec/multi_cast_data_stream_source.h | 1 -
be/src/pipeline/exec/multi_cast_data_streamer.cpp | 32 ++-----
be/src/pipeline/exec/multi_cast_data_streamer.h | 13 +--
be/src/pipeline/exec/set_probe_sink_operator.h | 2 +-
be/src/pipeline/exec/set_sink_operator.h | 2 +-
be/src/pipeline/exec/set_source_operator.cpp | 9 +-
be/src/pipeline/exec/union_source_operator.cpp | 28 ++-----
be/src/pipeline/exec/union_source_operator.h | 1 -
be/src/pipeline/pipeline_x/dependency.cpp | 6 +-
be/src/pipeline/pipeline_x/dependency.h | 97 +++++-----------------
.../local_exchange_source_operator.cpp | 14 +---
.../local_exchange_source_operator.h | 1 -
be/src/pipeline/pipeline_x/operator.cpp | 76 +++++++++++------
be/src/pipeline/pipeline_x/operator.h | 23 +++--
.../pipeline_x/pipeline_x_fragment_context.cpp | 12 ++-
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 20 +++--
be/src/pipeline/pipeline_x/pipeline_x_task.h | 26 +++++-
24 files changed, 164 insertions(+), 244 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index bf41fe30536..a7b30d46117 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -53,9 +53,8 @@ public:
~AggSinkDependency() override = default;
void set_ready() override {
- std::shared_ptr<BasicSharedState> shared_state = _shared_state;
- if (shared_state && _is_streaming_agg_state(shared_state)) {
- if
(((SharedState*)shared_state.get())->data_queue->has_enough_space_to_push()) {
+ if (_shared_state && _is_streaming_agg_state(_shared_state)) {
+ if
(((SharedState*)_shared_state)->data_queue->has_enough_space_to_push()) {
Dependency::set_ready();
}
} else {
@@ -64,9 +63,8 @@ public:
}
void block() override {
- std::shared_ptr<BasicSharedState> shared_state = _shared_state;
- if (_is_streaming_agg_state(shared_state)) {
- if
(!((SharedState*)shared_state.get())->data_queue->has_enough_space_to_push()) {
+ if (_is_streaming_agg_state(_shared_state)) {
+ if
(!((SharedState*)_shared_state)->data_queue->has_enough_space_to_push()) {
Dependency::block();
}
} else {
@@ -75,8 +73,8 @@ public:
}
private:
- static bool _is_streaming_agg_state(const
std::shared_ptr<BasicSharedState>& shared_state) {
- return ((SharedState*)shared_state.get())->data_queue != nullptr;
+ static bool _is_streaming_agg_state(const BasicSharedState* shared_state) {
+ return ((SharedState*)shared_state)->data_queue != nullptr;
}
};
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h
b/be/src/pipeline/exec/aggregation_source_operator.h
index a7125c42ff6..c03aefcc04b 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -63,7 +63,7 @@ public:
private:
bool _is_streaming_agg_state() {
- return ((SharedState*)Dependency::_shared_state.get())->data_queue !=
nullptr;
+ return ((SharedState*)Dependency::_shared_state)->data_queue !=
nullptr;
}
};
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h
b/be/src/pipeline/exec/analytic_sink_operator.h
index 0214b22c006..064d68f189a 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -63,10 +63,6 @@ public:
: PipelineXSinkLocalState<AnalyticSinkDependency>(parent, state) {}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
- Status close(RuntimeState* state, Status exec_status) override {
- _shared_state->release_sink_dep();
- return PipelineXSinkLocalState<AnalyticSinkDependency>::close(state,
exec_status);
- }
private:
friend class AnalyticSinkOperatorX;
diff --git a/be/src/pipeline/exec/analytic_source_operator.h
b/be/src/pipeline/exec/analytic_source_operator.h
index dc86bc95062..f4e2f10a719 100644
--- a/be/src/pipeline/exec/analytic_source_operator.h
+++ b/be/src/pipeline/exec/analytic_source_operator.h
@@ -84,9 +84,7 @@ private:
if (need_more_input) {
_dependency->block();
_dependency->set_ready_to_write();
- if (!_shared_state->sink_released_flag) {
- _shared_state->sink_dep->set_ready();
- }
+ _shared_state->sink_dep->set_ready();
} else {
_dependency->set_block_to_write();
_dependency->set_ready();
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
index 8a45634027f..6b5506be269 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
@@ -35,15 +35,4 @@ std::string MultiCastDataStreamSinkLocalState::name_suffix()
{
return id_name;
}
-Status MultiCastDataStreamSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info) {
- auto multi_cast_data_streamer =
static_cast<MultiCastDataStreamSinkOperatorX*>(_parent)
-
->create_multi_cast_data_streamer();
- auto& deps = info.dependencys;
- for (auto dep : deps) {
-
((MultiCastSinkDependency*)dep.get())->set_shared_state(multi_cast_data_streamer);
- }
- RETURN_IF_ERROR(Base::init(state, info));
- return Status::OK();
-}
-
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
index 965605fd3c7..84a75720a66 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
@@ -59,7 +59,6 @@ class MultiCastDataStreamSinkLocalState final
friend class DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;
using Base = PipelineXSinkLocalState<MultiCastSinkDependency>;
using Parent = MultiCastDataStreamSinkOperatorX;
- Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
std::string name_suffix() override;
private:
@@ -113,7 +112,7 @@ private:
friend class MultiCastDataStreamSinkLocalState;
ObjectPool* _pool;
RowDescriptor _row_desc;
- int _cast_sender_count;
+ const int _cast_sender_count;
const TMultiCastDataStreamSink& _sink;
friend class MultiCastDataStreamSinkLocalState;
};
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index d360e2eb5dd..6ac06ee5f10 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -152,13 +152,6 @@ Status
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
return Status::OK();
}
-Status MultiCastDataStreamSourceLocalState::close(RuntimeState* state) {
- _shared_state->multi_cast_data_streamer.released_dependency(
- _parent->cast<Parent>()._consumer_id);
- RETURN_IF_ERROR(Base::close(state));
- return Status::OK();
-}
-
Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block*
block,
SourceState&
source_state) {
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index 73c506f9cb8..5dae2be31f7 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -117,7 +117,6 @@ public:
RETURN_IF_ERROR(_acquire_runtime_filter());
return Status::OK();
}
- Status close(RuntimeState* state) override;
friend class MultiCastDataStreamerSourceOperatorX;
RuntimeFilterDependency* filterdependency() override { return
_filter_dependency.get(); }
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
index f3e44731aef..175a21469b8 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
@@ -104,37 +104,15 @@ void MultiCastDataStreamer::_set_ready_for_read(int
sender_idx) {
if (_dependencies.empty()) {
return;
}
- if (_dependencies_release_flag[sender_idx]) {
- return;
- }
- {
- std::unique_lock<std::mutex> lc(_release_lock);
- if (_dependencies_release_flag[sender_idx]) {
- return;
- }
- auto* dep = _dependencies[sender_idx];
- DCHECK(dep);
- dep->set_ready();
- }
+ auto* dep = _dependencies[sender_idx];
+ DCHECK(dep);
+ dep->set_ready();
}
void MultiCastDataStreamer::_set_ready_for_read() {
- size_t i = 0;
for (auto* dep : _dependencies) {
- if (_dependencies_release_flag[i]) {
- i++;
- continue;
- }
- {
- std::unique_lock<std::mutex> lc(_release_lock);
- if (_dependencies_release_flag[i]) {
- i++;
- continue;
- }
- DCHECK(dep);
- dep->set_ready();
- i++;
- }
+ DCHECK(dep);
+ dep->set_ready();
}
}
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h
b/be/src/pipeline/exec/multi_cast_data_streamer.h
index 7f221d622c0..5e4179e0cad 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.h
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.h
@@ -38,14 +38,10 @@ public:
bool with_dependencies = false)
: _row_desc(row_desc),
_profile(pool->add(new
RuntimeProfile("MultiCastDataStreamSink"))),
- _cast_sender_count(cast_sender_count),
- _dependencies_release_flag(cast_sender_count) {
+ _cast_sender_count(cast_sender_count) {
_sender_pos_to_read.resize(cast_sender_count,
_multi_cast_blocks.end());
if (with_dependencies) {
_dependencies.resize(cast_sender_count, nullptr);
- for (size_t i = 0; i < cast_sender_count; i++) {
- _dependencies_release_flag[i] = false;
- }
}
_peak_mem_usage = ADD_COUNTER(profile(), "PeakMemUsage", TUnit::BYTES);
@@ -83,11 +79,6 @@ public:
_block_reading(sender_idx);
}
- void released_dependency(int sender_idx) {
- std::unique_lock<std::mutex> lc(_release_lock);
- _dependencies_release_flag[sender_idx] = true;
- }
-
private:
void _set_ready_for_read(int sender_idx);
void _set_ready_for_read();
@@ -106,8 +97,6 @@ private:
RuntimeProfile::Counter* _process_rows = nullptr;
RuntimeProfile::Counter* _peak_mem_usage = nullptr;
- std::mutex _release_lock;
- std::vector<std::atomic<bool>> _dependencies_release_flag;
std::vector<MultiCastSourceDependency*> _dependencies;
};
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h
b/be/src/pipeline/exec/set_probe_sink_operator.h
index 6ac09ccfd9d..b8687376059 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -76,7 +76,7 @@ public:
void set_cur_child_id(int id) {
_child_idx = id;
-
((SetSharedState*)_shared_state.get())->probe_finished_children_dependency[id]
= this;
+
((SetSharedState*)_shared_state)->probe_finished_children_dependency[id] = this;
block();
}
diff --git a/be/src/pipeline/exec/set_sink_operator.h
b/be/src/pipeline/exec/set_sink_operator.h
index b851e18ce5b..946720cd179 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -68,7 +68,7 @@ public:
~SetSinkDependency() override = default;
void set_cur_child_id(int id) {
-
((SetSharedState*)_shared_state.get())->probe_finished_children_dependency[id]
= this;
+
((SetSharedState*)_shared_state)->probe_finished_children_dependency[id] = this;
}
};
diff --git a/be/src/pipeline/exec/set_source_operator.cpp
b/be/src/pipeline/exec/set_source_operator.cpp
index 03cd67477db..1e64bf8a50d 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -51,13 +51,14 @@ template class SetSourceOperator<false>;
template <bool is_intersect>
Status SetSourceLocalState<is_intersect>::init(RuntimeState* state,
LocalStateInfo& info) {
- std::shared_ptr<typename SetSourceDependency::SharedState> ss = nullptr;
+ RETURN_IF_ERROR(Base::init(state, info));
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_open_timer);
auto& deps = info.upstream_dependencies;
- ss.reset(new typename SetSourceDependency::SharedState(deps.size()));
+ _shared_state->probe_finished_children_dependency.resize(deps.size(),
nullptr);
for (auto& dep : deps) {
- ((SetSourceDependency*)dep.get())->set_shared_state(ss);
+ dep->set_shared_state(_dependency->shared_state());
}
- RETURN_IF_ERROR(Base::init(state, info));
return Status::OK();
}
diff --git a/be/src/pipeline/exec/union_source_operator.cpp
b/be/src/pipeline/exec/union_source_operator.cpp
index e8ef1ba7207..709e89368a8 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -105,28 +105,19 @@ Status UnionSourceOperator::get_block(RuntimeState*
state, vectorized::Block* bl
}
Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
+ RETURN_IF_ERROR(Base::init(state, info));
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<Parent>();
int child_count = p.get_child_count();
- auto ss = create_shared_state();
if (child_count != 0) {
auto& deps = info.upstream_dependencies;
for (auto& dep : deps) {
- ((UnionSinkDependency*)dep.get())->set_shared_state(ss);
+ dep->set_shared_state(_dependency->shared_state());
}
- } else {
- auto& deps = info.upstream_dependencies;
- DCHECK(child_count == 0);
- DCHECK(deps.size() == 1);
- DCHECK(deps.front() == nullptr);
- //child_count == 0 , we need to creat a UnionDependency
- deps.front() = std::make_shared<UnionSourceDependency>(
- _parent->operator_id(), _parent->node_id(),
state->get_query_ctx());
- ((UnionSourceDependency*)deps.front().get())->set_shared_state(ss);
}
- RETURN_IF_ERROR(Base::init(state, info));
- ss->data_queue.set_source_dependency(info.dependency);
- SCOPED_TIMER(exec_time_counter());
- SCOPED_TIMER(_open_timer);
+ ((UnionSharedState*)_dependency->shared_state())
+ ->data_queue.set_source_dependency(info.dependency);
// Const exprs materialized by this node. These exprs don't refer to any
children.
// Only materialized by the first fragment instance to avoid duplication.
if (state->per_fragment_instance_idx() == 0) {
@@ -151,13 +142,6 @@ Status UnionSourceLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
return Status::OK();
}
-std::shared_ptr<UnionSharedState> UnionSourceLocalState::create_shared_state()
{
- auto& p = _parent->cast<Parent>();
- std::shared_ptr<UnionSharedState> data_queue =
- std::make_shared<UnionSharedState>(p._child_size);
- return data_queue;
-}
-
std::string UnionSourceLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}",
Base::debug_string(indentation_level));
diff --git a/be/src/pipeline/exec/union_source_operator.h
b/be/src/pipeline/exec/union_source_operator.h
index 0c150a072b6..887c0cb9639 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -86,7 +86,6 @@ public:
UnionSourceLocalState(RuntimeState* state, OperatorXBase* parent) :
Base(state, parent) {};
Status init(RuntimeState* state, LocalStateInfo& info) override;
- std::shared_ptr<UnionSharedState> create_shared_state();
[[nodiscard]] std::string debug_string(int indentation_level = 0) const
override;
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp
b/be/src/pipeline/pipeline_x/dependency.cpp
index bb27a688820..9cb6e99b44f 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -189,13 +189,9 @@ void
LocalExchangeSharedState::sub_running_sink_operators() {
}
}
-LocalExchangeSharedState::LocalExchangeSharedState(int num_instances)
- : dependencies_release_flag(num_instances) {
+LocalExchangeSharedState::LocalExchangeSharedState(int num_instances) {
source_dependencies.resize(num_instances, nullptr);
mem_trackers.resize(num_instances, nullptr);
- for (size_t i = 0; i < num_instances; i++) {
- dependencies_release_flag[i] = false;
- }
}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 172f7383f3e..59f9fee3775 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -57,22 +57,22 @@ static constexpr auto TIME_UNIT_DEPENDENCY_LOG = 30 * 1000L
* 1000L * 1000L;
static_assert(TIME_UNIT_DEPENDENCY_LOG < SLOW_DEPENDENCY_THRESHOLD);
struct BasicSharedState {
- Dependency* source_dep = nullptr;
- Dependency* sink_dep = nullptr;
-
- std::atomic_bool source_released_flag {false};
- std::atomic_bool sink_released_flag {false};
- std::mutex source_release_lock;
- std::mutex sink_release_lock;
-
- void release_source_dep() {
- std::unique_lock<std::mutex> lc(source_release_lock);
- source_released_flag = true;
+ template <class TARGET>
+ TARGET* cast() {
+ DCHECK(dynamic_cast<TARGET*>(this))
+ << " Mismatch type! Current type is " << typeid(*this).name()
+ << " and expect type is" << typeid(TARGET).name();
+ return reinterpret_cast<TARGET*>(this);
}
- void release_sink_dep() {
- std::unique_lock<std::mutex> lc(sink_release_lock);
- sink_released_flag = true;
+ template <class TARGET>
+ const TARGET* cast() const {
+ DCHECK(dynamic_cast<const TARGET*>(this))
+ << " Mismatch type! Current type is " << typeid(*this).name()
+ << " and expect type is" << typeid(TARGET).name();
+ return reinterpret_cast<const TARGET*>(this);
}
+ DependencySPtr source_dep = nullptr;
+ DependencySPtr sink_dep = nullptr;
virtual ~BasicSharedState() = default;
};
@@ -99,11 +99,8 @@ public:
[[nodiscard]] int id() const { return _id; }
[[nodiscard]] virtual std::string name() const { return _name; }
void add_child(std::shared_ptr<Dependency> child) {
_children.push_back(child); }
- std::shared_ptr<BasicSharedState> shared_state() { return _shared_state; }
- void set_shared_state(std::shared_ptr<BasicSharedState> shared_state) {
- _shared_state = shared_state;
- }
- void clear_shared_state() { _shared_state.reset(); }
+ BasicSharedState* shared_state() { return _shared_state; }
+ void set_shared_state(BasicSharedState* shared_state) { _shared_state =
shared_state; }
virtual std::string debug_string(int indentation_level = 0);
// Start the watcher. We use it to count how long this dependency block
the current pipeline task.
@@ -121,47 +118,19 @@ public:
virtual void set_ready();
void set_ready_to_read() {
DCHECK(_is_write_dependency) << debug_string();
- if (_shared_state->source_released_flag) {
- return;
- }
- std::unique_lock<std::mutex> lc(_shared_state->source_release_lock);
- if (_shared_state->source_released_flag) {
- return;
- }
DCHECK(_shared_state->source_dep != nullptr) << debug_string();
_shared_state->source_dep->set_ready();
}
void set_block_to_read() {
DCHECK(_is_write_dependency) << debug_string();
- if (_shared_state->source_released_flag) {
- return;
- }
- std::unique_lock<std::mutex> lc(_shared_state->source_release_lock);
- if (_shared_state->source_released_flag) {
- return;
- }
DCHECK(_shared_state->source_dep != nullptr) << debug_string();
_shared_state->source_dep->block();
}
void set_ready_to_write() {
- if (_shared_state->sink_released_flag) {
- return;
- }
- std::unique_lock<std::mutex> lc(_shared_state->sink_release_lock);
- if (_shared_state->sink_released_flag) {
- return;
- }
DCHECK(_shared_state->sink_dep != nullptr) << debug_string();
_shared_state->sink_dep->set_ready();
}
void set_block_to_write() {
- if (_shared_state->sink_released_flag) {
- return;
- }
- std::unique_lock<std::mutex> lc(_shared_state->sink_release_lock);
- if (_shared_state->sink_released_flag) {
- return;
- }
DCHECK(_shared_state->sink_dep != nullptr) << debug_string();
_shared_state->sink_dep->block();
}
@@ -180,7 +149,7 @@ protected:
std::atomic<bool> _ready;
const QueryContext* _query_ctx = nullptr;
- std::shared_ptr<BasicSharedState> _shared_state = nullptr;
+ BasicSharedState* _shared_state = nullptr;
MonotonicStopWatch _watcher;
std::list<std::shared_ptr<Dependency>> _children;
@@ -524,7 +493,6 @@ public:
struct SetSharedState : public BasicSharedState {
public:
- SetSharedState(int num_deps) {
probe_finished_children_dependency.resize(num_deps, nullptr); }
/// default init
vectorized::Block build_block; // build to source
//record element size in hashtable
@@ -666,45 +634,24 @@ public:
ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
LocalExchangeSharedState(int num_instances);
std::unique_ptr<Exchanger> exchanger {};
- std::vector<Dependency*> source_dependencies;
- std::vector<std::atomic_bool> dependencies_release_flag;
- Dependency* sink_dependency;
+ std::vector<DependencySPtr> source_dependencies;
+ DependencySPtr sink_dependency;
std::vector<MemTracker*> mem_trackers;
std::atomic<size_t> mem_usage = 0;
std::mutex le_lock;
void sub_running_sink_operators();
void _set_ready_for_read() {
- size_t i = 0;
for (auto& dep : source_dependencies) {
- if (dependencies_release_flag[i]) {
- i++;
- continue;
- }
- {
- std::unique_lock<std::mutex> lc(source_release_lock);
- if (dependencies_release_flag[i]) {
- i++;
- continue;
- }
- DCHECK(dep);
- dep->set_ready();
- i++;
- }
+ DCHECK(dep);
+ dep->set_ready();
}
}
- void set_dep_by_channel_id(Dependency* dep, int channel_id) {
+ void set_dep_by_channel_id(DependencySPtr dep, int channel_id) {
source_dependencies[channel_id] = dep;
}
void set_ready_to_read(int channel_id) {
- if (dependencies_release_flag[channel_id]) {
- return;
- }
- std::unique_lock<std::mutex> lc(source_release_lock);
- if (dependencies_release_flag[channel_id]) {
- return;
- }
auto& dep = source_dependencies[channel_id];
DCHECK(dep) << channel_id;
dep->set_ready();
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
index 9e98e3b6e8f..edfb8114811 100644
---
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
+++
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
@@ -22,11 +22,11 @@
namespace doris::pipeline {
void LocalExchangeSourceDependency::block() {
- if
(((LocalExchangeSharedState*)_shared_state.get())->exchanger->_running_sink_operators
== 0) {
+ if
(((LocalExchangeSharedState*)_shared_state)->exchanger->_running_sink_operators
== 0) {
return;
}
- std::unique_lock<std::mutex>
lc(((LocalExchangeSharedState*)_shared_state.get())->le_lock);
- if
(((LocalExchangeSharedState*)_shared_state.get())->exchanger->_running_sink_operators
== 0) {
+ std::unique_lock<std::mutex>
lc(((LocalExchangeSharedState*)_shared_state)->le_lock);
+ if
(((LocalExchangeSharedState*)_shared_state)->exchanger->_running_sink_operators
== 0) {
return;
}
Dependency::block();
@@ -37,7 +37,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState*
state, LocalStateInfo&
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
_channel_id = info.task_idx;
- _shared_state->set_dep_by_channel_id(_dependency, _channel_id);
+ _shared_state->set_dep_by_channel_id(info.dependency, _channel_id);
_shared_state->mem_trackers[_channel_id] = _mem_tracker.get();
_exchanger = _shared_state->exchanger.get();
DCHECK(_exchanger != nullptr);
@@ -61,12 +61,6 @@ std::string LocalExchangeSourceLocalState::debug_string(int
indentation_level) c
return fmt::to_string(debug_string_buffer);
}
-Status LocalExchangeSourceLocalState::close(RuntimeState* state) {
- _shared_state->dependencies_release_flag[_channel_id] = true;
- RETURN_IF_ERROR(Base::close(state));
- return Status::OK();
-}
-
Status LocalExchangeSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* block,
SourceState& source_state) {
auto& local_state = get_local_state(state);
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
index 4c95a84b533..63d71bbe08b 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
@@ -48,7 +48,6 @@ public:
Status init(RuntimeState* state, LocalStateInfo& info) override;
std::string debug_string(int indentation_level) const override;
- Status close(RuntimeState* state) override;
private:
friend class LocalExchangeSourceOperatorX;
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index 21453dfbc3c..db687865657 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -280,18 +280,32 @@ template <>
inline constexpr bool NeedToCreate<LocalExchangeSharedState> = false;
template <typename LocalStateType>
-void DataSinkOperatorX<LocalStateType>::get_dependency(vector<DependencySPtr>&
dependency,
- QueryContext* ctx) {
- std::shared_ptr<typename LocalStateType::DependencyType::SharedState> ss =
nullptr;
+void DataSinkOperatorX<LocalStateType>::get_dependency(
+ vector<DependencySPtr>& dependency,
+ std::map<int, std::shared_ptr<BasicSharedState>>& shared_states,
QueryContext* ctx) {
+ std::shared_ptr<BasicSharedState> ss = nullptr;
if constexpr (NeedToCreate<typename
LocalStateType::DependencyType::SharedState>) {
ss.reset(new typename LocalStateType::DependencyType::SharedState());
+ DCHECK(!shared_states.contains(dests_id().front()));
+ if constexpr (!std::is_same_v<typename
LocalStateType::DependencyType::SharedState,
+ FakeSharedState>) {
+ shared_states.insert({dests_id().front(), ss});
+ }
+ } else if constexpr (std::is_same_v<typename
LocalStateType::DependencyType::SharedState,
+ MultiCastSharedState>) {
+ ss =
((MultiCastDataStreamSinkOperatorX*)this)->create_multi_cast_data_streamer();
+ auto& dests = dests_id();
+ for (auto& dest_id : dests) {
+ DCHECK(!shared_states.contains(dest_id));
+ shared_states.insert({dest_id, ss});
+ }
}
if constexpr (!std::is_same_v<typename LocalStateType::DependencyType,
FakeDependency>) {
auto& dests = dests_id();
for (auto& dest_id : dests) {
dependency.push_back(std::make_shared<typename
LocalStateType::DependencyType>(
dest_id, _node_id, ctx));
- dependency.back()->set_shared_state(ss);
+ dependency.back()->set_shared_state(ss.get());
}
} else {
dependency.push_back(nullptr);
@@ -299,8 +313,23 @@ void
DataSinkOperatorX<LocalStateType>::get_dependency(vector<DependencySPtr>& d
}
template <typename LocalStateType>
-DependencySPtr OperatorX<LocalStateType>::get_dependency(QueryContext* ctx) {
- return std::make_shared<typename
LocalStateType::DependencyType>(_operator_id, _node_id, ctx);
+DependencySPtr OperatorX<LocalStateType>::get_dependency(
+ QueryContext* ctx, std::map<int, std::shared_ptr<BasicSharedState>>&
shared_states) {
+ std::shared_ptr<BasicSharedState> ss = nullptr;
+ if constexpr (std::is_same_v<typename
LocalStateType::DependencyType::SharedState,
+ SetSharedState>) {
+ ss.reset(new typename LocalStateType::DependencyType::SharedState());
+ shared_states.insert({operator_id(), ss});
+ } else if constexpr (std::is_same_v<typename
LocalStateType::DependencyType::SharedState,
+ UnionSharedState>) {
+ ss.reset(new typename LocalStateType::DependencyType::SharedState(
+ ((UnionSourceOperatorX*)this)->get_child_count()));
+ shared_states.insert({operator_id(), ss});
+ }
+ auto dep =
+ std::make_shared<typename
LocalStateType::DependencyType>(_operator_id, _node_id, ctx);
+ dep->set_shared_state(ss.get());
+ return dep;
}
template <typename LocalStateType>
@@ -338,19 +367,20 @@ Status
PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
_runtime_profile, "WaitForDependency[" + _dependency->name() +
"]Time", 1);
auto& deps = info.upstream_dependencies;
if constexpr (std::is_same_v<LocalExchangeSourceDependency,
DependencyType>) {
-
_dependency->set_shared_state(info.le_state_map[_parent->operator_id()].first);
- _shared_state =
- (typename
DependencyType::SharedState*)_dependency->shared_state().get();
+
_dependency->set_shared_state(info.le_state_map[_parent->operator_id()].first.get());
+ _shared_state = _dependency->shared_state()
+ ->template cast<typename
DependencyType::SharedState>();
- _shared_state->source_dep = info.dependency.get();
- _shared_state->sink_dep = deps.front().get();
+ _shared_state->source_dep = info.dependency;
} else if constexpr (!is_fake_shared) {
- _dependency->set_shared_state(deps.front()->shared_state());
- _shared_state =
- (typename
DependencyType::SharedState*)_dependency->shared_state().get();
+ _dependency->set_shared_state(info.shared_state);
+ _shared_state = _dependency->shared_state()
+ ->template cast<typename
DependencyType::SharedState>();
- _shared_state->source_dep = info.dependency.get();
- _shared_state->sink_dep = deps.front().get();
+ _shared_state->source_dep = info.dependency;
+ if (!deps.empty()) {
+ _shared_state->sink_dep = deps.front();
+ }
}
}
@@ -382,12 +412,8 @@ Status
PipelineXLocalState<DependencyType>::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
- if (_shared_state) {
- _shared_state->release_source_dep();
- }
if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
COUNTER_SET(_wait_for_dependency_timer,
_dependency->watcher_elapse_time());
- _dependency->clear_shared_state();
}
if (_rows_returned_counter != nullptr) {
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
@@ -410,22 +436,21 @@ Status
PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
constexpr auto is_fake_shared =
std::is_same_v<typename DependencyType::SharedState,
FakeSharedState>;
if constexpr (!std::is_same_v<FakeDependency, DependencyType>) {
- auto& deps = info.dependencys;
+ auto& deps = info.dependencies;
_dependency = (DependencyType*)deps.front().get();
if constexpr (std::is_same_v<LocalExchangeSinkDependency,
DependencyType>) {
_dependency =
info.le_state_map[_parent->dests_id().front()].second.get();
}
if (_dependency) {
if constexpr (!is_fake_shared) {
- _shared_state =
- (typename
DependencyType::SharedState*)_dependency->shared_state().get();
+ _shared_state = (typename
DependencyType::SharedState*)_dependency->shared_state();
}
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_profile, "WaitForDependency[" + _dependency->name() +
"]Time", 1);
}
} else {
- auto& deps = info.dependencys;
+ auto& deps = info.dependencies;
deps.front() = std::make_shared<FakeDependency>(0, 0,
state->get_query_ctx());
_dependency = (DependencyType*)deps.front().get();
}
@@ -448,9 +473,6 @@ Status
PipelineXSinkLocalState<DependencyType>::close(RuntimeState* state, Statu
}
if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
COUNTER_SET(_wait_for_dependency_timer,
_dependency->watcher_elapse_time());
- if constexpr (!std::is_same_v<LocalExchangeSinkDependency,
DependencyType>) {
- _dependency->clear_shared_state();
- }
}
if (_peak_memory_usage_counter) {
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index e40c7849c09..1c076bd5a69 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -34,6 +34,7 @@ struct LocalStateInfo {
RuntimeProfile* parent_profile = nullptr;
const std::vector<TScanRangeParams> scan_ranges;
std::vector<DependencySPtr>& upstream_dependencies;
+ BasicSharedState* shared_state;
std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
std::shared_ptr<LocalExchangeSinkDependency>>>
le_state_map;
@@ -47,7 +48,7 @@ struct LocalSinkStateInfo {
const int task_idx;
RuntimeProfile* parent_profile = nullptr;
const int sender_id;
- std::vector<DependencySPtr>& dependencys;
+ std::vector<DependencySPtr>& dependencies;
std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
std::shared_ptr<LocalExchangeSinkDependency>>>
le_state_map;
@@ -187,7 +188,8 @@ public:
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, _op_name);
}
[[nodiscard]] std::string get_name() const override { return _op_name; }
- [[nodiscard]] virtual DependencySPtr get_dependency(QueryContext* ctx) = 0;
+ [[nodiscard]] virtual DependencySPtr get_dependency(
+ QueryContext* ctx, std::map<int,
std::shared_ptr<BasicSharedState>>& shared_states) = 0;
[[nodiscard]] virtual DataDistribution required_data_distribution() const {
return _child_x && _child_x->ignore_data_distribution() && !is_source()
? DataDistribution(ExchangeType::PASSTHROUGH)
@@ -340,7 +342,9 @@ public:
return state->get_local_state(operator_id())->template
cast<LocalState>();
}
- DependencySPtr get_dependency(QueryContext* ctx) override;
+ DependencySPtr get_dependency(
+ QueryContext* ctx,
+ std::map<int, std::shared_ptr<BasicSharedState>>& shared_states)
override;
};
template <typename DependencyArg = FakeDependency>
@@ -448,10 +452,7 @@ protected:
class DataSinkOperatorXBase : public OperatorBase {
public:
DataSinkOperatorXBase(const int operator_id, const int node_id)
- : OperatorBase(nullptr),
- _operator_id(operator_id),
- _node_id(node_id),
- _dests_id({operator_id}) {}
+ : OperatorBase(nullptr), _operator_id(operator_id),
_node_id(node_id), _dests_id({1}) {}
DataSinkOperatorXBase(const int operator_id, const int node_id, const int
dest_id)
: OperatorBase(nullptr),
@@ -498,7 +499,9 @@ public:
return reinterpret_cast<const TARGET&>(*this);
}
- virtual void get_dependency(std::vector<DependencySPtr>& dependency,
QueryContext* ctx) = 0;
+ virtual void get_dependency(std::vector<DependencySPtr>& dependency,
+ std::map<int,
std::shared_ptr<BasicSharedState>>& shared_states,
+ QueryContext* ctx) = 0;
[[nodiscard]] virtual DataDistribution required_data_distribution() const {
return _child_x && _child_x->ignore_data_distribution()
? DataDistribution(ExchangeType::PASSTHROUGH)
@@ -595,7 +598,9 @@ public:
~DataSinkOperatorX() override = default;
Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info)
override;
- void get_dependency(std::vector<DependencySPtr>& dependency, QueryContext*
ctx) override;
+ void get_dependency(std::vector<DependencySPtr>& dependency,
+ std::map<int, std::shared_ptr<BasicSharedState>>&
shared_states,
+ QueryContext* ctx) override;
using LocalState = LocalStateType;
[[nodiscard]] LocalState& get_local_state(RuntimeState* state) const {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index aa580b49d48..de0a544f18c 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -618,10 +618,16 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
for (auto& dep : deps) {
if (pipeline_id_to_task.contains(dep)) {
task->add_upstream_dependency(
-
pipeline_id_to_task[dep]->get_downstream_dependency());
+
pipeline_id_to_task[dep]->get_downstream_dependency(),
+
pipeline_id_to_task[dep]->get_shared_states());
}
}
}
+ }
+ }
+ for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
+ if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
+ auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
RETURN_IF_ERROR(prepare_and_set_parent_profile(task, pip_idx));
}
}
@@ -773,8 +779,8 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
}
auto sink_dep = std::make_shared<LocalExchangeSinkDependency>(sink_id,
local_exchange_id,
_runtime_state->get_query_ctx());
- sink_dep->set_shared_state(shared_state);
- shared_state->sink_dependency = sink_dep.get();
+ sink_dep->set_shared_state(shared_state.get());
+ shared_state->sink_dependency = sink_dep;
_op_id_to_le_state.insert({local_exchange_id, {shared_state, sink_dep}});
// 3. Set two pipelines' operator list. For example, split pipeline [Scan
- AggSink] to
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index fece30296c9..9a88c417be0 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -60,9 +60,10 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t
task_id, RuntimeSta
_task_idx(task_idx),
_execution_dep(state->get_query_ctx()->get_execution_dependency()) {
_pipeline_task_watcher.start();
- _sink->get_dependency(_downstream_dependency, state->get_query_ctx());
+ _sink->get_dependency(_downstream_dependency, _shared_states,
state->get_query_ctx());
for (auto& op : _operators) {
- _source_dependency.insert({op->operator_id(),
op->get_dependency(state->get_query_ctx())});
+ _source_dependency.insert(
+ {op->operator_id(), op->get_dependency(state->get_query_ctx(),
_shared_states)});
}
pipeline->incr_created_tasks();
}
@@ -94,8 +95,13 @@ Status PipelineXTask::prepare(const TPipelineInstanceParams&
local_params, const
for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
auto& op = _operators[op_idx];
auto& deps = get_upstream_dependency(op->operator_id());
- LocalStateInfo info {parent_profile, scan_ranges, deps,
- _le_state_map, _task_idx,
_source_dependency[op->operator_id()]};
+ LocalStateInfo info {parent_profile,
+ scan_ranges,
+ deps,
+ get_shared_state(op->operator_id()),
+ _le_state_map,
+ _task_idx,
+ _source_dependency[op->operator_id()]};
RETURN_IF_ERROR(op->setup_local_state(_state, info));
parent_profile = _state->get_local_state(op->operator_id())->profile();
query_ctx->register_query_statistics(
@@ -297,9 +303,9 @@ void PipelineXTask::finalize() {
std::unique_lock<std::mutex> lc(_release_lock);
_finished = true;
std::vector<DependencySPtr> {}.swap(_downstream_dependency);
- DependencyMap {}.swap(_upstream_dependency);
- std::map<int, DependencySPtr> {}.swap(_source_dependency);
-
+ _upstream_dependency.clear();
+ _source_dependency.clear();
+ _shared_states.clear();
_le_state_map.clear();
}
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 96069cbbea2..a558dbeb40e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -95,8 +95,10 @@ public:
bool is_pending_finish() override { return _finish_blocked_dependency() !=
nullptr; }
std::vector<DependencySPtr>& get_downstream_dependency() { return
_downstream_dependency; }
+ std::map<int, std::shared_ptr<BasicSharedState>>& get_shared_states() {
return _shared_states; }
- void add_upstream_dependency(std::vector<DependencySPtr>&
multi_upstream_dependency) {
+ void add_upstream_dependency(std::vector<DependencySPtr>&
multi_upstream_dependency,
+ std::map<int,
std::shared_ptr<BasicSharedState>>& shared_states) {
for (auto dep : multi_upstream_dependency) {
int dst_id = dep->id();
if (!_upstream_dependency.contains(dst_id)) {
@@ -104,16 +106,31 @@ public:
} else {
_upstream_dependency[dst_id].push_back(dep);
}
+
+ if (shared_states.contains(dst_id) &&
!_shared_states.contains(dst_id)) {
+ // Shared state is created by upstream task's sink operator
and shared by source operator of this task.
+ _shared_states.insert({dst_id, shared_states[dst_id]});
+ } else if (_shared_states.contains(dst_id) &&
!shared_states.contains(dst_id)) {
+ // Shared state is created by this task's source operator and
shared by upstream task's sink operator.
+ shared_states.insert({dst_id, _shared_states[dst_id]});
+ }
}
}
std::vector<DependencySPtr>& get_upstream_dependency(int id) {
if (_upstream_dependency.find(id) == _upstream_dependency.end()) {
- _upstream_dependency.insert({id, {DependencySPtr {}}});
+ _upstream_dependency.insert({id, {}});
}
return _upstream_dependency[id];
}
+ BasicSharedState* get_shared_state(int id) {
+ if (!_shared_states.contains(id)) {
+ return nullptr;
+ }
+ return _shared_states[id].get();
+ }
+
bool is_pipelineX() const override { return true; }
void wake_up();
@@ -190,9 +207,14 @@ private:
std::vector<Dependency*> _finish_dependencies;
RuntimeFilterDependency* _filter_dependency;
+ // Write dependencies of upstream pipeline tasks.
DependencyMap _upstream_dependency;
+ // Read dependencies of this pipeline task.
std::map<int, DependencySPtr> _source_dependency;
+ // Write dependencies of this pipeline tasks.
std::vector<DependencySPtr> _downstream_dependency;
+ // All shared states of this pipeline task.
+ std::map<int, std::shared_ptr<BasicSharedState>> _shared_states;
std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
std::shared_ptr<LocalExchangeSinkDependency>>>
_le_state_map;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]