Repository: incubator-impala
Updated Branches:
  refs/heads/master 858f5c219 -> b9034ea0d


IMPALA-4577: Adjust maximum size of row batch queue with MT_DOP.

When MT_DOP is set, non-Parquet scans are run with the old
scan node with a fixed NUM_SCANNER_THREADS=1. This patch
adjust the maximum size of the row batch queue based on MT_DOP
for each such scan instance to avoid a significant increase in
the memory consumption of such scans. The max queued batches
per scan-node instance is at least 2 to always allow for some
parallelism between the producer/consumer.

Decreases the maximum allowed value for MT_DOP from 128 to 64.

Change-Id: Ic2aa260f9265ec21173fb703c41934964ece6485
Reviewed-on: http://gerrit.cloudera.org:8080/5330
Reviewed-by: Alex Behm <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b237d131
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b237d131
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b237d131

Branch: refs/heads/master
Commit: b237d1316eac6ccde4ce395b4c238997be3e2f8b
Parents: 858f5c2
Author: Alex Behm <[email protected]>
Authored: Fri Dec 2 10:17:22 2016 -0800
Committer: Internal Jenkins <[email protected]>
Committed: Sat Dec 3 06:46:01 2016 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-scan-node.cc   | 30 +++++++++++++++++++++---------
 be/src/exec/hdfs-scan-node.h    |  1 +
 be/src/service/query-options.cc |  4 ++--
 3 files changed, 24 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b237d131/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index d5e0fc8..305fa28 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -64,15 +64,6 @@ HdfsScanNode::HdfsScanNode(ObjectPool* pool, const 
TPlanNode& tnode,
       all_ranges_started_(false),
       thread_avail_cb_id_(-1),
       max_num_scanner_threads_(CpuInfo::num_cores()) {
-  max_materialized_row_batches_ = FLAGS_max_row_batches;
-  if (max_materialized_row_batches_ <= 0) {
-    // TODO: This parameter has an U-shaped effect on performance: increasing 
the value
-    // would first improve performance, but further increasing would degrade 
performance.
-    // Investigate and tune this.
-    max_materialized_row_batches_ =
-        10 * (DiskInfo::num_disks() + DiskIoMgr::REMOTE_NUM_DISKS);
-  }
-  materialized_row_batches_.reset(new 
RowBatchQueue(max_materialized_row_batches_));
 }
 
 HdfsScanNode::~HdfsScanNode() {
@@ -150,6 +141,27 @@ Status HdfsScanNode::GetNextInternal(
   return status_;
 }
 
+Status HdfsScanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+  int default_max_row_batches = FLAGS_max_row_batches;
+  if (default_max_row_batches <= 0) {
+    default_max_row_batches = 10 * (DiskInfo::num_disks() + 
DiskIoMgr::REMOTE_NUM_DISKS);
+  }
+  if (state->query_options().__isset.mt_dop && state->query_options().mt_dop > 
0) {
+    // To avoid a significant memory increase, adjust the number of maximally 
queued
+    // row batches per scan instance based on MT_DOP. The max materialized row 
batches
+    // is at least 2 to allow for some parallelism between the 
producer/consumer.
+    max_materialized_row_batches_ =
+        max(2, default_max_row_batches / state->query_options().mt_dop);
+  } else {
+    max_materialized_row_batches_ = default_max_row_batches;
+  }
+  VLOG_QUERY << "Max row batch queue size for scan node '" << id_
+      << "' in fragment instance '" << state->fragment_instance_id()
+      << "': " << max_materialized_row_batches_;
+  materialized_row_batches_.reset(new 
RowBatchQueue(max_materialized_row_batches_));
+  return HdfsScanNodeBase::Init(tnode, state);
+}
+
 Status HdfsScanNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(HdfsScanNodeBase::Prepare(state));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b237d131/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 0fc2ba7..f8064b1 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -66,6 +66,7 @@ class HdfsScanNode : public HdfsScanNodeBase {
   HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& 
descs);
   ~HdfsScanNode();
 
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
   virtual Status Prepare(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b237d131/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index d6d2cb2..8be6b19 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -400,10 +400,10 @@ Status impala::SetQueryOption(const string& key, const 
string& value,
         StringParser::ParseResult result;
         const int32_t dop =
             StringParser::StringToInt<int32_t>(value.c_str(), value.length(), 
&result);
-        if (result != StringParser::PARSE_SUCCESS || dop < 0 || dop > 128) {
+        if (result != StringParser::PARSE_SUCCESS || dop < 0 || dop > 64) {
           return Status(
               Substitute("$0 is not valid for mt_dop. Valid values are in "
-                "[0, 128].", value));
+                "[0, 64].", value));
         }
         query_options->__set_mt_dop(dop);
         break;

Reply via email to