IMPALA-2831: Bound the number of scanner threads per scan node. Our current code base allows a scan node to spin up as many as 3x the number of logical cpu cores of scanner threads. However, the scanner threads are cpu bound so there is diminishing return for starting more scanner threads than the number of logical cores. In fact, it may be detrimental due to context switching overhead.
This change bounds the number of scanner threads spun up by a scan node to the number of logical cpu cores unless the query option 'num_scanner_threads' is set. The total number of available thread tokens is unchanged. With this change, the peak memory usage of the following query on a single node Impala cluster running on a machine with 8 logical cores reduces from 287MB to 101MB. select count(*) from tpch100_parquet.lineitem where l_orderkey > 20 The reduction comes mostly from the fewer outstanding IO buffers. The IO for scan ranges will be scheduled by the scanner threads which pick them up. There will be at least an IO buffer of 8 to 16MB associated with each scan range. So, more threads we start up, more memory will be consumed by the IO buffers, leading to the higher peak memory usages. Change-Id: I191988ad18d6b4caf892fc967258823edcf9681f Reviewed-on: http://gerrit.cloudera.org:8080/4174 Reviewed-by: Michael Ho <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ecd78fb6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ecd78fb6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ecd78fb6 Branch: refs/heads/master Commit: ecd78fb67d240485c96d60e745c674e0157d9263 Parents: df83090 Author: Michael Ho <[email protected]> Authored: Tue Aug 30 10:36:28 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Wed Aug 31 06:58:44 2016 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-scan-node.cc | 29 +++++++++++++++++------------ be/src/exec/hdfs-scan-node.h | 6 ++++++ 2 files changed, 23 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecd78fb6/be/src/exec/hdfs-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc index 9ed78de..4846004 100644 --- a/be/src/exec/hdfs-scan-node.cc +++ b/be/src/exec/hdfs-scan-node.cc @@ -123,7 +123,8 @@ HdfsScanNode::HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode, all_ranges_started_(false), counters_running_(false), thread_avail_cb_id_(-1), - rm_callback_id_(-1) { + rm_callback_id_(-1), + max_num_scanner_threads_(CpuInfo::num_cores()) { max_materialized_row_batches_ = FLAGS_max_row_batches; if (max_materialized_row_batches_ <= 0) { // TODO: This parameter has an U-shaped effect on performance: increasing the value @@ -259,6 +260,8 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos } // Issue initial ranges for all file types. + RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this, + matching_per_type_files[THdfsFileFormat::PARQUET])); RETURN_IF_ERROR(HdfsTextScanner::IssueInitialRanges(this, matching_per_type_files[THdfsFileFormat::TEXT])); RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this, @@ -267,8 +270,6 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos matching_per_type_files[THdfsFileFormat::RC_FILE])); RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this, matching_per_type_files[THdfsFileFormat::AVRO])); - RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this, - matching_per_type_files[THdfsFileFormat::PARQUET])); // Release the scanner threads ranges_issued_barrier_.Notify(); @@ -730,9 +731,9 @@ Status HdfsScanNode::Open(RuntimeState* state) { // reservation before any ranges are issued. runtime_state_->resource_pool()->ReserveOptionalTokens(1); if (runtime_state_->query_options().num_scanner_threads > 0) { - runtime_state_->resource_pool()->set_max_quota( - runtime_state_->query_options().num_scanner_threads); + max_num_scanner_threads_ = runtime_state_->query_options().num_scanner_threads; } + DCHECK_GT(max_num_scanner_threads_, 0); thread_avail_cb_id_ = runtime_state_->resource_pool()->AddThreadAvailableCb( bind<void>(mem_fn(&HdfsScanNode::ThreadTokenAvailableCb), this, _1)); @@ -897,7 +898,7 @@ Status HdfsScanNode::AddDiskIoRanges(const vector<DiskIoMgr::ScanRange*>& ranges runtime_state_->io_mgr()->AddScanRanges(reader_context_, ranges)); num_unqueued_files_.Add(-num_files_queued); DCHECK_GE(num_unqueued_files_.Load(), 0); - ThreadTokenAvailableCb(runtime_state_->resource_pool()); + if (!ranges.empty()) ThreadTokenAvailableCb(runtime_state_->resource_pool()); return Status::OK(); } @@ -983,8 +984,9 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool) // 5. Don't start up a ScannerThread if materialized_row_batches_ is full since // we are not scanner bound. // 6. Don't start up a thread if there isn't enough memory left to run it. - // 7. Don't start up if there are no thread tokens. - // 8. Don't start up if we are running too many threads for our vcore allocation + // 7. Don't start up more than maximum number of scanner threads configured. + // 8. Don't start up if there are no thread tokens. + // 9. Don't start up if we are running too many threads for our vcore allocation // (unless the thread is reserved, in which case it has to run). // Case 4. We have not issued the initial ranges so don't start a scanner thread. @@ -1000,7 +1002,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool) unique_lock<mutex> lock(lock_); // Cases 1, 2, 3. if (done_ || all_ranges_started_ || - active_scanner_thread_counter_.value() >= progress_.remaining()) { + active_scanner_thread_counter_.value() >= progress_.remaining()) { break; } @@ -1011,11 +1013,14 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool) break; } - // Case 7. + // Case 7 and 8. bool is_reserved = false; - if (!pool->TryAcquireThreadToken(&is_reserved)) break; + if (active_scanner_thread_counter_.value() >= max_num_scanner_threads_ || + !pool->TryAcquireThreadToken(&is_reserved)) { + break; + } - // Case 8. + // Case 9. if (!is_reserved) { if (runtime_state_->query_resource_mgr() != NULL && runtime_state_->query_resource_mgr()->IsVcoreOverSubscribed()) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecd78fb6/be/src/exec/hdfs-scan-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h index ae38856..36e95b5 100644 --- a/be/src/exec/hdfs-scan-node.h +++ b/be/src/exec/hdfs-scan-node.h @@ -509,6 +509,12 @@ class HdfsScanNode : public ScanNode { /// -1 if no callback is registered. int32_t rm_callback_id_; + /// Maximum number of scanner threads. Set to 'NUM_SCANNER_THREADS' if that query + /// option is set. Otherwise, it's set to the number of cpu cores. Scanner threads + /// are generally cpu bound so there is no benefit in spinning up more threads than + /// the number of cores. + int max_num_scanner_threads_; + /// Tries to spin up as many scanner threads as the quota allows. Called explicitly /// (e.g., when adding new ranges) or when threads are available for this scan node. void ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool);
