This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e5cd00b6 [util] introduce ThreadPoolToken::Close()
4e5cd00b6 is described below

commit 4e5cd00b62e6be94e410abf98dda6fb066c6e1bb
Author: Alexey Serbin <[email protected]>
AuthorDate: Tue Mar 11 23:27:19 2025 -0700

    [util] introduce ThreadPoolToken::Close()
    
    This patch introduces a means to protect a thread pool token from
    submission of new tasks, while allowing all of its queued tasks to
    complete before the token reaches terminal QUIESCED state.  That's the
    main difference from ThreadPoolToken::Shutdown(), where all the token's
    queued but not-yet-in-flight tasks are thrown out of the thread pool's
    queue.  From this perspective, the newly introduced Close() method might
    be considered as a graceful and asynchronous version of its Shutdown()
    counterpart.
    
    This patch also adds a few test scenarios to cover the crucial parts
    of the newly introduced functionality.
    
    Change-Id: Ic226319c191af666233ec735c1787389d051b039
    Reviewed-on: http://gerrit.cloudera.org:8080/22614
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Alexey Serbin <[email protected]>
---
 src/kudu/util/threadpool-test.cc | 233 ++++++++++++++++++++++++++++++++++++++-
 src/kudu/util/threadpool.cc      |  78 ++++++++++---
 src/kudu/util/threadpool.h       |  68 +++++++++---
 3 files changed, 351 insertions(+), 28 deletions(-)

diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc
index 35708ee0c..449cc672c 100644
--- a/src/kudu/util/threadpool-test.cc
+++ b/src/kudu/util/threadpool-test.cc
@@ -21,7 +21,9 @@
 
 #include <algorithm>
 #include <atomic>
+#include <cstddef>
 #include <cstdint>
+#include <deque>
 #include <functional>
 #include <iterator>
 #include <limits>
@@ -1009,7 +1011,25 @@ TEST_F(ThreadPoolTest, TestSlowDestructor) {
 
 // For test cases that should run with both kinds of tokens.
 class ThreadPoolTestTokenTypes : public ThreadPoolTest,
-                                 public 
testing::WithParamInterface<ThreadPool::ExecutionMode> {};
+                                 public 
testing::WithParamInterface<ThreadPool::ExecutionMode> {
+ protected:
+  static bool IsTokenActive(const ThreadPoolToken& t) {
+    std::lock_guard unique_lock(t.pool_->lock_);
+    return t.IsActive();
+  }
+  static bool IsTokenQueueEmpty(const ThreadPoolToken& t) {
+    std::lock_guard unique_lock(t.pool_->lock_);
+    return t.entries_.empty();
+  }
+  static bool IsTokenClosed(const ThreadPoolToken& t) {
+    std::lock_guard unique_lock(t.pool_->lock_);
+    return t.state() == ThreadPoolToken::State::GRACEFUL_QUIESCING;
+  }
+  static bool IsTokenShutDown(const ThreadPoolToken& t) {
+    std::lock_guard unique_lock(t.pool_->lock_);
+    return t.state() == ThreadPoolToken::State::QUIESCED;
+  }
+};
 
 INSTANTIATE_TEST_SUITE_P(Tokens, ThreadPoolTestTokenTypes,
                          ::testing::Values(ThreadPool::ExecutionMode::SERIAL,
@@ -1133,6 +1153,217 @@ TEST_P(ThreadPoolTestTokenTypes, TestTokenShutdown) {
   t2->Shutdown();
 }
 
+TEST_P(ThreadPoolTestTokenTypes, QueueIsEmptyAfterWaitingOnClosedToken) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  ASSERT_OK(RebuildPoolWithMinMax(0, 2));
+  for (auto iteration = 0; iteration < 10; ++iteration) {
+    ThreadSafeRandom tsr(SeedRandom());
+
+    unique_ptr<ThreadPoolToken> t(pool_->NewToken(GetParam()));
+    atomic<int32_t> counter{0};
+    constexpr const size_t kTaskNum = 32;
+    for (size_t i = 0; i < kTaskNum; ++i) {
+      ASSERT_OK(t->Submit([&]() {
+        SleepFor(MonoDelta::FromMilliseconds(tsr.Uniform(25)));
+        ++counter;
+      }));
+    }
+
+    // The token should be active now.
+    ASSERT_TRUE(IsTokenActive(*t));
+
+    // Close the token after some random pause. Usually, the Close() request
+    // arrives when the token is RUNNING, but sometimes it may arrive when
+    // the token is IDLE.
+    SleepFor(MonoDelta::FromMilliseconds(tsr.Uniform(100)));
+    t->Close();
+
+    // Wait for the tasks to complete.
+    t->Wait();
+    ASSERT_FALSE(IsTokenActive(*t));
+    ASSERT_TRUE(IsTokenShutDown(*t));
+    ASSERT_TRUE(IsTokenQueueEmpty(*t));
+    ASSERT_EQ(kTaskNum, counter);
+  }
+}
+
+TEST_P(ThreadPoolTestTokenTypes, CloseIdleToken) {
+  unique_ptr<ThreadPoolToken> t(pool_->NewToken(GetParam()));
+  ASSERT_FALSE(IsTokenClosed(*t));
+  t->Close();
+  ASSERT_FALSE(IsTokenActive(*t));
+  ASSERT_TRUE(IsTokenQueueEmpty(*t));
+  ASSERT_TRUE(IsTokenShutDown(*t));
+  // It's not possible to submit new tasks on a closed token.
+  ASSERT_TRUE(t->Submit([](){}).IsServiceUnavailable());
+}
+
+TEST_P(ThreadPoolTestTokenTypes, CloseTokenBasic) {
+  constexpr const size_t kTaskNum = 64;
+
+  CountDownLatch l(1);
+  atomic<int32_t> counter{0};
+  unique_ptr<ThreadPoolToken> t(pool_->NewToken(GetParam()));
+  for (size_t i = 0; i < kTaskNum; ++i) {
+    ASSERT_OK(t->Submit([&]() {
+      l.Wait();
+      ++counter;
+    }));
+  }
+
+  // The token should be active now.
+  ASSERT_TRUE(IsTokenActive(*t));
+  // Close the token.
+  t->Close();
+  // The token should be still active after calling Close().
+  ASSERT_TRUE(IsTokenActive(*t));
+  ASSERT_TRUE(IsTokenClosed(*t));
+  // It's not possible to submit new tasks on a token that's been closed.
+  ASSERT_TRUE(t->Submit([](){}).IsServiceUnavailable());
+
+  // Unblock all of the in-flights tasks.
+  l.CountDown();
+  // Wait for the tasks to complete.
+  t->Wait();
+  // ThreadPoolToken::Close() drains the queue of the token's scheduled tasks.
+  ASSERT_TRUE(IsTokenQueueEmpty(*t));
+  ASSERT_FALSE(IsTokenActive(*t));
+  ASSERT_TRUE(IsTokenShutDown(*t));
+  ASSERT_EQ(kTaskNum, counter);
+
+  // Try submitting a task once more after all the pending tasks are complete.
+  ASSERT_TRUE(t->Submit([](){}).IsServiceUnavailable());
+
+  // Call Close() again: it's a no-op at this point.
+  t->Close();
+  ASSERT_TRUE(t->Submit([](){}).IsServiceUnavailable());
+  // There should be no active tasks at this point.
+  ASSERT_FALSE(IsTokenActive(*t));
+}
+
+TEST_P(ThreadPoolTestTokenTypes, ShutdownClosedToken) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  ASSERT_OK(RebuildPoolWithMinMax(1, 1));
+  unique_ptr<ThreadPoolToken> t(pool_->NewToken(GetParam()));
+
+  CountDownLatch l(1);
+  atomic<int32_t> counter{0};
+  for (int i = 0; i < 10; i++) {
+    ASSERT_OK(t->Submit([&]() {
+      l.Wait();
+      ++counter;
+    }));
+  }
+
+  // The token should be active now.
+  ASSERT_TRUE(IsTokenActive(*t));
+
+  // Close the token.
+  t->Close();
+  ASSERT_TRUE(IsTokenClosed(*t));
+  // It's not possible to submit new tasks on a token that's been closed.
+  ASSERT_TRUE(t->Submit([](){}).IsServiceUnavailable());
+  // Worker thread(s) should still be busy with the in-flight task(s), if any.
+  ASSERT_TRUE(IsTokenActive(*t));
+
+  // This thread unblocks the in-flight tasks, so t->Shutdown() below 
eventually
+  // returns.
+  //
+  // NOTE: a relatively long delay is used to avoid flakiness
+  //       if main test thread is scheduled off CPU for a long time
+  //       before it runs t->Shutdown() below
+  thread unblocker([&]{
+    SleepFor(MonoDelta::FromSeconds(3));
+    // Unblock all of the tasks.
+    l.CountDown();
+  });
+  SCOPED_CLEANUP({
+    unblocker.join();
+  });
+
+  // Shutdown the closed token.
+  t->Shutdown();
+  ASSERT_TRUE(IsTokenShutDown(*t));
+  // The token's queue must be empty after it was shut down.
+  ASSERT_TRUE(IsTokenQueueEmpty(*t));
+  // There should be no active tasks after the token is shut down.
+  ASSERT_FALSE(IsTokenActive(*t));
+  // Shutting down the token after closing it should keep the token unavailable
+  // for the submission of new tasks.
+  ASSERT_TRUE(t->Submit([](){}).IsServiceUnavailable());
+
+  // All but maybe the very first task should have been removed from the queue,
+  // so not more than one task might be completed.
+  ASSERT_LE(counter, 1);
+}
+
+TEST_P(ThreadPoolTestTokenTypes, CloseMultipleIndependentTokens) {
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_max_threads(4)));
+
+  unique_ptr<ThreadPoolToken> t1(pool_->NewToken(GetParam()));
+  CountDownLatch l1(1);
+  atomic<int32_t> c1{0};
+  for (int i = 0; i < 8; i++) {
+    ASSERT_OK(t1->Submit([&]() {
+      l1.Wait();
+      ++c1;
+    }));
+  }
+  auto l1_unblock = MakeScopedCleanup([&]() {
+    l1.CountDown();
+  });
+  ASSERT_TRUE(IsTokenActive(*t1));
+
+  unique_ptr<ThreadPoolToken> t2(pool_->NewToken(GetParam()));
+  CountDownLatch l2(1);
+  atomic<int32_t> c2{0};
+  for (int i = 0; i < 8; i++) {
+    ASSERT_OK(t2->Submit([&]() {
+      l2.Wait();
+      ++c2;
+    }));
+  }
+  auto l2_unblock = MakeScopedCleanup([&]() {
+    l2.CountDown();
+  });
+  ASSERT_TRUE(IsTokenActive(*t2));
+
+  // Unblock all of t1's tasks, but not t2's tasks.
+  l1_unblock.run();
+
+  // Close the first token.
+  t1->Close();
+
+  // We can no longer submit to t1 but we can still submit to t2.
+  ASSERT_TRUE(t1->Submit([](){}).IsServiceUnavailable());
+  ASSERT_OK(t2->Submit([](){}));
+
+  t1->Wait();
+  ASSERT_FALSE(IsTokenActive(*t1));
+  ASSERT_EQ(8, c1);
+  t1->Shutdown();
+  ASSERT_TRUE(IsTokenShutDown(*t1));
+  ASSERT_FALSE(IsTokenActive(*t1));
+
+  ASSERT_TRUE(IsTokenActive(*t2));
+  ASSERT_EQ(0, c2);
+  t2->Close();
+  ASSERT_TRUE(IsTokenClosed(*t2));
+  ASSERT_TRUE(t2->Submit([](){}).IsServiceUnavailable());
+  ASSERT_TRUE(IsTokenActive(*t2));
+  ASSERT_EQ(0, c2);
+
+  // Unblock t2's tasks.
+  l2_unblock.run();
+  t2->Wait();
+  ASSERT_FALSE(IsTokenActive(*t2));
+  ASSERT_TRUE(IsTokenShutDown(*t2));
+  ASSERT_EQ(8, c2);
+}
+
 TEST_P(ThreadPoolTestTokenTypes, TestTokenWaitForAll) {
   const int kNumTokens = 3;
   const int kNumSubmissions = 20;
diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index 09e2c4ad2..b3c57d630 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -197,6 +197,32 @@ Status ThreadPoolToken::Submit(std::function<void()> f) {
   return pool_->DoSubmit(std::move(f), this);
 }
 
+void ThreadPoolToken::Close() {
+  std::unique_lock lock(pool_->lock_);
+  pool_->CheckNotPoolThreadUnlocked();
+
+  switch (state_) {
+    case State::IDLE:
+      // If there aren't any outstanding tasks, quiesce the token immediately.
+      // Otherwise, transition from IDLE into GRACEFUL_QUIESCING state.
+      Transition(entries_.empty() ? State::QUIESCED : 
State::GRACEFUL_QUIESCING);
+      return;
+    case State::RUNNING:
+      // Unconditionally transition the token into GRACEFUL_QUIESCING state.
+      // The state machine of the thread pool takes care of the rest,
+      // i.e. waiting for the in-flight task to complete and then start draning
+      // the queue if it's not empty.
+      Transition(State::GRACEFUL_QUIESCING);
+      return;
+    case State::GRACEFUL_QUIESCING:
+    case State::QUIESCING:
+    case State::QUIESCED:
+      // Nothing to do -- the token is already in (graceful) quiescing state or
+      // shut down.
+      return;
+  }
+}
+
 void ThreadPoolToken::Shutdown() {
   std::unique_lock lock(pool_->lock_);
   pool_->CheckNotPoolThreadUnlocked();
@@ -208,12 +234,13 @@ void ThreadPoolToken::Shutdown() {
   std::deque<ThreadPool::Task> to_release = std::move(entries_);
   pool_->total_queued_tasks_ -= to_release.size();
 
-  switch (state()) {
+  switch (state_) {
     case State::IDLE:
       // There were no tasks outstanding; we can quiesce the token immediately.
       Transition(State::QUIESCED);
       break;
     case State::RUNNING:
+    case State::GRACEFUL_QUIESCING:
       // There were outstanding tasks. If any are still running, switch to
       // QUIESCING and wait for them to finish (the worker thread executing
       // the token's last task will switch the token to QUIESCED). Otherwise,
@@ -240,7 +267,7 @@ void ThreadPoolToken::Shutdown() {
     case State::QUIESCING:
       // The token is already quiescing. Just wait for a worker thread to
       // switch it to QUIESCED.
-      while (state() != State::QUIESCED) {
+      while (state_ != State::QUIESCED) {
         not_running_cond_.Wait();
       }
       break;
@@ -295,8 +322,10 @@ void ThreadPoolToken::Transition(State new_state) {
   switch (state_) {
     case State::IDLE:
       CHECK(new_state == State::RUNNING ||
+            new_state == State::GRACEFUL_QUIESCING ||
             new_state == State::QUIESCED);
-      if (new_state == State::RUNNING) {
+      if (new_state == State::RUNNING ||
+          new_state == State::GRACEFUL_QUIESCING) {
         CHECK(!entries_.empty());
       } else {
         CHECK(entries_.empty());
@@ -305,13 +334,20 @@ void ThreadPoolToken::Transition(State new_state) {
       break;
     case State::RUNNING:
       CHECK(new_state == State::IDLE ||
+            new_state == State::GRACEFUL_QUIESCING ||
             new_state == State::QUIESCING ||
             new_state == State::QUIESCED);
-      CHECK(entries_.empty());
-      if (new_state == State::QUIESCING) {
+      if (new_state != State::GRACEFUL_QUIESCING) {
+        CHECK(entries_.empty());
+      }
+      if (new_state == State::GRACEFUL_QUIESCING || new_state == 
State::QUIESCING) {
         CHECK_GT(active_threads_, 0);
       }
       break;
+    case State::GRACEFUL_QUIESCING:
+      CHECK(new_state == State::QUIESCING ||
+            new_state == State::QUIESCED);
+      break;
     case State::QUIESCING:
       CHECK(new_state == State::QUIESCED);
       CHECK_EQ(active_threads_, 0);
@@ -339,10 +375,16 @@ void ThreadPoolToken::Transition(State new_state) {
 
 const char* ThreadPoolToken::StateToString(State s) {
   switch (s) {
-    case State::IDLE: return "IDLE"; break;
-    case State::RUNNING: return "RUNNING"; break;
-    case State::QUIESCING: return "QUIESCING"; break;
-    case State::QUIESCED: return "QUIESCED"; break;
+    case State::IDLE:
+      return "IDLE";
+    case State::RUNNING:
+      return "RUNNING";
+    case State::GRACEFUL_QUIESCING:
+      return "GRACEFUL_QUIESCING";
+    case State::QUIESCING:
+      return "QUIESCING";
+    case State::QUIESCED:
+      return "QUIESCED";
   }
   return "<cannot reach here>";
 }
@@ -451,6 +493,7 @@ void ThreadPool::Shutdown() {
         t->Transition(ThreadPoolToken::State::QUIESCED);
         break;
       case ThreadPoolToken::State::RUNNING:
+      case ThreadPoolToken::State::GRACEFUL_QUIESCING:
         // The token has tasks associated with it. If they're merely queued
         // (i.e. there are no active threads), the tasks will have been removed
         // above and we can quiesce immediately. Otherwise, we need to wait for
@@ -604,9 +647,10 @@ Status ThreadPool::DoSubmit(std::function<void()> f, 
ThreadPoolToken* token) {
   task.submit_time = submit_time;
 
   // Add the task to the token's queue.
-  ThreadPoolToken::State state = token->state();
+  const ThreadPoolToken::State state = token->state();
   DCHECK(state == ThreadPoolToken::State::IDLE ||
-         state == ThreadPoolToken::State::RUNNING);
+         state == ThreadPoolToken::State::RUNNING ||
+         state == ThreadPoolToken::State::GRACEFUL_QUIESCING);
   token->entries_.emplace_back(std::move(task));
   if (state == ThreadPoolToken::State::IDLE ||
       token->mode() == ExecutionMode::CONCURRENT) {
@@ -744,7 +788,8 @@ void ThreadPool::DispatchThread() {
     // Get the next token and task to execute.
     ThreadPoolToken* token = queue_.front();
     queue_.pop_front();
-    DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state());
+    DCHECK(token->state() == ThreadPoolToken::State::RUNNING ||
+           token->state() == ThreadPoolToken::State::GRACEFUL_QUIESCING);
     DCHECK(!token->entries_.empty());
     Task task = std::move(token->entries_.front());
     token->entries_.pop_front();
@@ -806,15 +851,20 @@ void ThreadPool::DispatchThread() {
     // 1. The token was shut down while we ran its task. Transition to 
QUIESCED.
     // 2. The token has no more queued tasks. Transition back to IDLE.
     // 3. The token has more tasks. Requeue it and transition back to RUNNABLE.
-    ThreadPoolToken::State state = token->state();
+    const ThreadPoolToken::State state = token->state();
     DCHECK(state == ThreadPoolToken::State::RUNNING ||
+           state == ThreadPoolToken::State::GRACEFUL_QUIESCING ||
            state == ThreadPoolToken::State::QUIESCING);
     if (--token->active_threads_ == 0) {
       if (state == ThreadPoolToken::State::QUIESCING) {
         DCHECK(token->entries_.empty());
         token->Transition(ThreadPoolToken::State::QUIESCED);
       } else if (token->entries_.empty()) {
-        token->Transition(ThreadPoolToken::State::IDLE);
+        if (state == ThreadPoolToken::State::GRACEFUL_QUIESCING) {
+          token->Transition(ThreadPoolToken::State::QUIESCED);
+        } else {
+          token->Transition(ThreadPoolToken::State::IDLE);
+        }
       } else if (token->mode() == ExecutionMode::SERIAL) {
         queue_.emplace_back(token);
       }
diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h
index 887594423..b855e0e3b 100644
--- a/src/kudu/util/threadpool.h
+++ b/src/kudu/util/threadpool.h
@@ -335,6 +335,7 @@ class ThreadPool {
   FRIEND_TEST(ThreadPoolTest, TestVariableSizeThreadPool);
 
   friend class ThreadPoolBuilder;
+  friend class ThreadPoolTestTokenTypes;
   friend class ThreadPoolToken;
 
   // Client-provided task to be executed by this pool.
@@ -632,7 +633,7 @@ class ThreadPool {
 // Entry point for token-based task submission and blocking for a particular
 // thread pool. Tokens can only be created via ThreadPool::NewToken().
 //
-// All functions are thread-safe. Mutable members are protected via the
+// All public methods are thread-safe. Mutable members are protected via the
 // ThreadPool's lock.
 class ThreadPoolToken {
  public:
@@ -648,6 +649,13 @@ class ThreadPoolToken {
   // Submit a task, execute the task after delay_ms later.
   Status Schedule(std::function<void()> f, int64_t delay_ms) 
WARN_UNUSED_RESULT;
 
+  // Marks the token as unusable for future submissions, returning immediately.
+  // This lets all the in-flight and scheduled tasks on this token to complete,
+  // unless Shutdown() is called in the middle of the process.
+  // If some tasks were in-flight or queued upon calling Close(), use Wait()
+  // to wait until all the tasks submitted via this token to complete.
+  void Close();
+
   // Marks the token as unusable for future submissions. Any queued tasks not
   // yet running are destroyed. If tasks are in flight, Shutdown() will wait
   // on their completion before returning.
@@ -670,16 +678,41 @@ class ThreadPoolToken {
 
  private:
   friend class SchedulerThread;
-  // All possible token states. Legal state transitions:
-  //   IDLE      -> RUNNING: task is submitted via token
-  //   IDLE      -> QUIESCED: token or pool is shut down
-  //   RUNNING   -> IDLE: worker thread finishes executing a task and
-  //                      there are no more tasks queued to the token
-  //   RUNNING   -> QUIESCING: token or pool is shut down while worker thread
-  //                           is executing a task
-  //   RUNNING   -> QUIESCED: token or pool is shut down
-  //   QUIESCING -> QUIESCED:  worker thread finishes executing a task
-  //                           belonging to a shut down token or pool
+  friend class ThreadPoolTestTokenTypes;
+
+  // The 'State' enumerates all possible token states.
+  //
+  // Legal state transitions:
+  //  IDLE               -> RUNNING
+  //    task is submitted via token
+  //
+  //  IDLE               -> QUIESCED
+  //    token or pool is closed or shut down
+  //
+  //  RUNNING            -> IDLE
+  //    worker thread finishes executing a task and there are no more tasks
+  //    queued to the token
+  //
+  //  RUNNING            -> GRACEFUL_QUIESCING
+  //    token is being closed while worker thread is executing a task
+  //
+  //  RUNNING            -> QUIESCING
+  //    token or pool is shut down while worker thread is executing a task
+  //
+  //  RUNNING            -> QUIESCED
+  //    token or pool is shut down
+  //
+  //  GRACEFUL_QUIESCING -> QUIESCING
+  //    token is being shut down while worker thread is running a queued task,
+  //    draining the queue since the token has been closed
+  //
+  //  GRACEFUL_QUIESCING -> QUIESCED
+  //    worker thread finishes executing last queued task belonging
+  //    to a closed token
+  //
+  //  QUIESCING          -> QUIESCED
+  //    worker thread finishes executing a task belonging
+  //    to a shut down token or pool
   enum class State {
     // Token has no queued tasks.
     IDLE,
@@ -688,7 +721,14 @@ class ThreadPoolToken {
     RUNNING,
 
     // No new tasks may be submitted to the token. A worker thread is still
-    // running a previously queued task.
+    // running one of the token's previously queued tasks, and will continue
+    // running all the rest of the scheduled tasks if any are still present
+    // in the queue unless the token is shut down in the process of doing so.
+    GRACEFUL_QUIESCING,
+
+    // No new tasks may be submitted to the token. A worker thread is still
+    // running a previously queued task, but the rest of already queued
+    // but not yet running tasks will be removed from the queue.
     QUIESCING,
 
     // No new tasks may be submitted to the token. There are no active tasks
@@ -718,12 +758,14 @@ class ThreadPoolToken {
   // task belonging to this token is already running.
   bool IsActive() const {
     return state_ == State::RUNNING ||
+           state_ == State::GRACEFUL_QUIESCING ||
            state_ == State::QUIESCING;
   }
 
   // Returns true if new tasks may be submitted to this token.
   bool MaySubmitNewTasks() const {
-    return state_ != State::QUIESCING &&
+    return state_ != State::GRACEFUL_QUIESCING &&
+           state_ != State::QUIESCING &&
            state_ != State::QUIESCED;
   }
 

Reply via email to