This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 327e31c  [Feature] Support setting concurrency for thread pool token 
(#6237)
327e31c is described below

commit 327e31c227dbb23362b332835fc7be5d60951a0f
Author: Mingyu Chen <[email protected]>
AuthorDate: Wed Jul 21 12:30:43 2021 +0800

    [Feature] Support setting concurrency for thread pool token (#6237)
    
    Now we can submit a group of tasks using thread pool token, and limit
    the max concurrency of this task group
---
 be/src/util/threadpool.cpp       | 47 +++++++++++++++++++---
 be/src/util/threadpool.h         | 22 +++++++++--
 be/test/util/threadpool_test.cpp | 85 ++++++++++++++++++++++++----------------
 3 files changed, 110 insertions(+), 44 deletions(-)

diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp
index f2a0f18..b06a494 100644
--- a/be/src/util/threadpool.cpp
+++ b/be/src/util/threadpool.cpp
@@ -80,12 +80,20 @@ Status 
ThreadPoolBuilder::build(std::unique_ptr<ThreadPool>* pool) const {
     return Status::OK();
 }
 
-ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode 
mode)
+ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode 
mode, int max_concurrency)
         : _mode(mode),
           _pool(pool),
           _state(State::IDLE),
           _not_running_cond(&pool->_lock),
-          _active_threads(0) {}
+          _active_threads(0),
+          _max_concurrency(max_concurrency),
+          _num_submitted_tasks(0),
+          _num_unsubmitted_tasks(0) {
+
+    if (max_concurrency == 1 && mode != ThreadPool::ExecutionMode::SERIAL) {
+        _mode = ThreadPool::ExecutionMode::SERIAL;
+    }
+}
 
 ThreadPoolToken::~ThreadPoolToken() {
     shutdown();
@@ -240,6 +248,11 @@ const char* ThreadPoolToken::state_to_string(State s) {
     return "<cannot reach here>";
 }
 
+bool ThreadPoolToken::need_dispatch() {
+    return _state == ThreadPoolToken::State::IDLE 
+        || (_mode == ThreadPool::ExecutionMode::CONCURRENT && 
_num_submitted_tasks < _max_concurrency);
+}
+
 ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
         : _name(builder._name),
           _min_threads(builder._min_threads),
@@ -294,6 +307,7 @@ void ThreadPool::shutdown() {
     // wanting to access the ThreadPool. The task's destructors may acquire
     // locks, etc, so this also prevents lock inversions.
     _queue.clear();
+
     std::deque<std::deque<Task>> to_release;
     for (auto* t : _tokens) {
         if (!t->_entries.empty()) {
@@ -336,9 +350,9 @@ void ThreadPool::shutdown() {
     }
 }
 
-std::unique_ptr<ThreadPoolToken> ThreadPool::new_token(ExecutionMode mode) {
+std::unique_ptr<ThreadPoolToken> ThreadPool::new_token(ExecutionMode mode, int 
max_concurrency) {
     MutexLock unique_lock(&_lock);
-    std::unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, mode));
+    std::unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, mode, 
max_concurrency));
     InsertOrDie(&_tokens, t.get());
     return t;
 }
@@ -416,11 +430,22 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, 
ThreadPoolToken* token
     ThreadPoolToken::State state = token->state();
     DCHECK(state == ThreadPoolToken::State::IDLE || state == 
ThreadPoolToken::State::RUNNING);
     token->_entries.emplace_back(std::move(task));
-    if (state == ThreadPoolToken::State::IDLE || token->mode() == 
ExecutionMode::CONCURRENT) {
+    // When we need to execute the task in the token, we submit the token 
object to the queue.
+    // There are currently two places where tokens will be submitted to the 
queue:
+    // 1. When submitting a new task, if the token is still in the IDLE state, 
+    //    or the concurrency of the token has not reached the online level, it 
will be added to the queue.
+    // 2. When the dispatch thread finishes executing a task:
+    //    1. If it is a SERIAL token, and there are unsubmitted tasks, submit 
them to the queue.
+    //    2. If it is a CONCURRENT token, and there are still unsubmitted 
tasks, and the upper limit of concurrency is not reached,
+    //       then submitted to the queue. 
+    if (token->need_dispatch()) {
         _queue.emplace_back(token);
+        ++token->_num_submitted_tasks;
         if (state == ThreadPoolToken::State::IDLE) {
             token->transition(ThreadPoolToken::State::RUNNING);
         }
+    } else {
+        ++token->_num_unsubmitted_tasks;
     }
     _total_queued_tasks++;
 
@@ -563,7 +588,9 @@ void ThreadPool::dispatch_thread() {
         ThreadPoolToken::State state = token->state();
         DCHECK(state == ThreadPoolToken::State::RUNNING ||
                state == ThreadPoolToken::State::QUIESCING);
-        if (--token->_active_threads == 0) {
+        --token->_active_threads;
+        --token->_num_submitted_tasks;
+        if (token->_active_threads == 0) {
             if (state == ThreadPoolToken::State::QUIESCING) {
                 DCHECK(token->_entries.empty());
                 token->transition(ThreadPoolToken::State::QUIESCED);
@@ -571,8 +598,16 @@ void ThreadPool::dispatch_thread() {
                 token->transition(ThreadPoolToken::State::IDLE);
             } else if (token->mode() == ExecutionMode::SERIAL) {
                 _queue.emplace_back(token);
+                ++token->_num_submitted_tasks;
+                --token->_num_unsubmitted_tasks;
             }
+        } else if (token->mode() == ExecutionMode::CONCURRENT && 
token->_num_submitted_tasks < token->_max_concurrency
+                && token->_num_unsubmitted_tasks > 0) {
+            _queue.emplace_back(token);
+            ++token->_num_submitted_tasks;
+            --token->_num_unsubmitted_tasks;
         }
+
         if (--_active_threads == 0) {
             _idle_cond.notify_all();
         }
diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h
index 51acc9d..c228e15 100644
--- a/be/src/util/threadpool.h
+++ b/be/src/util/threadpool.h
@@ -188,9 +188,9 @@ public:
         SERIAL,
 
         // Tasks submitted via this token may be executed concurrently.
-        CONCURRENT,
+        CONCURRENT
     };
-    std::unique_ptr<ThreadPoolToken> new_token(ExecutionMode mode);
+    std::unique_ptr<ThreadPoolToken> new_token(ExecutionMode mode, int 
max_concurrency = INT_MAX);
 
     // Return the number of threads currently running (or in the process of 
starting up)
     // for this thread pool.
@@ -362,6 +362,13 @@ public:
     // Returns true if all submissions are complete, false otherwise.
     bool wait_for(const MonoDelta& delta);
 
+    bool need_dispatch();
+
+    size_t num_tasks() {
+        MutexLock l(&_pool->_lock);
+        return _entries.size();
+    }
+
 private:
     // All possible token states. Legal state transitions:
     //   IDLE      -> RUNNING: task is submitted via token
@@ -400,7 +407,7 @@ private:
     // Constructs a new token.
     //
     // The token may not outlive its thread pool ('pool').
-    ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode);
+    ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode, int 
max_concurrency = INT_MAX);
 
     // Changes this token's state to 'new_state' taking actions as needed.
     void transition(State new_state);
@@ -418,7 +425,7 @@ private:
     ThreadPool::ExecutionMode mode() const { return _mode; }
 
     // Token's configured execution mode.
-    const ThreadPool::ExecutionMode _mode;
+    ThreadPool::ExecutionMode _mode;
 
     // Pointer to the token's thread pool.
     ThreadPool* _pool;
@@ -436,6 +443,13 @@ private:
     // Number of worker threads currently executing tasks belonging to this
     // token.
     int _active_threads;
+    // The max number of tasks that can be ran concurrenlty. This is to limit
+    // the concurrency of a thread pool token, and default is INT_MAX(no 
limited)
+    int _max_concurrency;
+    // Number of tasks which has been submitted to the thread pool's queue.
+    int _num_submitted_tasks;
+    // Number of tasks which has not been submitted to the thread pool's queue.
+    int _num_unsubmitted_tasks;
 
     DISALLOW_COPY_AND_ASSIGN(ThreadPoolToken);
 };
diff --git a/be/test/util/threadpool_test.cpp b/be/test/util/threadpool_test.cpp
index f9866a7..b8ebebd 100644
--- a/be/test/util/threadpool_test.cpp
+++ b/be/test/util/threadpool_test.cpp
@@ -739,44 +739,61 @@ TEST_F(ThreadPoolTest, TestTokenConcurrency) {
                                      total_num_tokens_submitted.load());
 }
 
-/*
-TEST_F(ThreadPoolTest, TestLIFOThreadWakeUps) {
-    const int kNumThreads = 10;
+static void MyFunc(int idx, int n) {
+    std::cout << idx << ", " << std::this_thread::get_id() << " before sleep " 
<< n << " seconds" << std::endl;
+    sleep(n);
+    std::cout << idx << ", " << std::this_thread::get_id() << " after sleep " 
<< n << " seconds" << std::endl;
+}
 
-    // Test with a pool that allows for kNumThreads concurrent threads.
-    ASSERT_OK(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
-                .set_max_threads(kNumThreads)).ok());
+TEST_F(ThreadPoolTest, TestNormal) {
+    std::unique_ptr<ThreadPool> thread_pool;
+    ThreadPoolBuilder("my_pool")
+        .set_min_threads(0)
+        .set_max_threads(5)
+        .set_max_queue_size(10)
+        .set_idle_timeout(MonoDelta::FromMilliseconds(2000))
+        .build(&thread_pool);
+
+    std::unique_ptr<ThreadPoolToken> token1 = 
thread_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT, 2);
+    for (int i = 0; i < 10; i++) {
+        token1->submit_func(std::bind(&MyFunc, i, 1));
+    }
+    std::cout << "after submit 1" << std::endl;
+    token1->wait();
+    ASSERT_EQ(0, token1->num_tasks());
 
-    // Submit kNumThreads slow tasks and unblock them, in order to produce
-    // kNumThreads worker threads.
-    CountDownLatch latch(1);
-    SCOPED_CLEANUP({
-            latch.CountDown();
-    });
-    for (int i = 0; i < kNumThreads; i++) {
-        ASSERT_OK(pool_->submit(SlowTask::new_slow_task(&latch)).ok());
+    std::unique_ptr<ThreadPoolToken> token2 = 
thread_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT, 20);
+    for (int i = 0; i < 10; i++) {
+        token2->submit_func(std::bind(&MyFunc, i, 1));
     }
-    ASSERT_EQ(kNumThreads, _pool->num_threads());
-    latch.count_down();
-    pool_->wait();
-
-    // The kNumThreads threads are idle and waiting for the idle timeout.
-
-    // Submit a slow trickle of lightning fast tasks.
-    //
-    // If the threads are woken up in FIFO order, this trickle is enough to
-    // prevent all of them from idling and the AssertEventually will time out.
-    //
-    // If LIFO order is used, the same thread will be reused for each task and
-    // the other threads will eventually time out.
-    AssertEventually([&]() {
-            ASSERT_OK(_pool->submit_func([](){}).ok());
-            SleepFor(MonoDelta::FromMilliseconds(10));
-            ASSERT_EQ(1, _pool->num_threads());
-            }, MonoDelta::FromSeconds(10), AssertBackoff::NONE);
-    NO_PENDING_FATALS();
+    std::cout << "after submit 2" << std::endl;
+    token2->wait();
+    ASSERT_EQ(0, token2->num_tasks());
+
+    std::unique_ptr<ThreadPoolToken> token3 = 
thread_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT, 1);
+    for (int i = 0; i < 10; i++) {
+        token3->submit_func(std::bind(&MyFunc, i, 1));
+    }
+    std::cout << "after submit 3" << std::endl;
+    token3->wait();
+    ASSERT_EQ(0, token3->num_tasks());
+
+    std::unique_ptr<ThreadPoolToken> token4 = 
thread_pool->new_token(ThreadPool::ExecutionMode::SERIAL);
+    for (int i = 0; i < 10; i++) {
+        token4->submit_func(std::bind(&MyFunc, i, 1));
+    }
+    std::cout << "after submit 4" << std::endl;
+    token4->wait();
+    ASSERT_EQ(0, token4->num_tasks());
+
+    std::unique_ptr<ThreadPoolToken> token5 = 
thread_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT, 20);
+    for (int i = 0; i < 10; i++) {
+        token5->submit_func(std::bind(&MyFunc, i, 1));
+    }
+    std::cout << "after submit 5" << std::endl;
+    token5->shutdown();
+    ASSERT_EQ(0, token5->num_tasks());
 }
-*/
 
 } // namespace doris
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to