This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 23e7423748 [pipeline](refactor) refactor pipeline task schedule logics
(#22028)
23e7423748 is described below
commit 23e742374840338ba0df1b3802f66c1ee498a9d3
Author: Gabriel <[email protected]>
AuthorDate: Tue Jul 25 17:18:26 2023 +0800
[pipeline](refactor) refactor pipeline task schedule logics (#22028)
---
be/src/exec/data_sink.h | 4 +++-
be/src/pipeline/exec/operator.h | 3 +--
be/src/pipeline/pipeline_task.cpp | 9 +++++++--
be/src/pipeline/pipeline_task.h | 2 ++
be/src/pipeline/task_scheduler.cpp | 30 +++++++++++++++++-----------
be/src/vec/exec/scan/scanner_context.cpp | 1 +
be/src/vec/exec/scan/vscan_node.cpp | 4 +++-
be/src/vec/sink/vdata_stream_sender.cpp | 34 ++++++++++++++++++++++----------
be/src/vec/sink/vdata_stream_sender.h | 2 +-
be/src/vec/sink/vtablet_sink.cpp | 8 +++-----
be/src/vec/sink/vtablet_sink.h | 2 +-
11 files changed, 64 insertions(+), 35 deletions(-)
diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index cf7b774fcd..fd59cd1d27 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -67,7 +67,9 @@ public:
return Status::NotSupported("Not support send block");
}
- virtual void try_close(RuntimeState* state, Status exec_status) {}
+ [[nodiscard]] virtual Status try_close(RuntimeState* state, Status
exec_status) {
+ return Status::OK();
+ }
virtual bool is_close_done() { return true; }
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 12a117b4c4..acf55cb7bc 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -285,8 +285,7 @@ public:
}
Status try_close(RuntimeState* state) override {
- _sink->try_close(state, state->query_status());
- return Status::OK();
+ return _sink->try_close(state, state->query_status());
}
[[nodiscard]] bool is_pending_finish() const override { return
!_sink->is_close_done(); }
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 411d9578ac..0e2041c488 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -284,8 +284,13 @@ Status PipelineTask::finalize() {
}
Status PipelineTask::try_close() {
- _sink->try_close(_state);
- return _source->try_close(_state);
+ if (_try_close_flag) {
+ return Status::OK();
+ }
+ _try_close_flag = true;
+ Status status1 = _sink->try_close(_state);
+ Status status2 = _source->try_close(_state);
+ return status1.ok() ? status2 : status1;
}
Status PipelineTask::close() {
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 28ebf285e4..5cba2ef96e 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -283,6 +283,8 @@ private:
int _queue_level = 0;
int _core_id = 0;
+ bool _try_close_flag = false;
+
RuntimeProfile* _parent_profile;
std::unique_ptr<RuntimeProfile> _task_profile;
RuntimeProfile::Counter* _task_cpu_timer;
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 0d725950d4..4f9e90f80e 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -327,28 +327,34 @@ void TaskScheduler::_do_work(size_t index) {
}
void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState
state) {
- // state only should be CANCELED or FINISHED
- task->try_close();
if (task->is_pending_finish()) {
task->set_state(PipelineTaskState::PENDING_FINISH);
_blocked_task_scheduler->add_blocked_task(task);
+ return;
+ }
+ auto status = task->try_close();
+ if (!status.ok() && state != PipelineTaskState::CANCELED) {
+ // Call `close` if `try_close` failed to make sure allocated resources
are released
+ task->close();
+
task->fragment_context()->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
+ status.to_string());
+ state = PipelineTaskState::CANCELED;
+ } else if (task->is_pending_finish()) {
+ task->set_state(PipelineTaskState::PENDING_FINISH);
+ _blocked_task_scheduler->add_blocked_task(task);
+ return;
} else {
- auto status = task->close();
+ status = task->close();
if (!status.ok() && state != PipelineTaskState::CANCELED) {
task->fragment_context()->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
status.to_string());
state = PipelineTaskState::CANCELED;
- } else {
- if (task->is_pending_finish()) {
- task->set_state(PipelineTaskState::PENDING_FINISH);
- _blocked_task_scheduler->add_blocked_task(task);
- return;
- }
}
- task->set_state(state);
- task->set_close_pipeline_time();
- task->fragment_context()->close_a_pipeline();
+ DCHECK(!task->is_pending_finish()) << task->debug_string();
}
+ task->set_state(state);
+ task->set_close_pipeline_time();
+ task->fragment_context()->close_a_pipeline();
}
void TaskScheduler::shutdown() {
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index 2f535e4947..6570b5be65 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -299,6 +299,7 @@ void ScannerContext::clear_and_join(VScanNode* node,
RuntimeState* state) {
if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) {
break;
} else {
+ DCHECK(!state->enable_pipeline_exec());
while (!(_num_running_scanners == 0 && _num_scheduling_ctx == 0)) {
_ctx_finish_cv.wait(l);
}
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index 4c0dd28b4e..66bfe6386d 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -323,11 +323,13 @@ Status VScanNode::close(RuntimeState* state) {
void VScanNode::release_resource(RuntimeState* state) {
if (_scanner_ctx.get()) {
- if (!state->enable_pipeline_exec() || _should_create_scanner) {
+ if (!state->enable_pipeline_exec()) {
// stop and wait the scanner scheduler to be done
// _scanner_ctx may not be created for some short circuit case.
_scanner_ctx->set_should_stop();
_scanner_ctx->clear_and_join(this, state);
+ } else if (_should_create_scanner) {
+ _scanner_ctx->clear_and_join(this, state);
}
}
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index 97154c3e3f..29fb3446dc 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -672,11 +672,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block*
block, bool eos) {
return Status::OK();
}
-Status VDataStreamSender::close(RuntimeState* state, Status exec_status) {
- if (_closed) {
- return Status::OK();
- }
-
+Status VDataStreamSender::try_close(RuntimeState* state, Status exec_status) {
Status final_st = Status::OK();
for (int i = 0; i < _channels.size(); ++i) {
Status st = _channels[i]->close(state);
@@ -684,13 +680,31 @@ Status VDataStreamSender::close(RuntimeState* state,
Status exec_status) {
final_st = st;
}
}
- // wait all channels to finish
- for (int i = 0; i < _channels.size(); ++i) {
- Status st = _channels[i]->close_wait(state);
- if (!st.ok() && final_st.ok()) {
- final_st = st;
+ return final_st;
+}
+
+Status VDataStreamSender::close(RuntimeState* state, Status exec_status) {
+ if (_closed) {
+ return Status::OK();
+ }
+
+ Status final_st = Status::OK();
+ if (!state->enable_pipeline_exec()) {
+ for (int i = 0; i < _channels.size(); ++i) {
+ Status st = _channels[i]->close(state);
+ if (!st.ok() && final_st.ok()) {
+ final_st = st;
+ }
+ }
+ // wait all channels to finish
+ for (int i = 0; i < _channels.size(); ++i) {
+ Status st = _channels[i]->close_wait(state);
+ if (!st.ok() && final_st.ok()) {
+ final_st = st;
+ }
}
}
+
DataSink::close(state, exec_status);
return final_st;
}
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index 679557a92a..4dbe2625d9 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -92,7 +92,7 @@ public:
Status open(RuntimeState* state) override;
Status send(RuntimeState* state, Block* block, bool eos = false) override;
-
+ Status try_close(RuntimeState* state, Status exec_status) override;
Status close(RuntimeState* state, Status exec_status) override;
RuntimeProfile* profile() override { return _profile; }
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 79579cf2a0..e33f0f55a7 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -1320,11 +1320,7 @@ void VOlapTableSink::_cancel_all_channel(Status status) {
print_id(_load_id), _txn_id, status);
}
-void VOlapTableSink::try_close(RuntimeState* state, Status exec_status) {
- if (_try_close) {
- return;
- }
-
+Status VOlapTableSink::try_close(RuntimeState* state, Status exec_status) {
SCOPED_TIMER(_close_timer);
Status status = exec_status;
if (status.ok()) {
@@ -1357,6 +1353,8 @@ void VOlapTableSink::try_close(RuntimeState* state,
Status exec_status) {
_close_status = status;
_try_close = true;
}
+
+ return Status::OK();
}
bool VOlapTableSink::is_close_done() {
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index 38c9e7c325..1e5d6ea2d7 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -478,7 +478,7 @@ public:
Status open(RuntimeState* state) override;
- void try_close(RuntimeState* state, Status exec_status) override;
+ Status try_close(RuntimeState* state, Status exec_status) override;
// if true, all node channels rpc done, can start close().
bool is_close_done() override;
Status close(RuntimeState* state, Status close_status) override;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]