This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7434f80300 [pipelineX](refactor) Refactor pending finish dependency 
(#25181)
7434f80300 is described below

commit 7434f8030069ca42642e87ae73fe22c30ec741a6
Author: Gabriel <[email protected]>
AuthorDate: Tue Oct 10 11:56:02 2023 +0800

    [pipelineX](refactor) Refactor pending finish dependency (#25181)
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp      | 50 +++++++++++++++++-----
 be/src/pipeline/exec/exchange_sink_buffer.h        | 11 ++++-
 be/src/pipeline/exec/exchange_sink_operator.cpp    |  5 ++-
 be/src/pipeline/exec/exchange_sink_operator.h      |  2 +-
 be/src/pipeline/exec/exchange_source_operator.cpp  |  4 --
 be/src/pipeline/exec/exchange_source_operator.h    |  2 -
 be/src/pipeline/exec/jdbc_table_sink_operator.cpp  |  4 +-
 be/src/pipeline/exec/jdbc_table_sink_operator.h    |  2 +-
 be/src/pipeline/exec/result_file_sink_operator.cpp |  6 +--
 be/src/pipeline/exec/result_file_sink_operator.h   |  4 +-
 be/src/pipeline/exec/result_sink_operator.h        | 12 +++---
 be/src/pipeline/exec/scan_operator.cpp             |  7 +--
 be/src/pipeline/exec/scan_operator.h               | 10 ++---
 be/src/pipeline/pipeline_x/dependency.h            | 42 ++++++++++++++++++
 be/src/pipeline/pipeline_x/operator.cpp            | 23 +++++++---
 be/src/pipeline/pipeline_x/operator.h              | 21 +++------
 be/src/pipeline/pipeline_x/pipeline_x_task.h       | 19 ++++----
 be/src/vec/exec/scan/pip_scanner_context.h         |  4 +-
 be/src/vec/exec/scan/scanner_context.cpp           | 31 ++++++++++++++
 be/src/vec/exec/scan/scanner_context.h             | 17 +++-----
 be/src/vec/sink/writer/async_result_writer.cpp     | 10 +++++
 be/src/vec/sink/writer/async_result_writer.h       |  5 ++-
 22 files changed, 205 insertions(+), 86 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index f055151698..22b270c3b8 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -101,7 +101,7 @@ bool ExchangeSinkBuffer<Parent>::is_pending_finish() {
     for (auto& pair : _instance_to_package_queue_mutex) {
         std::unique_lock<std::mutex> lock(*(pair.second));
         auto& id = pair.first;
-        if (!_instance_to_sending_by_pipeline.at(id)) {
+        if (!_rpc_channel_is_idle.at(id)) {
             // when pending finish, we need check whether current query is 
cancelled
             if (need_cancel && _instance_to_rpc_ctx.find(id) != 
_instance_to_rpc_ctx.end()) {
                 auto& rpc_ctx = _instance_to_rpc_ctx[id];
@@ -135,7 +135,7 @@ void ExchangeSinkBuffer<Parent>::register_sink(TUniqueId 
fragment_instance_id) {
     PUniqueId finst_id;
     finst_id.set_hi(fragment_instance_id.hi);
     finst_id.set_lo(fragment_instance_id.lo);
-    _instance_to_sending_by_pipeline[low_id] = true;
+    _rpc_channel_is_idle[low_id] = true;
     _instance_to_rpc_ctx[low_id] = {};
     _instance_to_receiver_eof[low_id] = false;
     _instance_to_rpc_time[low_id] = 0;
@@ -152,9 +152,13 @@ Status 
ExchangeSinkBuffer<Parent>::add_block(TransmitInfo<Parent>&& request) {
     {
         std::unique_lock<std::mutex> 
lock(*_instance_to_package_queue_mutex[ins_id.lo]);
         // Do not have in process rpc, directly send
-        if (_instance_to_sending_by_pipeline[ins_id.lo]) {
+        if (_rpc_channel_is_idle[ins_id.lo]) {
             send_now = true;
-            _instance_to_sending_by_pipeline[ins_id.lo] = false;
+            _rpc_channel_is_idle[ins_id.lo] = false;
+            _busy_channels++;
+            if (_finish_dependency) {
+                _finish_dependency->block_finishing();
+            }
         }
         _instance_to_package_queue[ins_id.lo].emplace(std::move(request));
         _total_queue_size++;
@@ -187,9 +191,13 @@ Status 
ExchangeSinkBuffer<Parent>::add_block(BroadcastTransmitInfo<Parent>&& req
     {
         std::unique_lock<std::mutex> 
lock(*_instance_to_package_queue_mutex[ins_id.lo]);
         // Do not have in process rpc, directly send
-        if (_instance_to_sending_by_pipeline[ins_id.lo]) {
+        if (_rpc_channel_is_idle[ins_id.lo]) {
             send_now = true;
-            _instance_to_sending_by_pipeline[ins_id.lo] = false;
+            _rpc_channel_is_idle[ins_id.lo] = false;
+            _busy_channels++;
+            if (_finish_dependency) {
+                _finish_dependency->block_finishing();
+            }
         }
         _instance_to_broadcast_package_queue[ins_id.lo].emplace(request);
     }
@@ -204,7 +212,7 @@ template <typename Parent>
 Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) {
     std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
 
-    DCHECK(_instance_to_sending_by_pipeline[id] == false);
+    DCHECK(_rpc_channel_is_idle[id] == false);
 
     std::queue<TransmitInfo<Parent>, std::list<TransmitInfo<Parent>>>& q =
             _instance_to_package_queue[id];
@@ -212,7 +220,11 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
             broadcast_q = _instance_to_broadcast_package_queue[id];
 
     if (_is_finishing) {
-        _instance_to_sending_by_pipeline[id] = true;
+        _rpc_channel_is_idle[id] = true;
+        _busy_channels--;
+        if (_finish_dependency && _busy_channels == 0) {
+            _finish_dependency->set_ready_to_finish();
+        }
         return Status::OK();
     }
 
@@ -326,7 +338,11 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
         }
         broadcast_q.pop();
     } else {
-        _instance_to_sending_by_pipeline[id] = true;
+        _rpc_channel_is_idle[id] = true;
+        _busy_channels--;
+        if (_finish_dependency && _busy_channels == 0) {
+            _finish_dependency->set_ready_to_finish();
+        }
     }
 
     return Status::OK();
@@ -346,7 +362,13 @@ void 
ExchangeSinkBuffer<Parent>::_construct_request(InstanceLoId id, PUniqueId f
 template <typename Parent>
 void ExchangeSinkBuffer<Parent>::_ended(InstanceLoId id) {
     std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
-    _instance_to_sending_by_pipeline[id] = true;
+    if (!_rpc_channel_is_idle[id]) {
+        _busy_channels--;
+        if (_finish_dependency && _busy_channels == 0) {
+            _finish_dependency->set_ready_to_finish();
+        }
+    }
+    _rpc_channel_is_idle[id] = true;
 }
 
 template <typename Parent>
@@ -360,7 +382,13 @@ template <typename Parent>
 void ExchangeSinkBuffer<Parent>::_set_receiver_eof(InstanceLoId id) {
     std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
     _instance_to_receiver_eof[id] = true;
-    _instance_to_sending_by_pipeline[id] = true;
+    if (!_rpc_channel_is_idle[id]) {
+        _busy_channels--;
+        if (_finish_dependency && _busy_channels == 0) {
+            _finish_dependency->set_ready_to_finish();
+        }
+    }
+    _rpc_channel_is_idle[id] = true;
 }
 
 template <typename Parent>
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index d5e530af5c..c47d6c6a14 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -45,6 +45,7 @@ using InstanceLoId = int64_t;
 namespace pipeline {
 class BroadcastDependency;
 class ExchangeSinkQueueDependency;
+class FinishDependency;
 } // namespace pipeline
 
 namespace vectorized {
@@ -182,8 +183,10 @@ public:
     void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t 
receive_rpc_time);
     void update_profile(RuntimeProfile* profile);
 
-    void set_queue_dependency(std::shared_ptr<ExchangeSinkQueueDependency> 
queue_dependency) {
+    void set_dependency(std::shared_ptr<ExchangeSinkQueueDependency> 
queue_dependency,
+                        std::shared_ptr<FinishDependency> finish_dependency) {
         _queue_dependency = queue_dependency;
+        _finish_dependency = finish_dependency;
     }
 
 private:
@@ -202,7 +205,10 @@ private:
     // TODO: make all flat_hash_map to a STRUT
     phmap::flat_hash_map<InstanceLoId, PackageSeq> _instance_to_seq;
     phmap::flat_hash_map<InstanceLoId, std::unique_ptr<PTransmitDataParams>> 
_instance_to_request;
-    phmap::flat_hash_map<InstanceLoId, bool> _instance_to_sending_by_pipeline;
+    // One channel is corresponding to a downstream instance.
+    phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_idle;
+    // Number of busy channels;
+    std::atomic<int> _busy_channels = 0;
     phmap::flat_hash_map<InstanceLoId, bool> _instance_to_receiver_eof;
     phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_time;
     phmap::flat_hash_map<InstanceLoId, ExchangeRpcContext> 
_instance_to_rpc_ctx;
@@ -230,6 +236,7 @@ private:
     static constexpr int QUEUE_CAPACITY_FACTOR = 64;
     int _queue_capacity = 0;
     std::shared_ptr<ExchangeSinkQueueDependency> _queue_dependency = nullptr;
+    std::shared_ptr<FinishDependency> _finish_dependency = nullptr;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 46f5e99614..c778e397b8 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -179,6 +179,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
 
     _exchange_sink_dependency = AndDependency::create_shared(_parent->id());
     _queue_dependency = 
ExchangeSinkQueueDependency::create_shared(_parent->id());
+    _sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
     _exchange_sink_dependency->add_child(_queue_dependency);
     if ((p._part_type == TPartitionType::UNPARTITIONED || channels.size() == 
1) &&
         !only_local_exchange) {
@@ -557,9 +558,9 @@ WriteDependency* 
ExchangeSinkOperatorX::wait_for_dependency(RuntimeState* state)
     return local_state._exchange_sink_dependency->write_blocked_by();
 }
 
-bool ExchangeSinkOperatorX::is_pending_finish(RuntimeState* state) const {
+FinishDependency* ExchangeSinkOperatorX::finish_blocked_by(RuntimeState* 
state) const {
     auto& local_state = 
state->get_sink_local_state(id())->cast<ExchangeSinkLocalState>();
-    return local_state._sink_buffer->is_pending_finish();
+    return local_state._finish_dependency->finish_blocked_by();
 }
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index db83a71097..f76f24479e 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -235,7 +235,7 @@ public:
 
     Status try_close(RuntimeState* state, Status exec_status) override;
     WriteDependency* wait_for_dependency(RuntimeState* state) override;
-    bool is_pending_finish(RuntimeState* state) const override;
+    FinishDependency* finish_blocked_by(RuntimeState* state) const override;
 
 private:
     friend class ExchangeSinkLocalState;
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 0569670b74..66a6ecc536 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -171,10 +171,6 @@ Dependency* 
ExchangeSourceOperatorX::wait_for_dependency(RuntimeState* state) {
     return local_state.source_dependency->read_blocked_by();
 }
 
-bool ExchangeSourceOperatorX::is_pending_finish(RuntimeState* /*state*/) const 
{
-    return false;
-}
-
 Status ExchangeLocalState::close(RuntimeState* state) {
     SCOPED_TIMER(profile()->total_time_counter());
     SCOPED_TIMER(_close_timer);
diff --git a/be/src/pipeline/exec/exchange_source_operator.h 
b/be/src/pipeline/exec/exchange_source_operator.h
index a7e146b54d..b0f455cefc 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -113,8 +113,6 @@ public:
     ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs,
                             int num_senders);
     Dependency* wait_for_dependency(RuntimeState* state) override;
-    bool is_pending_finish(RuntimeState* state) const override;
-
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
     Status prepare(RuntimeState* state) override;
     Status open(RuntimeState* state) override;
diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp 
b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
index a551762027..32c5958b95 100644
--- a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
+++ b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
@@ -68,9 +68,9 @@ WriteDependency* 
JdbcTableSinkOperatorX::wait_for_dependency(RuntimeState* state
     return local_state.write_blocked_by();
 }
 
-bool JdbcTableSinkOperatorX::is_pending_finish(RuntimeState* state) const {
+FinishDependency* JdbcTableSinkOperatorX::finish_blocked_by(RuntimeState* 
state) const {
     auto& local_state = 
state->get_sink_local_state(id())->cast<JdbcTableSinkLocalState>();
-    return local_state.is_pending_finish();
+    return local_state._finish_dependency->finish_blocked_by();
 }
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.h 
b/be/src/pipeline/exec/jdbc_table_sink_operator.h
index 6db9c38065..a37e1c4098 100644
--- a/be/src/pipeline/exec/jdbc_table_sink_operator.h
+++ b/be/src/pipeline/exec/jdbc_table_sink_operator.h
@@ -55,7 +55,7 @@ public:
                 SourceState source_state) override;
 
     WriteDependency* wait_for_dependency(RuntimeState* state) override;
-    bool is_pending_finish(RuntimeState* state) const override;
+    FinishDependency* finish_blocked_by(RuntimeState* state) const override;
 
 private:
     friend class JdbcTableSinkLocalState;
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp 
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 4369c6b5d8..40190ded8a 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -146,7 +146,7 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& i
         }
         _only_local_exchange = local_size == _channels.size();
     }
-    _writer->set_dependency(_async_writer_dependency.get());
+    _writer->set_dependency(_async_writer_dependency.get(), 
_finish_dependency.get());
     _writer->set_header_info(p._header_type, p._header);
     return Status::OK();
 }
@@ -268,9 +268,9 @@ Status ResultFileSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* in_
     return local_state.sink(state, in_block, source_state);
 }
 
-bool ResultFileSinkOperatorX::is_pending_finish(RuntimeState* state) const {
+FinishDependency* ResultFileSinkOperatorX::finish_blocked_by(RuntimeState* 
state) const {
     auto& local_state = 
state->get_sink_local_state(id())->cast<ResultFileSinkLocalState>();
-    return local_state.is_pending_finish();
+    return local_state._finish_dependency->finish_blocked_by();
 }
 
 WriteDependency* ResultFileSinkOperatorX::wait_for_dependency(RuntimeState* 
state) {
diff --git a/be/src/pipeline/exec/result_file_sink_operator.h 
b/be/src/pipeline/exec/result_file_sink_operator.h
index fb77e7e8c9..5ce028e63c 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.h
+++ b/be/src/pipeline/exec/result_file_sink_operator.h
@@ -55,7 +55,7 @@ public:
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state, Status exec_status) override;
 
-    int sender_id() const { return _sender_id; }
+    [[nodiscard]] int sender_id() const { return _sender_id; }
 
     RuntimeProfile::Counter* brpc_wait_timer() { return _brpc_wait_timer; }
 
@@ -94,7 +94,7 @@ public:
 
     WriteDependency* wait_for_dependency(RuntimeState* state) override;
 
-    bool is_pending_finish(RuntimeState* state) const override;
+    FinishDependency* finish_blocked_by(RuntimeState* state) const override;
 
 private:
     friend class ResultFileSinkLocalState;
diff --git a/be/src/pipeline/exec/result_sink_operator.h 
b/be/src/pipeline/exec/result_sink_operator.h
index 799389ffe3..c6c3a1c69d 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -43,29 +43,29 @@ public:
     bool can_write() override;
 };
 
-class ResultBufferDependency : public WriteDependency {
+class ResultBufferDependency final : public WriteDependency {
 public:
     ENABLE_FACTORY_CREATOR(ResultBufferDependency);
     ResultBufferDependency(int id) : WriteDependency(id, 
"ResultBufferDependency") {}
-    ~ResultBufferDependency() = default;
+    ~ResultBufferDependency() override = default;
 
     void* shared_state() override { return nullptr; }
 };
 
-class ResultQueueDependency : public WriteDependency {
+class ResultQueueDependency final : public WriteDependency {
 public:
     ENABLE_FACTORY_CREATOR(ResultQueueDependency);
     ResultQueueDependency(int id) : WriteDependency(id, 
"ResultQueueDependency") {}
-    ~ResultQueueDependency() = default;
+    ~ResultQueueDependency() override = default;
 
     void* shared_state() override { return nullptr; }
 };
 
-class CancelDependency : public WriteDependency {
+class CancelDependency final : public WriteDependency {
 public:
     ENABLE_FACTORY_CREATOR(CancelDependency);
     CancelDependency(int id) : WriteDependency(id, "CancelDependency") { 
_ready_for_write = false; }
-    ~CancelDependency() = default;
+    ~CancelDependency() override = default;
 
     void* shared_state() override { return nullptr; }
 };
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 3ac6d3d900..df45db62ed 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1189,7 +1189,8 @@ Status ScanLocalState<Derived>::_start_scanners(
     _data_ready_dependency = DataReadyDependency::create_shared(p.id(), 
_scanner_ctx.get());
     _source_dependency->add_child(_data_ready_dependency);
 
-    _scanner_ctx->set_dependency(_data_ready_dependency, 
_scanner_done_dependency);
+    _scanner_ctx->set_dependency(_data_ready_dependency, 
_scanner_done_dependency,
+                                 _finish_dependency);
     return Status::OK();
 }
 
@@ -1291,9 +1292,9 @@ Dependency* 
ScanOperatorX<LocalStateType>::wait_for_dependency(RuntimeState* sta
 }
 
 template <typename LocalStateType>
-bool ScanOperatorX<LocalStateType>::is_pending_finish(RuntimeState* state) 
const {
+FinishDependency* 
ScanOperatorX<LocalStateType>::finish_blocked_by(RuntimeState* state) const {
     auto& local_state = state->get_local_state(id())->template 
cast<LocalStateType>();
-    return local_state._scanner_ctx && 
!local_state._scanner_ctx->no_schedule();
+    return local_state._finish_dependency->finish_blocked_by();
 }
 
 template <typename LocalStateType>
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 602c1ce4ea..5cdbac8957 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -65,14 +65,14 @@ public:
     [[nodiscard]] int64_t read_watcher_elapse_time() override { return 0; }
 };
 
-struct EosDependency : public Dependency {
+class EosDependency : public Dependency {
 public:
     ENABLE_FACTORY_CREATOR(EosDependency);
     EosDependency(int id) : Dependency(id, "EosDependency") {}
     void* shared_state() override { return nullptr; }
 };
 
-struct ScannerDoneDependency : public Dependency {
+class ScannerDoneDependency : public Dependency {
 public:
     ENABLE_FACTORY_CREATOR(ScannerDoneDependency);
     ScannerDoneDependency(int id, vectorized::ScannerContext* scanner_ctx)
@@ -90,7 +90,7 @@ private:
     vectorized::ScannerContext* _scanner_ctx;
 };
 
-struct DataReadyDependency : public Dependency {
+class DataReadyDependency : public Dependency {
 public:
     ENABLE_FACTORY_CREATOR(DataReadyDependency);
     DataReadyDependency(int id, vectorized::ScannerContext* scanner_ctx)
@@ -417,14 +417,14 @@ public:
     Status try_close(RuntimeState* state) override;
 
     Dependency* wait_for_dependency(RuntimeState* state) override;
-    bool is_pending_finish(RuntimeState* state) const override;
+    FinishDependency* finish_blocked_by(RuntimeState* state) const override;
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
     Status prepare(RuntimeState* state) override { return 
OperatorXBase::prepare(state); }
     Status open(RuntimeState* state) override;
     Status get_block(RuntimeState* state, vectorized::Block* block,
                      SourceState& source_state) override;
-    bool is_source() const override { return true; }
+    [[nodiscard]] bool is_source() const override { return true; }
 
     const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override {
         return _runtime_filter_descs;
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 575b305b01..943e58cd31 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -151,6 +151,48 @@ protected:
     MonotonicStopWatch _write_dependency_watcher;
 };
 
+class FinishDependency : public Dependency {
+public:
+    FinishDependency(int id, std::string name) : Dependency(id, name), 
_ready_to_finish(true) {}
+    ~FinishDependency() override = default;
+
+    void start_finish_watcher() {
+        for (auto& child : _children) {
+            ((FinishDependency*)child.get())->start_finish_watcher();
+        }
+        _finish_dependency_watcher.start();
+    }
+
+    [[nodiscard]] virtual int64_t finish_watcher_elapse_time() {
+        return _finish_dependency_watcher.elapsed_time();
+    }
+
+    [[nodiscard]] virtual FinishDependency* finish_blocked_by() {
+        if (config::enable_fuzzy_mode && !_ready_to_finish &&
+            _finish_dependency_watcher.elapsed_time() > 
SLOW_DEPENDENCY_THRESHOLD) {
+            LOG(WARNING) << "========Dependency may be blocked by some 
reasons: " << name() << " "
+                         << id();
+        }
+        return _ready_to_finish ? nullptr : this;
+    }
+
+    void set_ready_to_finish() {
+        if (_ready_to_finish) {
+            return;
+        }
+        _finish_dependency_watcher.stop();
+        _ready_to_finish = true;
+    }
+
+    void block_finishing() { _ready_to_finish = false; }
+
+    void* shared_state() override { return nullptr; }
+
+protected:
+    std::atomic<bool> _ready_to_finish;
+    MonotonicStopWatch _finish_dependency_watcher;
+};
+
 class AndDependency : public WriteDependency {
 public:
     ENABLE_FACTORY_CREATOR(AndDependency);
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index be573c4a89..d20c3c3263 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -354,6 +354,22 @@ Status 
OperatorX<UnionSourceLocalState>::setup_local_states(RuntimeState* state,
     return Status::OK();
 }
 
+PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase*
 parent,
+                                                         RuntimeState* state)
+        : _parent(parent),
+          _state(state),
+          _finish_dependency(new FinishDependency(parent->id(), 
parent->get_name())) {}
+
+PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, 
OperatorXBase* parent)
+        : _num_rows_returned(0),
+          _rows_returned_counter(nullptr),
+          _rows_returned_rate(nullptr),
+          _memory_used_counter(nullptr),
+          _peak_memory_usage_counter(nullptr),
+          _parent(parent),
+          _state(state),
+          _finish_dependency(new FinishDependency(parent->id(), 
parent->get_name())) {}
+
 template <typename DependencyType>
 Status PipelineXLocalState<DependencyType>::init(RuntimeState* state, 
LocalStateInfo& info) {
     _runtime_profile.reset(new RuntimeProfile(_parent->get_name() +
@@ -499,7 +515,7 @@ Status AsyncWriterSink<Writer, Parent>::init(RuntimeState* 
state, LocalSinkState
 
     _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs));
     _async_writer_dependency = 
AsyncWriterDependency::create_shared(_parent->id());
-    _writer->set_dependency(_async_writer_dependency.get());
+    _writer->set_dependency(_async_writer_dependency.get(), 
_finish_dependency.get());
 
     _wait_for_dependency_timer =
             ADD_TIMER(_profile, "WaitForDependency[" + 
_async_writer_dependency->name() + "]Time");
@@ -548,11 +564,6 @@ Status AsyncWriterSink<Writer, 
Parent>::try_close(RuntimeState* state, Status ex
     return Status::OK();
 }
 
-template <typename Writer, typename Parent>
-bool AsyncWriterSink<Writer, Parent>::is_pending_finish() {
-    return _writer->is_pending_finish();
-}
-
 #define DECLARE_OPERATOR_X(LOCAL_STATE) template class 
DataSinkOperatorX<LOCAL_STATE>;
 DECLARE_OPERATOR_X(HashJoinBuildSinkLocalState)
 DECLARE_OPERATOR_X(ResultSinkLocalState)
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index f01119daef..d880e55feb 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -58,14 +58,7 @@ struct LocalSinkStateInfo {
 
 class PipelineXLocalStateBase {
 public:
-    PipelineXLocalStateBase(RuntimeState* state, OperatorXBase* parent)
-            : _num_rows_returned(0),
-              _rows_returned_counter(nullptr),
-              _rows_returned_rate(nullptr),
-              _memory_used_counter(nullptr),
-              _peak_memory_usage_counter(nullptr),
-              _parent(parent),
-              _state(state) {}
+    PipelineXLocalStateBase(RuntimeState* state, OperatorXBase* parent);
     virtual ~PipelineXLocalStateBase() = default;
 
     template <class TARGET>
@@ -147,6 +140,7 @@ protected:
     vectorized::VExprContextSPtrs _projections;
     bool _closed = false;
     vectorized::Block _origin_block;
+    std::shared_ptr<FinishDependency> _finish_dependency;
 };
 
 class OperatorXBase : public OperatorBase {
@@ -224,7 +218,7 @@ public:
 
     virtual Dependency* wait_for_dependency(RuntimeState* state) { return 
nullptr; }
 
-    virtual bool is_pending_finish(RuntimeState* state) const { return false; }
+    virtual FinishDependency* finish_blocked_by(RuntimeState* state) const { 
return nullptr; }
 
     [[nodiscard]] virtual const RowDescriptor& intermediate_row_desc() const {
         return _row_descriptor;
@@ -338,8 +332,7 @@ class DataSinkOperatorXBase;
 
 class PipelineXSinkLocalStateBase {
 public:
-    PipelineXSinkLocalStateBase(DataSinkOperatorXBase* parent_, RuntimeState* 
state_)
-            : _parent(parent_), _state(state_) {}
+    PipelineXSinkLocalStateBase(DataSinkOperatorXBase* parent_, RuntimeState* 
state_);
     virtual ~PipelineXSinkLocalStateBase() = default;
 
     // Do initialization. This step should be executed only once and in 
bthread, so we can do some
@@ -401,6 +394,8 @@ protected:
     RuntimeProfile::Counter* _open_timer = nullptr;
     RuntimeProfile::Counter* _close_timer = nullptr;
     RuntimeProfile::Counter* _wait_for_dependency_timer;
+
+    std::shared_ptr<FinishDependency> _finish_dependency;
 };
 
 class DataSinkOperatorXBase : public OperatorBase {
@@ -469,7 +464,7 @@ public:
 
     virtual WriteDependency* wait_for_dependency(RuntimeState* state) { return 
nullptr; }
 
-    virtual bool is_pending_finish(RuntimeState* state) const { return false; }
+    virtual FinishDependency* finish_blocked_by(RuntimeState* state) const { 
return nullptr; }
 
     [[nodiscard]] std::string debug_string() const override { return ""; }
 
@@ -629,8 +624,6 @@ public:
 
     Status try_close(RuntimeState* state, Status exec_status) override;
 
-    bool is_pending_finish();
-
 protected:
     vectorized::VExprContextSPtrs _output_vexpr_ctxs;
     std::unique_ptr<Writer> _writer;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 04b647b28a..c406d7f30f 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -101,18 +101,17 @@ public:
     std::string debug_string() override;
 
     bool is_pending_finish() override {
-        bool source_ret = _source->is_pending_finish(_state);
-        if (source_ret) {
-            return true;
-        } else {
-            set_src_pending_finish_time();
+        for (auto& op : _operators) {
+            auto dep = op->finish_blocked_by(_state);
+            if (dep != nullptr) {
+                dep->start_finish_watcher();
+                return true;
+            }
         }
-
-        bool sink_ret = _sink->is_pending_finish(_state);
-        if (sink_ret) {
+        auto dep = _sink->finish_blocked_by(_state);
+        if (dep != nullptr) {
+            dep->start_finish_watcher();
             return true;
-        } else {
-            set_dst_pending_finish_time();
         }
         return false;
     }
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h 
b/be/src/vec/exec/scan/pip_scanner_context.h
index b06d84f0e1..159cf2ba65 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -51,9 +51,11 @@ public:
               _need_colocate_distribute(!_col_distribute_ids.empty()) {}
 
     void set_dependency(std::shared_ptr<DataReadyDependency> dependency,
-                        std::shared_ptr<ScannerDoneDependency> 
scanner_done_dependency) override {
+                        std::shared_ptr<ScannerDoneDependency> 
scanner_done_dependency,
+                        std::shared_ptr<FinishDependency> finish_dependency) 
override {
         _data_dependency = dependency;
         _scanner_done_dependency = scanner_done_dependency;
+        _finish_dependency = finish_dependency;
     }
 
     Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* 
block, bool* eos,
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index d5ca622dec..9363759941 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -129,6 +129,10 @@ Status ScannerContext::init() {
     // 4. This ctx will be submitted to the scanner scheduler right after init.
     // So set _num_scheduling_ctx to 1 here.
     _num_scheduling_ctx = 1;
+    if (_finish_dependency) {
+        std::lock_guard l(_transfer_lock);
+        _finish_dependency->block_finishing();
+    }
 
     _num_unfinished_scanners = _scanners.size();
 
@@ -208,6 +212,9 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
         auto state = _scanner_scheduler->submit(this);
         if (state.ok()) {
             _num_scheduling_ctx++;
+            if (_finish_dependency) {
+                _finish_dependency->block_finishing();
+            }
         } else {
             set_status_on_error(state, false);
         }
@@ -283,6 +290,21 @@ void ScannerContext::set_should_stop() {
     _blocks_queue_added_cv.notify_one();
 }
 
+void ScannerContext::update_num_running(int32_t scanner_inc, int32_t 
sched_inc) {
+    std::lock_guard l(_transfer_lock);
+    _num_running_scanners += scanner_inc;
+    _num_scheduling_ctx += sched_inc;
+    if (_finish_dependency) {
+        if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) {
+            _finish_dependency->set_ready_to_finish();
+        } else {
+            _finish_dependency->block_finishing();
+        }
+    }
+    _blocks_queue_added_cv.notify_one();
+    _ctx_finish_cv.notify_one();
+}
+
 bool ScannerContext::set_status_on_error(const Status& status, bool need_lock) 
{
     std::unique_lock l(_transfer_lock, std::defer_lock);
     if (need_lock) {
@@ -405,6 +427,9 @@ void ScannerContext::reschedule_scanner_ctx() {
     //todo(wb) rethinking is it better to mark current scan_context failed 
when submit failed many times?
     if (state.ok()) {
         _num_scheduling_ctx++;
+        if (_finish_dependency) {
+            _finish_dependency->block_finishing();
+        }
     } else {
         set_status_on_error(state, false);
     }
@@ -421,11 +446,17 @@ void 
ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
     // We have to decrease _num_running_scanners before schedule, otherwise
     // schedule does not woring due to _num_running_scanners.
     _num_running_scanners--;
+    if (_finish_dependency && _num_running_scanners == 0 && 
_num_scheduling_ctx == 0) {
+        _finish_dependency->set_ready_to_finish();
+    }
 
     if (should_be_scheduled()) {
         auto state = _scanner_scheduler->submit(this);
         if (state.ok()) {
             _num_scheduling_ctx++;
+            if (_finish_dependency) {
+                _finish_dependency->block_finishing();
+            }
         } else {
             set_status_on_error(state, false);
         }
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index 1b0ebef8b5..07f9f05551 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -43,8 +43,9 @@ class TupleDescriptor;
 
 namespace pipeline {
 class ScanLocalStateBase;
-struct ScannerDoneDependency;
-struct DataReadyDependency;
+class ScannerDoneDependency;
+class FinishDependency;
+class DataReadyDependency;
 } // namespace pipeline
 
 namespace taskgroup {
@@ -106,7 +107,8 @@ public:
 
     virtual void set_dependency(
             std::shared_ptr<pipeline::DataReadyDependency> dependency,
-            std::shared_ptr<pipeline::ScannerDoneDependency> 
scanner_done_dependency) {}
+            std::shared_ptr<pipeline::ScannerDoneDependency> 
scanner_done_dependency,
+            std::shared_ptr<pipeline::FinishDependency> finish_dependency) {}
 
     // Called by ScanNode.
     // Used to notify the scheduler that this ScannerContext can stop working.
@@ -116,13 +118,7 @@ public:
     virtual bool done() { return _is_finished || _should_stop; }
 
     // Update the running num of scanners and contexts
-    void update_num_running(int32_t scanner_inc, int32_t sched_inc) {
-        std::lock_guard l(_transfer_lock);
-        _num_running_scanners += scanner_inc;
-        _num_scheduling_ctx += sched_inc;
-        _blocks_queue_added_cv.notify_one();
-        _ctx_finish_cv.notify_one();
-    }
+    void update_num_running(int32_t scanner_inc, int32_t sched_inc);
 
     int get_num_running_scanners() const { return _num_running_scanners; }
 
@@ -278,6 +274,7 @@ protected:
     RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
 
     std::shared_ptr<pipeline::ScannerDoneDependency> _scanner_done_dependency 
= nullptr;
+    std::shared_ptr<pipeline::FinishDependency> _finish_dependency = nullptr;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp 
b/be/src/vec/sink/writer/async_result_writer.cpp
index 2a7d8d988d..c142eb1ebc 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -35,6 +35,13 @@ namespace vectorized {
 AsyncResultWriter::AsyncResultWriter(const 
doris::vectorized::VExprContextSPtrs& output_expr_ctxs)
         : _vec_output_expr_ctxs(output_expr_ctxs), _dependency(nullptr) {};
 
+void AsyncResultWriter::set_dependency(pipeline::AsyncWriterDependency* dep,
+                                       pipeline::FinishDependency* finish_dep) 
{
+    _dependency = dep;
+    _finish_dependency = finish_dep;
+    _finish_dependency->block_finishing();
+}
+
 Status AsyncResultWriter::sink(Block* block, bool eos) {
     auto rows = block->rows();
     auto status = Status::OK();
@@ -132,6 +139,9 @@ void AsyncResultWriter::process_block(RuntimeState* state, 
RuntimeProfile* profi
         _need_normal_close = false;
     }
     _writer_thread_closed = true;
+    if (_finish_dependency) {
+        _finish_dependency->set_ready_to_finish();
+    }
 }
 
 Status AsyncResultWriter::_projection_block(doris::vectorized::Block& 
input_block,
diff --git a/be/src/vec/sink/writer/async_result_writer.h 
b/be/src/vec/sink/writer/async_result_writer.h
index 5d0cd26052..780f8b506e 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -35,6 +35,7 @@ class TExpr;
 namespace pipeline {
 class AsyncWriterDependency;
 class WriteDependency;
+class FinishDependency;
 
 } // namespace pipeline
 
@@ -56,7 +57,8 @@ class AsyncResultWriter : public ResultWriter {
 public:
     AsyncResultWriter(const VExprContextSPtrs& output_expr_ctxs);
 
-    void set_dependency(pipeline::AsyncWriterDependency* dep) { _dependency = 
dep; }
+    void set_dependency(pipeline::AsyncWriterDependency* dep,
+                        pipeline::FinishDependency* finish_dep);
 
     void force_close(Status s);
 
@@ -117,6 +119,7 @@ private:
 
     // Used by pipelineX
     pipeline::AsyncWriterDependency* _dependency;
+    pipeline::FinishDependency* _finish_dependency;
 
     moodycamel::ConcurrentQueue<std::unique_ptr<Block>> _free_blocks;
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to