This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bb1420b3fe7 [fix](scheduler) Fix coredump due to different queue size
(#55736)
bb1420b3fe7 is described below
commit bb1420b3fe7a49dda4e8b69762625c9c370bf15d
Author: Gabriel <[email protected]>
AuthorDate: Mon Sep 8 14:27:36 2025 +0800
[fix](scheduler) Fix coredump due to different queue size (#55736)
---
be/src/pipeline/pipeline_task.h | 18 ++++++++----------
be/src/pipeline/task_queue.cpp | 10 +++++-----
be/src/pipeline/task_scheduler.cpp | 11 +++++------
be/src/pipeline/task_scheduler.h | 8 ++++++--
4 files changed, 24 insertions(+), 23 deletions(-)
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 91db6789b44..ed27cc753b0 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -66,15 +66,13 @@ public:
std::weak_ptr<PipelineFragmentContext>& fragment_context() { return
_fragment_context; }
- int get_core_id() const { return _core_id; }
-
- PipelineTask& set_core_id(int id) {
- if (id != _core_id) {
- if (_core_id != -1) {
- COUNTER_UPDATE(_core_change_times, 1);
- }
- _core_id = id;
- }
+ int get_thread_id(int num_threads) const {
+ return _thread_id == -1 ? _thread_id : _thread_id % num_threads;
+ }
+
+ PipelineTask& set_thread_id(int thread_id) {
+ _thread_id = thread_id;
+ COUNTER_UPDATE(_core_change_times, 1);
return *this;
}
@@ -193,7 +191,7 @@ private:
PipelinePtr _pipeline;
bool _opened;
RuntimeState* _state = nullptr;
- int _core_id = -1;
+ int _thread_id = -1;
uint32_t _schedule_time = 0;
std::unique_ptr<vectorized::Block> _block;
diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index be32b6800f7..81d09bc2455 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -191,11 +191,11 @@ PipelineTaskSPtr MultiCoreTaskQueue::_steal_take(int
core_id) {
}
Status MultiCoreTaskQueue::push_back(PipelineTaskSPtr task) {
- int core_id = task->get_core_id();
- if (core_id < 0) {
- core_id = _next_core.fetch_add(1) % _core_size;
+ int thread_id = task->get_thread_id(_core_size);
+ if (thread_id < 0) {
+ thread_id = _next_core.fetch_add(1) % _core_size;
}
- return push_back(task, core_id);
+ return push_back(task, thread_id);
}
Status MultiCoreTaskQueue::push_back(PipelineTaskSPtr task, int core_id) {
@@ -207,7 +207,7 @@ Status MultiCoreTaskQueue::push_back(PipelineTaskSPtr task,
int core_id) {
void MultiCoreTaskQueue::update_statistics(PipelineTask* task, int64_t
time_spent) {
// if the task not execute but exception early close, core_id == -1
// should not do update_statistics
- if (auto core_id = task->get_core_id(); core_id >= 0) {
+ if (auto core_id = task->get_thread_id(_core_size); core_id >= 0) {
task->inc_runtime_ns(time_spent);
_prio_task_queues[core_id].inc_sub_queue_runtime(task->get_queue_level(),
time_spent);
}
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 6b0e7ec8b05..794a08155d1 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -56,15 +56,14 @@ TaskScheduler::~TaskScheduler() {
}
Status TaskScheduler::start() {
- int cores = _task_queue.cores();
RETURN_IF_ERROR(ThreadPoolBuilder(_name)
- .set_min_threads(cores)
- .set_max_threads(cores)
+ .set_min_threads(_num_threads)
+ .set_max_threads(_num_threads)
.set_max_queue_size(0)
.set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
.build(&_fix_thread_pool));
- LOG_INFO("TaskScheduler set cores").tag("size", cores);
- for (int32_t i = 0; i < cores; ++i) {
+ LOG_INFO("TaskScheduler set cores").tag("size", _num_threads);
+ for (int32_t i = 0; i < _num_threads; ++i) {
RETURN_IF_ERROR(_fix_thread_pool->submit_func([this, i] { _do_work(i);
}));
}
return Status::OK();
@@ -117,7 +116,7 @@ void TaskScheduler::_do_work(int index) {
// Fragment already finished
continue;
}
- task->set_running(true).set_core_id(index);
+ task->set_running(true).set_thread_id(index);
bool done = false;
auto status = Status::OK();
int64_t exec_ns = 0;
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index 85e39910500..62b1b644c90 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -61,14 +61,18 @@ private:
friend class HybridTaskScheduler;
TaskScheduler(int core_num, std::string name,
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
- : _name(std::move(name)), _task_queue(core_num),
_cgroup_cpu_ctl(cgroup_cpu_ctl) {}
- TaskScheduler() : _task_queue(0) {}
+ : _name(std::move(name)),
+ _task_queue(core_num),
+ _num_threads(core_num),
+ _cgroup_cpu_ctl(cgroup_cpu_ctl) {}
+ TaskScheduler() : _task_queue(0), _num_threads(0) {}
std::string _name;
std::unique_ptr<ThreadPool> _fix_thread_pool;
MultiCoreTaskQueue _task_queue;
bool _need_to_stop = false;
bool _shutdown = false;
+ const int _num_threads;
std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
void _do_work(int index);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]