This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 840f3b6439f [pipelineX](dependency) Wake by task by read dependency 
(#27260)
840f3b6439f is described below

commit 840f3b6439fbb65b873a25cbda63f80f00c6fc8d
Author: Gabriel <[email protected]>
AuthorDate: Mon Nov 20 22:00:59 2023 +0800

    [pipelineX](dependency) Wake by task by read dependency (#27260)
---
 be/src/http/action/pipeline_task_action.h       |  1 -
 be/src/pipeline/exec/exchange_sink_operator.h   | 16 -----------
 be/src/pipeline/exec/exchange_source_operator.h |  8 +++---
 be/src/pipeline/exec/hashjoin_build_sink.h      |  2 --
 be/src/pipeline/exec/result_sink_operator.h     |  2 --
 be/src/pipeline/exec/scan_operator.h            |  4 +--
 be/src/pipeline/exec/set_sink_operator.cpp      |  1 +
 be/src/pipeline/pipeline_task.h                 |  3 ++-
 be/src/pipeline/pipeline_x/dependency.cpp       | 23 +++++++++++-----
 be/src/pipeline/pipeline_x/dependency.h         | 36 +++++--------------------
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp  |  2 +-
 be/src/pipeline/pipeline_x/pipeline_x_task.h    | 17 +++++++-----
 be/src/pipeline/task_scheduler.cpp              |  7 ++++-
 13 files changed, 48 insertions(+), 74 deletions(-)

diff --git a/be/src/http/action/pipeline_task_action.h 
b/be/src/http/action/pipeline_task_action.h
index 488a1148a53..00c1c062cad 100644
--- a/be/src/http/action/pipeline_task_action.h
+++ b/be/src/http/action/pipeline_task_action.h
@@ -23,7 +23,6 @@ namespace doris {
 
 class HttpRequest;
 
-// Get BE health state from http API.
 class PipelineTaskAction : public HttpHandler {
 public:
     PipelineTaskAction() = default;
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 48189c8eb4a..566845639cb 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -70,8 +70,6 @@ public:
     ExchangeSinkQueueDependency(int id, int node_id)
             : WriteDependency(id, node_id, "ResultQueueDependency") {}
     ~ExchangeSinkQueueDependency() override = default;
-
-    void* shared_state() override { return nullptr; }
 };
 
 class BroadcastDependency final : public WriteDependency {
@@ -95,19 +93,6 @@ public:
         }
     }
 
-    void* shared_state() override {
-        throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not 
reach here!");
-        return nullptr;
-    }
-
-    void set_ready_for_write() override {
-        throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not 
reach here!");
-    }
-
-    void block_writing() override {
-        throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not 
reach here!");
-    }
-
     int available_blocks() const { return _available_block; }
 
 private:
@@ -138,7 +123,6 @@ public:
     LocalExchangeChannelDependency(int id, int node_id)
             : WriteDependency(id, node_id, "LocalExchangeChannelDependency") {}
     ~LocalExchangeChannelDependency() override = default;
-    void* shared_state() override { return nullptr; }
     // TODO(gabriel): blocked by memory
 };
 
diff --git a/be/src/pipeline/exec/exchange_source_operator.h 
b/be/src/pipeline/exec/exchange_source_operator.h
index 5d754747bee..c00319c1e9d 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -56,15 +56,13 @@ public:
     ExchangeDataDependency(int id, int node_id,
                            vectorized::VDataStreamRecvr::SenderQueue* 
sender_queue)
             : Dependency(id, node_id, "DataDependency"), _always_done(false) {}
-    void* shared_state() override { return nullptr; }
 
     void set_always_done() {
-        _always_done = true;
-        if (_ready_for_read) {
+        if (_always_done) {
             return;
         }
-        _read_dependency_watcher.stop();
-        _ready_for_read = true;
+        _always_done = true;
+        Dependency::set_ready_for_read();
     }
 
     void block_reading() override {
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index a1815ca5118..9771ae43e8e 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -52,8 +52,6 @@ public:
     SharedHashTableDependency(int id, int node_id)
             : WriteDependency(id, node_id, "SharedHashTableDependency") {}
     ~SharedHashTableDependency() override = default;
-
-    void* shared_state() override { return nullptr; }
 };
 
 class HashJoinBuildSinkLocalState final
diff --git a/be/src/pipeline/exec/result_sink_operator.h 
b/be/src/pipeline/exec/result_sink_operator.h
index 53e0292f68f..9bda54e79b6 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -49,8 +49,6 @@ public:
     ResultSinkDependency(int id, int node_id)
             : WriteDependency(id, node_id, "ResultSinkDependency") {}
     ~ResultSinkDependency() override = default;
-
-    void* shared_state() override { return nullptr; }
 };
 
 class ResultSinkLocalState final : public PipelineXSinkLocalState<> {
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index f058225580d..f4dd2c45d51 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -62,8 +62,6 @@ public:
     ScanDependency(int id, int node_id)
             : Dependency(id, node_id, "ScanDependency"), _scanner_ctx(nullptr) 
{}
 
-    void* shared_state() override { return nullptr; }
-
     // TODO(gabriel):
     [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override {
         if (_scanner_ctx && _scanner_ctx->get_num_running_scanners() == 0 &&
@@ -73,6 +71,8 @@ public:
         return Dependency::read_blocked_by(task);
     }
 
+    bool push_to_blocking_queue() override { return true; }
+
     void block_reading() override {
         if (_eos) {
             return;
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp 
b/be/src/pipeline/exec/set_sink_operator.cpp
index 52bd8aa3cf7..4c92c37e48f 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -165,6 +165,7 @@ Status SetSinkLocalState<is_intersect>::init(RuntimeState* 
state, LocalSinkState
     _build_timer = ADD_TIMER(_profile, "BuildTime");
 
     Parent& parent = _parent->cast<Parent>();
+    _dependency->set_cur_child_id(parent._cur_child_id);
     _child_exprs.resize(parent._child_exprs.size());
     for (size_t i = 0; i < _child_exprs.size(); i++) {
         RETURN_IF_ERROR(parent._child_exprs[i]->clone(state, _child_exprs[i]));
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 947f3418802..a302dc7c34d 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -133,7 +133,7 @@ public:
         _wait_worker_watcher.start();
     }
     void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
-    PipelineTaskState get_state() { return _cur_state; }
+    PipelineTaskState get_state() const { return _cur_state; }
     void set_state(PipelineTaskState state);
 
     virtual bool is_pending_finish() {
@@ -154,6 +154,7 @@ public:
     }
 
     virtual bool source_can_read() { return _source->can_read() || 
_pipeline->_always_can_read; }
+    virtual bool push_blocked_task_to_queue() const { return true; }
 
     virtual bool runtime_filters_are_ready_or_timeout() {
         return _source->runtime_filters_are_ready_or_timeout();
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp 
b/be/src/pipeline/pipeline_x/dependency.cpp
index 2e43007dee7..f21a48dc5fc 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -29,10 +29,8 @@
 namespace doris::pipeline {
 
 void Dependency::add_block_task(PipelineXTask* task) {
-    // TODO(gabriel): support read dependency
-    if (!_blocked_task.empty() && _blocked_task[_blocked_task.size() - 1] == 
task) {
-        return;
-    }
+    DCHECK(_blocked_task.empty() || _blocked_task[_blocked_task.size() - 1] != 
task)
+            << "Duplicate task: " << task->debug_string();
     _blocked_task.push_back(task);
 }
 
@@ -74,6 +72,17 @@ void Dependency::set_ready_for_read() {
         _ready_for_read = true;
         local_block_task.swap(_blocked_task);
     }
+    for (auto* task : local_block_task) {
+        task->try_wake_up(this);
+    }
+}
+
+void SetDependency::set_ready_for_read() {
+    if (_child_idx == 0) {
+        WriteDependency::set_ready_for_read();
+    } else {
+        
_set_state->probe_finished_children_dependency[0]->set_ready_for_read();
+    }
 }
 
 void WriteDependency::set_ready_for_write() {
@@ -84,7 +93,7 @@ void WriteDependency::set_ready_for_write() {
 
     std::vector<PipelineXTask*> local_block_task {};
     {
-        std::unique_lock<std::mutex> lc(_task_lock);
+        std::unique_lock<std::mutex> lc(_write_task_lock);
         if (_ready_for_write) {
             return;
         }
@@ -133,7 +142,7 @@ Dependency* Dependency::read_blocked_by(PipelineXTask* 
task) {
 
     std::unique_lock<std::mutex> lc(_task_lock);
     auto ready_for_read = _ready_for_read.load();
-    if (!ready_for_read && task) {
+    if (!ready_for_read && !push_to_blocking_queue() && task) {
         add_block_task(task);
     }
     return ready_for_read ? nullptr : this;
@@ -162,7 +171,7 @@ FinishDependency* 
FinishDependency::finish_blocked_by(PipelineXTask* task) {
 }
 
 WriteDependency* WriteDependency::write_blocked_by(PipelineXTask* task) {
-    std::unique_lock<std::mutex> lc(_task_lock);
+    std::unique_lock<std::mutex> lc(_write_task_lock);
     const auto ready_for_write = _ready_for_write.load();
     if (!ready_for_write && task) {
         add_write_block_task(task);
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index f6a37766525..937aa01bd86 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -57,9 +57,10 @@ public:
 
     [[nodiscard]] int id() const { return _id; }
     [[nodiscard]] virtual std::string name() const { return _name; }
-    virtual void* shared_state() = 0;
+    virtual void* shared_state() { return nullptr; }
     virtual std::string debug_string(int indentation_level = 0);
     virtual bool is_write_dependency() { return false; }
+    virtual bool push_to_blocking_queue() { return false; }
 
     // Start the watcher. We use it to count how long this dependency block 
the current pipeline task.
     void start_read_watcher() {
@@ -145,6 +146,7 @@ public:
 protected:
     friend class Dependency;
     std::atomic<bool> _ready_for_write {true};
+    std::mutex _write_task_lock;
     MonotonicStopWatch _write_dependency_watcher;
 
 private:
@@ -172,7 +174,6 @@ public:
 
     void set_ready_to_finish();
 
-    void* shared_state() override { return nullptr; }
     std::string debug_string(int indentation_level = 0) override;
 
     void add_block_task(PipelineXTask* task) override;
@@ -222,7 +223,6 @@ public:
     RuntimeFilterDependency(int id, int node_id, std::string name)
             : Dependency(id, node_id, name) {}
     RuntimeFilterDependency* filter_blocked_by(PipelineXTask* task);
-    void* shared_state() override { return nullptr; }
     void add_filters(IRuntimeFilter* runtime_filter);
     void sub_filters();
     void set_blocked_by_rf(std::shared_ptr<std::atomic_bool> blocked_by_rf) {
@@ -255,8 +255,6 @@ public:
         return fmt::to_string(debug_string_buffer);
     }
 
-    void* shared_state() override { return nullptr; }
-
     std::string debug_string(int indentation_level = 0) override;
 
     [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override {
@@ -284,7 +282,6 @@ struct FakeDependency final : public WriteDependency {
 public:
     FakeDependency(int id, int node_id) : WriteDependency(id, node_id, 
"FakeDependency") {}
     using SharedState = FakeSharedState;
-    void* shared_state() override { return nullptr; }
     [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override { 
return nullptr; }
     [[nodiscard]] WriteDependency* write_blocked_by(PipelineXTask* task) 
override {
         return nullptr;
@@ -465,6 +462,7 @@ public:
         }
         return this;
     }
+    bool push_to_blocking_queue() override { return true; }
     void block_reading() override {}
     void block_writing() override {}
 
@@ -667,7 +665,6 @@ public:
     AsyncWriterDependency(int id, int node_id)
             : WriteDependency(id, node_id, "AsyncWriterDependency") {}
     ~AsyncWriterDependency() override = default;
-    void* shared_state() override { return nullptr; }
 };
 
 class SetDependency;
@@ -769,30 +766,10 @@ public:
 
     void set_shared_state(std::shared_ptr<SetSharedState> set_state) { 
_set_state = set_state; }
 
-    // Which dependency current pipeline task is blocked by. `nullptr` if this 
dependency is ready.
-    [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override {
-        if (config::enable_fuzzy_mode && !_set_state->ready_for_read &&
-            _should_log(_read_dependency_watcher.elapsed_time())) {
-            LOG(WARNING) << "========Dependency may be blocked by some 
reasons: " << name() << " "
-                         << id() << " " << _node_id << " block tasks: " << 
_blocked_task.size();
-        }
-        std::unique_lock<std::mutex> lc(_task_lock);
-        if (!_set_state->ready_for_read && task) {
-            add_block_task(task);
-        }
-        return _set_state->ready_for_read ? nullptr : this;
-    }
-
-    // Notify downstream pipeline tasks this dependency is ready.
-    void set_ready_for_read() override {
-        if (_set_state->ready_for_read) {
-            return;
-        }
-        _read_dependency_watcher.stop();
-        _set_state->ready_for_read = true;
-    }
+    void set_ready_for_read() override;
 
     void set_cur_child_id(int id) {
+        _child_idx = id;
         _set_state->probe_finished_children_dependency[id] = this;
         if (id != 0) {
             block_writing();
@@ -801,6 +778,7 @@ public:
 
 private:
     std::shared_ptr<SetSharedState> _set_state;
+    int _child_idx {0};
 };
 
 using PartitionedBlock = std::pair<std::shared_ptr<vectorized::Block>,
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 7295f38a124..ade718c1498 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -186,7 +186,7 @@ Status PipelineXTask::_open() {
                 _blocked_dep = _filter_dependency->filter_blocked_by(this);
                 if (_blocked_dep) {
                     set_state(PipelineTaskState::BLOCKED_FOR_RF);
-                    set_use_blocking_queue(false);
+                    set_use_blocking_queue();
                     RETURN_IF_ERROR(st);
                 } else if (i == 1) {
                     CHECK(false) << debug_string();
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 04c5ddc1974..f920bf219b0 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -127,7 +127,7 @@ public:
 
     OperatorXs operatorXs() { return _operators; }
 
-    bool push_blocked_task_to_queue() {
+    bool push_blocked_task_to_queue() const override {
         /**
          * Push task into blocking queue if:
          * 1. `_use_blocking_queue` is true.
@@ -135,15 +135,19 @@ public:
          */
         return _use_blocking_queue || get_state() == 
PipelineTaskState::BLOCKED_FOR_DEPENDENCY;
     }
-    void set_use_blocking_queue(bool use_blocking_queue) {
-        _use_blocking_queue = use_blocking_queue;
+    void set_use_blocking_queue() {
+        if (_blocked_dep->push_to_blocking_queue()) {
+            _use_blocking_queue = true;
+            return;
+        }
+        _use_blocking_queue = false;
     }
 
 private:
     Dependency* _write_blocked_dependency() {
         _blocked_dep = _write_dependencies->write_blocked_by(this);
         if (_blocked_dep != nullptr) {
-            set_use_blocking_queue(false);
+            set_use_blocking_queue();
             static_cast<WriteDependency*>(_blocked_dep)->start_write_watcher();
             return _blocked_dep;
         }
@@ -154,7 +158,7 @@ private:
         for (auto* fin_dep : _finish_dependencies) {
             _blocked_dep = fin_dep->finish_blocked_by(this);
             if (_blocked_dep != nullptr) {
-                set_use_blocking_queue(false);
+                set_use_blocking_queue();
                 
static_cast<FinishDependency*>(_blocked_dep)->start_finish_watcher();
                 return _blocked_dep;
             }
@@ -166,8 +170,7 @@ private:
         for (auto* op_dep : _read_dependencies) {
             _blocked_dep = op_dep->read_blocked_by(this);
             if (_blocked_dep != nullptr) {
-                // TODO(gabriel):
-                set_use_blocking_queue(true);
+                set_use_blocking_queue();
                 _blocked_dep->start_read_watcher();
                 return _blocked_dep;
             }
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 9ce2711c27a..8885a0601a1 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -77,7 +77,7 @@ Status BlockedTaskScheduler::add_blocked_task(PipelineTask* 
task) {
         return Status::InternalError("BlockedTaskScheduler shutdown");
     }
     std::unique_lock<std::mutex> lock(_task_mutex);
-    if (task->is_pipelineX() && 
!static_cast<PipelineXTask*>(task)->push_blocked_task_to_queue()) {
+    if (!static_cast<PipelineXTask*>(task)->push_blocked_task_to_queue()) {
         // put this task into current dependency's blocking queue and wait for 
event notification
         // instead of using a separate BlockedTaskScheduler.
         return Status::OK();
@@ -142,6 +142,11 @@ void BlockedTaskScheduler::_schedule() {
             } else if (state == PipelineTaskState::BLOCKED_FOR_SOURCE) {
                 if (task->source_can_read()) {
                     _make_task_run(local_blocked_tasks, iter);
+                } else if (!task->push_blocked_task_to_queue()) {
+                    // TODO(gabriel): This condition means this task is in 
blocking queue now and we should
+                    //  remove it because this new dependency should not be 
put into blocking queue. We
+                    //  will delete this strange behavior after ScanDependency 
and UnionDependency done.
+                    local_blocked_tasks.erase(iter++);
                 } else {
                     iter++;
                 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to