IMPALA-2831: Bound the number of scanner threads per scan node.

Our current code base allows a scan node to spin up as many as
3x the number of logical cpu cores of scanner threads. However,
the scanner threads are cpu bound so there is diminishing return
for starting more scanner threads than the number of logical cores.
In fact, it may be detrimental due to context switching overhead.

This change bounds the number of scanner threads spun up by a scan
node to the number of logical cpu cores unless the query option
'num_scanner_threads' is set. The total number of available thread
tokens is unchanged. With this change, the peak memory usage of the
following query on a single node Impala cluster running on a machine
with 8 logical cores reduces from 287MB to 101MB.

select count(*) from tpch100_parquet.lineitem where l_orderkey > 20

The reduction comes mostly from the fewer outstanding IO buffers.
The IO for scan ranges will be scheduled by the scanner threads
which pick them up. There will be at least an IO buffer of 8 to 16MB
associated with each scan range. So, more threads we start up,
more memory will be consumed by the IO buffers, leading to the
higher peak memory usages.

Change-Id: I191988ad18d6b4caf892fc967258823edcf9681f
Reviewed-on: http://gerrit.cloudera.org:8080/4174
Reviewed-by: Michael Ho <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ecd78fb6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ecd78fb6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ecd78fb6

Branch: refs/heads/master
Commit: ecd78fb67d240485c96d60e745c674e0157d9263
Parents: df83090
Author: Michael Ho <[email protected]>
Authored: Tue Aug 30 10:36:28 2016 -0700
Committer: Internal Jenkins <[email protected]>
Committed: Wed Aug 31 06:58:44 2016 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-scan-node.cc | 29 +++++++++++++++++------------
 be/src/exec/hdfs-scan-node.h  |  6 ++++++
 2 files changed, 23 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecd78fb6/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 9ed78de..4846004 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -123,7 +123,8 @@ HdfsScanNode::HdfsScanNode(ObjectPool* pool, const 
TPlanNode& tnode,
       all_ranges_started_(false),
       counters_running_(false),
       thread_avail_cb_id_(-1),
-      rm_callback_id_(-1) {
+      rm_callback_id_(-1),
+      max_num_scanner_threads_(CpuInfo::num_cores()) {
   max_materialized_row_batches_ = FLAGS_max_row_batches;
   if (max_materialized_row_batches_ <= 0) {
     // TODO: This parameter has an U-shaped effect on performance: increasing 
the value
@@ -259,6 +260,8 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* 
row_batch, bool* eos
     }
 
     // Issue initial ranges for all file types.
+    RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this,
+        matching_per_type_files[THdfsFileFormat::PARQUET]));
     RETURN_IF_ERROR(HdfsTextScanner::IssueInitialRanges(this,
         matching_per_type_files[THdfsFileFormat::TEXT]));
     RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
@@ -267,8 +270,6 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* 
row_batch, bool* eos
         matching_per_type_files[THdfsFileFormat::RC_FILE]));
     RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
         matching_per_type_files[THdfsFileFormat::AVRO]));
-    RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this,
-        matching_per_type_files[THdfsFileFormat::PARQUET]));
 
     // Release the scanner threads
     ranges_issued_barrier_.Notify();
@@ -730,9 +731,9 @@ Status HdfsScanNode::Open(RuntimeState* state) {
   // reservation before any ranges are issued.
   runtime_state_->resource_pool()->ReserveOptionalTokens(1);
   if (runtime_state_->query_options().num_scanner_threads > 0) {
-    runtime_state_->resource_pool()->set_max_quota(
-        runtime_state_->query_options().num_scanner_threads);
+    max_num_scanner_threads_ = 
runtime_state_->query_options().num_scanner_threads;
   }
+  DCHECK_GT(max_num_scanner_threads_, 0);
 
   thread_avail_cb_id_ = runtime_state_->resource_pool()->AddThreadAvailableCb(
       bind<void>(mem_fn(&HdfsScanNode::ThreadTokenAvailableCb), this, _1));
@@ -897,7 +898,7 @@ Status HdfsScanNode::AddDiskIoRanges(const 
vector<DiskIoMgr::ScanRange*>& ranges
       runtime_state_->io_mgr()->AddScanRanges(reader_context_, ranges));
   num_unqueued_files_.Add(-num_files_queued);
   DCHECK_GE(num_unqueued_files_.Load(), 0);
-  ThreadTokenAvailableCb(runtime_state_->resource_pool());
+  if (!ranges.empty()) ThreadTokenAvailableCb(runtime_state_->resource_pool());
   return Status::OK();
 }
 
@@ -983,8 +984,9 @@ void 
HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
   //  5. Don't start up a ScannerThread if materialized_row_batches_ is full 
since
   //     we are not scanner bound.
   //  6. Don't start up a thread if there isn't enough memory left to run it.
-  //  7. Don't start up if there are no thread tokens.
-  //  8. Don't start up if we are running too many threads for our vcore 
allocation
+  //  7. Don't start up more than maximum number of scanner threads configured.
+  //  8. Don't start up if there are no thread tokens.
+  //  9. Don't start up if we are running too many threads for our vcore 
allocation
   //  (unless the thread is reserved, in which case it has to run).
 
   // Case 4. We have not issued the initial ranges so don't start a scanner 
thread.
@@ -1000,7 +1002,7 @@ void 
HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
     unique_lock<mutex> lock(lock_);
     // Cases 1, 2, 3.
     if (done_ || all_ranges_started_ ||
-      active_scanner_thread_counter_.value() >= progress_.remaining()) {
+        active_scanner_thread_counter_.value() >= progress_.remaining()) {
       break;
     }
 
@@ -1011,11 +1013,14 @@ void 
HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
       break;
     }
 
-    // Case 7.
+    // Case 7 and 8.
     bool is_reserved = false;
-    if (!pool->TryAcquireThreadToken(&is_reserved)) break;
+    if (active_scanner_thread_counter_.value() >= max_num_scanner_threads_ ||
+        !pool->TryAcquireThreadToken(&is_reserved)) {
+      break;
+    }
 
-    // Case 8.
+    // Case 9.
     if (!is_reserved) {
       if (runtime_state_->query_resource_mgr() != NULL &&
           runtime_state_->query_resource_mgr()->IsVcoreOverSubscribed()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecd78fb6/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index ae38856..36e95b5 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -509,6 +509,12 @@ class HdfsScanNode : public ScanNode {
   /// -1 if no callback is registered.
   int32_t rm_callback_id_;
 
+  /// Maximum number of scanner threads. Set to 'NUM_SCANNER_THREADS' if that 
query
+  /// option is set. Otherwise, it's set to the number of cpu cores. Scanner 
threads
+  /// are generally cpu bound so there is no benefit in spinning up more 
threads than
+  /// the number of cores.
+  int max_num_scanner_threads_;
+
   /// Tries to spin up as many scanner threads as the quota allows. Called 
explicitly
   /// (e.g., when adding new ranges) or when threads are available for this 
scan node.
   void ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool);

Reply via email to