IMPALA-6920: fix inconsistencies with scanner thread tokens

The first scanner thread to start now takes a "required" token,
which always succeeds. Only additional threads try to get
"optional" tokens, which can fail. Previously threads always
requested optional tokens, which could fail and leave the scan
node without any running threads until its callback is invoked.

This allows us to remove the "reserved optional token" and
set_max_quota() interfaces from ThreadResourceManager. There should
be no behavioural changes in ThreadResourceMgr in cases when those
features are not used.

Also switch Kudu to using the same logic for implementing
NUM_SCANNER_THREADS (it was not switched over to the improved
HDFS scanner logic added in IMPALA-2831).

Do some cleanup in ThreadResourceMgr code while we're here:
* Fix some benign data races in ThreadResourceMgr by switching to
  AtomicInt* classes.
* Remove pointless object caching (TCMalloc will do better).
* Reduce dependencies on the thread-resource-mgr.h header.

Testing:
Ran core tests.

Ran a few queries under TSAN, checked that it didn't report any more
races in this code after fixing those data races.

I couldn't construct a regression test because there are no easily
testable consequences of the change - the main difference is that
some scanner threads start earlier when there is pressure on scanner
thread tokens but that is hard to construct a robust test around.

Change-Id: I16d31d72441aff7293759281d0248e641df43704
Reviewed-on: http://gerrit.cloudera.org:8080/10186
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/789c5aac
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/789c5aac
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/789c5aac

Branch: refs/heads/master
Commit: 789c5aac23480acc6e18c057b767b65fdd791c97
Parents: d0f838b
Author: Tim Armstrong <[email protected]>
Authored: Tue Apr 24 15:36:41 2018 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Sat Apr 28 04:30:55 2018 +0000

----------------------------------------------------------------------
 be/src/exec/blocking-join-node.cc          |   1 +
 be/src/exec/hdfs-scan-node.cc              |  25 +-
 be/src/exec/hdfs-scan-node.h               |   7 +-
 be/src/exec/kudu-scan-node.cc              |  31 +-
 be/src/exec/kudu-scan-node.h               |  21 +-
 be/src/runtime/fragment-instance-state.cc  |   5 +-
 be/src/runtime/io/disk-io-mgr-internal.h   |   1 -
 be/src/runtime/io/disk-io-mgr.h            |   1 -
 be/src/runtime/runtime-state.cc            |   7 +-
 be/src/runtime/runtime-state.h             |   6 +-
 be/src/runtime/thread-resource-mgr-test.cc |  36 +--
 be/src/runtime/thread-resource-mgr.cc      | 112 +++----
 be/src/runtime/thread-resource-mgr.h       | 374 ++++++++++--------------
 13 files changed, 290 insertions(+), 337 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/exec/blocking-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.cc 
b/be/src/exec/blocking-join-node.cc
index cfaf91a..57b7723 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -26,6 +26,7 @@
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/tuple-row.h"
+#include "runtime/thread-resource-mgr.h"
 #include "util/debug-util.h"
 #include "util/runtime-profile-counters.h"
 #include "util/time.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 7c64338..045ae18 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -30,6 +30,7 @@
 #include "runtime/runtime-state.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
+#include "runtime/thread-resource-mgr.h"
 #include "util/debug-util.h"
 #include "util/disk-info.h"
 #include "util/runtime-profile-counters.h"
@@ -211,9 +212,6 @@ Status HdfsScanNode::Open(RuntimeState* state) {
 
   if (file_descs_.empty() || progress_.done()) return Status::OK();
 
-  // We need at least one scanner thread to make progress. We need to make this
-  // reservation before any ranges are issued.
-  runtime_state_->resource_pool()->ReserveOptionalTokens(1);
   if (runtime_state_->query_options().num_scanner_threads > 0) {
     max_num_scanner_threads_ = 
runtime_state_->query_options().num_scanner_threads;
   }
@@ -295,7 +293,7 @@ bool HdfsScanNode::EnoughMemoryForScannerThread(bool 
new_thread) {
   return est_additional_scanner_mem < mem_tracker()->SpareCapacity();
 }
 
-void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* 
pool) {
+void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
   // This is called to start up new scanner threads. It's not a big deal if we
   // spin up more than strictly necessary since they will go through and 
terminate
   // promptly. However, we want to minimize that by checking a conditions.
@@ -328,17 +326,20 @@ void 
HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
       break;
     }
 
+    bool first_thread = active_scanner_thread_counter_.value() == 0;
     // Cases 5 and 6.
-    if (active_scanner_thread_counter_.value() > 0 &&
+    if (!first_thread &&
         (materialized_row_batches_->Size() >= max_materialized_row_batches_ ||
          !EnoughMemoryForScannerThread(true))) {
       break;
     }
 
     // Case 7 and 8.
-    bool is_reserved = false;
-    if (active_scanner_thread_counter_.value() >= max_num_scanner_threads_ ||
-        !pool->TryAcquireThreadToken(&is_reserved)) {
+    if (first_thread) {
+      // The first thread is required to make progress on the scan.
+      pool->AcquireThreadToken();
+    } else if (active_scanner_thread_counter_.value() >= 
max_num_scanner_threads_
+        || !pool->TryAcquireThreadToken()) {
       break;
     }
 
@@ -347,7 +348,7 @@ void 
HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
         PrintId(runtime_state_->fragment_instance_id()), id(),
         num_scanner_threads_started_counter_->value());
 
-    auto fn = [this]() { this->ScannerThread(); };
+    auto fn = [this, first_thread]() { this->ScannerThread(first_thread); };
     std::unique_ptr<Thread> t;
     status =
       Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn, 
&t, true);
@@ -357,7 +358,7 @@ void 
HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
       // serves two purposes. First, it prevents a mutual recursion between 
this function
       // and ReleaseThreadToken()->InvokeCallbacks(). Second, Thread::Create() 
failed and
       // is likely to continue failing for future callbacks.
-      pool->ReleaseThreadToken(false, true);
+      pool->ReleaseThreadToken(first_thread, true);
 
       // Abort the query. This is still holding the lock_, so done_ is known 
to be
       // false and status_ must be ok.
@@ -372,7 +373,7 @@ void 
HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
   }
 }
 
-void HdfsScanNode::ScannerThread() {
+void HdfsScanNode::ScannerThread(bool first_thread) {
   SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters());
   SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
 
@@ -474,7 +475,7 @@ void HdfsScanNode::ScannerThread() {
   COUNTER_ADD(&active_scanner_thread_counter_, -1);
 
 exit:
-  runtime_state_->resource_pool()->ReleaseThreadToken(false);
+  runtime_state_->resource_pool()->ReleaseThreadToken(first_thread);
   for (auto& ctx: filter_ctxs) ctx.expr_eval->Close(runtime_state_);
   filter_mem_pool.FreeAll();
   expr_results_pool.FreeAll();

http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index a1c97cf..a9be94e 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -39,6 +39,7 @@ class DescriptorTbl;
 class ObjectPool;
 class RuntimeState;
 class RowBatch;
+class ThreadResourcePool;
 class TPlanNode;
 
 /// Legacy ScanNode implementation used in the non-multi-threaded execution 
mode
@@ -155,12 +156,14 @@ class HdfsScanNode : public HdfsScanNodeBase {
 
   /// Tries to spin up as many scanner threads as the quota allows. Called 
explicitly
   /// (e.g., when adding new ranges) or when threads are available for this 
scan node.
-  void ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool);
+  void ThreadTokenAvailableCb(ThreadResourcePool* pool);
 
   /// Main function for scanner thread. This thread pulls the next range to be
   /// processed from the IoMgr and then processes the entire range end to end.
   /// This thread terminates when all scan ranges are complete or an error 
occurred.
-  void ScannerThread();
+  /// 'first_thread' is true if this was the first scanner thread to start and
+  /// it acquired a "required" thread token from ThreadResourceMgr.
+  void ScannerThread(bool first_thread);
 
   /// Process the entire scan range with a new scanner object. Executed in 
scanner
   /// thread. 'filter_ctxs' is a clone of the class-wide filter_ctxs_, used to 
filter rows

http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index 6d5e085..16e0633 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -27,6 +27,7 @@
 #include "runtime/mem-pool.h"
 #include "runtime/runtime-state.h"
 #include "runtime/row-batch.h"
+#include "runtime/thread-resource-mgr.h"
 #include "runtime/tuple-row.h"
 #include "util/disk-info.h"
 #include "util/runtime-profile-counters.h"
@@ -43,6 +44,7 @@ KuduScanNode::KuduScanNode(ObjectPool* pool, const TPlanNode& 
tnode,
     : KuduScanNodeBase(pool, tnode, descs),
       num_active_scanners_(0),
       done_(false),
+      max_num_scanner_threads_(CpuInfo::num_cores()),
       thread_avail_cb_id_(-1) {
   DCHECK(KuduIsAvailable());
 
@@ -68,12 +70,10 @@ Status KuduScanNode::Open(RuntimeState* state) {
   num_scanner_threads_started_counter_ =
       ADD_COUNTER(runtime_profile(), NUM_SCANNER_THREADS_STARTED, TUnit::UNIT);
 
-  // Reserve one thread.
-  state->resource_pool()->ReserveOptionalTokens(1);
   if (state->query_options().num_scanner_threads > 0) {
-    state->resource_pool()->set_max_quota(
-        state->query_options().num_scanner_threads);
+    max_num_scanner_threads_ = 
runtime_state_->query_options().num_scanner_threads;
   }
+  DCHECK_GT(max_num_scanner_threads_, 0);
 
   if (filter_ctxs_.size() > 0) WaitForRuntimeFilters();
 
@@ -138,14 +138,20 @@ void KuduScanNode::Close(RuntimeState* state) {
   KuduScanNodeBase::Close(state);
 }
 
-void KuduScanNode::ThreadAvailableCb(ThreadResourceMgr::ResourcePool* pool) {
+void KuduScanNode::ThreadAvailableCb(ThreadResourcePool* pool) {
   while (true) {
     unique_lock<mutex> lock(lock_);
     // All done or all tokens are assigned.
     if (done_ || !HasScanToken()) break;
+    bool first_thread = active_scanner_thread_counter_.value() == 0;
 
-    // Check if we can get a token.
-    if (!pool->TryAcquireThreadToken()) break;
+    // Check if we can get a token. We need at least one thread to run.
+    if (first_thread) {
+      pool->AcquireThreadToken();
+    } else if (active_scanner_thread_counter_.value() >= 
max_num_scanner_threads_
+        || !pool->TryAcquireThreadToken()) {
+      break;
+    }
 
     string name = Substitute(
         "kudu-scanner-thread (finst:$0, plan-node-id:$1, thread-idx:$2)",
@@ -154,7 +160,9 @@ void 
KuduScanNode::ThreadAvailableCb(ThreadResourceMgr::ResourcePool* pool) {
 
     // Reserve the first token so no other thread picks it up.
     const string* token = GetNextScanToken();
-    auto fn = [this, token, name]() { this->RunScannerThread(name, token); };
+    auto fn = [this, first_thread, token, name]() {
+      this->RunScannerThread(first_thread, name, token);
+    };
     std::unique_ptr<Thread> t;
     Status status =
       Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn, 
&t, true);
@@ -163,7 +171,7 @@ void 
KuduScanNode::ThreadAvailableCb(ThreadResourceMgr::ResourcePool* pool) {
       // serves two purposes. First, it prevents a mutual recursion between 
this function
       // and ReleaseThreadToken()->InvokeCallbacks(). Second, Thread::Create() 
failed and
       // is likely to continue failing for future callbacks.
-      pool->ReleaseThreadToken(false, true);
+      pool->ReleaseThreadToken(first_thread, true);
 
       // Abort the query. This is still holding the lock_, so done_ is known 
to be
       // false and status_ must be ok.
@@ -201,7 +209,8 @@ Status KuduScanNode::ProcessScanToken(KuduScanner* scanner, 
const string& scan_t
   return Status::OK();
 }
 
-void KuduScanNode::RunScannerThread(const string& name, const string* 
initial_token) {
+void KuduScanNode::RunScannerThread(
+    bool first_thread, const string& name, const string* initial_token) {
   DCHECK(initial_token != NULL);
   SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters());
   SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
@@ -257,7 +266,7 @@ void KuduScanNode::RunScannerThread(const string& name, 
const string* initial_to
   // lock_ is released before calling ThreadResourceMgr::ReleaseThreadToken() 
which
   // invokes ThreadAvailableCb() which attempts to take the same lock.
   VLOG_RPC << "Thread done: " << name;
-  runtime_state_->resource_pool()->ReleaseThreadToken(false);
+  runtime_state_->resource_pool()->ReleaseThreadToken(first_thread);
 }
 
 void KuduScanNode::SetDoneInternal() {

http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/exec/kudu-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.h b/be/src/exec/kudu-scan-node.h
index 4759f0a..2f8d808 100644
--- a/be/src/exec/kudu-scan-node.h
+++ b/be/src/exec/kudu-scan-node.h
@@ -23,13 +23,13 @@
 #include <kudu/client/client.h>
 
 #include "exec/kudu-scan-node-base.h"
-#include "runtime/thread-resource-mgr.h"
 #include "gutil/gscoped_ptr.h"
 #include "util/thread.h"
 
 namespace impala {
 
 class KuduScanner;
+class ThreadResourcePool;
 
 /// A scan node that scans a Kudu table.
 ///
@@ -77,6 +77,12 @@ class KuduScanNode : public KuduScanNodeBase {
   /// Thread group for all scanner worker threads
   ThreadGroup scanner_threads_;
 
+  /// Maximum number of scanner threads. Set to 'NUM_SCANNER_THREADS' if that 
query
+  /// option is set. Otherwise, it's set to the number of cpu cores. Scanner 
threads
+  /// are generally cpu bound so there is no benefit in spinning up more 
threads than
+  /// the number of cores.
+  int max_num_scanner_threads_;
+
   /// The id of the callback added to the thread resource manager when a thread
   /// is available. Used to remove the callback before this scan node is 
destroyed.
   /// -1 if no callback is registered.
@@ -84,13 +90,16 @@ class KuduScanNode : public KuduScanNodeBase {
 
   /// Called when scanner threads are available for this scan node. This will
   /// try to spin up as many scanner threads as the quota allows.
-  void ThreadAvailableCb(ThreadResourceMgr::ResourcePool* pool);
+  void ThreadAvailableCb(ThreadResourcePool* pool);
 
   /// Main function for scanner thread which executes a KuduScanner. Begins by 
processing
-  /// 'initial_token', and continues processing scan tokens returned by
-  /// 'GetNextScanToken()' until there are none left, an error occurs, or the 
limit is
-  /// reached.
-  void RunScannerThread(const std::string& name, const std::string* 
initial_token);
+  /// 'initial_token', and continues processing scan tokens returned by 
GetNextScanToken()
+  /// until there are none left, an error occurs, or the limit is reached. The 
caller must
+  /// have acquired a thread token from the ThreadResourceMgr for this thread. 
The token
+  /// is released before this function returns. 'first_thread' is true if this 
was the
+  /// first scanner thread to start and it acquired a "required" thread token.
+  void RunScannerThread(
+      bool first_thread, const std::string& name, const std::string* 
initial_token);
 
   /// Processes a single scan token. Row batches are fetched using 'scanner' 
and enqueued
   /// in 'materialized_row_batches_' until the scanner reports eos, an error 
occurs, or

http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc 
b/be/src/runtime/fragment-instance-state.cc
index a6ae1ff..a14bf31 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -42,6 +42,7 @@
 #include "runtime/query-state.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
+#include "runtime/thread-resource-mgr.h"
 #include "scheduling/query-schedule.h"
 #include "util/debug-util.h"
 #include "util/container-util.h"
@@ -141,7 +142,7 @@ Status FragmentInstanceState::Prepare() {
   // Reserve one main thread from the pool
   runtime_state_->resource_pool()->AcquireThreadToken();
   avg_thread_tokens_ = profile()->AddSamplingCounter("AverageThreadTokens",
-      bind<int64_t>(mem_fn(&ThreadResourceMgr::ResourcePool::num_threads),
+      bind<int64_t>(mem_fn(&ThreadResourcePool::num_threads),
           runtime_state_->resource_pool()));
   mem_usage_sampled_counter_ = profile()->AddTimeSeriesCounter("MemoryUsage",
       TUnit::BYTES,
@@ -149,7 +150,7 @@ Status FragmentInstanceState::Prepare() {
           runtime_state_->instance_mem_tracker()));
   thread_usage_sampled_counter_ = 
profile()->AddTimeSeriesCounter("ThreadUsage",
       TUnit::UNIT,
-      bind<int64_t>(mem_fn(&ThreadResourceMgr::ResourcePool::num_threads),
+      bind<int64_t>(mem_fn(&ThreadResourcePool::num_threads),
           runtime_state_->resource_pool()));
 
   // set up plan

http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/runtime/io/disk-io-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-internal.h 
b/be/src/runtime/io/disk-io-mgr-internal.h
index 3fc3895..e6962ea 100644
--- a/be/src/runtime/io/disk-io-mgr-internal.h
+++ b/be/src/runtime/io/disk-io-mgr-internal.h
@@ -27,7 +27,6 @@
 #include "runtime/io/request-context.h"
 #include "runtime/io/disk-io-mgr.h"
 #include "runtime/mem-tracker.h"
-#include "runtime/thread-resource-mgr.h"
 #include "util/condition-variable.h"
 #include "util/cpu-info.h"
 #include "util/debug-util.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/runtime/io/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index cfac328..52d6993 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -33,7 +33,6 @@
 #include "runtime/io/handle-cache.h"
 #include "runtime/io/local-file-system.h"
 #include "runtime/io/request-ranges.h"
-#include "runtime/thread-resource-mgr.h"
 #include "util/aligned-new.h"
 #include "util/bit-util.h"
 #include "util/condition-variable.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 29ea737..c8776ac 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -40,6 +40,7 @@
 #include "runtime/mem-tracker.h"
 #include "runtime/query-state.h"
 #include "runtime/runtime-filter-bank.h"
+#include "runtime/thread-resource-mgr.h"
 #include "runtime/timestamp-value.h"
 #include "util/auth-util.h" // for GetEffectiveUser()
 #include "util/bitmap.h"
@@ -106,8 +107,8 @@ void RuntimeState::Init() {
   SCOPED_TIMER(profile_->total_time_counter());
 
   // Register with the thread mgr
-  resource_pool_ = exec_env_->thread_mgr()->RegisterPool();
-  DCHECK(resource_pool_ != NULL);
+  resource_pool_ = exec_env_->thread_mgr()->CreatePool();
+  DCHECK(resource_pool_ != nullptr);
 
   total_thread_statistics_ = ADD_THREAD_COUNTERS(runtime_profile(), 
"TotalThreads");
   total_storage_wait_timer_ = ADD_TIMER(runtime_profile(), 
"TotalStorageWaitTime");
@@ -229,7 +230,7 @@ void RuntimeState::ReleaseResources() {
   DCHECK(!released_resources_);
   if (filter_bank_ != nullptr) filter_bank_->Close();
   if (resource_pool_ != nullptr) {
-    exec_env_->thread_mgr()->UnregisterPool(resource_pool_);
+    exec_env_->thread_mgr()->DestroyPool(move(resource_pool_));
   }
   // Release any memory associated with codegen.
   if (codegen_ != nullptr) codegen_->Close();

http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 4b005b2..359afc5 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -26,7 +26,6 @@
 // NOTE: try not to add more headers here: runtime-state.h is included in many 
many files.
 #include "common/global-types.h"  // for PlanNodeId
 #include "runtime/client-cache-types.h"
-#include "runtime/thread-resource-mgr.h"
 #include "runtime/dml-exec-state.h"
 #include "util/runtime-profile.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
@@ -45,6 +44,7 @@ class RuntimeFilterBank;
 class ScalarFnCall;
 class Status;
 class TimestampValue;
+class ThreadResourcePool;
 class TUniqueId;
 class ExecEnv;
 class DataStreamMgrBase;
@@ -116,7 +116,7 @@ class RuntimeState {
   ReservationTracker* instance_buffer_reservation() {
     return instance_buffer_reservation_.get();
   }
-  ThreadResourceMgr::ResourcePool* resource_pool() { return resource_pool_; }
+  ThreadResourcePool* resource_pool() { return resource_pool_.get(); }
 
   void set_fragment_root_id(PlanNodeId id) {
     DCHECK_EQ(root_node_id_, -1) << "Should not set this twice.";
@@ -322,7 +322,7 @@ class RuntimeState {
 
   /// Thread resource management object for this fragment's execution.  The 
runtime
   /// state is responsible for returning this pool to the thread mgr.
-  ThreadResourceMgr::ResourcePool* resource_pool_ = nullptr;
+  std::unique_ptr<ThreadResourcePool> resource_pool_;
 
   /// Execution state for DML statements.
   DmlExecState dml_exec_state_;

http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/runtime/thread-resource-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/thread-resource-mgr-test.cc 
b/be/src/runtime/thread-resource-mgr-test.cc
index f480ecd..66c6d14 100644
--- a/be/src/runtime/thread-resource-mgr-test.cc
+++ b/be/src/runtime/thread-resource-mgr-test.cc
@@ -31,7 +31,7 @@ class NotifiedCounter {
   NotifiedCounter() : counter_(0) {
   }
 
-  void Notify(ThreadResourceMgr::ResourcePool* consumer) {
+  void Notify(ThreadResourcePool* consumer) {
     ASSERT_TRUE(consumer != NULL);
     ASSERT_LT(consumer->num_threads(), consumer->quota());
     ++counter_;
@@ -47,7 +47,7 @@ TEST(ThreadResourceMgr, BasicTest) {
   ThreadResourceMgr mgr(5);
   NotifiedCounter counter1, counter2;
 
-  ThreadResourceMgr::ResourcePool* c1 = mgr.RegisterPool();
+  unique_ptr<ThreadResourcePool> c1 = mgr.CreatePool();
   int callback1 = 
c1->AddThreadAvailableCb(bind<void>(mem_fn(&NotifiedCounter::Notify),
       &counter1, _1));
   c1->AcquireThreadToken();
@@ -62,16 +62,10 @@ TEST(ThreadResourceMgr, BasicTest) {
   EXPECT_EQ(c1->num_required_threads(), 2);
   EXPECT_EQ(c1->num_optional_threads(), 0);
   EXPECT_EQ(counter1.counter(), 1);
-  bool is_reserved = false;
-  c1->ReserveOptionalTokens(1);
-  EXPECT_TRUE(c1->TryAcquireThreadToken(&is_reserved));
-  EXPECT_TRUE(is_reserved);
-  EXPECT_TRUE(c1->TryAcquireThreadToken(&is_reserved));
-  EXPECT_FALSE(is_reserved);
-  EXPECT_TRUE(c1->TryAcquireThreadToken(&is_reserved));
-  EXPECT_FALSE(is_reserved);
-  EXPECT_FALSE(c1->TryAcquireThreadToken(&is_reserved));
-  EXPECT_FALSE(is_reserved);
+  EXPECT_TRUE(c1->TryAcquireThreadToken());
+  EXPECT_TRUE(c1->TryAcquireThreadToken());
+  EXPECT_TRUE(c1->TryAcquireThreadToken());
+  EXPECT_FALSE(c1->TryAcquireThreadToken());
   EXPECT_EQ(c1->num_threads(), 5);
   EXPECT_EQ(c1->num_required_threads(), 2);
   EXPECT_EQ(c1->num_optional_threads(), 3);
@@ -80,7 +74,7 @@ TEST(ThreadResourceMgr, BasicTest) {
   EXPECT_EQ(counter1.counter(), 3);
 
   // Register a new consumer, quota is cut in half
-  ThreadResourceMgr::ResourcePool* c2 = mgr.RegisterPool();
+  unique_ptr<ThreadResourcePool> c2 = mgr.CreatePool();
   int callback2 = 
c2->AddThreadAvailableCb(bind<void>(mem_fn(&NotifiedCounter::Notify),
       &counter2, _1));
   EXPECT_FALSE(c1->TryAcquireThreadToken());
@@ -91,9 +85,9 @@ TEST(ThreadResourceMgr, BasicTest) {
   EXPECT_EQ(c1->num_optional_threads(), 2);
 
   c1->RemoveThreadAvailableCb(callback1);
-  mgr.UnregisterPool(c1);
+  mgr.DestroyPool(move(c1));
   c2->RemoveThreadAvailableCb(callback2);
-  mgr.UnregisterPool(c2);
+  mgr.DestroyPool(move(c2));
   EXPECT_EQ(counter1.counter(), 3);
   EXPECT_EQ(counter2.counter(), 1);
 }
@@ -102,7 +96,7 @@ TEST(ThreadResourceMgr, MultiCallbacks) {
   ThreadResourceMgr mgr(6);
   NotifiedCounter counter1, counter2, counter3;
 
-  ThreadResourceMgr::ResourcePool* c1 = mgr.RegisterPool();
+  unique_ptr<ThreadResourcePool> c1 = mgr.CreatePool();
   int callback1 = c1->AddThreadAvailableCb(
       bind<void>(mem_fn(&NotifiedCounter::Notify), &counter1, _1));
   int callback2 = c1->AddThreadAvailableCb(
@@ -155,13 +149,15 @@ TEST(ThreadResourceMgr, MultiCallbacks) {
   EXPECT_EQ(counter1.counter(), 6);
   EXPECT_EQ(counter2.counter(), 3);
 
-  // Also verify UnregisterPool() will invoke the callback.
-  ThreadResourceMgr::ResourcePool* c2 = mgr.RegisterPool();
-  c2->AddThreadAvailableCb(
+  // Also verify DestroyPool() will invoke the callback.
+  unique_ptr<ThreadResourcePool> c2 = mgr.CreatePool();
+  int callback3 = c2->AddThreadAvailableCb(
       bind<void>(mem_fn(&NotifiedCounter::Notify), &counter3, _1));
   EXPECT_EQ(counter3.counter(), 0);
-  mgr.UnregisterPool(c1);
+  mgr.DestroyPool(move(c1));
   EXPECT_EQ(counter3.counter(), 1);
+  c2->RemoveThreadAvailableCb(callback3);
+  mgr.DestroyPool(move(c2));
 }
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/runtime/thread-resource-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/thread-resource-mgr.cc 
b/be/src/runtime/thread-resource-mgr.cc
index 72fdf42..86ea794 100644
--- a/be/src/runtime/thread-resource-mgr.cc
+++ b/be/src/runtime/thread-resource-mgr.cc
@@ -42,87 +42,65 @@ ThreadResourceMgr::ThreadResourceMgr(int threads_quota) {
   } else {
     system_threads_quota_ = threads_quota;
   }
-  per_pool_quota_ = 0;
 }
 
-ThreadResourceMgr::ResourcePool::ResourcePool(ThreadResourceMgr* parent)
+ThreadResourcePool::ThreadResourcePool(ThreadResourceMgr* parent)
   : parent_(parent) {
 }
 
-void ThreadResourceMgr::ResourcePool::Reset() {
-  num_threads_ = 0;
-  num_reserved_optional_threads_ = 0;
-  thread_callbacks_.clear();
-  num_callbacks_ = 0;
-  next_callback_idx_ = 0;
-  max_quota_ = INT_MAX;
-}
-
-void ThreadResourceMgr::ResourcePool::ReserveOptionalTokens(int num) {
-  DCHECK_GE(num, 0);
-  num_reserved_optional_threads_ = num;
-}
-
-ThreadResourceMgr::ResourcePool* ThreadResourceMgr::RegisterPool() {
+unique_ptr<ThreadResourcePool> ThreadResourceMgr::CreatePool() {
   unique_lock<mutex> l(lock_);
-  ResourcePool* pool = NULL;
-  if (free_pool_objs_.empty()) {
-    pool = new ResourcePool(this);
-  } else {
-    pool = free_pool_objs_.front();
-    free_pool_objs_.pop_front();
-  }
-
-  DCHECK(pool != NULL);
-  DCHECK(pools_.find(pool) == pools_.end());
-  pools_.insert(pool);
-  pool->Reset();
+  unique_ptr<ThreadResourcePool> pool =
+      unique_ptr<ThreadResourcePool>(new ThreadResourcePool(this));
+  pools_.insert(pool.get());
 
   // Added a new pool, update the quotas for each pool.
-  UpdatePoolQuotas(pool);
+  UpdatePoolQuotas(pool.get());
   return pool;
 }
 
-void ThreadResourceMgr::UnregisterPool(ResourcePool* pool) {
-  DCHECK(pool != NULL);
-  DCHECK_EQ(pool->num_callbacks_, 0);
+void ThreadResourceMgr::DestroyPool(unique_ptr<ThreadResourcePool> pool) {
+  DCHECK(pool != nullptr);
+  DCHECK(pool->parent_ != nullptr) << "Already unregistered";
+  DCHECK_EQ(pool->num_callbacks_.Load(), 0);
   unique_lock<mutex> l(lock_);
-  DCHECK(pools_.find(pool) != pools_.end());
-  pools_.erase(pool);
-  free_pool_objs_.push_back(pool);
+  DCHECK(pools_.find(pool.get()) != pools_.end());
+  pools_.erase(pool.get());
+  pool->parent_ = nullptr;
+  pool.reset();
   UpdatePoolQuotas();
 }
 
-int ThreadResourceMgr::ResourcePool::AddThreadAvailableCb(ThreadAvailableCb 
fn) {
+int ThreadResourcePool::AddThreadAvailableCb(ThreadAvailableCb fn) {
   unique_lock<mutex> l(lock_);
   // The id is unique for each callback and is monotonically increasing.
   int id = thread_callbacks_.size();
   thread_callbacks_.push_back(fn);
-  ++num_callbacks_;
+  num_callbacks_.Add(1);
   return id;
 }
 
-void ThreadResourceMgr::ResourcePool::RemoveThreadAvailableCb(int id) {
+void ThreadResourcePool::RemoveThreadAvailableCb(int id) {
   unique_lock<mutex> l(lock_);
-  DCHECK(thread_callbacks_[id] != NULL);
-  DCHECK_GT(num_callbacks_, 0);
-  thread_callbacks_[id] = NULL;
-  --num_callbacks_;
+  DCHECK(!thread_callbacks_[id].empty());
+  DCHECK_GT(num_callbacks_.Load(), 0);
+  thread_callbacks_[id].clear();
+  num_callbacks_.Add(-1);
 }
 
-void ThreadResourceMgr::ResourcePool::InvokeCallbacks() {
+void ThreadResourcePool::InvokeCallbacks() {
   // We need to grab a lock before issuing the callbacks to prevent the
   // them from being removed while it is happening.
   // Note: this is unlikely to be a big deal for performance currently
   // since this is only called with any frequency on (1) the scanner thread
   // completion path and (2) pool unregistration.
-  if (num_available_threads() > 0 && num_callbacks_ > 0) {
+  if (num_available_threads() > 0 && num_callbacks_.Load() > 0) {
     int num_invoked = 0;
     unique_lock<mutex> l(lock_);
-    while (num_available_threads() > 0 && num_invoked < num_callbacks_) {
+    while (num_available_threads() > 0 && num_invoked < num_callbacks_.Load()) 
{
       DCHECK_LT(next_callback_idx_, thread_callbacks_.size());
       ThreadAvailableCb fn = thread_callbacks_[next_callback_idx_];
-      if (LIKELY(fn != NULL)) {
+      if (LIKELY(!fn.empty())) {
         ++num_invoked;
         fn(this);
       }
@@ -132,15 +110,45 @@ void ThreadResourceMgr::ResourcePool::InvokeCallbacks() {
   }
 }
 
-void ThreadResourceMgr::UpdatePoolQuotas(ResourcePool* new_pool) {
+void ThreadResourceMgr::UpdatePoolQuotas(ThreadResourcePool* new_pool) {
   if (pools_.empty()) return;
-  per_pool_quota_ =
-      ceil(static_cast<double>(system_threads_quota_) / pools_.size());
+  per_pool_quota_.Store(
+      ceil(static_cast<double>(system_threads_quota_) / pools_.size()));
   // Only invoke callbacks on pool unregistration.
   if (new_pool == NULL) {
-    for (Pools::iterator it = pools_.begin(); it != pools_.end(); ++it) {
-      ResourcePool* pool = *it;
+    for (ThreadResourcePool* pool : pools_) {
       pool->InvokeCallbacks();
     }
   }
 }
+
+bool ThreadResourcePool::TryAcquireThreadToken() {
+  while (true) {
+    int64_t previous_num_threads = num_threads_.Load();
+    int64_t new_optional_threads = (previous_num_threads >> 32) + 1;
+    int64_t new_required_threads = previous_num_threads & 0xFFFFFFFF;
+    if (new_optional_threads + new_required_threads > quota()) return false;
+    int64_t new_value = new_optional_threads << 32 | new_required_threads;
+    // Atomically swap the new value if no one updated num_threads_.  We do not
+    // care about the ABA problem here.
+    if (num_threads_.CompareAndSwap(previous_num_threads, new_value)) return 
true;
+  }
+}
+
+void ThreadResourcePool::ReleaseThreadToken(
+    bool required, bool skip_callbacks) {
+  if (required) {
+    DCHECK_GT(num_required_threads(), 0);
+    num_threads_.Add(-1);
+  } else {
+    DCHECK_GT(num_optional_threads(), 0);
+    while (true) {
+      int64_t previous_num_threads = num_threads_.Load();
+      int64_t new_optional_threads = (previous_num_threads >> 32) - 1;
+      int64_t new_required_threads = previous_num_threads & 0xFFFFFFFF;
+      int64_t new_value = new_optional_threads << 32 | new_required_threads;
+      if (num_threads_.CompareAndSwap(previous_num_threads, new_value)) break;
+    }
+  }
+  if (!skip_callbacks) InvokeCallbacks();
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/runtime/thread-resource-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/thread-resource-mgr.h 
b/be/src/runtime/thread-resource-mgr.h
index bf601bd..e6c73a6 100644
--- a/be/src/runtime/thread-resource-mgr.h
+++ b/be/src/runtime/thread-resource-mgr.h
@@ -25,269 +25,195 @@
 
 #include <list>
 
+#include "common/atomic.h"
 #include "common/status.h"
 
 namespace impala {
 
 /// Singleton object to manage CPU (aka thread) resources for the process.
-/// Conceptually, there is a fixed pool of threads that are shared between
-/// query fragments.  If there is only one fragment running, it can use the
+/// Implements a soft limit on the total number of threads being used across 
running
+/// fragment instances. If there is only one fragment instance running, it can 
use the
 /// entire pool, spinning up the maximum number of threads to saturate the
-/// hardware.  If there are multiple fragments, the CPU pool must be shared
-/// between them.  Currently, the total system pool is split evenly between
-/// all consumers.  Each consumer gets ceil(total_system_threads / 
num_consumers).
+/// hardware. If there are multiple fragment instances, we try to share evenly
+/// between them. Currently, the total system pool is split evenly between
+/// all consumers. Each consumer gets ceil(total_system_threads / 
num_consumers).
 //
-/// Each fragment must register with the ThreadResourceMgr to request threads
-/// (in the form of tokens).  The fragment has required threads (it can't run
-/// with fewer threads) and optional threads.  If the fragment is running on 
its
-/// own, it will be able to spin up more optional threads.  When the system
-/// is under load, the ThreadResourceMgr will stop giving out tokens for 
optional
-/// threads.
-/// Pools should not use this for threads that are almost always idle (e.g.
+/// Each fragment instance must register with the ThreadResourceMgr to request 
threads
+/// (in the form of tokens). The fragment instance has required threads (it 
can't run
+/// with fewer threads) and optional threads. If the fragment instance is 
running on its
+/// own, it will be able to spin up more optional threads. When the system is 
under load,
+/// the ThreadResourceMgr will stop giving out tokens for optional threads.
+///
+/// ThreadResourcePools should not be used for threads that are almost always 
idle (e.g.
 /// periodic reporting threads).
-/// Pools will temporarily go over the quota regularly and this is very
-/// much by design.  For example, if a pool is running on its own with
-/// 4 required threads and 28 optional and another pool is added to the
-/// system, the first pool's quota is then cut by half (16 total) and will
-/// over time drop the optional threads.
+/// ThreadResourcePools will temporarily go over the quota regularly and this 
is very
+/// much by design. For example, if a fragment instance is running on its own 
with
+/// 4 required threads and 28 optional and another fragment instance starts, 
the first
+/// pool's quota is then cut by half (16 total) and will over time drop the 
optional
+/// threads.
+///
 /// This class is thread safe.
-/// TODO: this is an initial simple version to improve the behavior with
-/// concurrency.  This will need to be expanded post GA.  These include:
-///  - More places where threads are optional (e.g. hash table build side,
-///    data stream threads, etc).
-///  - Admission control
-///  - Integration with other nodes/statestore
-///  - Priorities for different pools
-/// If both the mgr and pool locks need to be taken, the mgr lock must
-/// be taken first.
 ///
-/// TODO: make ResourcePool a stand-alone class
+/// Note: this is a fairly limited way to manage CPU consumption and has 
flaws, including:
+/// * non-deterministic decisions about resource allocation
+/// * lack of integration with admission control
+/// * lack of any non-trivial policies such as hierachical limits or 
priorities.
+
+class ThreadResourcePool;
+
 class ThreadResourceMgr {
  public:
-  class ResourcePool;
-
-  /// This function will be called whenever the pool has more threads it can 
run on.
-  /// This can happen on ReleaseThreadToken or if the quota for this pool 
increases.
-  /// This is a good place, for example, to wake up anything blocked on 
available threads.
-  /// This callback must not block.
-  /// Note that this is not called once for each available thread or even 
guaranteed that
-  /// when it is called, a thread is available (the quota could have changed 
again in
-  /// between).  It is simply that something might have happened (similar to 
condition
-  /// variable semantics).
-  typedef boost::function<void (ResourcePool*)> ThreadAvailableCb;
-
-  /// Pool abstraction for a single resource pool.
-  /// TODO: this is not quite sufficient going forward.  We need a hierarchy 
of pools,
-  /// one for the entire query, and a sub pool for each component that needs 
threads,
-  /// all of which share a quota.  Currently, the way state is tracked here, 
it would
-  /// be impossible to have two components both want optional threads (e.g. 
two things
-  /// that have 1+ thread usage).
-  class ResourcePool {
-   public:
-    /// Acquire a thread for the pool.  This will always succeed; the
-    /// pool will go over the quota.
-    /// Pools should use this API to reserve threads they need in order
-    /// to make progress.
-    void AcquireThreadToken();
-
-    /// Try to acquire a thread for this pool.  If the pool is at
-    /// the quota, this will return false and the pool should not run.
-    /// Pools should use this API for resources they can use but don't
-    /// need (e.g. scanner threads).
-    bool TryAcquireThreadToken(bool* is_reserved = NULL);
-
-    /// Set a reserved optional number of threads for this pool.  This can be
-    /// used to implement that a component needs n+ number of threads.  The
-    /// first 'num' threads are guaranteed to be acquirable (via 
TryAcquireThreadToken)
-    /// but anything beyond can fail.
-    /// This can also be done with:
-    ///  if (pool->num_optional_threads() < num) AcquireThreadToken();
-    ///  else TryAcquireThreadToken();
-    /// and similar tracking on the Release side but this is common enough to
-    /// abstract it away.
-    void ReserveOptionalTokens(int num);
-
-    /// Release a thread for the pool.  This must be called once for
-    /// each call to AcquireThreadToken and each successful call to 
TryAcquireThreadToken
-    /// If the thread token is from AcquireThreadToken, required must be true; 
false
-    /// if from TryAcquireThreadToken.
-    /// If 'skip_callbacks' is true, ReleaseThreadToken() will not run 
callbacks to find
-    /// a replacement for this thread. This is dangerous and can lead to 
underutilization
-    /// of the system.
-    void ReleaseThreadToken(bool required, bool skip_callbacks = false);
-
-    /// Register a callback to be notified when a thread is available.
-    /// Returns a unique id to be used when removing the callback.
-    /// TODO: rethink this.  How we do coordinate when we have multiple places 
in
-    /// the execution that all need threads (e.g. do we use that thread for
-    /// the scanner or for the join).
-    int AddThreadAvailableCb(ThreadAvailableCb fn);
-
-    /// Unregister the callback corresponding to 'id'.
-    void RemoveThreadAvailableCb(int id);
-
-    /// Returns the number of threads that are from AcquireThreadToken.
-    int num_required_threads() const { return num_threads_ & 0xFFFFFFFF; }
-
-    /// Returns the number of thread resources returned by successful calls
-    /// to TryAcquireThreadToken.
-    int num_optional_threads() const { return num_threads_ >> 32; }
-
-    /// Returns the total number of thread resources for this pool
-    /// (i.e. num_optional_threads + num_required_threads).
-    int64_t num_threads() const {
-      return num_required_threads() + num_optional_threads();
-    }
-
-    int num_reserved_optional_threads() { return 
num_reserved_optional_threads_; }
-
-    /// Returns true if the number of optional threads has now exceeded the 
quota.
-    bool optional_exceeded() {
-      // Cache this so optional/required are computed based on the same value.
-      volatile int64_t num_threads = num_threads_;
-      int64_t optional_threads = num_threads >> 32;
-      int64_t required_threads = num_threads & 0xFFFFFFFF;
-      return optional_threads > num_reserved_optional_threads_ &&
-             optional_threads + required_threads > quota();
-    }
-
-    /// Returns the number of optional threads that can still be used.
-    int num_available_threads() const {
-      int value = std::max(quota() - static_cast<int>(num_threads()),
-          num_reserved_optional_threads_ - num_optional_threads());
-      return std::max(0, value);
-    }
-
-    /// Returns the quota for this pool.  Note this changes dynamically
-    /// based on system load.
-    int quota() const { return std::min(max_quota_, parent_->per_pool_quota_); 
}
-
-    /// Sets the max thread quota for this pool.
-    /// The actual quota is the min of this value and the dynamic value.
-    void set_max_quota(int quota) { max_quota_ = quota; }
-
-   private:
-    friend class ThreadResourceMgr;
-
-    ResourcePool(ThreadResourceMgr* parent);
-
-    /// Resets internal state.
-    void Reset();
-
-    /// Invoke registered callbacks in round-robin manner until the quota is 
exhausted.
-    void InvokeCallbacks();
-
-    ThreadResourceMgr* parent_;
-
-    int max_quota_;
-    int num_reserved_optional_threads_;
-
-    /// A single 64 bit value to store both the number of optional and
-    /// required threads.  This is combined to allow using compare and
-    /// swap operations.  The number of required threads is the lower
-    /// 32 bits and the number of optional threads is the upper 32 bits.
-    int64_t num_threads_;
-
-    /// Lock for the fields below.  This lock is taken when the callback
-    /// function is called.
-    /// TODO: reconsider this.
-    boost::mutex lock_;
-
-    /// A vector of registered callback functions. Entries will be NULL
-    /// for unregistered functions.
-    std::vector<ThreadAvailableCb> thread_callbacks_;
-
-    /// The number of registered callbacks (i.e. the number of non-NULL 
entries in
-    /// thread_callbacks_).
-    int num_callbacks_;
-
-    /// The index into thread_callbacks_ of the next callback to invoke.
-    int next_callback_idx_;
-  };
-
-  /// Create a thread mgr object.  If threads_quota is non-zero, it will be
+  /// Create a thread mgr object. If threads_quota is non-zero, it will be
   /// the number of threads for the system, otherwise it will be determined
   /// based on the hardware.
   ThreadResourceMgr(int threads_quota = 0);
 
   int system_threads_quota() const { return system_threads_quota_; }
 
-  /// Register a new pool with the thread mgr.  Registering a pool
+  /// Create a new pool and register with the thread mgr. Registering a pool
   /// will update the quotas for all existing pools.
-  ResourcePool* RegisterPool();
+  std::unique_ptr<ThreadResourcePool> CreatePool();
 
-  /// Unregisters the pool.  'pool' is no longer valid after this.
-  /// This updates the quotas for the remaining pools.
-  void UnregisterPool(ResourcePool* pool);
+  /// Destroy the pool and unregister with the thread mgr. This updates the 
quotas for
+  /// the remaining pools.
+  void DestroyPool(std::unique_ptr<ThreadResourcePool> pool);
 
  private:
+  friend class ThreadResourcePool;
+
   /// 'Optimal' number of threads for the entire process.
   int system_threads_quota_;
 
-  /// Lock for the entire object.  Protects all fields below.
+  /// Lock for the entire object. Protects all fields below. Must be acquired 
before
+  /// ThreadResourcePool::lock_ if both are held at the same time.
   boost::mutex lock_;
 
   /// Pools currently being managed
-  typedef std::set<ResourcePool*> Pools;
+  typedef std::set<ThreadResourcePool*> Pools;
   Pools pools_;
 
-  /// Each pool currently gets the same share.  This is the ceil of the
+  /// Each pool currently gets the same share. This is the ceil of the
   /// system quota divided by the number of pools.
-  int per_pool_quota_;
-
-  /// Recycled list of pool objects
-  std::list<ResourcePool*> free_pool_objs_;
+  AtomicInt32 per_pool_quota_{0};
 
   /// Updates the per pool quota and notifies any pools that now have
-  /// more threads they can use.  Must be called with lock_ taken.
+  /// more threads they can use. Must be called with lock_ taken.
   /// If new_pool is non-null, new_pool will *not* be notified.
-  void UpdatePoolQuotas(ResourcePool* new_pool = NULL);
+  void UpdatePoolQuotas(ThreadResourcePool* new_pool = nullptr);
 };
 
-inline void ThreadResourceMgr::ResourcePool::AcquireThreadToken() {
-  __sync_fetch_and_add(&num_threads_, 1);
-}
-
-inline bool ThreadResourceMgr::ResourcePool::TryAcquireThreadToken(bool* 
is_reserved) {
-  while (true) {
-    int64_t previous_num_threads = num_threads_;
-    int64_t new_optional_threads = (previous_num_threads >> 32) + 1;
-    int64_t new_required_threads = previous_num_threads & 0xFFFFFFFF;
-    if (new_optional_threads > num_reserved_optional_threads_ &&
-        new_optional_threads + new_required_threads > quota()) {
-      return false;
-    }
-    bool thread_is_reserved = new_optional_threads <= 
num_reserved_optional_threads_;
-    int64_t new_value = new_optional_threads << 32 | new_required_threads;
-    // Atomically swap the new value if no one updated num_threads_.  We do not
-    // not care about the ABA problem here.
-    if (__sync_bool_compare_and_swap(&num_threads_, previous_num_threads, 
new_value)) {
-      if (is_reserved != NULL) *is_reserved = thread_is_reserved;
-      return true;
-    }
+/// Pool abstraction for a single resource pool.
+/// Note; there is no concept of hierarchy - all pools are treated equally 
even if
+/// they belong to the same query..
+class ThreadResourcePool {
+ public:
+  /// This function will be called whenever the pool has more threads it can 
run on.
+  /// This can happen on ReleaseThreadToken or if the quota for this pool 
increases.
+  /// This is a good place, for example, to wake up anything blocked on 
available threads.
+  /// This callback must not block.
+  /// Note that this is not called once for each available thread or even 
guaranteed that
+  /// when it is called, a thread is available (the quota could have changed 
again in
+  /// between). It is simply that something might have happened (similar to 
condition
+  /// variable semantics).
+  typedef boost::function<void (ThreadResourcePool*)> ThreadAvailableCb;
+
+  ~ThreadResourcePool() { DCHECK(parent_ == nullptr) << "Must unregister 
pool"; }
+
+  /// Acquire a thread for the pool. This will always succeed; the pool will 
go over the
+  /// quota if needed. Pools should use this API to reserve threads they need 
in order to
+  /// make progress.
+  void AcquireThreadToken() { num_threads_.Add(1); }
+
+  /// Try to acquire a thread for this pool. If the pool is at the quota, this 
will
+  /// return false and the pool should not run. Pools should use this API for 
resources
+  /// they can use but don't need (e.g. extra scanner threads).
+  bool TryAcquireThreadToken();
+
+  /// Release a thread for the pool. This must be called once for each call to
+  /// AcquireThreadToken() and each successful call to TryAcquireThreadToken()
+  /// If the thread token is from AcquireThreadToken(), required must be true; 
false
+  /// if from TryAcquireThreadToken().
+  /// If 'skip_callbacks' is true, ReleaseThreadToken() will not run callbacks 
to find
+  /// a replacement for this thread. This is dangerous and can lead to 
underutilization
+  /// of the system.
+  void ReleaseThreadToken(bool required, bool skip_callbacks = false);
+
+  /// Register a callback to be notified when a thread is available.
+  /// Returns a unique id to be used when removing the callback.
+  /// Note: this is limited because we can't coordinate between multiple 
places in
+  /// execution that could use extra threads (e.g. do we use that thread for a
+  /// scanner or for a join).
+  int AddThreadAvailableCb(ThreadAvailableCb fn);
+
+  /// Unregister the callback corresponding to 'id'.
+  void RemoveThreadAvailableCb(int id);
+
+  /// Returns the number of threads that are from AcquireThreadToken.
+  int num_required_threads() const { return num_threads_.Load() & 0xFFFFFFFF; }
+
+  /// Returns the number of thread resources returned by successful calls
+  /// to TryAcquireThreadToken.
+  int num_optional_threads() const { return num_threads_.Load() >> 32; }
+
+  /// Returns the total number of thread resources for this pool
+  /// (i.e. num_optional_threads + num_required_threads).
+  int64_t num_threads() const {
+    return num_required_threads() + num_optional_threads();
+  }
+
+  /// Returns true if the number of optional threads has now exceeded the 
quota.
+  bool optional_exceeded() {
+    // Cache this so optional/required are computed based on the same value.
+    int64_t num_threads = num_threads_.Load();
+    int64_t optional_threads = num_threads >> 32;
+    int64_t required_threads = num_threads & 0xFFFFFFFF;
+    return optional_threads + required_threads > quota();
   }
-}
-
-inline void ThreadResourceMgr::ResourcePool::ReleaseThreadToken(
-    bool required, bool skip_callbacks) {
-  if (required) {
-    DCHECK_GT(num_required_threads(), 0);
-    __sync_fetch_and_add(&num_threads_, -1);
-  } else {
-    DCHECK_GT(num_optional_threads(), 0);
-    while (true) {
-      int64_t previous_num_threads = num_threads_;
-      int64_t new_optional_threads = (previous_num_threads >> 32) - 1;
-      int64_t new_required_threads = previous_num_threads & 0xFFFFFFFF;
-      int64_t new_value = new_optional_threads << 32 | new_required_threads;
-      if (__sync_bool_compare_and_swap(&num_threads_, previous_num_threads, 
new_value)) {
-        break;
-      }
-    }
+
+  /// Returns the number of optional threads that can still be used.
+  int num_available_threads() const {
+    return std::max(0, quota() - static_cast<int>(num_threads()));
   }
-  if (!skip_callbacks) InvokeCallbacks();
-}
 
+  /// Returns the quota for this pool. Note this changes dynamically based on 
the global
+  /// number of registered resource pools.
+  int quota() const { return parent_->per_pool_quota_.Load(); }
+
+ private:
+  friend class ThreadResourceMgr;
+
+  ThreadResourcePool(ThreadResourceMgr* parent);
+
+  /// Invoke registered callbacks in round-robin manner until the quota is 
exhausted.
+  void InvokeCallbacks();
+
+  /// The parent resource manager. Set to NULL when unregistered.
+  ThreadResourceMgr* parent_;
+
+  /// A single 64 bit value to store both the number of optional and required 
threads.
+  /// This is combined to allow atomic compare-and-swap of both fields. The 
number of
+  /// required threads is the lower 32 bits and the number of optional threads 
is the
+  /// upper 32 bits.
+  AtomicInt64 num_threads_{0};
+
+  /// Lock for the fields below. This lock is taken when the callback function 
is called.
+  /// Must be acquired after ThreadResourceMgr::lock_ if both are held at the 
same time.
+  boost::mutex lock_;
+
+  /// A vector of registered callback functions. Entries will be set to 
"empty" function
+  /// objects, which can be constructed with the default ThreadAvailableCb() 
constructor,
+  /// when the function is unregistered.
+  std::vector<ThreadAvailableCb> thread_callbacks_;
+
+  /// The number of registered callbacks (i.e. the number of non-NULL entries 
in
+  /// 'thread_callbacks_'). Must hold 'lock_' to write, but can read without 
holding
+  /// 'lock_'.
+  AtomicInt32 num_callbacks_{0};
+
+  /// The index of the next callback to invoke in 'thread_callbacks_'. 
Protected by
+  /// 'lock_'.
+  int next_callback_idx_ = 0;
+};
 } // namespace impala
 
 #endif

Reply via email to