This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ca7dbc36360 [refactor](pipelineX) refine union dependency (#27348)
ca7dbc36360 is described below

commit ca7dbc363603571d0bb35ce2c5fc5e306af89167
Author: Mryange <[email protected]>
AuthorDate: Thu Nov 23 16:28:32 2023 +0800

    [refactor](pipelineX) refine union dependency (#27348)
---
 be/src/pipeline/exec/aggregation_sink_operator.cpp |  8 ++++--
 be/src/pipeline/exec/aggregation_sink_operator.h   |  3 ++-
 .../pipeline/exec/aggregation_source_operator.cpp  |  3 +--
 be/src/pipeline/exec/data_queue.cpp                | 31 ++++++++++++----------
 be/src/pipeline/exec/data_queue.h                  | 12 ++++++---
 ...istinct_streaming_aggregation_sink_operator.cpp |  3 ++-
 .../exec/streaming_aggregation_sink_operator.cpp   |  2 +-
 be/src/pipeline/exec/union_sink_operator.cpp       |  1 +
 be/src/pipeline/exec/union_source_operator.cpp     |  5 +++-
 be/src/pipeline/exec/union_source_operator.h       | 13 ---------
 10 files changed, 43 insertions(+), 38 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index dfde3a02998..c2bb041abb2 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -82,6 +82,9 @@ Status AggSinkLocalState<DependencyType, 
Derived>::init(RuntimeState* state,
         
Base::_shared_state->aggregate_evaluators.back()->set_timer(_exec_timer, 
_merge_timer,
                                                                     
_expr_timer);
     }
+    if (p._is_streaming) {
+        
Base::_shared_state->data_queue->set_sink_dependency(Base::_dependency, 0);
+    }
     Base::_shared_state->probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
     for (size_t i = 0; i < Base::_shared_state->probe_expr_ctxs.size(); i++) {
         RETURN_IF_ERROR(
@@ -717,7 +720,7 @@ Status AggSinkLocalState<DependencyType, 
Derived>::try_spill_disk(bool eos) {
 template <typename LocalStateType>
 AggSinkOperatorX<LocalStateType>::AggSinkOperatorX(ObjectPool* pool, int 
operator_id,
                                                    const TPlanNode& tnode,
-                                                   const DescriptorTbl& descs)
+                                                   const DescriptorTbl& descs, 
bool is_streaming)
         : DataSinkOperatorX<LocalStateType>(operator_id, tnode.node_id),
           _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
           _intermediate_tuple_desc(nullptr),
@@ -727,7 +730,8 @@ 
AggSinkOperatorX<LocalStateType>::AggSinkOperatorX(ObjectPool* pool, int operato
           _is_merge(false),
           _pool(pool),
           _limit(tnode.limit),
-          _have_conjuncts(tnode.__isset.vconjunct && 
!tnode.vconjunct.nodes.empty()) {
+          _have_conjuncts(tnode.__isset.vconjunct && 
!tnode.vconjunct.nodes.empty()),
+          _is_streaming(is_streaming) {
     _is_first_phase = tnode.agg_node.__isset.is_first_phase && 
tnode.agg_node.is_first_phase;
 }
 
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 2d16df2be25..ec4bbe3bc70 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -351,7 +351,7 @@ template <typename LocalStateType = 
BlockingAggSinkLocalState>
 class AggSinkOperatorX : public DataSinkOperatorX<LocalStateType> {
 public:
     AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
-                     const DescriptorTbl& descs);
+                     const DescriptorTbl& descs, bool is_streaming = false);
     ~AggSinkOperatorX() override = default;
     Status init(const TDataSink& tsink) override {
         return Status::InternalError("{} should not init with TPlanNode",
@@ -404,6 +404,7 @@ protected:
     size_t _spill_partition_count_bits;
     int64_t _limit; // -1: no limit
     bool _have_conjuncts;
+    const bool _is_streaming;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index dc09e8a268c..6f0c071d391 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -51,8 +51,7 @@ Status AggLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     auto& p = _parent->template cast<AggSourceOperatorX>();
     if (p._is_streaming) {
         _shared_state->data_queue.reset(new DataQueue(1));
-        _shared_state->data_queue->set_dependency(_dependency,
-                                                  
info.upstream_dependencies.front().get());
+        _shared_state->data_queue->set_source_dependency(_dependency);
     }
     if (p._without_key) {
         if (p._needs_finalize) {
diff --git a/be/src/pipeline/exec/data_queue.cpp 
b/be/src/pipeline/exec/data_queue.cpp
index 656db9cbe73..680589ce2b1 100644
--- a/be/src/pipeline/exec/data_queue.cpp
+++ b/be/src/pipeline/exec/data_queue.cpp
@@ -40,9 +40,7 @@ DataQueue::DataQueue(int child_count)
           _is_canceled(child_count),
           _cur_bytes_in_queue(child_count),
           _cur_blocks_nums_in_queue(child_count),
-          _flag_queue_idx(0),
-          _source_dependency(nullptr),
-          _sink_dependency(nullptr) {
+          _flag_queue_idx(0) {
     for (int i = 0; i < child_count; ++i) {
         _queue_blocks_lock[i].reset(new std::mutex());
         _free_blocks_lock[i].reset(new std::mutex());
@@ -51,6 +49,8 @@ DataQueue::DataQueue(int child_count)
         _cur_bytes_in_queue[i] = 0;
         _cur_blocks_nums_in_queue[i] = 0;
     }
+    _un_finished_counter = child_count;
+    _sink_dependencies.resize(child_count, nullptr);
 }
 
 std::unique_ptr<vectorized::Block> DataQueue::get_free_block(int child_idx) {
@@ -118,11 +118,12 @@ Status 
DataQueue::get_block_from_queue(std::unique_ptr<vectorized::Block>* outpu
             }
             _cur_bytes_in_queue[_flag_queue_idx] -= 
(*output_block)->allocated_bytes();
             _cur_blocks_nums_in_queue[_flag_queue_idx] -= 1;
-            if (_sink_dependency) {
-                if (!_is_finished[_flag_queue_idx]) {
+            auto old_value = _cur_blocks_total_nums.fetch_sub(1);
+            if (old_value == 1 && _source_dependency) {
+                if (!is_all_finish()) {
                     _source_dependency->block();
                 }
-                _sink_dependency->set_ready();
+                _sink_dependencies[_flag_queue_idx]->set_ready();
             }
         } else {
             if (_is_finished[_flag_queue_idx]) {
@@ -142,9 +143,10 @@ void 
DataQueue::push_block(std::unique_ptr<vectorized::Block> block, int child_i
         _cur_bytes_in_queue[child_idx] += block->allocated_bytes();
         _queue_blocks[child_idx].emplace_back(std::move(block));
         _cur_blocks_nums_in_queue[child_idx] += 1;
-        if (_sink_dependency) {
+        _cur_blocks_total_nums++;
+        if (_source_dependency) {
             _source_dependency->set_ready();
-            _sink_dependency->block();
+            _sink_dependencies[child_idx]->block();
         }
         //this only use to record the queue[0] for profile
         _max_bytes_in_queue = std::max(_max_bytes_in_queue, 
_cur_bytes_in_queue[0].load());
@@ -154,10 +156,16 @@ void 
DataQueue::push_block(std::unique_ptr<vectorized::Block> block, int child_i
 
 void DataQueue::set_finish(int child_idx) {
     std::lock_guard<std::mutex> l(*_queue_blocks_lock[child_idx]);
+    if (_is_finished[child_idx]) {
+        return;
+    }
     _is_finished[child_idx] = true;
     if (_source_dependency) {
         _source_dependency->set_ready();
     }
+    if (_un_finished_counter.fetch_sub(1) == 1) {
+        _is_all_finished = true;
+    }
 }
 
 void DataQueue::set_canceled(int child_idx) {
@@ -175,12 +183,7 @@ bool DataQueue::is_finish(int child_idx) {
 }
 
 bool DataQueue::is_all_finish() {
-    for (int i = 0; i < _child_count; ++i) {
-        if (_is_finished[i] == false) {
-            return false;
-        }
-    }
-    return true;
+    return _is_all_finished;
 }
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/data_queue.h 
b/be/src/pipeline/exec/data_queue.h
index f756ca7e621..d28fe5d8f01 100644
--- a/be/src/pipeline/exec/data_queue.h
+++ b/be/src/pipeline/exec/data_queue.h
@@ -60,9 +60,11 @@ public:
     int64_t max_size_of_queue() const { return _max_size_of_queue; }
 
     bool data_exhausted() const { return _data_exhausted; }
-    void set_dependency(Dependency* source_dependency, Dependency* 
sink_dependency) {
+    void set_source_dependency(Dependency* source_dependency) {
         _source_dependency = source_dependency;
-        _sink_dependency = sink_dependency;
+    }
+    void set_sink_dependency(Dependency* sink_dependency, int child_idx) {
+        _sink_dependencies[child_idx] = sink_dependency;
     }
 
 private:
@@ -80,10 +82,13 @@ private:
     //how many deque will be init, always will be one
     int _child_count = 0;
     std::vector<std::atomic_bool> _is_finished;
+    std::atomic_uint32_t _un_finished_counter;
+    std::atomic_bool _is_all_finished = false;
     std::vector<std::atomic_bool> _is_canceled;
     // int64_t just for counter of profile
     std::vector<std::atomic_int64_t> _cur_bytes_in_queue;
     std::vector<std::atomic_uint32_t> _cur_blocks_nums_in_queue;
+    std::atomic_uint32_t _cur_blocks_total_nums = 0;
 
     //this will be indicate which queue has data, it's useful when have many 
queues
     std::atomic_int _flag_queue_idx = 0;
@@ -95,8 +100,9 @@ private:
     int64_t _max_size_of_queue = 0;
     static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024l * 1024 * 1024 / 10;
 
+    // data queue is multi sink one source
     Dependency* _source_dependency = nullptr;
-    Dependency* _sink_dependency = nullptr;
+    std::vector<Dependency*> _sink_dependencies;
 };
 
 } // namespace pipeline
diff --git 
a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
index 74cf655c366..da4968025af 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
@@ -183,7 +183,8 @@ 
DistinctStreamingAggSinkOperatorX::DistinctStreamingAggSinkOperatorX(ObjectPool*
                                                                      int 
operator_id,
                                                                      const 
TPlanNode& tnode,
                                                                      const 
DescriptorTbl& descs)
-        : AggSinkOperatorX<DistinctStreamingAggSinkLocalState>(pool, 
operator_id, tnode, descs) {}
+        : AggSinkOperatorX<DistinctStreamingAggSinkLocalState>(pool, 
operator_id, tnode, descs,
+                                                               true) {}
 
 Status DistinctStreamingAggSinkOperatorX::init(const TPlanNode& tnode, 
RuntimeState* state) {
     
RETURN_IF_ERROR(AggSinkOperatorX<DistinctStreamingAggSinkLocalState>::init(tnode,
 state));
diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
index 17eb1b2db1c..2cc8b9efcaf 100644
--- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
@@ -341,7 +341,7 @@ Status 
StreamingAggSinkLocalState::_pre_agg_with_serialized_key(
 StreamingAggSinkOperatorX::StreamingAggSinkOperatorX(ObjectPool* pool, int 
operator_id,
                                                      const TPlanNode& tnode,
                                                      const DescriptorTbl& 
descs)
-        : AggSinkOperatorX<StreamingAggSinkLocalState>(pool, operator_id, 
tnode, descs) {}
+        : AggSinkOperatorX<StreamingAggSinkLocalState>(pool, operator_id, 
tnode, descs, true) {}
 
 Status StreamingAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
     RETURN_IF_ERROR(AggSinkOperatorX<StreamingAggSinkLocalState>::init(tnode, 
state));
diff --git a/be/src/pipeline/exec/union_sink_operator.cpp 
b/be/src/pipeline/exec/union_sink_operator.cpp
index 2a235123bda..1ce3ef5c217 100644
--- a/be/src/pipeline/exec/union_sink_operator.cpp
+++ b/be/src/pipeline/exec/union_sink_operator.cpp
@@ -99,6 +99,7 @@ Status UnionSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info)
     SCOPED_TIMER(_open_timer);
     auto& p = _parent->cast<Parent>();
     _child_expr.resize(p._child_expr.size());
+    _shared_state->data_queue.set_sink_dependency(_dependency, 
p._cur_child_id);
     for (size_t i = 0; i < p._child_expr.size(); i++) {
         RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i]));
     }
diff --git a/be/src/pipeline/exec/union_source_operator.cpp 
b/be/src/pipeline/exec/union_source_operator.cpp
index d824f9db7a0..619b40f777b 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -120,7 +120,7 @@ Status UnionSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
         ((UnionSourceDependency*)deps.front().get())->set_shared_state(ss);
     }
     RETURN_IF_ERROR(Base::init(state, info));
-    ss->data_queue.set_dependency(_dependency, 
info.upstream_dependencies.front().get());
+    ss->data_queue.set_source_dependency(_dependency);
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     // Const exprs materialized by this node. These exprs don't refer to any 
children.
@@ -141,6 +141,9 @@ Status UnionSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
             RETURN_IF_ERROR(clone_expr_list(_const_expr_list, 
other_expr_list));
         }
     }
+    if (child_count == 0) {
+        _dependency->set_ready();
+    }
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/union_source_operator.h 
b/be/src/pipeline/exec/union_source_operator.h
index 8b7060884e9..c39ea3dbc43 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -75,19 +75,6 @@ public:
     UnionSourceDependency(int id, int node_id, QueryContext* query_ctx)
             : Dependency(id, node_id, "UnionSourceDependency", query_ctx) {}
     ~UnionSourceDependency() override = default;
-
-    [[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override {
-        if (((UnionSharedState*)_shared_state.get())->child_count() == 0) {
-            return nullptr;
-        }
-        if 
(((UnionSharedState*)_shared_state.get())->data_queue.is_all_finish() ||
-            
((UnionSharedState*)_shared_state.get())->data_queue.remaining_has_data()) {
-            return nullptr;
-        }
-        return this;
-    }
-    bool push_to_blocking_queue() const override { return true; }
-    void block() override {}
 };
 
 class UnionSourceOperatorX;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to