This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 78165a3f6b6 [refine](pipelineX) use finish dependency in task (#25881)
78165a3f6b6 is described below

commit 78165a3f6b6000445777f487ad596447407a6e5a
Author: Mryange <[email protected]>
AuthorDate: Thu Oct 26 10:32:13 2023 +0800

    [refine](pipelineX) use finish dependency in task (#25881)
---
 be/src/pipeline/exec/exchange_sink_operator.cpp    |  5 ---
 be/src/pipeline/exec/exchange_sink_operator.h      |  1 -
 be/src/pipeline/exec/jdbc_table_sink_operator.cpp  |  5 ---
 be/src/pipeline/exec/jdbc_table_sink_operator.h    |  2 --
 .../exec/multi_cast_data_stream_source.cpp         |  2 ++
 .../pipeline/exec/multi_cast_data_stream_source.h  |  6 ----
 be/src/pipeline/exec/olap_table_sink_operator.h    |  6 ----
 be/src/pipeline/exec/result_file_sink_operator.cpp |  6 ----
 be/src/pipeline/exec/result_file_sink_operator.h   |  2 --
 be/src/pipeline/exec/scan_operator.cpp             | 17 ++--------
 be/src/pipeline/exec/scan_operator.h               |  4 ---
 be/src/pipeline/pipeline_x/dependency.h            | 36 +++++++++++++++++++---
 be/src/pipeline/pipeline_x/operator.cpp            |  9 ++++--
 be/src/pipeline/pipeline_x/operator.h              | 12 ++++----
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     | 11 +++++++
 be/src/pipeline/pipeline_x/pipeline_x_task.h       | 23 +++++++++-----
 16 files changed, 74 insertions(+), 73 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 851c9477966..0988052a981 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -510,9 +510,4 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, 
Status exec_status) {
     return PipelineXSinkLocalState<>::close(state, exec_status);
 }
 
-FinishDependency* ExchangeSinkOperatorX::finish_blocked_by(RuntimeState* 
state) const {
-    auto& local_state = 
state->get_sink_local_state(operator_id())->cast<ExchangeSinkLocalState>();
-    return local_state._finish_dependency->finish_blocked_by();
-}
-
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 5ef3d9ee715..69fa4da4be4 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -232,7 +232,6 @@ public:
                            int num_receivers = 1);
 
     Status try_close(RuntimeState* state, Status exec_status) override;
-    FinishDependency* finish_blocked_by(RuntimeState* state) const override;
 
 private:
     friend class ExchangeSinkLocalState;
diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp 
b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
index 139c6c434a8..ae8fcba3d1c 100644
--- a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
+++ b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
@@ -63,9 +63,4 @@ Status JdbcTableSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* bloc
     return Status::OK();
 }
 
-FinishDependency* JdbcTableSinkOperatorX::finish_blocked_by(RuntimeState* 
state) const {
-    auto& local_state = 
state->get_sink_local_state(operator_id())->cast<JdbcTableSinkLocalState>();
-    return local_state._finish_dependency->finish_blocked_by();
-}
-
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.h 
b/be/src/pipeline/exec/jdbc_table_sink_operator.h
index 02ccd00fa3f..41348ccaccf 100644
--- a/be/src/pipeline/exec/jdbc_table_sink_operator.h
+++ b/be/src/pipeline/exec/jdbc_table_sink_operator.h
@@ -55,8 +55,6 @@ public:
     Status sink(RuntimeState* state, vectorized::Block* in_block,
                 SourceState source_state) override;
 
-    FinishDependency* finish_blocked_by(RuntimeState* state) const override;
-
 private:
     friend class JdbcTableSinkLocalState;
     template <typename Writer, typename Parent>
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 1d3565c65b5..dd1c1e46d21 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -145,6 +145,8 @@ Status 
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
     }
     // init profile for runtime filter
     RuntimeFilterConsumer::_init_profile(profile());
+    _filter_dependency->set_filter_blocked_by_fn(
+            [this]() { return this->runtime_filters_are_ready_or_timeout(); });
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h 
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index ee724b1ff1d..aafacf9d87d 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -171,12 +171,6 @@ public:
 
     int dest_id_from_sink() const { return _t_data_stream_sink.dest_node_id; }
 
-    bool runtime_filters_are_ready_or_timeout(RuntimeState* state) const 
override {
-        return state->get_local_state(operator_id())
-                ->template cast<MultiCastDataStreamSourceLocalState>()
-                .runtime_filters_are_ready_or_timeout();
-    }
-
 private:
     friend class MultiCastDataStreamSourceLocalState;
     const int _consumer_id;
diff --git a/be/src/pipeline/exec/olap_table_sink_operator.h 
b/be/src/pipeline/exec/olap_table_sink_operator.h
index 6c84f3ef523..877a42937da 100644
--- a/be/src/pipeline/exec/olap_table_sink_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_operator.h
@@ -100,12 +100,6 @@ public:
         return local_state.sink(state, in_block, source_state);
     }
 
-    FinishDependency* finish_blocked_by(RuntimeState* state) const override {
-        auto& local_state =
-                
state->get_sink_local_state(operator_id())->cast<OlapTableSinkLocalState>();
-        return local_state._finish_dependency->finish_blocked_by();
-    };
-
 private:
     friend class OlapTableSinkLocalState;
     template <typename Writer, typename Parent>
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp 
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 79380642863..ea765055cda 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -269,10 +269,4 @@ Status ResultFileSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* in_
     return local_state.sink(state, in_block, source_state);
 }
 
-FinishDependency* ResultFileSinkOperatorX::finish_blocked_by(RuntimeState* 
state) const {
-    auto& local_state =
-            
state->get_sink_local_state(operator_id())->cast<ResultFileSinkLocalState>();
-    return local_state._finish_dependency->finish_blocked_by();
-}
-
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/result_file_sink_operator.h 
b/be/src/pipeline/exec/result_file_sink_operator.h
index f80093428cf..3a98401de60 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.h
+++ b/be/src/pipeline/exec/result_file_sink_operator.h
@@ -94,8 +94,6 @@ public:
     Status sink(RuntimeState* state, vectorized::Block* in_block,
                 SourceState source_state) override;
 
-    FinishDependency* finish_blocked_by(RuntimeState* state) const override;
-
 private:
     friend class ResultFileSinkLocalState;
     template <typename Writer, typename Parent>
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index adee7e7d0b3..38f7864db97 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -128,7 +128,8 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, 
LocalStateInfo& info)
     _source_dependency->add_child(_open_dependency);
     _eos_dependency = 
EosDependency::create_shared(PipelineXLocalState<>::_parent->operator_id());
     _source_dependency->add_child(_eos_dependency);
-
+    _filter_dependency->set_filter_blocked_by_fn(
+            [this]() { return this->runtime_filters_are_ready_or_timeout(); });
     auto& p = _parent->cast<typename Derived::Parent>();
     set_scan_ranges(info.scan_ranges);
     _common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size());
@@ -1283,12 +1284,6 @@ ScanOperatorX<LocalStateType>::ScanOperatorX(ObjectPool* 
pool, const TPlanNode&
     }
 }
 
-template <typename LocalStateType>
-FinishDependency* 
ScanOperatorX<LocalStateType>::finish_blocked_by(RuntimeState* state) const {
-    auto& local_state = state->get_local_state(operator_id())->template 
cast<LocalStateType>();
-    return local_state._finish_dependency->finish_blocked_by();
-}
-
 template <typename LocalStateType>
 Status ScanOperatorX<LocalStateType>::init(const TPlanNode& tnode, 
RuntimeState* state) {
     RETURN_IF_ERROR(OperatorX<LocalStateType>::init(tnode, state));
@@ -1370,14 +1365,6 @@ Status ScanLocalState<Derived>::close(RuntimeState* 
state) {
     return PipelineXLocalState<>::close(state);
 }
 
-template <typename LocalStateType>
-bool ScanOperatorX<LocalStateType>::runtime_filters_are_ready_or_timeout(
-        RuntimeState* state) const {
-    return state->get_local_state(operator_id())
-            ->template cast<LocalStateType>()
-            .runtime_filters_are_ready_or_timeout();
-}
-
 template <typename LocalStateType>
 Status ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, 
vectorized::Block* block,
                                                 SourceState& source_state) {
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 4355efed8a6..022b57ee9bf 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -414,12 +414,8 @@ protected:
 template <typename LocalStateType>
 class ScanOperatorX : public OperatorX<LocalStateType> {
 public:
-    bool runtime_filters_are_ready_or_timeout(RuntimeState* state) const 
override;
-
     Status try_close(RuntimeState* state) override;
 
-    FinishDependency* finish_blocked_by(RuntimeState* state) const override;
-
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
     Status prepare(RuntimeState* state) override { return 
OperatorXBase::prepare(state); }
     Status open(RuntimeState* state) override;
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 488c33c5a43..028b78957b4 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -152,7 +152,8 @@ protected:
 
 class FinishDependency final : public Dependency {
 public:
-    FinishDependency(int id, std::string name) : Dependency(id, name), 
_ready_to_finish(true) {}
+    FinishDependency(int id, int node_id, std::string name)
+            : Dependency(id, name), _ready_to_finish(true), _node_id(node_id) 
{}
     ~FinishDependency() override = default;
 
     void start_finish_watcher() {
@@ -162,15 +163,15 @@ public:
         _finish_dependency_watcher.start();
     }
 
-    [[nodiscard]] virtual int64_t finish_watcher_elapse_time() {
+    [[nodiscard]] int64_t finish_watcher_elapse_time() {
         return _finish_dependency_watcher.elapsed_time();
     }
 
-    [[nodiscard]] virtual FinishDependency* finish_blocked_by() {
+    [[nodiscard]] FinishDependency* finish_blocked_by() {
         if (config::enable_fuzzy_mode && !_ready_to_finish &&
             _finish_dependency_watcher.elapsed_time() > 
SLOW_DEPENDENCY_THRESHOLD) {
             LOG(WARNING) << "========Dependency may be blocked by some 
reasons: " << name() << " "
-                         << id();
+                         << _node_id;
         }
         return _ready_to_finish ? nullptr : this;
     }
@@ -190,6 +191,33 @@ public:
 protected:
     std::atomic<bool> _ready_to_finish;
     MonotonicStopWatch _finish_dependency_watcher;
+    const int _node_id;
+};
+
+class FilterDependency final : public Dependency {
+public:
+    FilterDependency(int id, int node_id, std::string name)
+            : Dependency(id, name),
+              _runtime_filters_are_ready_or_timeout(nullptr),
+              _node_id(node_id) {}
+
+    FilterDependency* filter_blocked_by() {
+        if (!_runtime_filters_are_ready_or_timeout) {
+            return nullptr;
+        }
+        if (!_runtime_filters_are_ready_or_timeout()) {
+            return this;
+        }
+        return nullptr;
+    }
+    void* shared_state() override { return nullptr; }
+    void set_filter_blocked_by_fn(std::function<bool()> call_fn) {
+        _runtime_filters_are_ready_or_timeout = call_fn;
+    }
+
+protected:
+    std::function<bool()> _runtime_filters_are_ready_or_timeout;
+    const int _node_id;
 };
 
 class AndDependency final : public WriteDependency {
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index 7fa8a14993e..9377f5f2701 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -303,7 +303,7 @@ 
PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase*
                                                          RuntimeState* state)
         : _parent(parent),
           _state(state),
-          _finish_dependency(new FinishDependency(parent->operator_id(),
+          _finish_dependency(new FinishDependency(parent->operator_id(), 
parent->node_id(),
                                                   parent->get_name() + 
"_FINISH_DEPENDENCY")) {}
 
 PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, 
OperatorXBase* parent)
@@ -312,8 +312,11 @@ 
PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXB
           _peak_memory_usage_counter(nullptr),
           _parent(parent),
           _state(state),
-          _finish_dependency(new FinishDependency(parent->operator_id(),
-                                                  parent->get_name() + 
"_FINISH_DEPENDENCY")) {}
+          _finish_dependency(new FinishDependency(parent->operator_id(), 
parent->node_id(),
+                                                  parent->get_name() + 
"_FINISH_DEPENDENCY")) {
+    _filter_dependency = std::make_unique<FilterDependency>(
+            parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FILTER_DEPENDENCY");
+}
 
 template <typename DependencyType>
 Status PipelineXLocalState<DependencyType>::init(RuntimeState* state, 
LocalStateInfo& info) {
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index a6ef8a81acb..bfad246d3e6 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -99,6 +99,9 @@ public:
 
     virtual Dependency* dependency() { return nullptr; }
 
+    FinishDependency* finishdependency() { return _finish_dependency.get(); }
+    FilterDependency* filterdependency() { return _filter_dependency.get(); }
+
 protected:
     friend class OperatorXBase;
 
@@ -130,6 +133,7 @@ protected:
     bool _closed = false;
     vectorized::Block _origin_block;
     std::shared_ptr<FinishDependency> _finish_dependency;
+    std::unique_ptr<FilterDependency> _filter_dependency;
 };
 
 class OperatorXBase : public OperatorBase {
@@ -202,12 +206,8 @@ public:
         return true;
     }
 
-    virtual bool runtime_filters_are_ready_or_timeout(RuntimeState* state) 
const { return true; }
-
     Status close(RuntimeState* state) override;
 
-    virtual FinishDependency* finish_blocked_by(RuntimeState* state) const { 
return nullptr; }
-
     [[nodiscard]] virtual const RowDescriptor& intermediate_row_desc() const {
         return _row_descriptor;
     }
@@ -368,6 +368,8 @@ public:
 
     virtual WriteDependency* dependency() { return nullptr; }
 
+    FinishDependency* finishdependency() { return _finish_dependency.get(); }
+
 protected:
     DataSinkOperatorXBase* _parent;
     RuntimeState* _state;
@@ -469,8 +471,6 @@ public:
         return false;
     }
 
-    virtual FinishDependency* finish_blocked_by(RuntimeState* state) const { 
return nullptr; }
-
     [[nodiscard]] std::string debug_string() const override { return ""; }
 
     [[nodiscard]] virtual std::string debug_string(int indentation_level) 
const;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 30be4f6240d..9ad91341ce5 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -108,6 +108,8 @@ Status PipelineXTask::extract_dependencies() {
         auto* dep = local_state->dependency();
         DCHECK(dep != nullptr);
         _read_dependencies.push_back(dep);
+        auto* fin_dep = local_state->finishdependency();
+        _finish_dependencies.push_back(fin_dep);
     }
     {
         auto result = 
_state->get_sink_local_state_result(_sink->operator_id());
@@ -118,6 +120,15 @@ Status PipelineXTask::extract_dependencies() {
         auto* dep = local_state->dependency();
         DCHECK(dep != nullptr);
         _write_dependencies = dep;
+        auto* fin_dep = local_state->finishdependency();
+        _finish_dependencies.push_back(fin_dep);
+    }
+    {
+        auto result = _state->get_local_state_result(_source->operator_id());
+        if (!result) {
+            return result.error();
+        }
+        _filter_dependency = result.value()->filterdependency();
     }
     return Status::OK();
 }
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 0120cfa2e54..5155c2fe766 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -78,6 +78,7 @@ public:
             auto* dep = op_dep->read_blocked_by();
             if (dep != nullptr) {
                 dep->start_read_watcher();
+                push_blocked_task_to_dependency(dep);
                 return false;
             }
         }
@@ -85,13 +86,19 @@ public:
     }
 
     bool runtime_filters_are_ready_or_timeout() override {
-        return _source->runtime_filters_are_ready_or_timeout(_state);
+        auto* dep = _filter_dependency->filter_blocked_by();
+        if (dep != nullptr) {
+            push_blocked_task_to_dependency(dep);
+            return false;
+        }
+        return true;
     }
 
     bool sink_can_write() override {
         auto* dep = _write_dependencies->write_blocked_by();
         if (dep != nullptr) {
             dep->start_write_watcher();
+            push_blocked_task_to_dependency(dep);
             return false;
         }
         return true;
@@ -102,18 +109,14 @@ public:
     std::string debug_string() override;
 
     bool is_pending_finish() override {
-        for (auto& op : _operators) {
-            auto dep = op->finish_blocked_by(_state);
+        for (auto* fin_dep : _finish_dependencies) {
+            auto* dep = fin_dep->finish_blocked_by();
             if (dep != nullptr) {
                 dep->start_finish_watcher();
+                push_blocked_task_to_dependency(dep);
                 return true;
             }
         }
-        auto dep = _sink->finish_blocked_by(_state);
-        if (dep != nullptr) {
-            dep->start_finish_watcher();
-            return true;
-        }
         return false;
     }
 
@@ -146,6 +149,8 @@ public:
 
     Status extract_dependencies();
 
+    void push_blocked_task_to_dependency(Dependency* dep) {}
+
 private:
     void set_close_pipeline_time() override {}
     void _init_profile() override;
@@ -160,6 +165,8 @@ private:
 
     std::vector<Dependency*> _read_dependencies;
     WriteDependency* _write_dependencies;
+    std::vector<FinishDependency*> _finish_dependencies;
+    FilterDependency* _filter_dependency;
 
     DependencyMap _upstream_dependency;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to