This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new ba5c6fba985 [scheduler](core) Use signed int as number of cores
(#38514) (#38913)
ba5c6fba985 is described below
commit ba5c6fba9856cdd1c3be54ab434e300f2b21d6b1
Author: Gabriel <[email protected]>
AuthorDate: Tue Aug 6 14:44:59 2024 +0800
[scheduler](core) Use signed int as number of cores (#38514) (#38913)
pick #38514
*** is nereids: 0 ***
*** tablet id: 0 ***
*** Aborted at 1722279016 (unix time) try "date -d @1722279016" if you
are using GNU date ***
*** Current BE git commitID: e9f12fac47e ***
*** SIGSEGV unknown detail explain (@0x0) received by PID 1116227 (TID
1116498 OR 0x7f009ac00640) from PID 0; stack trace: *** 0#
doris::signal::(anonymous namespace)::FailureSignalHandler(int,
siginfo_t*, void*) at
/home/zcp/repo_center/doris_branch-2.1/doris/be/src/common/signal_handler.h:421
1# PosixSignals::chained_handler(int, siginfo*, void*) [clone .part.0]
in /usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so 2#
JVM_handle_linux_signal in
/usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so
3# 0x00007F01E49B0520 in /lib/x86_64-linux-gnu/libc.so.6
4# pthread_mutex_lock at ./nptl/pthread_mutex_lock.c:80
5# doris::pipeline::MultiCoreTaskQueue::take(unsigned long) at
/home/zcp/repo_center/doris_branch-2.1/doris/be/src/pipeline/task_queue.cpp:154
6# doris::pipeline::TaskScheduler::_do_work(unsigned long) at
/home/zcp/repo_center/doris_branch-2.1/doris/be/src/pipeline/task_scheduler.cpp:268
7# doris::ThreadPool::dispatch_thread() in
/mnt/disk1/STRESS_ENV/be/lib/doris_be
8# doris::Thread::supervise_thread(void*) at
/home/zcp/repo_center/doris_branch-2.1/doris/be/src/util/thread.cpp:499
9# start_thread at ./nptl/pthread_create.c:442
10# 0x00007F01E4A94850 at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:83
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---
be/src/pipeline/task_queue.cpp | 48 ++++++++++++++++++++++++--------------
be/src/pipeline/task_queue.h | 30 ++++++++++++------------
be/src/pipeline/task_scheduler.cpp | 2 +-
3 files changed, 46 insertions(+), 34 deletions(-)
diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index 617cd7a78d1..293769162f6 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -130,37 +130,46 @@ Status PriorityTaskQueue::push(PipelineTask* task) {
return Status::OK();
}
-int PriorityTaskQueue::task_size() {
- std::unique_lock<std::mutex> lock(_work_size_mutex);
- return _total_task_size;
-}
-
MultiCoreTaskQueue::~MultiCoreTaskQueue() = default;
-MultiCoreTaskQueue::MultiCoreTaskQueue(size_t core_size) :
TaskQueue(core_size), _closed(false) {
- _prio_task_queue_list.reset(new PriorityTaskQueue[core_size]);
+MultiCoreTaskQueue::MultiCoreTaskQueue(int core_size) : TaskQueue(core_size),
_closed(false) {
+ _prio_task_queue_list =
+
std::make_shared<std::vector<std::unique_ptr<PriorityTaskQueue>>>(core_size);
+ for (int i = 0; i < core_size; i++) {
+ (*_prio_task_queue_list)[i] = std::make_unique<PriorityTaskQueue>();
+ }
}
void MultiCoreTaskQueue::close() {
+ if (_closed) {
+ return;
+ }
_closed = true;
for (int i = 0; i < _core_size; ++i) {
- _prio_task_queue_list[i].close();
+ (*_prio_task_queue_list)[i]->close();
}
+ std::atomic_store(&_prio_task_queue_list,
+
std::shared_ptr<std::vector<std::unique_ptr<PriorityTaskQueue>>>(nullptr));
}
-PipelineTask* MultiCoreTaskQueue::take(size_t core_id) {
+PipelineTask* MultiCoreTaskQueue::take(int core_id) {
PipelineTask* task = nullptr;
+ auto prio_task_queue_list =
+ std::atomic_load_explicit(&_prio_task_queue_list,
std::memory_order_relaxed);
while (!_closed) {
- task = _prio_task_queue_list[core_id].try_take(false);
+ DCHECK(prio_task_queue_list->size() > core_id)
+ << " list size: " << prio_task_queue_list->size() << "
core_id: " << core_id
+ << " _core_size: " << _core_size << " _next_core: " <<
_next_core.load();
+ task = (*prio_task_queue_list)[core_id]->try_take(false);
if (task) {
task->set_core_id(core_id);
break;
}
- task = _steal_take(core_id);
+ task = _steal_take(core_id, *prio_task_queue_list);
if (task) {
break;
}
- task = _prio_task_queue_list[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS
/* timeout_ms */);
+ task =
(*prio_task_queue_list)[core_id]->take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms
*/);
if (task) {
task->set_core_id(core_id);
break;
@@ -172,16 +181,17 @@ PipelineTask* MultiCoreTaskQueue::take(size_t core_id) {
return task;
}
-PipelineTask* MultiCoreTaskQueue::_steal_take(size_t core_id) {
+PipelineTask* MultiCoreTaskQueue::_steal_take(
+ int core_id, std::vector<std::unique_ptr<PriorityTaskQueue>>&
prio_task_queue_list) {
DCHECK(core_id < _core_size);
- size_t next_id = core_id;
- for (size_t i = 1; i < _core_size; ++i) {
+ int next_id = core_id;
+ for (int i = 1; i < _core_size; ++i) {
++next_id;
if (next_id == _core_size) {
next_id = 0;
}
DCHECK(next_id < _core_size);
- auto task = _prio_task_queue_list[next_id].try_take(true);
+ auto task = prio_task_queue_list[next_id]->try_take(true);
if (task) {
task->set_core_id(next_id);
return task;
@@ -198,10 +208,12 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task) {
return push_back(task, core_id);
}
-Status MultiCoreTaskQueue::push_back(PipelineTask* task, size_t core_id) {
+Status MultiCoreTaskQueue::push_back(PipelineTask* task, int core_id) {
DCHECK(core_id < _core_size);
task->put_in_runnable_queue();
- return _prio_task_queue_list[core_id].push(task);
+ auto prio_task_queue_list =
+ std::atomic_load_explicit(&_prio_task_queue_list,
std::memory_order_relaxed);
+ return (*prio_task_queue_list)[core_id]->push(task);
}
} // namespace pipeline
diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h
index 02994511019..3ac9de46025 100644
--- a/be/src/pipeline/task_queue.h
+++ b/be/src/pipeline/task_queue.h
@@ -37,25 +37,25 @@ namespace pipeline {
class TaskQueue {
public:
- TaskQueue(size_t core_size) : _core_size(core_size) {}
+ TaskQueue(int core_size) : _core_size(core_size) {}
virtual ~TaskQueue();
virtual void close() = 0;
// Get the task by core id.
// TODO: To think the logic is useful?
- virtual PipelineTask* take(size_t core_id) = 0;
+ virtual PipelineTask* take(int core_id) = 0;
// push from scheduler
virtual Status push_back(PipelineTask* task) = 0;
// push from worker
- virtual Status push_back(PipelineTask* task, size_t core_id) = 0;
+ virtual Status push_back(PipelineTask* task, int core_id) = 0;
virtual void update_statistics(PipelineTask* task, int64_t time_spent) {}
int cores() const { return _core_size; }
protected:
- size_t _core_size;
+ int _core_size;
static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100;
};
@@ -105,8 +105,6 @@ public:
_sub_queues[level].inc_runtime(runtime);
}
- int task_size();
-
private:
PipelineTask* _try_take_unprotected(bool is_steal);
static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 2;
@@ -130,32 +128,34 @@ private:
// Need consider NUMA architecture
class MultiCoreTaskQueue : public TaskQueue {
public:
- explicit MultiCoreTaskQueue(size_t core_size);
+ explicit MultiCoreTaskQueue(int core_size);
~MultiCoreTaskQueue() override;
void close() override;
// Get the task by core id.
- // TODO: To think the logic is useful?
- PipelineTask* take(size_t core_id) override;
+ PipelineTask* take(int core_id) override;
// TODO combine these methods to `push_back(task, core_id = -1)`
Status push_back(PipelineTask* task) override;
- Status push_back(PipelineTask* task, size_t core_id) override;
+ Status push_back(PipelineTask* task, int core_id) override;
void update_statistics(PipelineTask* task, int64_t time_spent) override {
task->inc_runtime_ns(time_spent);
-
_prio_task_queue_list[task->get_core_id()].inc_sub_queue_runtime(task->get_queue_level(),
-
time_spent);
+ auto prio_task_queue_list =
+ std::atomic_load_explicit(&_prio_task_queue_list,
std::memory_order_relaxed);
+
(*prio_task_queue_list)[task->get_core_id()]->inc_sub_queue_runtime(task->get_queue_level(),
+
time_spent);
}
private:
- PipelineTask* _steal_take(size_t core_id);
+ PipelineTask* _steal_take(
+ int core_id, std::vector<std::unique_ptr<PriorityTaskQueue>>&
prio_task_queue_list);
- std::unique_ptr<PriorityTaskQueue[]> _prio_task_queue_list;
- std::atomic<size_t> _next_core = 0;
+ std::shared_ptr<std::vector<std::unique_ptr<PriorityTaskQueue>>>
_prio_task_queue_list;
+ std::atomic<int> _next_core = 0;
std::atomic<bool> _closed;
};
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index f2c86168180..de697469575 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -205,13 +205,13 @@ TaskScheduler::~TaskScheduler() {
Status TaskScheduler::start() {
int cores = _task_queue->cores();
- // Must be mutil number of cpu cores
RETURN_IF_ERROR(ThreadPoolBuilder(_name)
.set_min_threads(cores)
.set_max_threads(cores)
.set_max_queue_size(0)
.set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
.build(&_fix_thread_pool));
+ LOG_INFO("TaskScheduler set cores").tag("size", cores);
_markers.reserve(cores);
for (size_t i = 0; i < cores; ++i) {
_markers.push_back(std::make_unique<std::atomic<bool>>(true));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]