This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d41272421 [Opt](pipeline) Refactor the short circuit of join pipeline 
(#23639)
6d41272421 is described below

commit 6d412724210f228064d08483b18b7f3827618de7
Author: HappenLee <[email protected]>
AuthorDate: Wed Aug 30 18:44:14 2023 +0800

    [Opt](pipeline) Refactor the short circuit of join pipeline (#23639)
    
    * [Opt](pipeline) Refactor the short circuit of join pipeline
    
    * change core by cr
---
 be/src/pipeline/pipeline.cpp                  | 12 ++--
 be/src/pipeline/pipeline.h                    | 81 +++++++++++++++++++++------
 be/src/pipeline/pipeline_fragment_context.cpp |  8 +--
 be/src/pipeline/pipeline_task.cpp             |  5 +-
 be/src/pipeline/pipeline_task.h               | 46 ++-------------
 be/src/pipeline/pipeline_x/pipeline_x_task.h  | 17 ++----
 6 files changed, 85 insertions(+), 84 deletions(-)

diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 69eaba3fbb..3c988ed441 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -32,14 +32,14 @@ void Pipeline::_init_profile() {
     _pipeline_profile.reset(new RuntimeProfile(ss.str()));
 }
 
-Status Pipeline::build_operators(Operators& operators) {
+Status Pipeline::build_operators() {
     OperatorPtr pre;
     for (auto& operator_t : _operator_builders) {
         auto o = operator_t->build_operator();
         if (pre) {
             o->set_child(pre);
         }
-        operators.emplace_back(o);
+        _operators.emplace_back(o);
         pre = std::move(o);
     }
     return Status::OK();
@@ -54,17 +54,17 @@ Status Pipeline::add_operator(OperatorBuilderPtr& op) {
 }
 
 Status Pipeline::add_operator(OperatorXPtr& op) {
-    if (_operators.empty() && !op->is_source()) {
+    if (operatorXs.empty() && !op->is_source()) {
         return Status::InternalError("Should set source before other 
operator");
     }
-    _operators.emplace_back(op);
+    operatorXs.emplace_back(op);
     return Status::OK();
 }
 
 Status Pipeline::prepare(RuntimeState* state) {
     // TODO
-    RETURN_IF_ERROR(_operators.back()->prepare(state));
-    RETURN_IF_ERROR(_operators.back()->open(state));
+    RETURN_IF_ERROR(operatorXs.back()->prepare(state));
+    RETURN_IF_ERROR(operatorXs.back()->open(state));
     RETURN_IF_ERROR(_sink_x->prepare(state));
     RETURN_IF_ERROR(_sink_x->open(state));
     return Status::OK();
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 114f51071d..4553c1e212 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -45,31 +45,52 @@ using PipelineId = uint32_t;
 
 class Pipeline : public std::enable_shared_from_this<Pipeline> {
     friend class PipelineTask;
+    friend class PipelineXTask;
 
 public:
     Pipeline() = delete;
     explicit Pipeline(PipelineId pipeline_id, 
std::weak_ptr<PipelineFragmentContext> context)
-            : _complete_dependency(0), _pipeline_id(pipeline_id), 
_context(context) {
+            : _pipeline_id(pipeline_id), _context(context) {
         _init_profile();
     }
 
     void add_dependency(std::shared_ptr<Pipeline>& pipeline) {
-        pipeline->_parents.push_back(weak_from_this());
-        _dependencies.push_back(pipeline);
+        pipeline->_parents.push_back({_operator_builders.size(), 
weak_from_this()});
+        _dependencies.push_back({_operator_builders.size(), pipeline});
     }
 
     // If all dependencies are finished, this pipeline task should be 
scheduled.
     // e.g. Hash join probe task will be scheduled once Hash join build task 
is finished.
-    bool finish_one_dependency(int dependency_core_id) {
-        DCHECK(_complete_dependency < _dependencies.size());
-        bool finish = _complete_dependency.fetch_add(1) == 
_dependencies.size() - 1;
-        if (finish) {
+    void finish_one_dependency(int dep_opr, int dependency_core_id) {
+        std::lock_guard l(_depend_mutex);
+        if (!_operators.empty() && _operators[dep_opr - 
1]->can_terminate_early()) {
+            _always_can_read = true;
+            _always_can_write = (dep_opr == _operators.size());
+
+            for (int i = 0; i < _dependencies.size(); ++i) {
+                if (dep_opr == _dependencies[i].first) {
+                    _dependencies.erase(_dependencies.begin(), 
_dependencies.begin() + i + 1);
+                    break;
+                }
+            }
+        } else {
+            for (int i = 0; i < _dependencies.size(); ++i) {
+                if (dep_opr == _dependencies[i].first) {
+                    _dependencies.erase(_dependencies.begin() + i);
+                    break;
+                }
+            }
+        }
+
+        if (_dependencies.empty()) {
             _previous_schedule_id = dependency_core_id;
         }
-        return finish;
     }
 
-    bool has_dependency() { return _complete_dependency.load() < 
_dependencies.size(); }
+    bool has_dependency() {
+        std::lock_guard l(_depend_mutex);
+        return !_dependencies.empty();
+    }
 
     Status add_operator(OperatorBuilderPtr& op);
 
@@ -83,28 +104,28 @@ public:
 
     OperatorBuilderBase* sink() { return _sink.get(); }
     DataSinkOperatorX* sink_x() { return _sink_x.get(); }
-    OperatorXs& operator_xs() { return _operators; }
+    OperatorXs& operator_xs() { return operatorXs; }
     DataSinkOperatorXPtr sink_shared_pointer() { return _sink_x; }
 
-    Status build_operators(Operators&);
+    Status build_operators();
 
     RuntimeProfile* pipeline_profile() { return _pipeline_profile.get(); }
 
     const RowDescriptor& output_row_desc() const {
-        return _operators[_operators.size() - 1]->row_desc();
+        return operatorXs[operatorXs.size() - 1]->row_desc();
     }
 
     PipelineId id() const { return _pipeline_id; }
 
 private:
     void _init_profile();
-    std::atomic<uint32_t> _complete_dependency;
 
     OperatorBuilders _operator_builders; // left is _source, right is _root
     OperatorBuilderPtr _sink;            // put block to sink
 
-    std::vector<std::weak_ptr<Pipeline>> _parents;
-    std::vector<std::shared_ptr<Pipeline>> _dependencies;
+    std::mutex _depend_mutex;
+    std::vector<std::pair<int, std::weak_ptr<Pipeline>>> _parents;
+    std::vector<std::pair<int, std::shared_ptr<Pipeline>>> _dependencies;
 
     PipelineId _pipeline_id;
     std::weak_ptr<PipelineFragmentContext> _context;
@@ -114,10 +135,38 @@ private:
 
     // Operators for pipelineX. All pipeline tasks share operators from this.
     // [SourceOperator -> ... -> SinkOperator]
-    OperatorXs _operators;
+    OperatorXs operatorXs;
     DataSinkOperatorXPtr _sink_x;
 
     std::shared_ptr<ObjectPool> _obj_pool;
+
+    Operators _operators;
+    /**
+     * Consider the query plan below:
+     *
+     *      ExchangeSource     JoinBuild1
+     *            \              /
+     *         JoinProbe1 (Right Outer)    JoinBuild2
+     *                   \                   /
+     *                 JoinProbe2 (Right Outer)
+     *                          |
+     *                        Sink
+     *
+     * Assume JoinBuild1/JoinBuild2 outputs 0 rows, this pipeline task should 
not be blocked by ExchangeSource
+     * because we have a determined conclusion that JoinProbe1/JoinProbe2 will 
also output 0 rows.
+     *
+     * Assume JoinBuild2 outputs > 0 rows, this pipeline task may be blocked 
by Sink because JoinProbe2 will
+     * produce more data.
+     *
+     * Assume both JoinBuild2 outputs 0 rows this pipeline task should not be 
blocked by ExchangeSource
+     * and Sink because JoinProbe2 will always produce 0 rows and terminate 
early.
+     *
+     * In a nutshell, we should follow the rules:
+     * 1. if any operator in pipeline can terminate early, this task should 
never be blocked by source operator.
+     * 2. if the last operator (except sink) can terminate early, this task 
should never be blocked by sink operator.
+     */
+    bool _always_can_read = false;
+    bool _always_can_write = false;
 };
 
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index c8f73cbaa3..10a6518ff1 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -320,11 +320,9 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
         // TODO pipeline 1 need to add new interface for exec node and operator
         sink->init(request.fragment.output_sink);
 
-        Operators operators;
-        RETURN_IF_ERROR(pipeline->build_operators(operators));
-        auto task =
-                std::make_unique<PipelineTask>(pipeline, _total_tasks++, 
_runtime_state.get(),
-                                               operators, sink, this, 
pipeline->pipeline_profile());
+        RETURN_IF_ERROR(pipeline->build_operators());
+        auto task = std::make_unique<PipelineTask>(pipeline, _total_tasks++, 
_runtime_state.get(),
+                                                   sink, this, 
pipeline->pipeline_profile());
         sink->set_child(task->get_root());
         _tasks.emplace_back(std::move(task));
         _runtime_profile->add_child(pipeline->pipeline_profile(), true, 
nullptr);
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 3c9f82f987..035db53a27 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -41,8 +41,7 @@ class RuntimeState;
 namespace doris::pipeline {
 
 PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t index, 
RuntimeState* state,
-                           Operators& operators, OperatorPtr& sink,
-                           PipelineFragmentContext* fragment_context,
+                           OperatorPtr& sink, PipelineFragmentContext* 
fragment_context,
                            RuntimeProfile* parent_profile)
         : _index(index),
           _pipeline(pipeline),
@@ -53,7 +52,7 @@ PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t 
index, RuntimeState*
           _data_state(SourceState::DEPEND_ON_SOURCE),
           _fragment_context(fragment_context),
           _parent_profile(parent_profile),
-          _operators(operators),
+          _operators(pipeline->_operators),
           _source(_operators.front()),
           _root(_operators.back()),
           _sink(sink) {
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 8b407bbaeb..b7b730ad82 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -109,9 +109,8 @@ class PriorityTaskQueue;
 // The class do the pipeline task. Minest schdule union by task scheduler
 class PipelineTask {
 public:
-    PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state, 
Operators& operators,
-                 OperatorPtr& sink, PipelineFragmentContext* fragment_context,
-                 RuntimeProfile* parent_profile);
+    PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state, 
OperatorPtr& sink,
+                 PipelineFragmentContext* fragment_context, RuntimeProfile* 
parent_profile);
 
     PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state,
                  PipelineFragmentContext* fragment_context, RuntimeProfile* 
parent_profile);
@@ -155,13 +154,13 @@ public:
         return false;
     }
 
-    virtual bool source_can_read() { return _source->can_read() || 
_ignore_blocking_source(); }
+    virtual bool source_can_read() { return _source->can_read() || 
_pipeline->_always_can_read; }
 
     virtual bool runtime_filters_are_ready_or_timeout() {
         return _source->runtime_filters_are_ready_or_timeout();
     }
 
-    virtual bool sink_can_write() { return _sink->can_write() || 
_ignore_blocking_sink(); }
+    virtual bool sink_can_write() { return _sink->can_write() || 
_pipeline->_always_can_write; }
 
     virtual Status finalize();
 
@@ -251,7 +250,7 @@ public:
 protected:
     void _finish_p_dependency() {
         for (const auto& p : _pipeline->_parents) {
-            p.lock()->finish_one_dependency(_previous_schedule_id);
+            p.second.lock()->finish_one_dependency(p.first, 
_previous_schedule_id);
         }
     }
 
@@ -348,41 +347,6 @@ protected:
     RuntimeProfile::Counter* _pip_task_total_timer;
 
 private:
-    /**
-     * Consider the query plan below:
-     *
-     *      ExchangeSource     JoinBuild1
-     *            \              /
-     *         JoinProbe1 (Right Outer)    JoinBuild2
-     *                   \                   /
-     *                 JoinProbe2 (Right Outer)
-     *                          |
-     *                        Sink
-     *
-     * Assume JoinBuild1/JoinBuild2 outputs 0 rows, this pipeline task should 
not be blocked by ExchangeSource
-     * because we have a determined conclusion that JoinProbe1/JoinProbe2 will 
also output 0 rows.
-     *
-     * Assume JoinBuild2 outputs > 0 rows, this pipeline task may be blocked 
by Sink because JoinProbe2 will
-     * produce more data.
-     *
-     * Assume both JoinBuild2 outputs 0 rows this pipeline task should not be 
blocked by ExchangeSource
-     * and Sink because JoinProbe2 will always produce 0 rows and terminate 
early.
-     *
-     * In a nutshell, we should follow the rules:
-     * 1. if any operator in pipeline can terminate early, this task should 
never be blocked by source operator.
-     * 2. if the last operator (except sink) can terminate early, this task 
should never be blocked by sink operator.
-     */
-    [[nodiscard]] bool _ignore_blocking_sink() { return 
_root->can_terminate_early(); }
-
-    [[nodiscard]] bool _ignore_blocking_source() {
-        for (size_t i = 1; i < _operators.size(); i++) {
-            if (_operators[i]->can_terminate_early()) {
-                return true;
-            }
-        }
-        return false;
-    }
-
     Operators _operators; // left is _source, right is _root
     OperatorPtr _source;
     OperatorPtr _root;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 864709b4ed..9dcd18b469 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -67,14 +67,16 @@ public:
     Status close() override;
 
     bool source_can_read() override {
-        return _source->can_read(_state) || _ignore_blocking_source();
+        return _source->can_read(_state) || _pipeline->_always_can_read;
     }
 
     bool runtime_filters_are_ready_or_timeout() override {
         return _source->runtime_filters_are_ready_or_timeout();
     }
 
-    bool sink_can_write() override { return _sink->can_write(_state) || 
_ignore_blocking_sink(); }
+    bool sink_can_write() override {
+        return _sink->can_write(_state) || _pipeline->_always_can_write;
+    }
 
     Status finalize() override;
 
@@ -105,17 +107,6 @@ public:
     }
 
 private:
-    [[nodiscard]] bool _ignore_blocking_sink() { return 
_root->can_terminate_early(_state); }
-
-    [[nodiscard]] bool _ignore_blocking_source() {
-        for (size_t i = 1; i < _operators.size(); i++) {
-            if (_operators[i]->can_terminate_early(_state)) {
-                return true;
-            }
-        }
-        return false;
-    }
-
     using DependencyMap = std::map<int, DependencySPtr>;
     Status _open() override;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to