This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bb1420b3fe7 [fix](scheduler) Fix coredump due to different queue size 
(#55736)
bb1420b3fe7 is described below

commit bb1420b3fe7a49dda4e8b69762625c9c370bf15d
Author: Gabriel <[email protected]>
AuthorDate: Mon Sep 8 14:27:36 2025 +0800

    [fix](scheduler) Fix coredump due to different queue size (#55736)
---
 be/src/pipeline/pipeline_task.h    | 18 ++++++++----------
 be/src/pipeline/task_queue.cpp     | 10 +++++-----
 be/src/pipeline/task_scheduler.cpp | 11 +++++------
 be/src/pipeline/task_scheduler.h   |  8 ++++++--
 4 files changed, 24 insertions(+), 23 deletions(-)

diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 91db6789b44..ed27cc753b0 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -66,15 +66,13 @@ public:
 
     std::weak_ptr<PipelineFragmentContext>& fragment_context() { return 
_fragment_context; }
 
-    int get_core_id() const { return _core_id; }
-
-    PipelineTask& set_core_id(int id) {
-        if (id != _core_id) {
-            if (_core_id != -1) {
-                COUNTER_UPDATE(_core_change_times, 1);
-            }
-            _core_id = id;
-        }
+    int get_thread_id(int num_threads) const {
+        return _thread_id == -1 ? _thread_id : _thread_id % num_threads;
+    }
+
+    PipelineTask& set_thread_id(int thread_id) {
+        _thread_id = thread_id;
+        COUNTER_UPDATE(_core_change_times, 1);
         return *this;
     }
 
@@ -193,7 +191,7 @@ private:
     PipelinePtr _pipeline;
     bool _opened;
     RuntimeState* _state = nullptr;
-    int _core_id = -1;
+    int _thread_id = -1;
     uint32_t _schedule_time = 0;
     std::unique_ptr<vectorized::Block> _block;
 
diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index be32b6800f7..81d09bc2455 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -191,11 +191,11 @@ PipelineTaskSPtr MultiCoreTaskQueue::_steal_take(int 
core_id) {
 }
 
 Status MultiCoreTaskQueue::push_back(PipelineTaskSPtr task) {
-    int core_id = task->get_core_id();
-    if (core_id < 0) {
-        core_id = _next_core.fetch_add(1) % _core_size;
+    int thread_id = task->get_thread_id(_core_size);
+    if (thread_id < 0) {
+        thread_id = _next_core.fetch_add(1) % _core_size;
     }
-    return push_back(task, core_id);
+    return push_back(task, thread_id);
 }
 
 Status MultiCoreTaskQueue::push_back(PipelineTaskSPtr task, int core_id) {
@@ -207,7 +207,7 @@ Status MultiCoreTaskQueue::push_back(PipelineTaskSPtr task, 
int core_id) {
 void MultiCoreTaskQueue::update_statistics(PipelineTask* task, int64_t 
time_spent) {
     // if the task not execute but exception early close, core_id == -1
     // should not do update_statistics
-    if (auto core_id = task->get_core_id(); core_id >= 0) {
+    if (auto core_id = task->get_thread_id(_core_size); core_id >= 0) {
         task->inc_runtime_ns(time_spent);
         
_prio_task_queues[core_id].inc_sub_queue_runtime(task->get_queue_level(), 
time_spent);
     }
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 6b0e7ec8b05..794a08155d1 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -56,15 +56,14 @@ TaskScheduler::~TaskScheduler() {
 }
 
 Status TaskScheduler::start() {
-    int cores = _task_queue.cores();
     RETURN_IF_ERROR(ThreadPoolBuilder(_name)
-                            .set_min_threads(cores)
-                            .set_max_threads(cores)
+                            .set_min_threads(_num_threads)
+                            .set_max_threads(_num_threads)
                             .set_max_queue_size(0)
                             .set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
                             .build(&_fix_thread_pool));
-    LOG_INFO("TaskScheduler set cores").tag("size", cores);
-    for (int32_t i = 0; i < cores; ++i) {
+    LOG_INFO("TaskScheduler set cores").tag("size", _num_threads);
+    for (int32_t i = 0; i < _num_threads; ++i) {
         RETURN_IF_ERROR(_fix_thread_pool->submit_func([this, i] { _do_work(i); 
}));
     }
     return Status::OK();
@@ -117,7 +116,7 @@ void TaskScheduler::_do_work(int index) {
             // Fragment already finished
             continue;
         }
-        task->set_running(true).set_core_id(index);
+        task->set_running(true).set_thread_id(index);
         bool done = false;
         auto status = Status::OK();
         int64_t exec_ns = 0;
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index 85e39910500..62b1b644c90 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -61,14 +61,18 @@ private:
     friend class HybridTaskScheduler;
 
     TaskScheduler(int core_num, std::string name, 
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
-            : _name(std::move(name)), _task_queue(core_num), 
_cgroup_cpu_ctl(cgroup_cpu_ctl) {}
-    TaskScheduler() : _task_queue(0) {}
+            : _name(std::move(name)),
+              _task_queue(core_num),
+              _num_threads(core_num),
+              _cgroup_cpu_ctl(cgroup_cpu_ctl) {}
+    TaskScheduler() : _task_queue(0), _num_threads(0) {}
     std::string _name;
     std::unique_ptr<ThreadPool> _fix_thread_pool;
 
     MultiCoreTaskQueue _task_queue;
     bool _need_to_stop = false;
     bool _shutdown = false;
+    const int _num_threads;
     std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
 
     void _do_work(int index);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to