This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new cb5c1673739 branch-3.1: [refactor](pipeline) Re-construct ownership of
pipeline components #49753 (#54567)
cb5c1673739 is described below
commit cb5c167373946e409a9ede31fc5d7db01abe70ca
Author: Gabriel <[email protected]>
AuthorDate: Wed Aug 13 14:27:44 2025 +0800
branch-3.1: [refactor](pipeline) Re-construct ownership of pipeline
components #49753 (#54567)
picked from #49753
---
be/src/pipeline/dependency.cpp | 18 +++--
be/src/pipeline/dependency.h | 8 +-
be/src/pipeline/pipeline.h | 1 +
be/src/pipeline/pipeline_fragment_context.cpp | 15 ++--
be/src/pipeline/pipeline_fragment_context.h | 2 +-
be/src/pipeline/pipeline_task.cpp | 101 ++++++++++++++++++--------
be/src/pipeline/pipeline_task.h | 32 ++++----
be/src/pipeline/task_queue.cpp | 20 ++---
be/src/pipeline/task_queue.h | 28 +++----
be/src/pipeline/task_scheduler.cpp | 38 +++++-----
be/src/pipeline/task_scheduler.h | 2 +-
11 files changed, 158 insertions(+), 107 deletions(-)
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index b15005e03a8..945ee3d4521 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -46,8 +46,9 @@ Dependency* BasicSharedState::create_sink_dependency(int
dest_id, int node_id, s
return sink_deps.back().get();
}
-void Dependency::_add_block_task(PipelineTask* task) {
- DCHECK(_blocked_task.empty() || _blocked_task[_blocked_task.size() - 1] !=
task)
+void Dependency::_add_block_task(std::shared_ptr<PipelineTask> task) {
+ DCHECK(_blocked_task.empty() || _blocked_task[_blocked_task.size() -
1].lock() == nullptr ||
+ _blocked_task[_blocked_task.size() - 1].lock().get() != task.get())
<< "Duplicate task: " << task->debug_string();
_blocked_task.push_back(task);
}
@@ -57,7 +58,7 @@ void Dependency::set_ready() {
return;
}
_watcher.stop();
- std::vector<PipelineTask*> local_block_task {};
+ std::vector<std::weak_ptr<PipelineTask>> local_block_task {};
{
std::unique_lock<std::mutex> lc(_task_lock);
if (_ready) {
@@ -66,12 +67,15 @@ void Dependency::set_ready() {
_ready = true;
local_block_task.swap(_blocked_task);
}
- for (auto* task : local_block_task) {
- task->wake_up();
+ for (auto task : local_block_task) {
+ if (auto t = task.lock()) {
+ std::unique_lock<std::mutex> lc(_task_lock);
+ t->wake_up();
+ }
}
}
-Dependency* Dependency::is_blocked_by(PipelineTask* task) {
+Dependency* Dependency::is_blocked_by(std::shared_ptr<PipelineTask> task) {
std::unique_lock<std::mutex> lc(_task_lock);
auto ready = _ready.load();
if (!ready && task) {
@@ -105,7 +109,7 @@ std::string RuntimeFilterDependency::debug_string(int
indentation_level) {
return fmt::to_string(debug_string_buffer);
}
-Dependency* RuntimeFilterDependency::is_blocked_by(PipelineTask* task) {
+Dependency*
RuntimeFilterDependency::is_blocked_by(std::shared_ptr<PipelineTask> task) {
std::unique_lock<std::mutex> lc(_task_lock);
auto ready = _ready.load();
if (!ready && task) {
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index ea6cacf51b1..9f07076497f 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -106,7 +106,7 @@ public:
[[nodiscard]] int64_t watcher_elapse_time() { return
_watcher.elapsed_time(); }
// Which dependency current pipeline task is blocked by. `nullptr` if this
dependency is ready.
- [[nodiscard]] virtual Dependency* is_blocked_by(PipelineTask* task =
nullptr);
+ [[nodiscard]] virtual Dependency*
is_blocked_by(std::shared_ptr<PipelineTask> task = nullptr);
// Notify downstream pipeline tasks this dependency is ready.
void set_ready();
void set_ready_to_read() {
@@ -151,7 +151,7 @@ public:
}
protected:
- void _add_block_task(PipelineTask* task);
+ void _add_block_task(std::shared_ptr<PipelineTask> task);
const int _id;
const int _node_id;
@@ -162,7 +162,7 @@ protected:
MonotonicStopWatch _watcher;
std::mutex _task_lock;
- std::vector<PipelineTask*> _blocked_task;
+ std::vector<std::weak_ptr<PipelineTask>> _blocked_task;
// If `_always_ready` is true, `block()` will never block tasks.
std::atomic<bool> _always_ready = false;
@@ -282,7 +282,7 @@ public:
: Dependency(id, node_id, name), _runtime_filter(runtime_filter) {}
std::string debug_string(int indentation_level = 0) override;
- Dependency* is_blocked_by(PipelineTask* task) override;
+ Dependency* is_blocked_by(std::shared_ptr<PipelineTask> task) override;
private:
const IRuntimeFilter* _runtime_filter = nullptr;
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 061a62ea99b..abfe883a804 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -135,6 +135,7 @@ public:
}
int num_tasks_of_parent() const { return _num_tasks_of_parent; }
+ std::string& name() { return _name; }
private:
void _init_profile();
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 6736ff89827..4a069483dc3 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -133,18 +133,17 @@ PipelineFragmentContext::~PipelineFragmentContext() {
// The memory released by the query end is recorded in the query mem
tracker.
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker);
auto st = _query_ctx->exec_status();
- _query_ctx.reset();
for (size_t i = 0; i < _tasks.size(); i++) {
if (!_tasks[i].empty()) {
_call_back(_tasks[i].front()->runtime_state(), &st);
}
}
- _tasks.clear();
for (auto& runtime_states : _task_runtime_states) {
for (auto& runtime_state : runtime_states) {
runtime_state.reset();
}
}
+ _tasks.clear();
_dag.clear();
_pip_id_to_pipeline.clear();
_pipelines.clear();
@@ -154,6 +153,7 @@ PipelineFragmentContext::~PipelineFragmentContext() {
_runtime_filter_states.clear();
_runtime_filter_mgr_map.clear();
_op_id_to_le_state.clear();
+ _query_ctx.reset();
}
bool PipelineFragmentContext::is_timeout(timespec now) const {
@@ -449,10 +449,11 @@ Status
PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
auto cur_task_id = _total_tasks++;
task_runtime_state->set_task_id(cur_task_id);
task_runtime_state->set_task_num(pipeline->num_tasks());
- auto task = std::make_unique<PipelineTask>(pipeline,
cur_task_id,
-
task_runtime_state.get(), this,
-
pipeline_id_to_profile[pip_idx].get(),
-
get_local_exchange_state(pipeline), i);
+ auto task = std::make_shared<PipelineTask>(
+ pipeline, cur_task_id, task_runtime_state.get(),
+
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
+ pipeline_id_to_profile[pip_idx].get(),
get_local_exchange_state(pipeline),
+ i);
pipeline->incr_created_tasks(i, task.get());
task_runtime_state->set_task(task.get());
pipeline_id_to_task.insert({pipeline->id(), task.get()});
@@ -1675,7 +1676,7 @@ Status PipelineFragmentContext::submit() {
auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
for (auto& task : _tasks) {
for (auto& t : task) {
- st = scheduler->schedule_task(t.get());
+ st = scheduler->schedule_task(t);
if (!st) {
cancel(Status::InternalError("submit context to executor
fail"));
std::lock_guard<std::mutex> l(_task_mutex);
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index f119384b2fb..b08a3f90f95 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -228,7 +228,7 @@ private:
OperatorPtr _root_op = nullptr;
// this is a [n * m] matrix. n is parallelism of pipeline engine and m is
the number of pipelines.
- std::vector<std::vector<std::unique_ptr<PipelineTask>>> _tasks;
+ std::vector<std::vector<std::shared_ptr<PipelineTask>>> _tasks;
// TODO: remove the _sink and _multi_cast_stream_sink_senders to set both
// of it in pipeline task not the fragment_context
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 99c6bf167b9..dfda613af52 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -48,12 +48,19 @@ namespace doris::pipeline {
PipelineTask::PipelineTask(
PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
- PipelineFragmentContext* fragment_context, RuntimeProfile*
parent_profile,
+ std::shared_ptr<PipelineFragmentContext> fragment_context,
RuntimeProfile* parent_profile,
std::map<int,
std::pair<std::shared_ptr<LocalExchangeSharedState>,
std::shared_ptr<Dependency>>>
le_state_map,
int task_idx)
- : _index(task_id),
+ :
+#ifdef BE_TEST
+ _query_id(fragment_context ? fragment_context->get_query_id() :
TUniqueId()),
+#else
+ _query_id(fragment_context->get_query_id()),
+#endif
+ _pip_id(pipeline->id()),
+ _index(task_id),
_pipeline(pipeline),
_opened(false),
_state(state),
@@ -64,7 +71,8 @@ PipelineTask::PipelineTask(
_root(_operators.back().get()),
_sink(pipeline->sink_shared_pointer()),
_le_state_map(std::move(le_state_map)),
- _task_idx(task_idx) {
+ _task_idx(task_idx),
+ _pipeline_name(_pipeline->name()) {
_pipeline_task_watcher.start();
_execution_dependencies.push_back(state->get_query_ctx()->get_execution_dependency());
auto shared_state = _sink->create_shared_state();
@@ -117,8 +125,12 @@ Status PipelineTask::prepare(const
TPipelineInstanceParams& local_params, const
std::copy(deps.begin(), deps.end(),
std::inserter(_execution_dependencies,
_execution_dependencies.end()));
}
- if (query_context()->is_cancelled()) {
- clear_blocking_state();
+ if (auto fragment = _fragment_context.lock()) {
+ if (fragment->get_query_ctx()->is_cancelled()) {
+ clear_blocking_state();
+ }
+ } else {
+ return Status::InternalError("Fragment already finished! Query: {}",
print_id(_query_id));
}
return Status::OK();
}
@@ -231,7 +243,7 @@ bool PipelineTask::_wait_to_start() {
// 2. Runtime filter dependencies are ready
// 3. All tablets are loaded into local storage
for (auto* op_dep : _execution_dependencies) {
- _blocked_dep = op_dep->is_blocked_by(this);
+ _blocked_dep = op_dep->is_blocked_by(shared_from_this());
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
return true;
@@ -252,7 +264,7 @@ bool PipelineTask::_is_blocked() {
for (int i = _read_dependencies.size() - 1; i >= 0; i--) {
// `_read_dependencies` is organized according to operators. For
each operator, running condition is met iff all dependencies are ready.
for (auto* dep : _read_dependencies[i]) {
- _blocked_dep = dep->is_blocked_by(this);
+ _blocked_dep = dep->is_blocked_by(shared_from_this());
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
return true;
@@ -271,7 +283,7 @@ bool PipelineTask::_is_blocked() {
}
for (auto* op_dep : _write_dependencies) {
- _blocked_dep = op_dep->is_blocked_by(this);
+ _blocked_dep = op_dep->is_blocked_by(shared_from_this());
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
return true;
@@ -281,6 +293,10 @@ bool PipelineTask::_is_blocked() {
}
Status PipelineTask::execute(bool* eos) {
+ auto fragment_context = _fragment_context.lock();
+ if (!fragment_context) {
+ return Status::InternalError("Fragment already finished! Query: {}",
print_id(_query_id));
+ }
if (_eos) {
*eos = true;
return Status::OK();
@@ -303,11 +319,11 @@ Status PipelineTask::execute(bool* eos) {
}
int64_t delta_cpu_time = cpu_time_stop_watch.elapsed_time();
_task_cpu_timer->update(delta_cpu_time);
- auto cpu_qs = query_context()->get_cpu_statistics();
+ auto cpu_qs = fragment_context->get_query_ctx()->get_cpu_statistics();
if (cpu_qs) {
cpu_qs->add_cpu_nanos(delta_cpu_time);
}
- query_context()->update_cpu_time(delta_cpu_time);
+ fragment_context->get_query_ctx()->update_cpu_time(delta_cpu_time);
}};
if (!_wake_up_early) {
RETURN_IF_ERROR(_prepare());
@@ -318,7 +334,7 @@ Status PipelineTask::execute(bool* eos) {
RETURN_IF_ERROR(_prepare());
// The status must be runnable
- if (!_opened && !_fragment_context->is_canceled()) {
+ if (!_opened && !fragment_context->is_canceled()) {
DBUG_EXECUTE_IF("PipelineTask::execute.open_sleep", {
auto required_pipeline_id =
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
@@ -350,7 +366,7 @@ Status PipelineTask::execute(bool* eos) {
_task_profile->add_info_string("TaskState", "Runnable");
_task_profile->add_info_string("BlockedByDependency", "");
- while (!_fragment_context->is_canceled()) {
+ while (!fragment_context->is_canceled()) {
SCOPED_RAW_TIMER(&time_spent);
if (_is_blocked()) {
return Status::OK();
@@ -359,7 +375,7 @@ Status PipelineTask::execute(bool* eos) {
/// When a task is cancelled,
/// its blocking state will be cleared and it will transition to a
ready state (though it is not truly ready).
/// Here, checking whether it is cancelled to prevent tasks in a
blocking state from being re-executed.
- if (_fragment_context->is_canceled()) {
+ if (fragment_context->is_canceled()) {
break;
}
@@ -428,7 +444,7 @@ Status PipelineTask::execute(bool* eos) {
}
}
- RETURN_IF_ERROR(get_task_queue()->push_back(this));
+ RETURN_IF_ERROR(get_task_queue()->push_back(shared_from_this()));
return Status::OK();
}
@@ -489,12 +505,34 @@ bool PipelineTask::should_revoke_memory(RuntimeState*
state, int64_t revocable_m
}
}
+void PipelineTask::stop_if_finished() {
+ auto fragment = _fragment_context.lock();
+ if (!fragment) {
+ return;
+ }
+
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(fragment->get_query_ctx()->query_mem_tracker);
+ if (auto sink = _sink) {
+ if (sink->is_finished(_state)) {
+ clear_blocking_state();
+ }
+ }
+}
+
void PipelineTask::finalize() {
+ auto fragment = _fragment_context.lock();
+ if (!fragment) {
+ return;
+ }
+
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(fragment->get_query_ctx()->query_mem_tracker);
std::unique_lock<std::mutex> lc(_dependency_lock);
_finalized = true;
_sink_shared_state.reset();
_op_shared_states.clear();
_le_state_map.clear();
+ _block.reset();
+ _operators.clear();
+ _sink.reset();
+ _pipeline.reset();
}
Status PipelineTask::close(Status exec_status, bool close_sink) {
@@ -529,31 +567,37 @@ Status PipelineTask::close(Status exec_status, bool
close_sink) {
}
std::string PipelineTask::debug_string() {
- std::unique_lock<std::mutex> lc(_dependency_lock);
fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer, "QueryId: {}\n",
print_id(query_context()->query_id()));
+ fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(_query_id));
fmt::format_to(debug_string_buffer, "InstanceId: {}\n",
print_id(_state->fragment_instance_id()));
+ fmt::format_to(
+ debug_string_buffer,
+ "PipelineTask[this = {}, id = {}, open = {}, eos = {}, finalized =
{}, dry run = "
+ "{}, _wake_up_early = {}, is running = {}]",
+ (void*)this, _index, _opened, _eos, _finalized, _dry_run,
_wake_up_early.load(),
+ is_running());
+ std::unique_lock<std::mutex> lc(_dependency_lock);
auto* cur_blocked_dep = _blocked_dep;
- auto elapsed = _fragment_context->elapsed_time() / 1000000000.0;
+ auto fragment = _fragment_context.lock();
+ if (is_finalized() || !fragment) {
+ fmt::format_to(debug_string_buffer, " pipeline name = {}",
_pipeline_name);
+ return fmt::to_string(debug_string_buffer);
+ }
+ auto elapsed = fragment->elapsed_time() / NANOS_PER_SEC;
fmt::format_to(debug_string_buffer,
- "PipelineTask[this = {}, id = {}, open = {}, eos = {},
finish = {}, dry run = "
- "{}, elapse time = {}s, _wake_up_early = {}], block
dependency = {}, is "
- "running = {}\noperators: ",
- (void*)this, _index, _opened, _eos, _finalized, _dry_run,
elapsed,
- _wake_up_early.load(),
- cur_blocked_dep && !_finalized ?
cur_blocked_dep->debug_string() : "NULL",
- is_running());
+ " elapse time = {}s, block dependency = [{}]\noperators: ",
elapsed,
+ cur_blocked_dep && !is_finalized() ?
cur_blocked_dep->debug_string() : "NULL");
for (size_t i = 0; i < _operators.size(); i++) {
fmt::format_to(debug_string_buffer, "\n{}",
_opened && !_finalized ?
_operators[i]->debug_string(_state, i)
:
_operators[i]->debug_string(i));
}
fmt::format_to(debug_string_buffer, "\n{}\n",
- _opened && !_finalized ? _sink->debug_string(_state,
_operators.size())
- :
_sink->debug_string(_operators.size()));
+ _opened && !is_finalized() ? _sink->debug_string(_state,
_operators.size())
+ :
_sink->debug_string(_operators.size()));
if (_finalized) {
return fmt::to_string(debug_string_buffer);
}
@@ -588,10 +632,7 @@ std::string PipelineTask::debug_string() {
void PipelineTask::wake_up() {
// call by dependency
- static_cast<void>(get_task_queue()->push_back(this));
+ static_cast<void>(get_task_queue()->push_back(shared_from_this()));
}
-QueryContext* PipelineTask::query_context() {
- return _fragment_context->get_query_ctx();
-}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 64f5f6b1674..3a50436280d 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -45,10 +45,11 @@ class TaskQueue;
class PriorityTaskQueue;
class Dependency;
-class PipelineTask {
+class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
public:
PipelineTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
- PipelineFragmentContext* fragment_context, RuntimeProfile*
parent_profile,
+ std::shared_ptr<PipelineFragmentContext> fragment_context,
+ RuntimeProfile* parent_profile,
std::map<int,
std::pair<std::shared_ptr<LocalExchangeSharedState>,
std::shared_ptr<Dependency>>>
le_state_map,
@@ -63,9 +64,7 @@ public:
// must be call after all pipeline task is finish to release resource
Status close(Status exec_status, bool close_sink = true);
- PipelineFragmentContext* fragment_context() { return _fragment_context; }
-
- QueryContext* query_context();
+ std::weak_ptr<PipelineFragmentContext>& fragment_context() { return
_fragment_context; }
int get_previous_core_id() const {
return _previous_schedule_id != -1 ? _previous_schedule_id
@@ -87,7 +86,7 @@ public:
bool is_pending_finish() {
for (auto* fin_dep : _finish_dependencies) {
- _blocked_dep = fin_dep->is_blocked_by(this);
+ _blocked_dep = fin_dep->is_blocked_by(shared_from_this());
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
return true;
@@ -138,10 +137,11 @@ public:
void set_wake_up_early() { _wake_up_early = true; }
void clear_blocking_state() {
+ auto fragment = _fragment_context.lock();
_state->get_query_ctx()->get_execution_dependency()->set_always_ready();
// We use a lock to assure all dependencies are not deconstructed here.
std::unique_lock<std::mutex> lc(_dependency_lock);
- if (!_finalized) {
+ if (!_finalized && fragment) {
std::for_each(_execution_dependencies.begin(),
_execution_dependencies.end(),
[&](Dependency* dep) { dep->set_ready(); });
for (auto& deps : _read_dependencies) {
@@ -227,13 +227,9 @@ public:
std::string task_name() const { return fmt::format("task{}({})", _index,
_pipeline->_name); }
- void stop_if_finished() {
- if (_sink->is_finished(_state)) {
- clear_blocking_state();
- }
- }
+ void stop_if_finished();
- PipelineId pipeline_id() const { return _pipeline->id(); }
+ PipelineId pipeline_id() const { return _pip_id; }
bool wake_up_early() const { return _wake_up_early; }
@@ -248,7 +244,9 @@ private:
Status _open();
Status _prepare();
- uint32_t _index;
+ const TUniqueId _query_id;
+ const PipelineId _pip_id;
+ const uint32_t _index;
PipelinePtr _pipeline;
bool _has_exceed_timeout = false;
bool _opened;
@@ -256,7 +254,7 @@ private:
int _previous_schedule_id = -1;
uint32_t _schedule_time = 0;
std::unique_ptr<doris::vectorized::Block> _block;
- PipelineFragmentContext* _fragment_context = nullptr;
+ std::weak_ptr<PipelineFragmentContext> _fragment_context;
TaskQueue* _task_queue = nullptr;
// used for priority queue
@@ -317,6 +315,10 @@ private:
std::atomic<bool> _running = false;
std::atomic<bool> _eos = false;
std::atomic<bool> _wake_up_early = false;
+ const std::string _pipeline_name;
};
+using PipelineTaskSPtr = std::shared_ptr<PipelineTask>;
+using PipelineTaskWPtr = std::weak_ptr<PipelineTask>;
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index b91b74dfdef..43984d03a9d 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -30,7 +30,7 @@ namespace doris::pipeline {
TaskQueue::~TaskQueue() = default;
-PipelineTask* SubTaskQueue::try_take(bool is_steal) {
+PipelineTaskSPtr SubTaskQueue::try_take(bool is_steal) {
if (_queue.empty()) {
return nullptr;
}
@@ -56,7 +56,7 @@ void PriorityTaskQueue::close() {
DorisMetrics::instance()->pipeline_task_queue_size->increment(-_total_task_size);
}
-PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
+PipelineTaskSPtr PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
if (_total_task_size == 0 || _closed) {
return nullptr;
}
@@ -93,13 +93,13 @@ int PriorityTaskQueue::_compute_level(uint64_t runtime) {
return SUB_QUEUE_LEVEL - 1;
}
-PipelineTask* PriorityTaskQueue::try_take(bool is_steal) {
+PipelineTaskSPtr PriorityTaskQueue::try_take(bool is_steal) {
// TODO other efficient lock? e.g. if get lock fail, return null_ptr
std::unique_lock<std::mutex> lock(_work_size_mutex);
return _try_take_unprotected(is_steal);
}
-PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) {
+PipelineTaskSPtr PriorityTaskQueue::take(uint32_t timeout_ms) {
std::unique_lock<std::mutex> lock(_work_size_mutex);
auto task = _try_take_unprotected(false);
if (task) {
@@ -114,7 +114,7 @@ PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) {
}
}
-Status PriorityTaskQueue::push(PipelineTask* task) {
+Status PriorityTaskQueue::push(PipelineTaskSPtr task) {
if (_closed) {
return Status::InternalError("WorkTaskQueue closed");
}
@@ -154,8 +154,8 @@ void MultiCoreTaskQueue::close() {
}
}
-PipelineTask* MultiCoreTaskQueue::take(int core_id) {
- PipelineTask* task = nullptr;
+PipelineTaskSPtr MultiCoreTaskQueue::take(int core_id) {
+ PipelineTaskSPtr task = nullptr;
while (!_closed) {
DCHECK(_prio_task_queue_list->size() > core_id)
<< " list size: " << _prio_task_queue_list->size() << "
core_id: " << core_id
@@ -181,7 +181,7 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) {
return task;
}
-PipelineTask* MultiCoreTaskQueue::_steal_take(
+PipelineTaskSPtr MultiCoreTaskQueue::_steal_take(
int core_id, std::vector<std::unique_ptr<PriorityTaskQueue>>&
prio_task_queue_list) {
DCHECK(core_id < _core_size);
int next_id = core_id;
@@ -200,7 +200,7 @@ PipelineTask* MultiCoreTaskQueue::_steal_take(
return nullptr;
}
-Status MultiCoreTaskQueue::push_back(PipelineTask* task) {
+Status MultiCoreTaskQueue::push_back(PipelineTaskSPtr task) {
int core_id = task->get_previous_core_id();
if (core_id < 0) {
core_id = _next_core.fetch_add(1) % _core_size;
@@ -208,7 +208,7 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task) {
return push_back(task, core_id);
}
-Status MultiCoreTaskQueue::push_back(PipelineTask* task, int core_id) {
+Status MultiCoreTaskQueue::push_back(PipelineTaskSPtr task, int core_id) {
DCHECK(core_id < _core_size);
task->put_in_runnable_queue();
return (*_prio_task_queue_list)[core_id]->push(task);
diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h
index b389ebc2c51..d8b4ee27a7f 100644
--- a/be/src/pipeline/task_queue.h
+++ b/be/src/pipeline/task_queue.h
@@ -40,13 +40,13 @@ public:
virtual void close() = 0;
// Get the task by core id.
// TODO: To think the logic is useful?
- virtual PipelineTask* take(int core_id) = 0;
+ virtual PipelineTaskSPtr take(int core_id) = 0;
// push from scheduler
- virtual Status push_back(PipelineTask* task) = 0;
+ virtual Status push_back(PipelineTaskSPtr task) = 0;
// push from worker
- virtual Status push_back(PipelineTask* task, int core_id) = 0;
+ virtual Status push_back(PipelineTaskSPtr task, int core_id) = 0;
virtual void update_statistics(PipelineTask* task, int64_t time_spent) {}
@@ -61,9 +61,9 @@ class SubTaskQueue {
friend class PriorityTaskQueue;
public:
- void push_back(PipelineTask* task) { _queue.emplace(task); }
+ void push_back(PipelineTaskSPtr task) { _queue.emplace(task); }
- PipelineTask* try_take(bool is_steal);
+ PipelineTaskSPtr try_take(bool is_steal);
void set_level_factor(double level_factor) { _level_factor = level_factor;
}
@@ -79,7 +79,7 @@ public:
bool empty() { return _queue.empty(); }
private:
- std::queue<PipelineTask*> _queue;
+ std::queue<PipelineTaskSPtr> _queue;
// depends on LEVEL_QUEUE_TIME_FACTOR
double _level_factor = 1;
@@ -93,18 +93,18 @@ public:
void close();
- PipelineTask* try_take(bool is_steal);
+ PipelineTaskSPtr try_take(bool is_steal);
- PipelineTask* take(uint32_t timeout_ms = 0);
+ PipelineTaskSPtr take(uint32_t timeout_ms = 0);
- Status push(PipelineTask* task);
+ Status push(PipelineTaskSPtr task);
void inc_sub_queue_runtime(int level, uint64_t runtime) {
_sub_queues[level].inc_runtime(runtime);
}
private:
- PipelineTask* _try_take_unprotected(bool is_steal);
+ PipelineTaskSPtr _try_take_unprotected(bool is_steal);
static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 2;
static constexpr size_t SUB_QUEUE_LEVEL = 6;
SubTaskQueue _sub_queues[SUB_QUEUE_LEVEL];
@@ -133,17 +133,17 @@ public:
void close() override;
// Get the task by core id.
- PipelineTask* take(int core_id) override;
+ PipelineTaskSPtr take(int core_id) override;
// TODO combine these methods to `push_back(task, core_id = -1)`
- Status push_back(PipelineTask* task) override;
+ Status push_back(PipelineTaskSPtr task) override;
- Status push_back(PipelineTask* task, int core_id) override;
+ Status push_back(PipelineTaskSPtr task, int core_id) override;
void update_statistics(PipelineTask* task, int64_t time_spent) override;
private:
- PipelineTask* _steal_take(
+ PipelineTaskSPtr _steal_take(
int core_id, std::vector<std::unique_ptr<PriorityTaskQueue>>&
prio_task_queue_list);
std::shared_ptr<std::vector<std::unique_ptr<PriorityTaskQueue>>>
_prio_task_queue_list;
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 1fb3fbb3c36..399bf416aec 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -66,19 +66,16 @@ Status TaskScheduler::start() {
return Status::OK();
}
-Status TaskScheduler::schedule_task(PipelineTask* task) {
+Status TaskScheduler::schedule_task(PipelineTaskSPtr task) {
return _task_queue->push_back(task);
}
// after _close_task, task maybe destructed.
-void _close_task(PipelineTask* task, Status exec_status) {
+void _close_task(PipelineTask* task, Status exec_status,
PipelineFragmentContext* ctx) {
// Has to attach memory tracker here, because the close task will also
release some memory.
// Should count the memory to the query or the query's memory will not
decrease when part of
// task finished.
SCOPED_ATTACH_TASK(task->runtime_state());
- // close_a_pipeline may delete fragment context and will core in some defer
- // code, because the defer code will access fragment context it self.
- auto lock_for_context = task->fragment_context()->shared_from_this();
// is_pending_finish does not check status, so has to check status in
close API.
// For example, in async writer, the writer may failed during dealing with
eos_block
// but it does not return error status. Has to check the error status in
close API.
@@ -86,16 +83,16 @@ void _close_task(PipelineTask* task, Status exec_status) {
// for pending finish now. So that could call close directly.
Status status = task->close(exec_status);
if (!status.ok()) {
- task->fragment_context()->cancel(status);
+ ctx->cancel(status);
}
task->finalize();
task->set_running(false);
- task->fragment_context()->close_a_pipeline(task->pipeline_id());
+ ctx->close_a_pipeline(task->pipeline_id());
}
void TaskScheduler::_do_work(size_t index) {
while (_markers[index]) {
- auto* task = _task_queue->take(index);
+ auto task = _task_queue->take(index);
if (!task) {
continue;
}
@@ -106,11 +103,15 @@ void TaskScheduler::_do_work(size_t index) {
if (task->is_finalized()) {
continue;
}
+ auto fragment_context = task->fragment_context().lock();
+ if (!fragment_context) {
+ // Fragment already finishedquery
+ continue;
+ }
task->log_detail_if_need();
task->set_running(true);
task->set_task_queue(_task_queue.get());
- auto* fragment_ctx = task->fragment_context();
- bool canceled = fragment_ctx->is_canceled();
+ bool canceled = fragment_context->is_canceled();
// If the state is PENDING_FINISH, then the task is come from blocked
queue, its is_pending_finish
// has to return false. The task is finished and need to close now.
@@ -121,7 +122,8 @@ void TaskScheduler::_do_work(size_t index) {
// If pipeline is canceled, it will report after pipeline closed,
and will propagate
// errors to downstream through exchange. So, here we needn't
send_report.
// fragment_ctx->send_report(true);
- _close_task(task, fragment_ctx->get_query_ctx()->exec_status());
+ _close_task(task.get(),
fragment_context->get_query_ctx()->exec_status(),
+ fragment_context.get());
continue;
}
@@ -137,7 +139,7 @@ void TaskScheduler::_do_work(size_t index) {
ASSIGN_STATUS_IF_CATCH_EXCEPTION(
//TODO: use a better enclose to abstracting these
if
(ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
- TUniqueId query_id = task->query_context()->query_id();
+ TUniqueId query_id = fragment_context->get_query_id();
std::string task_name = task->task_name();
std::thread::id tid = std::this_thread::get_id();
@@ -160,14 +162,14 @@ void TaskScheduler::_do_work(size_t index) {
// LOG(WARNING)<< "task:\n"<<task->debug_string();
// exec failed,cancel all fragment instance
- fragment_ctx->cancel(status);
+ fragment_context->cancel(status);
LOG(WARNING) << fmt::format("Pipeline task failed. query_id: {}
reason: {}",
-
print_id(task->query_context()->query_id()),
+
print_id(fragment_context->get_query_ctx()->query_id()),
status.to_string());
- _close_task(task, status);
+ _close_task(task.get(), status, fragment_context.get());
continue;
}
- fragment_ctx->trigger_report_if_necessary();
+ fragment_context->trigger_report_if_necessary();
if (eos) {
// is pending finish will add the task to dependency's blocking
queue, and then the task will be
@@ -176,8 +178,8 @@ void TaskScheduler::_do_work(size_t index) {
// Only meet eos, should set task to PENDING_FINISH state
task->set_running(false);
} else {
- Status exec_status =
fragment_ctx->get_query_ctx()->exec_status();
- _close_task(task, exec_status);
+ Status exec_status =
fragment_context->get_query_ctx()->exec_status();
+ _close_task(task.get(), exec_status, fragment_context.get());
}
continue;
}
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index 9a20807ea26..0307d0603ec 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -55,7 +55,7 @@ public:
~TaskScheduler();
- Status schedule_task(PipelineTask* task);
+ Status schedule_task(PipelineTaskSPtr task);
Status start();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]