This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fdc063101f9 [pipeline](refactor) remove pipeline task state (#34527)
fdc063101f9 is described below
commit fdc063101f92de82586b4b1e8cbda6a4c4f0b5cb
Author: Gabriel <[email protected]>
AuthorDate: Fri May 10 09:30:39 2024 +0800
[pipeline](refactor) remove pipeline task state (#34527)
---
be/src/pipeline/dependency.cpp | 9 ------
be/src/pipeline/dependency.h | 18 -----------
be/src/pipeline/pipeline_task.cpp | 43 +++++++++----------------
be/src/pipeline/pipeline_task.h | 64 +++-----------------------------------
be/src/pipeline/pipeline_tracing.h | 5 ++-
be/src/pipeline/task_scheduler.cpp | 53 +++++++------------------------
6 files changed, 33 insertions(+), 159 deletions(-)
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index de8d3e76c6f..2508040ea3f 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -80,15 +80,6 @@ Dependency* Dependency::is_blocked_by(PipelineTask* task) {
return ready ? nullptr : this;
}
-Dependency* FinishDependency::is_blocked_by(PipelineTask* task) {
- std::unique_lock<std::mutex> lc(_task_lock);
- auto ready = _ready.load();
- if (!ready && task) {
- _add_block_task(task);
- }
- return ready ? nullptr : this;
-}
-
std::string Dependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 5a67881c23d..891cc52a712 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -189,24 +189,6 @@ struct FakeSharedState final : public BasicSharedState {
ENABLE_FACTORY_CREATOR(FakeSharedState)
};
-struct FakeDependency final : public Dependency {
-public:
- using SharedState = FakeSharedState;
- FakeDependency(int id, int node_id, QueryContext* query_ctx)
- : Dependency(id, node_id, "FakeDependency", query_ctx) {}
-
- [[nodiscard]] Dependency* is_blocked_by(PipelineTask* task) override {
return nullptr; }
-};
-
-struct FinishDependency : public Dependency {
-public:
- using SharedState = FakeSharedState;
- FinishDependency(int id, int node_id, std::string name, QueryContext*
query_ctx)
- : Dependency(id, node_id, name, true, query_ctx) {}
-
- [[nodiscard]] Dependency* is_blocked_by(PipelineTask* task) override;
-};
-
struct CountedFinishDependency final : public Dependency {
public:
using SharedState = FakeSharedState;
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index b0925ad875c..0ea82e305fd 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -57,7 +57,6 @@ PipelineTask::PipelineTask(
_prepared(false),
_opened(false),
_state(state),
- _cur_state(PipelineTaskState::NOT_READY),
_fragment_context(fragment_context),
_parent_profile(parent_profile),
_operators(pipeline->operator_xs()),
@@ -79,7 +78,6 @@ PipelineTask::PipelineTask(
Status PipelineTask::prepare(const TPipelineInstanceParams& local_params,
const TDataSink& tsink,
QueryContext* query_ctx) {
DCHECK(_sink);
- DCHECK(_cur_state == PipelineTaskState::NOT_READY) <<
get_state_name(_cur_state);
_init_profile();
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_CPU_TIMER(_task_cpu_timer);
@@ -116,8 +114,6 @@ Status PipelineTask::prepare(const TPipelineInstanceParams&
local_params, const
std::copy(deps.begin(), deps.end(),
std::inserter(_filter_dependencies,
_filter_dependencies.end()));
}
- // We should make sure initial state for task are runnable so that we can
do some preparation jobs (e.g. initialize runtime filters).
- set_state(PipelineTaskState::RUNNABLE);
_prepared = true;
return Status::OK();
}
@@ -205,6 +201,11 @@ Status PipelineTask::execute(bool* eos) {
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_TIMER(_exec_timer);
SCOPED_ATTACH_TASK(_state);
+ *eos = _eos;
+ if (_eos) {
+ // If task is waken up by finish dependency, `_eos` is set to true by
last execution, and we should return here.
+ return Status::OK();
+ }
int64_t time_spent = 0;
ThreadCpuStopWatch cpu_time_stop_watch;
@@ -220,9 +221,7 @@ Status PipelineTask::execute(bool* eos) {
cpu_qs->add_cpu_nanos(delta_cpu_time);
}
}};
- *eos = false;
if (has_dependency() || _runtime_filter_blocked_dependency() != nullptr) {
- set_state(PipelineTaskState::BLOCKED);
return Status::OK();
}
// The status must be runnable
@@ -232,16 +231,13 @@ Status PipelineTask::execute(bool* eos) {
RETURN_IF_ERROR(_open());
}
if (!source_can_read() || !sink_can_write()) {
- set_state(PipelineTaskState::BLOCKED);
return Status::OK();
}
}
- Status status = Status::OK();
while (!_fragment_context->is_canceled()) {
if ((_root->need_data_from_children(_state) && !source_can_read()) ||
!sink_can_write()) {
- set_state(PipelineTaskState::BLOCKED);
- break;
+ return Status::OK();
}
/// When a task is cancelled,
@@ -265,6 +261,7 @@ Status PipelineTask::execute(bool* eos) {
continue;
}
+ *eos = _eos;
// Pull block from operator chain
if (!_dry_run) {
SCOPED_TIMER(_get_block_timer);
@@ -277,22 +274,26 @@ Status PipelineTask::execute(bool* eos) {
}
} else {
*eos = true;
+ _eos = true;
}
if (_block->rows() != 0 || *eos) {
SCOPED_TIMER(_sink_timer);
+ Status status = Status::OK();
status = _sink->sink(_state, block, *eos);
if (!status.is<ErrorCode::END_OF_FILE>()) {
RETURN_IF_ERROR(status);
}
*eos = status.is<ErrorCode::END_OF_FILE>() ? true : *eos;
if (*eos) { // just return, the scheduler will do finish work
- break;
+ _eos = true;
+ return Status::OK();
}
}
}
- return status;
+ static_cast<void>(get_task_queue()->push_back(this));
+ return Status::OK();
}
bool PipelineTask::should_revoke_memory(RuntimeState* state, int64_t
revocable_mem_bytes) {
@@ -349,20 +350,6 @@ void PipelineTask::finalize() {
_le_state_map.clear();
}
-// The FSM see PipelineTaskState's comment
-void PipelineTask::set_state(PipelineTaskState state) {
- DCHECK(_cur_state != PipelineTaskState::FINISHED);
-
- if (_cur_state == state) {
- return;
- }
- if (_cur_state == PipelineTaskState::RUNNABLE && state !=
PipelineTaskState::RUNNABLE) {
- COUNTER_UPDATE(_block_counts, 1);
- }
-
- _cur_state = state;
-}
-
Status PipelineTask::close(Status exec_status) {
int64_t close_ns = 0;
Defer defer {[&]() {
@@ -404,9 +391,9 @@ std::string PipelineTask::debug_string() {
auto elapsed = (MonotonicNanos() - _fragment_context->create_time()) /
1000000000.0;
auto* cur_blocked_dep = _blocked_dep;
fmt::format_to(debug_string_buffer,
- "PipelineTask[this = {}, state = {}, dry run = {}, elapse
time "
+ "PipelineTask[this = {}, dry run = {}, elapse time "
"= {}s], block dependency = {}, is running = {}\noperators:
",
- (void*)this, get_state_name(_cur_state), _dry_run, elapsed,
+ (void*)this, _dry_run, elapsed,
cur_blocked_dep && !_finished ?
cur_blocked_dep->debug_string() : "NULL",
is_running());
for (size_t i = 0; i < _operators.size(); i++) {
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index ddf6b190eb7..4ca3abbc4c5 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -42,60 +42,6 @@ class PipelineFragmentContext;
namespace doris::pipeline {
-/**
- * PipelineTaskState indicates all possible states of a pipeline task.
- * A FSM is described as below:
- *
- * |-----------------------------------------------------|
- * |---| transfer 2 transfer 3 |
transfer 4
- * |-------> BLOCKED ------------|
|---------------------------------------> CANCELED
- * |------| | |
transfer 5 transfer 6|
- * NOT_READY ---| transfer 0 |-----> RUNNABLE
---|---------> PENDING_FINISH ------|
- * | | ^ |
transfer 7|
- * |------------------------------------|
|--------|---------------------------------------> FINISHED
- * transfer 1 transfer 9
transfer 8
- * BLOCKED include BLOCKED_FOR_DEPENDENCY, BLOCKED_FOR_SOURCE and
BLOCKED_FOR_SINK.
- *
- * transfer 0 (NOT_READY -> BLOCKED): this pipeline task has some incomplete
dependencies
- * transfer 1 (NOT_READY -> RUNNABLE): this pipeline task has no incomplete
dependencies
- * transfer 2 (BLOCKED -> RUNNABLE): runnable condition for this pipeline task
is met (e.g. get a new block from rpc)
- * transfer 3 (RUNNABLE -> BLOCKED): runnable condition for this pipeline task
is not met (e.g. sink operator send a block by RPC and wait for a response)
- * transfer 4 (RUNNABLE -> CANCELED): current fragment is cancelled
- * transfer 5 (RUNNABLE -> PENDING_FINISH): this pipeline task completed but
wait for releasing resources hold by itself
- * transfer 6 (PENDING_FINISH -> CANCELED): current fragment is cancelled
- * transfer 7 (PENDING_FINISH -> FINISHED): this pipeline task completed and
resources hold by itself have been released already
- * transfer 8 (RUNNABLE -> FINISHED): this pipeline task completed and no
resource need to be released
- * transfer 9 (RUNNABLE -> RUNNABLE): this pipeline task yields CPU and
re-enters the runnable queue if it is runnable and has occupied CPU for a max
time slice
- */
-enum class PipelineTaskState : uint8_t {
- NOT_READY = 0, // do not prepare
- BLOCKED = 1, // blocked by dependency
- RUNNABLE = 2, // can execute
- PENDING_FINISH =
- 3, // compute task is over, but still hold resource. like some
scan and sink task
- FINISHED = 4, // finish with a regular state
- CANCELED = 5, // being cancelled
-
-};
-
-inline const char* get_state_name(PipelineTaskState idx) {
- switch (idx) {
- case PipelineTaskState::NOT_READY:
- return "NOT_READY";
- case PipelineTaskState::BLOCKED:
- return "BLOCKED";
- case PipelineTaskState::RUNNABLE:
- return "RUNNABLE";
- case PipelineTaskState::PENDING_FINISH:
- return "PENDING_FINISH";
- case PipelineTaskState::FINISHED:
- return "FINISHED";
- case PipelineTaskState::CANCELED:
- return "CANCELED";
- }
- __builtin_unreachable();
-}
-
class TaskQueue;
class PriorityTaskQueue;
class Dependency;
@@ -195,7 +141,7 @@ public:
int task_id() const { return _index; };
void clear_blocking_state() {
- if (!_finished && get_state() != PipelineTaskState::PENDING_FINISH &&
_blocked_dep) {
+ if (!_finished && _blocked_dep) {
_blocked_dep->set_ready();
_blocked_dep = nullptr;
}
@@ -237,8 +183,6 @@ public:
}
void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
- PipelineTaskState get_state() const { return _cur_state; }
- void set_state(PipelineTaskState state);
bool is_running() { return _running.load(); }
void set_running(bool running) { _running = running; }
@@ -264,8 +208,8 @@ public:
LOG(INFO) << "query id|instanceid " <<
print_id(_state->query_id()) << "|"
<< print_id(_state->fragment_instance_id())
<< " current pipeline exceed run time "
- << config::enable_debug_log_timeout_secs << " seconds.
Task state "
- << get_state_name(get_state()) << "/n task detail:" <<
debug_string();
+ << config::enable_debug_log_timeout_secs << " seconds. "
+ << "/n task detail:" << debug_string();
}
}
@@ -332,7 +276,6 @@ private:
RuntimeState* _state = nullptr;
int _previous_schedule_id = -1;
uint32_t _schedule_time = 0;
- PipelineTaskState _cur_state;
std::unique_ptr<doris::vectorized::Block> _block;
PipelineFragmentContext* _fragment_context = nullptr;
TaskQueue* _task_queue = nullptr;
@@ -396,6 +339,7 @@ private:
std::mutex _release_lock;
std::atomic<bool> _running {false};
+ std::atomic<bool> _eos {false};
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_tracing.h
b/be/src/pipeline/pipeline_tracing.h
index aad0a7f9ee8..5d446e6f3d3 100644
--- a/be/src/pipeline/pipeline_tracing.h
+++ b/be/src/pipeline/pipeline_tracing.h
@@ -39,12 +39,11 @@ struct ScheduleRecord {
uint64_t thread_id;
uint64_t start_time;
uint64_t end_time;
- std::string_view state_name;
bool operator<(const ScheduleRecord& rhs) const { return start_time <
rhs.start_time; }
std::string to_string(uint64_t append_value) const {
- return fmt::format("{}|{}|{}|{}|{}|{}|{}|{}\n",
doris::to_string(query_id), task_id,
- core_id, thread_id, start_time, end_time,
state_name, append_value);
+ return fmt::format("{}|{}|{}|{}|{}|{}|{}\n",
doris::to_string(query_id), task_id, core_id,
+ thread_id, start_time, end_time, append_value);
}
};
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index fbb67afdf46..5dc8982e426 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -72,7 +72,7 @@ Status TaskScheduler::schedule_task(PipelineTask* task) {
}
// after _close_task, task maybe destructed.
-void _close_task(PipelineTask* task, PipelineTaskState state, Status
exec_status) {
+void _close_task(PipelineTask* task, Status exec_status) {
// Has to attach memory tracker here, because the close task will also
release some memory.
// Should count the memory to the query or the query's memory will not
decrease when part of
// task finished.
@@ -86,12 +86,10 @@ void _close_task(PipelineTask* task, PipelineTaskState
state, Status exec_status
// We have already refactor all source and sink api, the close API does
not need waiting
// for pending finish now. So that could call close directly.
Status status = task->close(exec_status);
- if (!status.ok() && state != PipelineTaskState::CANCELED) {
+ if (!status.ok()) {
task->fragment_context()->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
std::string(status.msg()));
- state = PipelineTaskState::CANCELED;
}
- task->set_state(state);
task->finalize();
task->set_running(false);
task->fragment_context()->close_a_pipeline();
@@ -113,25 +111,18 @@ void TaskScheduler::_do_work(size_t index) {
auto* fragment_ctx = task->fragment_context();
bool canceled = fragment_ctx->is_canceled();
- auto state = task->get_state();
// If the state is PENDING_FINISH, then the task is come from blocked
queue, its is_pending_finish
// has to return false. The task is finished and need to close now.
- if (state == PipelineTaskState::PENDING_FINISH || canceled) {
+ if (canceled) {
// may change from pending FINISH,should called cancel
// also may change form BLOCK, other task called cancel
// If pipeline is canceled, it will report after pipeline closed,
and will propagate
// errors to downstream through exchange. So, here we needn't
send_report.
// fragment_ctx->send_report(true);
- Status exec_status = fragment_ctx->get_query_ctx()->exec_status();
- _close_task(task, canceled ? PipelineTaskState::CANCELED :
PipelineTaskState::FINISHED,
- exec_status);
+ _close_task(task, fragment_ctx->get_query_ctx()->exec_status());
continue;
}
- DCHECK(state != PipelineTaskState::FINISHED && state !=
PipelineTaskState::CANCELED)
- << "task already finish: " << task->debug_string();
-
- task->set_state(PipelineTaskState::RUNNABLE);
// task exec
bool eos = false;
@@ -158,10 +149,8 @@ void TaskScheduler::_do_work(size_t index) {
status = task->execute(&eos);
uint64_t end_time = MonotonicMicros();
- std::string_view state_name =
get_state_name(task->get_state());
ExecEnv::GetInstance()->pipeline_tracer_context()->record(
- {query_id, task_name, core_id, thread_id, start_time,
end_time,
- state_name});
+ {query_id, task_name, core_id, thread_id, start_time,
end_time});
} else {
status = task->execute(&eos);
}
@@ -171,14 +160,7 @@ void TaskScheduler::_do_work(size_t index) {
task->set_previous_core_id(index);
- if (status.is<ErrorCode::END_OF_FILE>()) {
- // Sink operator finished, just close task now.
- _close_task(task, PipelineTaskState::FINISHED, Status::OK());
- continue;
- } else if (!status.ok()) {
- LOG(WARNING) << fmt::format("Pipeline task failed. query_id: {}
reason: {}",
-
print_id(task->query_context()->query_id()),
- status.to_string());
+ if (!status.ok()) {
// Print detail informations below when you debugging here.
//
// LOG(WARNING)<< "task:\n"<<task->debug_string();
@@ -186,7 +168,10 @@ void TaskScheduler::_do_work(size_t index) {
// exec failed,cancel all fragment instance
fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
std::string(status.to_string_no_stack()));
- _close_task(task, PipelineTaskState::CANCELED, status);
+ LOG(WARNING) << fmt::format("Pipeline task failed. query_id: {}
reason: {}",
+
print_id(task->query_context()->query_id()),
+ status.to_string());
+ _close_task(task, status);
continue;
}
fragment_ctx->trigger_report_if_necessary();
@@ -201,29 +186,15 @@ void TaskScheduler::_do_work(size_t index) {
// added to running queue when dependency is ready.
if (task->is_pending_finish()) {
// Only meet eos, should set task to PENDING_FINISH state
- task->set_state(PipelineTaskState::PENDING_FINISH);
task->set_running(false);
} else {
Status exec_status =
fragment_ctx->get_query_ctx()->exec_status();
- _close_task(task, PipelineTaskState::FINISHED, exec_status);
+ _close_task(task, exec_status);
}
continue;
}
- auto pipeline_state = task->get_state();
- switch (pipeline_state) {
- case PipelineTaskState::BLOCKED:
- task->set_running(false);
- break;
- case PipelineTaskState::RUNNABLE:
- task->set_running(false);
- static_cast<void>(_task_queue->push_back(task, index));
- break;
- default:
- DCHECK(false) << "error state after run task, " <<
get_state_name(pipeline_state)
- << " task: " << task->debug_string();
- break;
- }
+ task->set_running(false);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]