This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new dbf963c1875 [fix](BE) Fix inefficient problem in PriorTaskWorkerPool
(#37169)
dbf963c1875 is described below
commit dbf963c187564e527f3d53aecd192bfcb1701926
Author: plat1ko <[email protected]>
AuthorDate: Fri Jul 5 22:41:48 2024 +0800
[fix](BE) Fix inefficient problem in PriorTaskWorkerPool (#37169)
## Proposed changes
In the original implementation of `PriorTaskWorkerPool`, although
multiple threads were launched in the normal pool and high prior pool,
only one thread was actually working in each pool (running `normal_loop`
and `high_prior_loop`, respectively). This PR fixes this issue.
---
be/src/agent/task_worker_pool.cpp | 38 +++++++++++++++-----------------------
be/src/agent/task_worker_pool.h | 6 +++---
2 files changed, 18 insertions(+), 26 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 9b0a9592950..0e851fba17a 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -540,26 +540,20 @@ Status TaskWorkerPool::submit_task(const
TAgentTaskRequest& task) {
}
PriorTaskWorkerPool::PriorTaskWorkerPool(
- std::string_view name, int normal_worker_count, int
high_prior_worker_count,
+ const std::string& name, int normal_worker_count, int
high_prior_worker_count,
std::function<void(const TAgentTaskRequest& task)> callback)
: _callback(std::move(callback)) {
- auto st = ThreadPoolBuilder(fmt::format("TaskWP_.{}", name))
- .set_min_threads(normal_worker_count)
- .set_max_threads(normal_worker_count)
- .build(&_normal_pool);
- CHECK(st.ok()) << name << ": " << st;
-
- st = _normal_pool->submit_func([this] { normal_loop(); });
- CHECK(st.ok()) << name << ": " << st;
-
- st = ThreadPoolBuilder(fmt::format("HighPriorPool.{}", name))
- .set_min_threads(high_prior_worker_count)
- .set_max_threads(high_prior_worker_count)
- .build(&_high_prior_pool);
- CHECK(st.ok()) << name << ": " << st;
+ for (int i = 0; i < normal_worker_count; ++i) {
+ auto st = Thread::create(
+ "Normal", name, [this] { normal_loop(); },
&_workers.emplace_back());
+ CHECK(st.ok()) << name << ": " << st;
+ }
- st = _high_prior_pool->submit_func([this] { high_prior_loop(); });
- CHECK(st.ok()) << name << ": " << st;
+ for (int i = 0; i < high_prior_worker_count; ++i) {
+ auto st = Thread::create(
+ "HighPrior", name, [this] { high_prior_loop(); },
&_workers.emplace_back());
+ CHECK(st.ok()) << name << ": " << st;
+ }
}
PriorTaskWorkerPool::~PriorTaskWorkerPool() {
@@ -578,12 +572,10 @@ void PriorTaskWorkerPool::stop() {
_normal_condv.notify_all();
_high_prior_condv.notify_all();
- if (_normal_pool) {
- _normal_pool->shutdown();
- }
-
- if (_high_prior_pool) {
- _high_prior_pool->shutdown();
+ for (auto&& w : _workers) {
+ if (w) {
+ w->join();
+ }
}
}
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index 514692968b4..f51d6c2a4c0 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -79,7 +79,8 @@ private:
class PriorTaskWorkerPool final : public TaskWorkerPoolIf {
public:
- PriorTaskWorkerPool(std::string_view name, int normal_worker_count, int
high_prior_worker_count,
+ PriorTaskWorkerPool(const std::string& name, int normal_worker_count,
+ int high_prior_worker_count,
std::function<void(const TAgentTaskRequest& task)>
callback);
~PriorTaskWorkerPool() override;
@@ -101,8 +102,7 @@ private:
std::condition_variable _high_prior_condv;
std::deque<std::unique_ptr<TAgentTaskRequest>> _high_prior_queue;
- std::unique_ptr<ThreadPool> _normal_pool;
- std::unique_ptr<ThreadPool> _high_prior_pool;
+ std::vector<scoped_refptr<Thread>> _workers;
std::function<void(const TAgentTaskRequest&)> _callback;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]