This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 840f3b6439f [pipelineX](dependency) Wake by task by read dependency
(#27260)
840f3b6439f is described below
commit 840f3b6439fbb65b873a25cbda63f80f00c6fc8d
Author: Gabriel <[email protected]>
AuthorDate: Mon Nov 20 22:00:59 2023 +0800
[pipelineX](dependency) Wake by task by read dependency (#27260)
---
be/src/http/action/pipeline_task_action.h | 1 -
be/src/pipeline/exec/exchange_sink_operator.h | 16 -----------
be/src/pipeline/exec/exchange_source_operator.h | 8 +++---
be/src/pipeline/exec/hashjoin_build_sink.h | 2 --
be/src/pipeline/exec/result_sink_operator.h | 2 --
be/src/pipeline/exec/scan_operator.h | 4 +--
be/src/pipeline/exec/set_sink_operator.cpp | 1 +
be/src/pipeline/pipeline_task.h | 3 ++-
be/src/pipeline/pipeline_x/dependency.cpp | 23 +++++++++++-----
be/src/pipeline/pipeline_x/dependency.h | 36 +++++--------------------
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 2 +-
be/src/pipeline/pipeline_x/pipeline_x_task.h | 17 +++++++-----
be/src/pipeline/task_scheduler.cpp | 7 ++++-
13 files changed, 48 insertions(+), 74 deletions(-)
diff --git a/be/src/http/action/pipeline_task_action.h
b/be/src/http/action/pipeline_task_action.h
index 488a1148a53..00c1c062cad 100644
--- a/be/src/http/action/pipeline_task_action.h
+++ b/be/src/http/action/pipeline_task_action.h
@@ -23,7 +23,6 @@ namespace doris {
class HttpRequest;
-// Get BE health state from http API.
class PipelineTaskAction : public HttpHandler {
public:
PipelineTaskAction() = default;
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 48189c8eb4a..566845639cb 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -70,8 +70,6 @@ public:
ExchangeSinkQueueDependency(int id, int node_id)
: WriteDependency(id, node_id, "ResultQueueDependency") {}
~ExchangeSinkQueueDependency() override = default;
-
- void* shared_state() override { return nullptr; }
};
class BroadcastDependency final : public WriteDependency {
@@ -95,19 +93,6 @@ public:
}
}
- void* shared_state() override {
- throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not
reach here!");
- return nullptr;
- }
-
- void set_ready_for_write() override {
- throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not
reach here!");
- }
-
- void block_writing() override {
- throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not
reach here!");
- }
-
int available_blocks() const { return _available_block; }
private:
@@ -138,7 +123,6 @@ public:
LocalExchangeChannelDependency(int id, int node_id)
: WriteDependency(id, node_id, "LocalExchangeChannelDependency") {}
~LocalExchangeChannelDependency() override = default;
- void* shared_state() override { return nullptr; }
// TODO(gabriel): blocked by memory
};
diff --git a/be/src/pipeline/exec/exchange_source_operator.h
b/be/src/pipeline/exec/exchange_source_operator.h
index 5d754747bee..c00319c1e9d 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -56,15 +56,13 @@ public:
ExchangeDataDependency(int id, int node_id,
vectorized::VDataStreamRecvr::SenderQueue*
sender_queue)
: Dependency(id, node_id, "DataDependency"), _always_done(false) {}
- void* shared_state() override { return nullptr; }
void set_always_done() {
- _always_done = true;
- if (_ready_for_read) {
+ if (_always_done) {
return;
}
- _read_dependency_watcher.stop();
- _ready_for_read = true;
+ _always_done = true;
+ Dependency::set_ready_for_read();
}
void block_reading() override {
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index a1815ca5118..9771ae43e8e 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -52,8 +52,6 @@ public:
SharedHashTableDependency(int id, int node_id)
: WriteDependency(id, node_id, "SharedHashTableDependency") {}
~SharedHashTableDependency() override = default;
-
- void* shared_state() override { return nullptr; }
};
class HashJoinBuildSinkLocalState final
diff --git a/be/src/pipeline/exec/result_sink_operator.h
b/be/src/pipeline/exec/result_sink_operator.h
index 53e0292f68f..9bda54e79b6 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -49,8 +49,6 @@ public:
ResultSinkDependency(int id, int node_id)
: WriteDependency(id, node_id, "ResultSinkDependency") {}
~ResultSinkDependency() override = default;
-
- void* shared_state() override { return nullptr; }
};
class ResultSinkLocalState final : public PipelineXSinkLocalState<> {
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index f058225580d..f4dd2c45d51 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -62,8 +62,6 @@ public:
ScanDependency(int id, int node_id)
: Dependency(id, node_id, "ScanDependency"), _scanner_ctx(nullptr)
{}
- void* shared_state() override { return nullptr; }
-
// TODO(gabriel):
[[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override {
if (_scanner_ctx && _scanner_ctx->get_num_running_scanners() == 0 &&
@@ -73,6 +71,8 @@ public:
return Dependency::read_blocked_by(task);
}
+ bool push_to_blocking_queue() override { return true; }
+
void block_reading() override {
if (_eos) {
return;
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp
b/be/src/pipeline/exec/set_sink_operator.cpp
index 52bd8aa3cf7..4c92c37e48f 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -165,6 +165,7 @@ Status SetSinkLocalState<is_intersect>::init(RuntimeState*
state, LocalSinkState
_build_timer = ADD_TIMER(_profile, "BuildTime");
Parent& parent = _parent->cast<Parent>();
+ _dependency->set_cur_child_id(parent._cur_child_id);
_child_exprs.resize(parent._child_exprs.size());
for (size_t i = 0; i < _child_exprs.size(); i++) {
RETURN_IF_ERROR(parent._child_exprs[i]->clone(state, _child_exprs[i]));
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 947f3418802..a302dc7c34d 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -133,7 +133,7 @@ public:
_wait_worker_watcher.start();
}
void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
- PipelineTaskState get_state() { return _cur_state; }
+ PipelineTaskState get_state() const { return _cur_state; }
void set_state(PipelineTaskState state);
virtual bool is_pending_finish() {
@@ -154,6 +154,7 @@ public:
}
virtual bool source_can_read() { return _source->can_read() ||
_pipeline->_always_can_read; }
+ virtual bool push_blocked_task_to_queue() const { return true; }
virtual bool runtime_filters_are_ready_or_timeout() {
return _source->runtime_filters_are_ready_or_timeout();
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp
b/be/src/pipeline/pipeline_x/dependency.cpp
index 2e43007dee7..f21a48dc5fc 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -29,10 +29,8 @@
namespace doris::pipeline {
void Dependency::add_block_task(PipelineXTask* task) {
- // TODO(gabriel): support read dependency
- if (!_blocked_task.empty() && _blocked_task[_blocked_task.size() - 1] ==
task) {
- return;
- }
+ DCHECK(_blocked_task.empty() || _blocked_task[_blocked_task.size() - 1] !=
task)
+ << "Duplicate task: " << task->debug_string();
_blocked_task.push_back(task);
}
@@ -74,6 +72,17 @@ void Dependency::set_ready_for_read() {
_ready_for_read = true;
local_block_task.swap(_blocked_task);
}
+ for (auto* task : local_block_task) {
+ task->try_wake_up(this);
+ }
+}
+
+void SetDependency::set_ready_for_read() {
+ if (_child_idx == 0) {
+ WriteDependency::set_ready_for_read();
+ } else {
+
_set_state->probe_finished_children_dependency[0]->set_ready_for_read();
+ }
}
void WriteDependency::set_ready_for_write() {
@@ -84,7 +93,7 @@ void WriteDependency::set_ready_for_write() {
std::vector<PipelineXTask*> local_block_task {};
{
- std::unique_lock<std::mutex> lc(_task_lock);
+ std::unique_lock<std::mutex> lc(_write_task_lock);
if (_ready_for_write) {
return;
}
@@ -133,7 +142,7 @@ Dependency* Dependency::read_blocked_by(PipelineXTask*
task) {
std::unique_lock<std::mutex> lc(_task_lock);
auto ready_for_read = _ready_for_read.load();
- if (!ready_for_read && task) {
+ if (!ready_for_read && !push_to_blocking_queue() && task) {
add_block_task(task);
}
return ready_for_read ? nullptr : this;
@@ -162,7 +171,7 @@ FinishDependency*
FinishDependency::finish_blocked_by(PipelineXTask* task) {
}
WriteDependency* WriteDependency::write_blocked_by(PipelineXTask* task) {
- std::unique_lock<std::mutex> lc(_task_lock);
+ std::unique_lock<std::mutex> lc(_write_task_lock);
const auto ready_for_write = _ready_for_write.load();
if (!ready_for_write && task) {
add_write_block_task(task);
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index f6a37766525..937aa01bd86 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -57,9 +57,10 @@ public:
[[nodiscard]] int id() const { return _id; }
[[nodiscard]] virtual std::string name() const { return _name; }
- virtual void* shared_state() = 0;
+ virtual void* shared_state() { return nullptr; }
virtual std::string debug_string(int indentation_level = 0);
virtual bool is_write_dependency() { return false; }
+ virtual bool push_to_blocking_queue() { return false; }
// Start the watcher. We use it to count how long this dependency block
the current pipeline task.
void start_read_watcher() {
@@ -145,6 +146,7 @@ public:
protected:
friend class Dependency;
std::atomic<bool> _ready_for_write {true};
+ std::mutex _write_task_lock;
MonotonicStopWatch _write_dependency_watcher;
private:
@@ -172,7 +174,6 @@ public:
void set_ready_to_finish();
- void* shared_state() override { return nullptr; }
std::string debug_string(int indentation_level = 0) override;
void add_block_task(PipelineXTask* task) override;
@@ -222,7 +223,6 @@ public:
RuntimeFilterDependency(int id, int node_id, std::string name)
: Dependency(id, node_id, name) {}
RuntimeFilterDependency* filter_blocked_by(PipelineXTask* task);
- void* shared_state() override { return nullptr; }
void add_filters(IRuntimeFilter* runtime_filter);
void sub_filters();
void set_blocked_by_rf(std::shared_ptr<std::atomic_bool> blocked_by_rf) {
@@ -255,8 +255,6 @@ public:
return fmt::to_string(debug_string_buffer);
}
- void* shared_state() override { return nullptr; }
-
std::string debug_string(int indentation_level = 0) override;
[[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override {
@@ -284,7 +282,6 @@ struct FakeDependency final : public WriteDependency {
public:
FakeDependency(int id, int node_id) : WriteDependency(id, node_id,
"FakeDependency") {}
using SharedState = FakeSharedState;
- void* shared_state() override { return nullptr; }
[[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override {
return nullptr; }
[[nodiscard]] WriteDependency* write_blocked_by(PipelineXTask* task)
override {
return nullptr;
@@ -465,6 +462,7 @@ public:
}
return this;
}
+ bool push_to_blocking_queue() override { return true; }
void block_reading() override {}
void block_writing() override {}
@@ -667,7 +665,6 @@ public:
AsyncWriterDependency(int id, int node_id)
: WriteDependency(id, node_id, "AsyncWriterDependency") {}
~AsyncWriterDependency() override = default;
- void* shared_state() override { return nullptr; }
};
class SetDependency;
@@ -769,30 +766,10 @@ public:
void set_shared_state(std::shared_ptr<SetSharedState> set_state) {
_set_state = set_state; }
- // Which dependency current pipeline task is blocked by. `nullptr` if this
dependency is ready.
- [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override {
- if (config::enable_fuzzy_mode && !_set_state->ready_for_read &&
- _should_log(_read_dependency_watcher.elapsed_time())) {
- LOG(WARNING) << "========Dependency may be blocked by some
reasons: " << name() << " "
- << id() << " " << _node_id << " block tasks: " <<
_blocked_task.size();
- }
- std::unique_lock<std::mutex> lc(_task_lock);
- if (!_set_state->ready_for_read && task) {
- add_block_task(task);
- }
- return _set_state->ready_for_read ? nullptr : this;
- }
-
- // Notify downstream pipeline tasks this dependency is ready.
- void set_ready_for_read() override {
- if (_set_state->ready_for_read) {
- return;
- }
- _read_dependency_watcher.stop();
- _set_state->ready_for_read = true;
- }
+ void set_ready_for_read() override;
void set_cur_child_id(int id) {
+ _child_idx = id;
_set_state->probe_finished_children_dependency[id] = this;
if (id != 0) {
block_writing();
@@ -801,6 +778,7 @@ public:
private:
std::shared_ptr<SetSharedState> _set_state;
+ int _child_idx {0};
};
using PartitionedBlock = std::pair<std::shared_ptr<vectorized::Block>,
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 7295f38a124..ade718c1498 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -186,7 +186,7 @@ Status PipelineXTask::_open() {
_blocked_dep = _filter_dependency->filter_blocked_by(this);
if (_blocked_dep) {
set_state(PipelineTaskState::BLOCKED_FOR_RF);
- set_use_blocking_queue(false);
+ set_use_blocking_queue();
RETURN_IF_ERROR(st);
} else if (i == 1) {
CHECK(false) << debug_string();
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 04c5ddc1974..f920bf219b0 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -127,7 +127,7 @@ public:
OperatorXs operatorXs() { return _operators; }
- bool push_blocked_task_to_queue() {
+ bool push_blocked_task_to_queue() const override {
/**
* Push task into blocking queue if:
* 1. `_use_blocking_queue` is true.
@@ -135,15 +135,19 @@ public:
*/
return _use_blocking_queue || get_state() ==
PipelineTaskState::BLOCKED_FOR_DEPENDENCY;
}
- void set_use_blocking_queue(bool use_blocking_queue) {
- _use_blocking_queue = use_blocking_queue;
+ void set_use_blocking_queue() {
+ if (_blocked_dep->push_to_blocking_queue()) {
+ _use_blocking_queue = true;
+ return;
+ }
+ _use_blocking_queue = false;
}
private:
Dependency* _write_blocked_dependency() {
_blocked_dep = _write_dependencies->write_blocked_by(this);
if (_blocked_dep != nullptr) {
- set_use_blocking_queue(false);
+ set_use_blocking_queue();
static_cast<WriteDependency*>(_blocked_dep)->start_write_watcher();
return _blocked_dep;
}
@@ -154,7 +158,7 @@ private:
for (auto* fin_dep : _finish_dependencies) {
_blocked_dep = fin_dep->finish_blocked_by(this);
if (_blocked_dep != nullptr) {
- set_use_blocking_queue(false);
+ set_use_blocking_queue();
static_cast<FinishDependency*>(_blocked_dep)->start_finish_watcher();
return _blocked_dep;
}
@@ -166,8 +170,7 @@ private:
for (auto* op_dep : _read_dependencies) {
_blocked_dep = op_dep->read_blocked_by(this);
if (_blocked_dep != nullptr) {
- // TODO(gabriel):
- set_use_blocking_queue(true);
+ set_use_blocking_queue();
_blocked_dep->start_read_watcher();
return _blocked_dep;
}
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 9ce2711c27a..8885a0601a1 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -77,7 +77,7 @@ Status BlockedTaskScheduler::add_blocked_task(PipelineTask*
task) {
return Status::InternalError("BlockedTaskScheduler shutdown");
}
std::unique_lock<std::mutex> lock(_task_mutex);
- if (task->is_pipelineX() &&
!static_cast<PipelineXTask*>(task)->push_blocked_task_to_queue()) {
+ if (!static_cast<PipelineXTask*>(task)->push_blocked_task_to_queue()) {
// put this task into current dependency's blocking queue and wait for
event notification
// instead of using a separate BlockedTaskScheduler.
return Status::OK();
@@ -142,6 +142,11 @@ void BlockedTaskScheduler::_schedule() {
} else if (state == PipelineTaskState::BLOCKED_FOR_SOURCE) {
if (task->source_can_read()) {
_make_task_run(local_blocked_tasks, iter);
+ } else if (!task->push_blocked_task_to_queue()) {
+ // TODO(gabriel): This condition means this task is in
blocking queue now and we should
+ // remove it because this new dependency should not be
put into blocking queue. We
+ // will delete this strange behavior after ScanDependency
and UnionDependency done.
+ local_blocked_tasks.erase(iter++);
} else {
iter++;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]