This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ca7dbc36360 [refactor](pipelineX) refine union dependency (#27348)
ca7dbc36360 is described below
commit ca7dbc363603571d0bb35ce2c5fc5e306af89167
Author: Mryange <[email protected]>
AuthorDate: Thu Nov 23 16:28:32 2023 +0800
[refactor](pipelineX) refine union dependency (#27348)
---
be/src/pipeline/exec/aggregation_sink_operator.cpp | 8 ++++--
be/src/pipeline/exec/aggregation_sink_operator.h | 3 ++-
.../pipeline/exec/aggregation_source_operator.cpp | 3 +--
be/src/pipeline/exec/data_queue.cpp | 31 ++++++++++++----------
be/src/pipeline/exec/data_queue.h | 12 ++++++---
...istinct_streaming_aggregation_sink_operator.cpp | 3 ++-
.../exec/streaming_aggregation_sink_operator.cpp | 2 +-
be/src/pipeline/exec/union_sink_operator.cpp | 1 +
be/src/pipeline/exec/union_source_operator.cpp | 5 +++-
be/src/pipeline/exec/union_source_operator.h | 13 ---------
10 files changed, 43 insertions(+), 38 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index dfde3a02998..c2bb041abb2 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -82,6 +82,9 @@ Status AggSinkLocalState<DependencyType,
Derived>::init(RuntimeState* state,
Base::_shared_state->aggregate_evaluators.back()->set_timer(_exec_timer,
_merge_timer,
_expr_timer);
}
+ if (p._is_streaming) {
+
Base::_shared_state->data_queue->set_sink_dependency(Base::_dependency, 0);
+ }
Base::_shared_state->probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
for (size_t i = 0; i < Base::_shared_state->probe_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(
@@ -717,7 +720,7 @@ Status AggSinkLocalState<DependencyType,
Derived>::try_spill_disk(bool eos) {
template <typename LocalStateType>
AggSinkOperatorX<LocalStateType>::AggSinkOperatorX(ObjectPool* pool, int
operator_id,
const TPlanNode& tnode,
- const DescriptorTbl& descs)
+ const DescriptorTbl& descs,
bool is_streaming)
: DataSinkOperatorX<LocalStateType>(operator_id, tnode.node_id),
_intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
_intermediate_tuple_desc(nullptr),
@@ -727,7 +730,8 @@
AggSinkOperatorX<LocalStateType>::AggSinkOperatorX(ObjectPool* pool, int operato
_is_merge(false),
_pool(pool),
_limit(tnode.limit),
- _have_conjuncts(tnode.__isset.vconjunct &&
!tnode.vconjunct.nodes.empty()) {
+ _have_conjuncts(tnode.__isset.vconjunct &&
!tnode.vconjunct.nodes.empty()),
+ _is_streaming(is_streaming) {
_is_first_phase = tnode.agg_node.__isset.is_first_phase &&
tnode.agg_node.is_first_phase;
}
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 2d16df2be25..ec4bbe3bc70 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -351,7 +351,7 @@ template <typename LocalStateType =
BlockingAggSinkLocalState>
class AggSinkOperatorX : public DataSinkOperatorX<LocalStateType> {
public:
AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
- const DescriptorTbl& descs);
+ const DescriptorTbl& descs, bool is_streaming = false);
~AggSinkOperatorX() override = default;
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
@@ -404,6 +404,7 @@ protected:
size_t _spill_partition_count_bits;
int64_t _limit; // -1: no limit
bool _have_conjuncts;
+ const bool _is_streaming;
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index dc09e8a268c..6f0c071d391 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -51,8 +51,7 @@ Status AggLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
auto& p = _parent->template cast<AggSourceOperatorX>();
if (p._is_streaming) {
_shared_state->data_queue.reset(new DataQueue(1));
- _shared_state->data_queue->set_dependency(_dependency,
-
info.upstream_dependencies.front().get());
+ _shared_state->data_queue->set_source_dependency(_dependency);
}
if (p._without_key) {
if (p._needs_finalize) {
diff --git a/be/src/pipeline/exec/data_queue.cpp
b/be/src/pipeline/exec/data_queue.cpp
index 656db9cbe73..680589ce2b1 100644
--- a/be/src/pipeline/exec/data_queue.cpp
+++ b/be/src/pipeline/exec/data_queue.cpp
@@ -40,9 +40,7 @@ DataQueue::DataQueue(int child_count)
_is_canceled(child_count),
_cur_bytes_in_queue(child_count),
_cur_blocks_nums_in_queue(child_count),
- _flag_queue_idx(0),
- _source_dependency(nullptr),
- _sink_dependency(nullptr) {
+ _flag_queue_idx(0) {
for (int i = 0; i < child_count; ++i) {
_queue_blocks_lock[i].reset(new std::mutex());
_free_blocks_lock[i].reset(new std::mutex());
@@ -51,6 +49,8 @@ DataQueue::DataQueue(int child_count)
_cur_bytes_in_queue[i] = 0;
_cur_blocks_nums_in_queue[i] = 0;
}
+ _un_finished_counter = child_count;
+ _sink_dependencies.resize(child_count, nullptr);
}
std::unique_ptr<vectorized::Block> DataQueue::get_free_block(int child_idx) {
@@ -118,11 +118,12 @@ Status
DataQueue::get_block_from_queue(std::unique_ptr<vectorized::Block>* outpu
}
_cur_bytes_in_queue[_flag_queue_idx] -=
(*output_block)->allocated_bytes();
_cur_blocks_nums_in_queue[_flag_queue_idx] -= 1;
- if (_sink_dependency) {
- if (!_is_finished[_flag_queue_idx]) {
+ auto old_value = _cur_blocks_total_nums.fetch_sub(1);
+ if (old_value == 1 && _source_dependency) {
+ if (!is_all_finish()) {
_source_dependency->block();
}
- _sink_dependency->set_ready();
+ _sink_dependencies[_flag_queue_idx]->set_ready();
}
} else {
if (_is_finished[_flag_queue_idx]) {
@@ -142,9 +143,10 @@ void
DataQueue::push_block(std::unique_ptr<vectorized::Block> block, int child_i
_cur_bytes_in_queue[child_idx] += block->allocated_bytes();
_queue_blocks[child_idx].emplace_back(std::move(block));
_cur_blocks_nums_in_queue[child_idx] += 1;
- if (_sink_dependency) {
+ _cur_blocks_total_nums++;
+ if (_source_dependency) {
_source_dependency->set_ready();
- _sink_dependency->block();
+ _sink_dependencies[child_idx]->block();
}
//this only use to record the queue[0] for profile
_max_bytes_in_queue = std::max(_max_bytes_in_queue,
_cur_bytes_in_queue[0].load());
@@ -154,10 +156,16 @@ void
DataQueue::push_block(std::unique_ptr<vectorized::Block> block, int child_i
void DataQueue::set_finish(int child_idx) {
std::lock_guard<std::mutex> l(*_queue_blocks_lock[child_idx]);
+ if (_is_finished[child_idx]) {
+ return;
+ }
_is_finished[child_idx] = true;
if (_source_dependency) {
_source_dependency->set_ready();
}
+ if (_un_finished_counter.fetch_sub(1) == 1) {
+ _is_all_finished = true;
+ }
}
void DataQueue::set_canceled(int child_idx) {
@@ -175,12 +183,7 @@ bool DataQueue::is_finish(int child_idx) {
}
bool DataQueue::is_all_finish() {
- for (int i = 0; i < _child_count; ++i) {
- if (_is_finished[i] == false) {
- return false;
- }
- }
- return true;
+ return _is_all_finished;
}
} // namespace pipeline
diff --git a/be/src/pipeline/exec/data_queue.h
b/be/src/pipeline/exec/data_queue.h
index f756ca7e621..d28fe5d8f01 100644
--- a/be/src/pipeline/exec/data_queue.h
+++ b/be/src/pipeline/exec/data_queue.h
@@ -60,9 +60,11 @@ public:
int64_t max_size_of_queue() const { return _max_size_of_queue; }
bool data_exhausted() const { return _data_exhausted; }
- void set_dependency(Dependency* source_dependency, Dependency*
sink_dependency) {
+ void set_source_dependency(Dependency* source_dependency) {
_source_dependency = source_dependency;
- _sink_dependency = sink_dependency;
+ }
+ void set_sink_dependency(Dependency* sink_dependency, int child_idx) {
+ _sink_dependencies[child_idx] = sink_dependency;
}
private:
@@ -80,10 +82,13 @@ private:
//how many deque will be init, always will be one
int _child_count = 0;
std::vector<std::atomic_bool> _is_finished;
+ std::atomic_uint32_t _un_finished_counter;
+ std::atomic_bool _is_all_finished = false;
std::vector<std::atomic_bool> _is_canceled;
// int64_t just for counter of profile
std::vector<std::atomic_int64_t> _cur_bytes_in_queue;
std::vector<std::atomic_uint32_t> _cur_blocks_nums_in_queue;
+ std::atomic_uint32_t _cur_blocks_total_nums = 0;
//this will be indicate which queue has data, it's useful when have many
queues
std::atomic_int _flag_queue_idx = 0;
@@ -95,8 +100,9 @@ private:
int64_t _max_size_of_queue = 0;
static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024l * 1024 * 1024 / 10;
+ // data queue is multi sink one source
Dependency* _source_dependency = nullptr;
- Dependency* _sink_dependency = nullptr;
+ std::vector<Dependency*> _sink_dependencies;
};
} // namespace pipeline
diff --git
a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
index 74cf655c366..da4968025af 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
@@ -183,7 +183,8 @@
DistinctStreamingAggSinkOperatorX::DistinctStreamingAggSinkOperatorX(ObjectPool*
int
operator_id,
const
TPlanNode& tnode,
const
DescriptorTbl& descs)
- : AggSinkOperatorX<DistinctStreamingAggSinkLocalState>(pool,
operator_id, tnode, descs) {}
+ : AggSinkOperatorX<DistinctStreamingAggSinkLocalState>(pool,
operator_id, tnode, descs,
+ true) {}
Status DistinctStreamingAggSinkOperatorX::init(const TPlanNode& tnode,
RuntimeState* state) {
RETURN_IF_ERROR(AggSinkOperatorX<DistinctStreamingAggSinkLocalState>::init(tnode,
state));
diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
index 17eb1b2db1c..2cc8b9efcaf 100644
--- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
@@ -341,7 +341,7 @@ Status
StreamingAggSinkLocalState::_pre_agg_with_serialized_key(
StreamingAggSinkOperatorX::StreamingAggSinkOperatorX(ObjectPool* pool, int
operator_id,
const TPlanNode& tnode,
const DescriptorTbl&
descs)
- : AggSinkOperatorX<StreamingAggSinkLocalState>(pool, operator_id,
tnode, descs) {}
+ : AggSinkOperatorX<StreamingAggSinkLocalState>(pool, operator_id,
tnode, descs, true) {}
Status StreamingAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState*
state) {
RETURN_IF_ERROR(AggSinkOperatorX<StreamingAggSinkLocalState>::init(tnode,
state));
diff --git a/be/src/pipeline/exec/union_sink_operator.cpp
b/be/src/pipeline/exec/union_sink_operator.cpp
index 2a235123bda..1ce3ef5c217 100644
--- a/be/src/pipeline/exec/union_sink_operator.cpp
+++ b/be/src/pipeline/exec/union_sink_operator.cpp
@@ -99,6 +99,7 @@ Status UnionSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info)
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<Parent>();
_child_expr.resize(p._child_expr.size());
+ _shared_state->data_queue.set_sink_dependency(_dependency,
p._cur_child_id);
for (size_t i = 0; i < p._child_expr.size(); i++) {
RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i]));
}
diff --git a/be/src/pipeline/exec/union_source_operator.cpp
b/be/src/pipeline/exec/union_source_operator.cpp
index d824f9db7a0..619b40f777b 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -120,7 +120,7 @@ Status UnionSourceLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
((UnionSourceDependency*)deps.front().get())->set_shared_state(ss);
}
RETURN_IF_ERROR(Base::init(state, info));
- ss->data_queue.set_dependency(_dependency,
info.upstream_dependencies.front().get());
+ ss->data_queue.set_source_dependency(_dependency);
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
// Const exprs materialized by this node. These exprs don't refer to any
children.
@@ -141,6 +141,9 @@ Status UnionSourceLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
RETURN_IF_ERROR(clone_expr_list(_const_expr_list,
other_expr_list));
}
}
+ if (child_count == 0) {
+ _dependency->set_ready();
+ }
return Status::OK();
}
diff --git a/be/src/pipeline/exec/union_source_operator.h
b/be/src/pipeline/exec/union_source_operator.h
index 8b7060884e9..c39ea3dbc43 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -75,19 +75,6 @@ public:
UnionSourceDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "UnionSourceDependency", query_ctx) {}
~UnionSourceDependency() override = default;
-
- [[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override {
- if (((UnionSharedState*)_shared_state.get())->child_count() == 0) {
- return nullptr;
- }
- if
(((UnionSharedState*)_shared_state.get())->data_queue.is_all_finish() ||
-
((UnionSharedState*)_shared_state.get())->data_queue.remaining_has_data()) {
- return nullptr;
- }
- return this;
- }
- bool push_to_blocking_queue() const override { return true; }
- void block() override {}
};
class UnionSourceOperatorX;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]