This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 327e31c [Feature] Support setting concurrency for thread pool token
(#6237)
327e31c is described below
commit 327e31c227dbb23362b332835fc7be5d60951a0f
Author: Mingyu Chen <[email protected]>
AuthorDate: Wed Jul 21 12:30:43 2021 +0800
[Feature] Support setting concurrency for thread pool token (#6237)
Now we can submit a group of tasks using thread pool token, and limit
the max concurrency of this task group
---
be/src/util/threadpool.cpp | 47 +++++++++++++++++++---
be/src/util/threadpool.h | 22 +++++++++--
be/test/util/threadpool_test.cpp | 85 ++++++++++++++++++++++++----------------
3 files changed, 110 insertions(+), 44 deletions(-)
diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp
index f2a0f18..b06a494 100644
--- a/be/src/util/threadpool.cpp
+++ b/be/src/util/threadpool.cpp
@@ -80,12 +80,20 @@ Status
ThreadPoolBuilder::build(std::unique_ptr<ThreadPool>* pool) const {
return Status::OK();
}
-ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode
mode)
+ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode
mode, int max_concurrency)
: _mode(mode),
_pool(pool),
_state(State::IDLE),
_not_running_cond(&pool->_lock),
- _active_threads(0) {}
+ _active_threads(0),
+ _max_concurrency(max_concurrency),
+ _num_submitted_tasks(0),
+ _num_unsubmitted_tasks(0) {
+
+ if (max_concurrency == 1 && mode != ThreadPool::ExecutionMode::SERIAL) {
+ _mode = ThreadPool::ExecutionMode::SERIAL;
+ }
+}
ThreadPoolToken::~ThreadPoolToken() {
shutdown();
@@ -240,6 +248,11 @@ const char* ThreadPoolToken::state_to_string(State s) {
return "<cannot reach here>";
}
+bool ThreadPoolToken::need_dispatch() {
+ return _state == ThreadPoolToken::State::IDLE
+ || (_mode == ThreadPool::ExecutionMode::CONCURRENT &&
_num_submitted_tasks < _max_concurrency);
+}
+
ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
: _name(builder._name),
_min_threads(builder._min_threads),
@@ -294,6 +307,7 @@ void ThreadPool::shutdown() {
// wanting to access the ThreadPool. The task's destructors may acquire
// locks, etc, so this also prevents lock inversions.
_queue.clear();
+
std::deque<std::deque<Task>> to_release;
for (auto* t : _tokens) {
if (!t->_entries.empty()) {
@@ -336,9 +350,9 @@ void ThreadPool::shutdown() {
}
}
-std::unique_ptr<ThreadPoolToken> ThreadPool::new_token(ExecutionMode mode) {
+std::unique_ptr<ThreadPoolToken> ThreadPool::new_token(ExecutionMode mode, int
max_concurrency) {
MutexLock unique_lock(&_lock);
- std::unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, mode));
+ std::unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, mode,
max_concurrency));
InsertOrDie(&_tokens, t.get());
return t;
}
@@ -416,11 +430,22 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r,
ThreadPoolToken* token
ThreadPoolToken::State state = token->state();
DCHECK(state == ThreadPoolToken::State::IDLE || state ==
ThreadPoolToken::State::RUNNING);
token->_entries.emplace_back(std::move(task));
- if (state == ThreadPoolToken::State::IDLE || token->mode() ==
ExecutionMode::CONCURRENT) {
+ // When we need to execute the task in the token, we submit the token
object to the queue.
+ // There are currently two places where tokens will be submitted to the
queue:
+ // 1. When submitting a new task, if the token is still in the IDLE state,
+ // or the concurrency of the token has not reached the online level, it
will be added to the queue.
+ // 2. When the dispatch thread finishes executing a task:
+ // 1. If it is a SERIAL token, and there are unsubmitted tasks, submit
them to the queue.
+ // 2. If it is a CONCURRENT token, and there are still unsubmitted
tasks, and the upper limit of concurrency is not reached,
+ // then submitted to the queue.
+ if (token->need_dispatch()) {
_queue.emplace_back(token);
+ ++token->_num_submitted_tasks;
if (state == ThreadPoolToken::State::IDLE) {
token->transition(ThreadPoolToken::State::RUNNING);
}
+ } else {
+ ++token->_num_unsubmitted_tasks;
}
_total_queued_tasks++;
@@ -563,7 +588,9 @@ void ThreadPool::dispatch_thread() {
ThreadPoolToken::State state = token->state();
DCHECK(state == ThreadPoolToken::State::RUNNING ||
state == ThreadPoolToken::State::QUIESCING);
- if (--token->_active_threads == 0) {
+ --token->_active_threads;
+ --token->_num_submitted_tasks;
+ if (token->_active_threads == 0) {
if (state == ThreadPoolToken::State::QUIESCING) {
DCHECK(token->_entries.empty());
token->transition(ThreadPoolToken::State::QUIESCED);
@@ -571,8 +598,16 @@ void ThreadPool::dispatch_thread() {
token->transition(ThreadPoolToken::State::IDLE);
} else if (token->mode() == ExecutionMode::SERIAL) {
_queue.emplace_back(token);
+ ++token->_num_submitted_tasks;
+ --token->_num_unsubmitted_tasks;
}
+ } else if (token->mode() == ExecutionMode::CONCURRENT &&
token->_num_submitted_tasks < token->_max_concurrency
+ && token->_num_unsubmitted_tasks > 0) {
+ _queue.emplace_back(token);
+ ++token->_num_submitted_tasks;
+ --token->_num_unsubmitted_tasks;
}
+
if (--_active_threads == 0) {
_idle_cond.notify_all();
}
diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h
index 51acc9d..c228e15 100644
--- a/be/src/util/threadpool.h
+++ b/be/src/util/threadpool.h
@@ -188,9 +188,9 @@ public:
SERIAL,
// Tasks submitted via this token may be executed concurrently.
- CONCURRENT,
+ CONCURRENT
};
- std::unique_ptr<ThreadPoolToken> new_token(ExecutionMode mode);
+ std::unique_ptr<ThreadPoolToken> new_token(ExecutionMode mode, int
max_concurrency = INT_MAX);
// Return the number of threads currently running (or in the process of
starting up)
// for this thread pool.
@@ -362,6 +362,13 @@ public:
// Returns true if all submissions are complete, false otherwise.
bool wait_for(const MonoDelta& delta);
+ bool need_dispatch();
+
+ size_t num_tasks() {
+ MutexLock l(&_pool->_lock);
+ return _entries.size();
+ }
+
private:
// All possible token states. Legal state transitions:
// IDLE -> RUNNING: task is submitted via token
@@ -400,7 +407,7 @@ private:
// Constructs a new token.
//
// The token may not outlive its thread pool ('pool').
- ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode);
+ ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode, int
max_concurrency = INT_MAX);
// Changes this token's state to 'new_state' taking actions as needed.
void transition(State new_state);
@@ -418,7 +425,7 @@ private:
ThreadPool::ExecutionMode mode() const { return _mode; }
// Token's configured execution mode.
- const ThreadPool::ExecutionMode _mode;
+ ThreadPool::ExecutionMode _mode;
// Pointer to the token's thread pool.
ThreadPool* _pool;
@@ -436,6 +443,13 @@ private:
// Number of worker threads currently executing tasks belonging to this
// token.
int _active_threads;
+ // The max number of tasks that can be ran concurrenlty. This is to limit
+ // the concurrency of a thread pool token, and default is INT_MAX(no
limited)
+ int _max_concurrency;
+ // Number of tasks which has been submitted to the thread pool's queue.
+ int _num_submitted_tasks;
+ // Number of tasks which has not been submitted to the thread pool's queue.
+ int _num_unsubmitted_tasks;
DISALLOW_COPY_AND_ASSIGN(ThreadPoolToken);
};
diff --git a/be/test/util/threadpool_test.cpp b/be/test/util/threadpool_test.cpp
index f9866a7..b8ebebd 100644
--- a/be/test/util/threadpool_test.cpp
+++ b/be/test/util/threadpool_test.cpp
@@ -739,44 +739,61 @@ TEST_F(ThreadPoolTest, TestTokenConcurrency) {
total_num_tokens_submitted.load());
}
-/*
-TEST_F(ThreadPoolTest, TestLIFOThreadWakeUps) {
- const int kNumThreads = 10;
+static void MyFunc(int idx, int n) {
+ std::cout << idx << ", " << std::this_thread::get_id() << " before sleep "
<< n << " seconds" << std::endl;
+ sleep(n);
+ std::cout << idx << ", " << std::this_thread::get_id() << " after sleep "
<< n << " seconds" << std::endl;
+}
- // Test with a pool that allows for kNumThreads concurrent threads.
- ASSERT_OK(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
- .set_max_threads(kNumThreads)).ok());
+TEST_F(ThreadPoolTest, TestNormal) {
+ std::unique_ptr<ThreadPool> thread_pool;
+ ThreadPoolBuilder("my_pool")
+ .set_min_threads(0)
+ .set_max_threads(5)
+ .set_max_queue_size(10)
+ .set_idle_timeout(MonoDelta::FromMilliseconds(2000))
+ .build(&thread_pool);
+
+ std::unique_ptr<ThreadPoolToken> token1 =
thread_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT, 2);
+ for (int i = 0; i < 10; i++) {
+ token1->submit_func(std::bind(&MyFunc, i, 1));
+ }
+ std::cout << "after submit 1" << std::endl;
+ token1->wait();
+ ASSERT_EQ(0, token1->num_tasks());
- // Submit kNumThreads slow tasks and unblock them, in order to produce
- // kNumThreads worker threads.
- CountDownLatch latch(1);
- SCOPED_CLEANUP({
- latch.CountDown();
- });
- for (int i = 0; i < kNumThreads; i++) {
- ASSERT_OK(pool_->submit(SlowTask::new_slow_task(&latch)).ok());
+ std::unique_ptr<ThreadPoolToken> token2 =
thread_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT, 20);
+ for (int i = 0; i < 10; i++) {
+ token2->submit_func(std::bind(&MyFunc, i, 1));
}
- ASSERT_EQ(kNumThreads, _pool->num_threads());
- latch.count_down();
- pool_->wait();
-
- // The kNumThreads threads are idle and waiting for the idle timeout.
-
- // Submit a slow trickle of lightning fast tasks.
- //
- // If the threads are woken up in FIFO order, this trickle is enough to
- // prevent all of them from idling and the AssertEventually will time out.
- //
- // If LIFO order is used, the same thread will be reused for each task and
- // the other threads will eventually time out.
- AssertEventually([&]() {
- ASSERT_OK(_pool->submit_func([](){}).ok());
- SleepFor(MonoDelta::FromMilliseconds(10));
- ASSERT_EQ(1, _pool->num_threads());
- }, MonoDelta::FromSeconds(10), AssertBackoff::NONE);
- NO_PENDING_FATALS();
+ std::cout << "after submit 2" << std::endl;
+ token2->wait();
+ ASSERT_EQ(0, token2->num_tasks());
+
+ std::unique_ptr<ThreadPoolToken> token3 =
thread_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT, 1);
+ for (int i = 0; i < 10; i++) {
+ token3->submit_func(std::bind(&MyFunc, i, 1));
+ }
+ std::cout << "after submit 3" << std::endl;
+ token3->wait();
+ ASSERT_EQ(0, token3->num_tasks());
+
+ std::unique_ptr<ThreadPoolToken> token4 =
thread_pool->new_token(ThreadPool::ExecutionMode::SERIAL);
+ for (int i = 0; i < 10; i++) {
+ token4->submit_func(std::bind(&MyFunc, i, 1));
+ }
+ std::cout << "after submit 4" << std::endl;
+ token4->wait();
+ ASSERT_EQ(0, token4->num_tasks());
+
+ std::unique_ptr<ThreadPoolToken> token5 =
thread_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT, 20);
+ for (int i = 0; i < 10; i++) {
+ token5->submit_func(std::bind(&MyFunc, i, 1));
+ }
+ std::cout << "after submit 5" << std::endl;
+ token5->shutdown();
+ ASSERT_EQ(0, token5->num_tasks());
}
-*/
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]