Repository: incubator-impala Updated Branches: refs/heads/master 858f5c219 -> b9034ea0d
IMPALA-4577: Adjust maximum size of row batch queue with MT_DOP. When MT_DOP is set, non-Parquet scans are run with the old scan node with a fixed NUM_SCANNER_THREADS=1. This patch adjust the maximum size of the row batch queue based on MT_DOP for each such scan instance to avoid a significant increase in the memory consumption of such scans. The max queued batches per scan-node instance is at least 2 to always allow for some parallelism between the producer/consumer. Decreases the maximum allowed value for MT_DOP from 128 to 64. Change-Id: Ic2aa260f9265ec21173fb703c41934964ece6485 Reviewed-on: http://gerrit.cloudera.org:8080/5330 Reviewed-by: Alex Behm <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b237d131 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b237d131 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b237d131 Branch: refs/heads/master Commit: b237d1316eac6ccde4ce395b4c238997be3e2f8b Parents: 858f5c2 Author: Alex Behm <[email protected]> Authored: Fri Dec 2 10:17:22 2016 -0800 Committer: Internal Jenkins <[email protected]> Committed: Sat Dec 3 06:46:01 2016 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-scan-node.cc | 30 +++++++++++++++++++++--------- be/src/exec/hdfs-scan-node.h | 1 + be/src/service/query-options.cc | 4 ++-- 3 files changed, 24 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b237d131/be/src/exec/hdfs-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc index d5e0fc8..305fa28 100644 --- a/be/src/exec/hdfs-scan-node.cc +++ b/be/src/exec/hdfs-scan-node.cc @@ -64,15 +64,6 @@ HdfsScanNode::HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode, all_ranges_started_(false), thread_avail_cb_id_(-1), max_num_scanner_threads_(CpuInfo::num_cores()) { - max_materialized_row_batches_ = FLAGS_max_row_batches; - if (max_materialized_row_batches_ <= 0) { - // TODO: This parameter has an U-shaped effect on performance: increasing the value - // would first improve performance, but further increasing would degrade performance. - // Investigate and tune this. - max_materialized_row_batches_ = - 10 * (DiskInfo::num_disks() + DiskIoMgr::REMOTE_NUM_DISKS); - } - materialized_row_batches_.reset(new RowBatchQueue(max_materialized_row_batches_)); } HdfsScanNode::~HdfsScanNode() { @@ -150,6 +141,27 @@ Status HdfsScanNode::GetNextInternal( return status_; } +Status HdfsScanNode::Init(const TPlanNode& tnode, RuntimeState* state) { + int default_max_row_batches = FLAGS_max_row_batches; + if (default_max_row_batches <= 0) { + default_max_row_batches = 10 * (DiskInfo::num_disks() + DiskIoMgr::REMOTE_NUM_DISKS); + } + if (state->query_options().__isset.mt_dop && state->query_options().mt_dop > 0) { + // To avoid a significant memory increase, adjust the number of maximally queued + // row batches per scan instance based on MT_DOP. The max materialized row batches + // is at least 2 to allow for some parallelism between the producer/consumer. + max_materialized_row_batches_ = + max(2, default_max_row_batches / state->query_options().mt_dop); + } else { + max_materialized_row_batches_ = default_max_row_batches; + } + VLOG_QUERY << "Max row batch queue size for scan node '" << id_ + << "' in fragment instance '" << state->fragment_instance_id() + << "': " << max_materialized_row_batches_; + materialized_row_batches_.reset(new RowBatchQueue(max_materialized_row_batches_)); + return HdfsScanNodeBase::Init(tnode, state); +} + Status HdfsScanNode::Prepare(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(HdfsScanNodeBase::Prepare(state)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b237d131/be/src/exec/hdfs-scan-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h index 0fc2ba7..f8064b1 100644 --- a/be/src/exec/hdfs-scan-node.h +++ b/be/src/exec/hdfs-scan-node.h @@ -66,6 +66,7 @@ class HdfsScanNode : public HdfsScanNodeBase { HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); ~HdfsScanNode(); + virtual Status Init(const TPlanNode& tnode, RuntimeState* state); virtual Status Prepare(RuntimeState* state); virtual Status Open(RuntimeState* state); virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b237d131/be/src/service/query-options.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index d6d2cb2..8be6b19 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -400,10 +400,10 @@ Status impala::SetQueryOption(const string& key, const string& value, StringParser::ParseResult result; const int32_t dop = StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result); - if (result != StringParser::PARSE_SUCCESS || dop < 0 || dop > 128) { + if (result != StringParser::PARSE_SUCCESS || dop < 0 || dop > 64) { return Status( Substitute("$0 is not valid for mt_dop. Valid values are in " - "[0, 128].", value)); + "[0, 64].", value)); } query_options->__set_mt_dop(dop); break;
