IMPALA-3748: minimum buffer requirements in planner

Compute the minimum buffer requirement for spilling nodes and
per-host estimates for the entire plan tree.

This builds on top of the existing resource estimation code, which
computes the sets of plan nodes that can execute concurrently. This is
cleaned up so that the process of producing resource requirements is
clearer. It also removes the unused VCore estimates.

Fixes various bugs and other issues:
* computeCosts() was not called for unpartitioned fragments, so
  the per-operator memory estimate was not visible.
* Nested loop join was not treated as a blocking join.
* The TODO comment about union was misleading
* Fix the computation for mt_dop > 1 by distinguishing per-instance and
  per-host estimates.
* Always generate an estimate instead of unpredictably returning
  -1/"unavailable" in many circumstances - there was little rhyme or
  reason to when this happened.
* Remove the special "trivial plan" estimates. With the rest of the
  cleanup we generate estimates <= 10MB for those trivial plans through
  the normal code path.

I left one bug (IMPALA-4862) unfixed because it is subtle, will affect
estimates for many plans and will be easier to review once we have the
test infra in place.

Testing:
Added basic planner tests for resource requirements in both the MT and
non-MT cases.

Re-enabled the explain_level tests, which appears to be the only
coverage for many of these estimates. Removed the complex and
brittle test cases and replaced with a couple of much simpler
end-to-end tests.

Change-Id: I1e358182bcf2bc5fe5c73883eb97878735b12d37
Reviewed-on: http://gerrit.cloudera.org:8080/5847
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9a29dfc9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9a29dfc9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9a29dfc9

Branch: refs/heads/master
Commit: 9a29dfc91b1ff8bbae3c94b53bf2b6ac81a271e0
Parents: c0e3060
Author: Tim Armstrong <[email protected]>
Authored: Wed Jan 25 15:19:35 2017 -0800
Committer: Impala Public Jenkins <[email protected]>
Committed: Tue Apr 18 20:36:08 2017 +0000

----------------------------------------------------------------------
 be/src/exec/analytic-eval-node.cc               |    4 +-
 be/src/exec/partitioned-aggregation-node.h      |    1 +
 be/src/exec/partitioned-hash-join-builder.h     |    1 +
 be/src/runtime/sorter.cc                        |    1 +
 be/src/scheduling/query-schedule.cc             |   15 +-
 be/src/service/query-exec-state.cc              |   14 +-
 common/thrift/Frontend.thrift                   |    8 +-
 .../apache/impala/analysis/AggregateInfo.java   |   13 +-
 .../apache/impala/analysis/TupleDescriptor.java |   10 +
 .../org/apache/impala/common/PrintUtils.java    |   12 +-
 .../apache/impala/planner/AggregationNode.java  |   38 +-
 .../apache/impala/planner/AnalyticEvalNode.java |   12 +-
 .../org/apache/impala/planner/DataSink.java     |   44 +-
 .../impala/planner/DataSourceScanNode.java      |    4 +-
 .../apache/impala/planner/DataStreamSink.java   |   14 +-
 .../impala/planner/DistributedPlanner.java      |   10 +-
 .../org/apache/impala/planner/EmptySetNode.java |   10 +-
 .../org/apache/impala/planner/ExchangeNode.java |    7 +
 .../apache/impala/planner/HBaseScanNode.java    |    4 +-
 .../apache/impala/planner/HBaseTableSink.java   |   18 +-
 .../org/apache/impala/planner/HashJoinNode.java |   29 +-
 .../org/apache/impala/planner/HdfsScanNode.java |   59 +-
 .../apache/impala/planner/HdfsTableSink.java    |   67 +-
 .../apache/impala/planner/JoinBuildSink.java    |   19 +-
 .../org/apache/impala/planner/JoinNode.java     |    6 +-
 .../org/apache/impala/planner/KuduScanNode.java |    6 +
 .../apache/impala/planner/KuduTableSink.java    |   19 +-
 .../impala/planner/NestedLoopJoinNode.java      |   15 +-
 .../apache/impala/planner/ParallelPlanner.java  |    8 +-
 .../impala/planner/PipelinedPlanNodeSet.java    |  141 +--
 .../org/apache/impala/planner/PlanFragment.java |   96 +-
 .../org/apache/impala/planner/PlanNode.java     |   60 +-
 .../org/apache/impala/planner/PlanRootSink.java |   13 +-
 .../java/org/apache/impala/planner/Planner.java |  123 +-
 .../apache/impala/planner/ResourceProfile.java  |   83 ++
 .../org/apache/impala/planner/SelectNode.java   |    8 +
 .../impala/planner/SingularRowSrcNode.java      |    8 +
 .../org/apache/impala/planner/SortNode.java     |   16 +-
 .../org/apache/impala/planner/SubplanNode.java  |    8 +
 .../org/apache/impala/planner/UnionNode.java    |    7 +
 .../org/apache/impala/planner/UnnestNode.java   |    7 +
 .../org/apache/impala/service/Frontend.java     |   17 +-
 .../org/apache/impala/planner/PlannerTest.java  |   10 +
 .../apache/impala/planner/PlannerTestBase.java  |   48 +-
 .../queries/PlannerTest/constant-folding.test   |   91 +-
 .../queries/PlannerTest/kudu-selectivity.test   |   70 +-
 .../queries/PlannerTest/mt-dop-validation.test  |  126 +-
 .../queries/PlannerTest/parquet-filtering.test  |   16 +-
 .../PlannerTest/resource-requirements.test      | 1121 ++++++++++++++++++
 .../queries/QueryTest/explain-level0.test       |  136 +--
 .../queries/QueryTest/explain-level1.test       |  224 +---
 .../queries/QueryTest/explain-level2.test       |  388 +-----
 .../queries/QueryTest/explain-level3.test       |  436 +------
 .../custom_cluster/test_admission_controller.py |    6 +-
 tests/metadata/test_explain.py                  |   12 +-
 55 files changed, 2119 insertions(+), 1620 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/be/src/exec/analytic-eval-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.cc 
b/be/src/exec/analytic-eval-node.cc
index 16dce50..083ab1a 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -177,9 +177,11 @@ Status AnalyticEvalNode::Prepare(RuntimeState* state) {
     }
   }
 
+  // Must be kept in sync with AnalyticEvalNode.computeResourceProfile() in fe.
+  const int MIN_REQUIRED_BUFFERS = 2;
   RETURN_IF_ERROR(state->block_mgr()->RegisterClient(
       Substitute("AnalyticEvalNode id=$0 ptr=$1", id_, this),
-      2, false, mem_tracker(), state, &client_));
+      MIN_REQUIRED_BUFFERS, false, mem_tracker(), state, &client_));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/be/src/exec/partitioned-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.h 
b/be/src/exec/partitioned-aggregation-node.h
index f26a252..2155473 100644
--- a/be/src/exec/partitioned-aggregation-node.h
+++ b/be/src/exec/partitioned-aggregation-node.h
@@ -679,6 +679,7 @@ class PartitionedAggregationNode : public ExecNode {
   /// as the partitions aggregate stream needs to be serialized and rewritten.
   /// We do not spill streaming preaggregations, so we do not need to reserve 
any buffers.
   int MinRequiredBuffers() const {
+    // Must be kept in sync with AggregationNode.computeResourceProfile() in 
fe.
     if (is_streaming_preagg_) return 0;
     return 2 * PARTITION_FANOUT + 1 + (needs_serialize_ ? 1 : 0);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/be/src/exec/partitioned-hash-join-builder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.h 
b/be/src/exec/partitioned-hash-join-builder.h
index 48f4c88..b133be6 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -247,6 +247,7 @@ class PhjBuilder : public DataSink {
   /// For NAAJ, we need 3 additional buffers for 'null_aware_partition_',
   /// 'null_aware_probe_partition_' and 'null_probe_rows_'.
   int MinRequiredBuffers() const {
+    // Must be kept in sync with HashJoinNode.computeResourceProfile() in fe.
     int num_reserved_buffers = PARTITION_FANOUT + 1;
     if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) num_reserved_buffers 
+= 3;
     return num_reserved_buffers;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index 01554b8..1acdf15 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -1376,6 +1376,7 @@ Status Sorter::Init() {
   in_mem_sort_timer_ = ADD_TIMER(profile_, "InMemorySortTime");
   sorted_data_size_ = ADD_COUNTER(profile_, "SortDataSize", TUnit::BYTES);
 
+  // Must be kept in sync with SortNode.computeResourceProfile() in fe.
   int min_buffers_required = MIN_BUFFERS_PER_MERGE;
   // Fixed and var-length blocks are separate, so we need MIN_BUFFERS_PER_MERGE
   // blocks for both if there is var-length data.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/be/src/scheduling/query-schedule.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.cc 
b/be/src/scheduling/query-schedule.cc
index 668c9c8..bb44145 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -190,13 +190,6 @@ int64_t QuerySchedule::GetPerHostMemoryEstimate() const {
     has_query_option = true;
   }
 
-  int64_t estimate_limit = numeric_limits<int64_t>::max();
-  bool has_estimate = false;
-  if (request_.__isset.per_host_mem_req && request_.per_host_mem_req > 0) {
-    estimate_limit = request_.per_host_mem_req;
-    has_estimate = true;
-  }
-
   int64_t per_host_mem = 0L;
   // TODO: Remove rm_initial_mem and associated logic when we're sure that 
clients won't
   // be affected.
@@ -204,13 +197,9 @@ int64_t QuerySchedule::GetPerHostMemoryEstimate() const {
     per_host_mem = query_options_.rm_initial_mem;
   } else if (has_query_option) {
     per_host_mem = query_option_memory_limit;
-  } else if (has_estimate) {
-    per_host_mem = estimate_limit;
   } else {
-    // If no estimate or query option, use the server-side limits anyhow.
-    bool ignored;
-    per_host_mem = ParseUtil::ParseMemSpec(FLAGS_rm_default_memory,
-        &ignored, 0);
+    DCHECK(request_.__isset.per_host_mem_estimate);
+    per_host_mem = request_.per_host_mem_estimate;
   }
   // Cap the memory estimate at the amount of physical memory available. The 
user's
   // provided value or the estimate from planning can each be unreasonable.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc 
b/be/src/service/query-exec-state.cc
index f704676..72dba6f 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -58,7 +58,7 @@ namespace impala {
 // Keys into the info string map of the runtime profile referring to specific
 // items used by CM for monitoring purposes.
 static const string PER_HOST_MEM_KEY = "Estimated Per-Host Mem";
-static const string PER_HOST_VCORES_KEY = "Estimated Per-Host VCores";
+static const string PER_HOST_MEMORY_RESERVATION_KEY = "Per-Host Memory 
Reservation";
 static const string TABLES_MISSING_STATS_KEY = "Tables Missing Stats";
 static const string TABLES_WITH_CORRUPT_STATS_KEY = "Tables With Corrupt Table 
Stats";
 static const string TABLES_WITH_MISSING_DISK_IDS_KEY = "Tables With Missing 
Disk Ids";
@@ -394,16 +394,16 @@ Status 
ImpalaServer::QueryExecState::ExecQueryOrDmlRequest(
             << "----------------";
     summary_profile_.AddInfoString("Plan", plan_ss.str());
   }
-  // Add info strings consumed by CM: Estimated mem/vcores and tables missing 
stats.
-  if (query_exec_request.__isset.per_host_mem_req) {
+  // Add info strings consumed by CM: Estimated mem and tables missing stats.
+  if (query_exec_request.__isset.per_host_mem_estimate) {
     stringstream ss;
-    ss << query_exec_request.per_host_mem_req;
+    ss << query_exec_request.per_host_mem_estimate;
     summary_profile_.AddInfoString(PER_HOST_MEM_KEY, ss.str());
   }
-  if (query_exec_request.__isset.per_host_vcores) {
+  if (query_exec_request.__isset.per_host_min_reservation) {
     stringstream ss;
-    ss << query_exec_request.per_host_vcores;
-    summary_profile_.AddInfoString(PER_HOST_VCORES_KEY, ss.str());
+    ss << query_exec_request.per_host_min_reservation;
+    summary_profile_.AddInfoString(PER_HOST_MEMORY_RESERVATION_KEY, ss.str());
   }
   if (!query_exec_request.query_ctx.__isset.parent_query_id &&
       query_exec_request.query_ctx.__isset.tables_missing_stats &&

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/common/thrift/Frontend.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index 4cef592..4337851 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -387,12 +387,10 @@ struct TQueryExecRequest {
   7: required Types.TStmtType stmt_type
 
   // Estimated per-host peak memory consumption in bytes. Used for resource 
management.
-  8: optional i64 per_host_mem_req
+  8: optional i64 per_host_mem_estimate
 
-  // Estimated per-host CPU requirements in YARN virtual cores.
-  // Used for resource management.
-  // TODO: Remove this and associated code in Planner.
-  9: optional i16 per_host_vcores
+  // Minimum buffer reservation required per host in bytes.
+  9: optional i64 per_host_min_reservation;
 
   // List of replica hosts.  Used by the host_idx field of TScanRangeLocation.
   10: required list<Types.TNetworkAddress> host_list

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java 
b/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java
index 3e12ee1..4f8b4fc 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java
@@ -20,10 +20,10 @@ package org.apache.impala.analysis;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.impala.catalog.AggregateFunction;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.InternalException;
-import org.apache.impala.planner.DataPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -700,6 +700,17 @@ public class AggregateInfo extends AggregateInfoBase {
     }
   }
 
+  /// Return true if any aggregate functions have a serialize function.
+  /// Only valid to call once analyzed.
+  public boolean needsSerialize() {
+    for (FunctionCallExpr aggregateExpr: aggregateExprs_) {
+      Preconditions.checkState(aggregateExpr.isAnalyzed());
+      AggregateFunction fn = (AggregateFunction)aggregateExpr.getFn();
+      if (fn.getSerializeFnSymbol() != null) return true;
+    }
+    return false;
+  }
+
   @Override
   public String debugString() {
     StringBuilder out = new StringBuilder(super.debugString());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java 
b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
index 2e501c1..6ec2d26 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
@@ -347,4 +347,14 @@ public class TupleDescriptor {
     }
     return partitionSlots;
   }
+
+  /**
+   * Returns true if the tuple has any variable-length slots.
+   */
+  public boolean hasVarLenSlots() {
+    for (SlotDescriptor slot: slots_) {
+      if (!slot.getType().isFixedLengthType()) return true;
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/common/PrintUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/PrintUtils.java 
b/fe/src/main/java/org/apache/impala/common/PrintUtils.java
index 3062161..77d77dd 100644
--- a/fe/src/main/java/org/apache/impala/common/PrintUtils.java
+++ b/fe/src/main/java/org/apache/impala/common/PrintUtils.java
@@ -26,6 +26,7 @@ import static org.apache.impala.common.ByteUnits.TERABYTE;
 import java.text.DecimalFormat;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.impala.planner.PlanFragmentId;
 
 /**
  * Utility functions for pretty printing.
@@ -51,20 +52,19 @@ public class PrintUtils {
         ((cardinality != -1) ? String.valueOf(cardinality) : "unavailable");
   }
 
-  public static String printHosts(String prefix, long numHosts) {
+  public static String printNumHosts(String prefix, long numHosts) {
     return prefix + "hosts=" + ((numHosts != -1) ? numHosts : "unavailable");
   }
 
-  public static String printMemCost(String prefix, long perHostMemCost) {
-    return prefix + "per-host-mem=" +
-        ((perHostMemCost != -1) ? printBytes(perHostMemCost) : "unavailable");
+  public static String printNumInstances(String prefix, long numInstances) {
+    return prefix + "instances=" + ((numInstances != -1) ? numInstances : 
"unavailable");
   }
 
   /**
    * Prints the given square matrix into matrixStr. Separates cells by 
cellSpacing.
    */
-  public static void printMatrix(boolean[][] matrix, int cellSpacing,
-      StringBuilder matrixStr) {
+  public static void printMatrix(
+      boolean[][] matrix, int cellSpacing, StringBuilder matrixStr) {
     // Print labels.
     matrixStr.append(StringUtils.repeat(' ', cellSpacing));
     String formatStr = "%Xd".replace("X", String.valueOf(cellSpacing));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java 
b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
index 07c51f1..5cfce82 100644
--- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
@@ -48,9 +48,9 @@ import com.google.common.collect.Sets;
 public class AggregationNode extends PlanNode {
   private final static Logger LOG = 
LoggerFactory.getLogger(AggregationNode.class);
 
-  // Default per-host memory requirement used if no valid stats are available.
+  // Default per-instance memory requirement used if no valid stats are 
available.
   // TODO: Come up with a more useful heuristic.
-  private final static long DEFAULT_PER_HOST_MEM = 128L * 1024L * 1024L;
+  private final static long DEFAULT_PER_INSTANCE_MEM = 128L * 1024L * 1024L;
 
   // Conservative minimum size of hash table for low-cardinality aggregations.
   private final static long MIN_HASH_TBL_MEM = 10L * 1024L * 1024L;
@@ -278,21 +278,33 @@ public class AggregationNode extends PlanNode {
   }
 
   @Override
-  public void computeCosts(TQueryOptions queryOptions) {
-    Preconditions.checkNotNull(fragment_,
-        "PlanNode must be placed into a fragment before calling this method.");
-    perHostMemCost_ = 0;
-    long perHostCardinality = 
fragment_.getNumDistinctValues(aggInfo_.getGroupingExprs());
-    if (perHostCardinality == -1) {
-      perHostMemCost_ = DEFAULT_PER_HOST_MEM;
-      return;
+  public void computeResourceProfile(TQueryOptions queryOptions) {
+    Preconditions.checkNotNull(
+        fragment_, "PlanNode must be placed into a fragment before calling 
this method.");
+    // Must be kept in sync with 
PartitionedAggregationNode::MinRequiredBuffers() in be.
+    long perInstanceMinBuffers;
+    if (aggInfo_.getGroupingExprs().isEmpty() || useStreamingPreagg_) {
+      perInstanceMinBuffers = 0;
+    } else {
+      final int PARTITION_FANOUT = 16;
+      long minBuffers = 2 * PARTITION_FANOUT + 1 + (aggInfo_.needsSerialize() 
? 1 : 0);
+      perInstanceMinBuffers = SPILLABLE_BUFFER_BYTES * minBuffers;
     }
 
-    // Per-host cardinality cannot be greater than the total output 
cardinality.
+    long perInstanceCardinality = fragment_.getPerInstanceNdv(
+        queryOptions.getMt_dop(), aggInfo_.getGroupingExprs());
+    if (perInstanceCardinality == -1) {
+      resourceProfile_ =
+          new ResourceProfile(DEFAULT_PER_INSTANCE_MEM, perInstanceMinBuffers);
+      return;
+    }
+    // Per-instance cardinality cannot be greater than the total output 
cardinality.
     if (cardinality_ != -1) {
-      perHostCardinality = Math.min(perHostCardinality, cardinality_);
+      perInstanceCardinality = Math.min(perInstanceCardinality, cardinality_);
     }
-    perHostMemCost_ += Math.max(perHostCardinality * avgRowSize_ *
+    long perInstanceMemEstimate = (long)Math.max(perInstanceCardinality * 
avgRowSize_ *
         PlannerContext.HASH_TBL_SPACE_OVERHEAD, MIN_HASH_TBL_MEM);
+    resourceProfile_ =
+        new ResourceProfile(perInstanceMemEstimate, perInstanceMinBuffers);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java 
b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
index 408680b..e0981c7 100644
--- a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
@@ -242,10 +242,14 @@ public class AnalyticEvalNode extends PlanNode {
   }
 
   @Override
-  public void computeCosts(TQueryOptions queryOptions) {
-    Preconditions.checkNotNull(fragment_,
-        "PlanNode must be placed into a fragment before calling this method.");
+  public void computeResourceProfile(TQueryOptions queryOptions) {
+    Preconditions.checkNotNull(
+        fragment_, "PlanNode must be placed into a fragment before calling 
this method.");
     // TODO: come up with estimate based on window
-    perHostMemCost_ = 0;
+    long perInstanceMemEstimate = 0;
+
+    // Must be kept in sync with MIN_REQUIRED_BUFFERS in AnalyticEvalNode in 
be.
+    long perInstanceMinBufferBytes = 2 * SPILLABLE_BUFFER_BYTES;
+    resourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 
perInstanceMinBufferBytes);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/DataSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DataSink.java 
b/fe/src/main/java/org/apache/impala/planner/DataSink.java
index b1977f4..7fc0c83 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataSink.java
@@ -17,15 +17,9 @@
 
 package org.apache.impala.planner;
 
-import java.util.List;
-
-import org.apache.impala.analysis.Expr;
-import org.apache.impala.catalog.HBaseTable;
-import org.apache.impala.catalog.HdfsTable;
-import org.apache.impala.catalog.KuduTable;
-import org.apache.impala.catalog.Table;
 import org.apache.impala.thrift.TDataSink;
 import org.apache.impala.thrift.TExplainLevel;
+import org.apache.impala.thrift.TQueryOptions;
 
 /**
  * A DataSink describes the destination of a plan fragment's output rows.
@@ -35,30 +29,44 @@ import org.apache.impala.thrift.TExplainLevel;
  */
 public abstract class DataSink {
 
-  // estimated per-host memory requirement for sink;
-  // set in computeCosts(); invalid: -1
-  protected long perHostMemCost_ = -1;
-
   // Fragment that this DataSink belongs to. Set by the PlanFragment enclosing 
this sink.
   protected PlanFragment fragment_;
 
+  // resource requirements and estimates for this plan node.
+  // set in computeResourceProfile()
+  protected ResourceProfile resourceProfile_ = null;
+
   /**
    * Return an explain string for the DataSink. Each line of the explain will 
be prefixed
    * by "prefix".
    */
-  public abstract String getExplainString(String prefix, String detailPrefix,
-      TExplainLevel explainLevel);
+  public final String getExplainString(String prefix, String detailPrefix,
+      TQueryOptions queryOptions, TExplainLevel explainLevel) {
+    StringBuilder output = new StringBuilder();
+    appendSinkExplainString(prefix, detailPrefix, queryOptions, explainLevel, 
output);
+    if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
+      output.append(detailPrefix);
+      output.append(resourceProfile_.getExplainString());
+      output.append("\n");
+    }
+    return output.toString();
+  }
+
+  /**
+   * Append the node-specific lines of the explain string to "output".
+   */
+  abstract protected void appendSinkExplainString(String prefix, String 
detailPrefix,
+      TQueryOptions queryOptions, TExplainLevel explainLevel, StringBuilder 
output);
 
   protected abstract TDataSink toThrift();
 
   public void setFragment(PlanFragment fragment) { fragment_ = fragment; }
   public PlanFragment getFragment() { return fragment_; }
-  public long getPerHostMemCost() { return perHostMemCost_; }
+  public ResourceProfile getResourceProfile() { return resourceProfile_; }
 
   /**
-   * Estimates the cost of executing this DataSink. Currently only sets 
perHostMemCost.
+   * Compute the resource profile for an instance of this DataSink.
    */
-  public void computeCosts() {
-    perHostMemCost_ = 0;
-  }
+  public abstract void computeResourceProfile(TQueryOptions queryOptions);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
index 89f8377..ab80439 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
@@ -331,9 +331,9 @@ public class DataSourceScanNode extends ScanNode {
   }
 
   @Override
-  public void computeCosts(TQueryOptions queryOptions) {
+  public void computeResourceProfile(TQueryOptions queryOptions) {
     // TODO: What's a good estimate of memory consumption?
-    perHostMemCost_ = 1024L * 1024L * 1024L;
+    resourceProfile_ = new ResourceProfile(1024L * 1024L * 1024L, 0);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java 
b/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
index 952215e..d1369f5 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
@@ -21,6 +21,8 @@ import org.apache.impala.thrift.TDataSink;
 import org.apache.impala.thrift.TDataSinkType;
 import org.apache.impala.thrift.TDataStreamSink;
 import org.apache.impala.thrift.TExplainLevel;
+import org.apache.impala.thrift.TQueryOptions;
+
 import com.google.common.base.Preconditions;
 
 /**
@@ -38,14 +40,18 @@ public class DataStreamSink extends DataSink {
   }
 
   @Override
-  public String getExplainString(String prefix, String detailPrefix,
-      TExplainLevel detailLevel) {
-    StringBuilder output = new StringBuilder();
+  public void appendSinkExplainString(String prefix, String detailPrefix,
+      TQueryOptions queryOptions, TExplainLevel detailLevel, StringBuilder 
output) {
     output.append(
         String.format("%sDATASTREAM SINK [FRAGMENT=%s, EXCHANGE=%s, %s]",
         prefix, exchNode_.getFragment().getId().toString(),
         exchNode_.getId().toString(), exchNode_.getDisplayLabelDetail()));
-    return output.toString();
+    output.append("\n");
+  }
+
+  @Override
+  public void computeResourceProfile(TQueryOptions queryOptions) {
+    resourceProfile_ = new ResourceProfile(0, 0);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index 8fe3cf9..85a8ab8 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -19,7 +19,6 @@ package org.apache.impala.planner;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.impala.analysis.AggregateInfo;
 import org.apache.impala.analysis.AnalysisContext;
@@ -28,7 +27,6 @@ import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.InsertStmt;
 import org.apache.impala.analysis.JoinOperator;
 import org.apache.impala.analysis.QueryStmt;
-import org.apache.impala.analysis.TupleId;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.planner.JoinNode.DistributionMode;
@@ -38,8 +36,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 
 /**
  * The distributed planner is responsible for creating an executable, 
distributed plan
@@ -155,8 +151,8 @@ public class DistributedPlanner {
       result = new PlanFragment(
           ctx_.getNextFragmentId(), root, DataPartition.UNPARTITIONED);
     } else {
-      throw new InternalException(
-          "Cannot create plan fragment for this node type: " + 
root.getExplainString());
+      throw new InternalException("Cannot create plan fragment for this node 
type: "
+          + root.getExplainString(ctx_.getQueryOptions()));
     }
     // move 'result' to end, it depends on all of its children
     fragments.remove(result);
@@ -478,7 +474,7 @@ public class DistributedPlanner {
           + Float.toString(lhsTree.getAvgRowSize()));
       LOG.trace("rhs card=" + Long.toString(rhsTree.getCardinality()) + " 
row_size="
           + Float.toString(rhsTree.getAvgRowSize()));
-      LOG.trace(rhsTree.getExplainString());
+      LOG.trace(rhsTree.getExplainString(ctx_.getQueryOptions()));
     }
 
     boolean doBroadcast = false;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java 
b/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
index 2b46bd9..174051d 100644
--- a/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
@@ -24,6 +24,8 @@ import org.apache.impala.analysis.TupleId;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
+import org.apache.impala.thrift.TQueryOptions;
+
 import com.google.common.base.Preconditions;
 
 /**
@@ -42,7 +44,6 @@ public class EmptySetNode extends PlanNode {
   public void computeStats(Analyzer analyzer) {
     avgRowSize_ = 0;
     cardinality_ = 0;
-    perHostMemCost_ = 0;
     numNodes_ = 1;
   }
 
@@ -59,6 +60,12 @@ public class EmptySetNode extends PlanNode {
   }
 
   @Override
+  public void computeResourceProfile(TQueryOptions queryOptions) {
+    // TODO: add an estimate
+    resourceProfile_ = new ResourceProfile(0, 0);
+  }
+
+  @Override
   protected String getNodeExplainString(String prefix, String detailPrefix,
       TExplainLevel detailLevel) {
     return String.format("%s%s:%s\n", prefix, id_.toString(), displayName_);
@@ -68,4 +75,5 @@ public class EmptySetNode extends PlanNode {
   protected void toThrift(TPlanNode msg) {
     msg.node_type = TPlanNodeType.EMPTY_SET_NODE;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java 
b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
index 32673ae..d3997b8 100644
--- a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
@@ -29,6 +29,7 @@ import org.apache.impala.thrift.TExchangeNode;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
+import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TSortInfo;
 import com.google.common.base.Preconditions;
 
@@ -184,6 +185,12 @@ public class ExchangeNode extends PlanNode {
   }
 
   @Override
+  public void computeResourceProfile(TQueryOptions queryOptions) {
+    // TODO: add an estimate
+    resourceProfile_ =  new ResourceProfile(0, 0);
+  }
+
+  @Override
   protected void toThrift(TPlanNode msg) {
     msg.node_type = TPlanNodeType.EXCHANGE_NODE;
     msg.exchange_node = new TExchangeNode();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
index aea3750..223362f 100644
--- a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
@@ -494,9 +494,9 @@ public class HBaseScanNode extends ScanNode {
   }
 
   @Override
-  public void computeCosts(TQueryOptions queryOptions) {
+  public void computeResourceProfile(TQueryOptions queryOptions) {
     // TODO: What's a good estimate of memory consumption?
-    perHostMemCost_ = 1024L * 1024L * 1024L;
+    resourceProfile_ =  new ResourceProfile(1024L * 1024L * 1024L, 0);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java 
b/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java
index 1d7994b..947665e 100644
--- a/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java
@@ -20,10 +20,10 @@ package org.apache.impala.planner;
 
 import org.apache.impala.analysis.DescriptorTable;
 import org.apache.impala.catalog.Table;
-import org.apache.impala.common.PrintUtils;
 import org.apache.impala.thrift.TDataSink;
 import org.apache.impala.thrift.TDataSinkType;
 import org.apache.impala.thrift.TExplainLevel;
+import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TTableSink;
 import org.apache.impala.thrift.TTableSinkType;
 
@@ -37,16 +37,14 @@ public class HBaseTableSink extends TableSink {
   }
 
   @Override
-  public String getExplainString(String prefix, String detailPrefix,
-      TExplainLevel explainLevel) {
-    StringBuilder output = new StringBuilder();
+  public void appendSinkExplainString(String prefix, String detailPrefix,
+      TQueryOptions queryOptions, TExplainLevel explainLevel, StringBuilder 
output) {
     output.append(prefix + "WRITE TO HBASE table=" + 
targetTable_.getFullName() + "\n");
-    if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
-      output.append(PrintUtils.printHosts(detailPrefix, 
fragment_.getNumNodes()));
-      output.append(PrintUtils.printMemCost(" ", perHostMemCost_));
-      output.append("\n");
-    }
-    return output.toString();
+  }
+
+  @Override
+  public void computeResourceProfile(TQueryOptions queryOptions) {
+    resourceProfile_ = new ResourceProfile(0, 0);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java 
b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
index 008cd5c..f819513 100644
--- a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
@@ -19,9 +19,6 @@ package org.apache.impala.planner;
 
 import java.util.List;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.BinaryPredicate;
 import org.apache.impala.analysis.Expr;
@@ -57,6 +54,9 @@ public class HashJoinNode extends JoinNode {
   }
 
   @Override
+  public boolean isBlockingJoinNode() { return true; }
+
+  @Override
   public List<BinaryPredicate> getEqJoinConjuncts() { return eqJoinConjuncts_; 
}
 
   @Override
@@ -177,15 +177,24 @@ public class HashJoinNode extends JoinNode {
   }
 
   @Override
-  public void computeCosts(TQueryOptions queryOptions) {
+  public void computeResourceProfile(TQueryOptions queryOptions) {
+    // Must be kept in sync with 
PartitionedHashJoinBuilder::MinRequiredBuffers() in be.
+    final int PARTITION_FANOUT = 16;
+    long minBuffers = PARTITION_FANOUT + 1
+        + (joinOp_ == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN ? 3 : 0);
+    long perInstanceMinBufferBytes = SPILLABLE_BUFFER_BYTES * minBuffers;
+
+    long perInstanceMemEstimate;
     if (getChild(1).getCardinality() == -1 || getChild(1).getAvgRowSize() == -1
         || numNodes_ == 0) {
-      perHostMemCost_ = DEFAULT_PER_HOST_MEM;
-      return;
+      perInstanceMemEstimate = DEFAULT_PER_INSTANCE_MEM;
+    } else {
+      perInstanceMemEstimate = (long) Math.ceil(getChild(1).cardinality_
+          * getChild(1).avgRowSize_ * PlannerContext.HASH_TBL_SPACE_OVERHEAD);
+      if (distrMode_ == DistributionMode.PARTITIONED) {
+        perInstanceMemEstimate /= 
fragment_.getNumInstances(queryOptions.getMt_dop());
+      }
     }
-    perHostMemCost_ =
-        (long) Math.ceil(getChild(1).cardinality_ * getChild(1).avgRowSize_
-          * PlannerContext.HASH_TBL_SPACE_OVERHEAD);
-    if (distrMode_ == DistributionMode.PARTITIONED) perHostMemCost_ /= 
numNodes_;
+    resourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 
perInstanceMinBufferBytes);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index cb6627f..9ff1a6b 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -32,7 +32,6 @@ import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.BinaryPredicate;
 import org.apache.impala.analysis.DescriptorTable;
 import org.apache.impala.analysis.Expr;
-import org.apache.impala.analysis.FunctionCallExpr;
 import org.apache.impala.analysis.SlotDescriptor;
 import org.apache.impala.analysis.SlotId;
 import org.apache.impala.analysis.SlotRef;
@@ -51,6 +50,7 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.NotImplementedException;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.RuntimeEnv;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TExpr;
 import org.apache.impala.thrift.THdfsFileBlock;
@@ -72,7 +72,6 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.base.Objects.ToStringHelper;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -82,7 +81,7 @@ import com.google.common.collect.Sets;
  *
  * It's expected that the creator of this object has already done any necessary
  * partition pruning before creating this object. In other words, the 
'conjuncts'
- * passed to the constructors are conjucts not fully evaluated by partition 
pruning
+ * passed to the constructors are conjuncts not fully evaluated by partition 
pruning
  * and 'partitions' are the remaining partitions after pruning.
  *
  * For scans of tables with Parquet files the class creates an additional list 
of
@@ -95,14 +94,13 @@ import com.google.common.collect.Sets;
 public class HdfsScanNode extends ScanNode {
   private final static Logger LOG = 
LoggerFactory.getLogger(HdfsScanNode.class);
 
-  // Read size of the backend I/O manager. Used in computeCosts().
-  private final static long IO_MGR_BUFFER_SIZE = 8L * 1024L * 1024L;
-
   // Maximum number of I/O buffers per thread executing this scan.
+  // TODO: it's unclear how this was chosen - this seems like a very high 
number
   private final static long MAX_IO_BUFFERS_PER_THREAD = 10;
 
-  // Number of scanner threads per core executing this scan.
-  private final static int THREADS_PER_CORE = 3;
+  // Maximum number of thread tokens per core that may be used to spin up 
extra scanner
+  // threads. Corresponds to the default value of --num_threads_per_core in 
the backend.
+  private final static int MAX_THREAD_TOKENS_PER_CORE = 3;
 
   // Factor capturing the worst-case deviation from a uniform distribution of 
scan ranges
   // among nodes. The factor of 1.2 means that a particular node may have 20% 
more
@@ -808,10 +806,10 @@ public class HdfsScanNode extends ScanNode {
   }
 
   @Override
-  public void computeCosts(TQueryOptions queryOptions) {
+  public void computeResourceProfile(TQueryOptions queryOptions) {
     Preconditions.checkNotNull(scanRanges_, "Cost estimation requires scan 
ranges.");
     if (scanRanges_.isEmpty()) {
-      perHostMemCost_ = 0;
+      resourceProfile_ = new ResourceProfile(0, 0);
       return;
     }
     Preconditions.checkState(0 < numNodes_ && numNodes_ <= scanRanges_.size());
@@ -834,34 +832,37 @@ public class HdfsScanNode extends ScanNode {
           (double) scanRanges_.size() / (double) numNodes_) * 
SCAN_RANGE_SKEW_FACTOR);
     }
 
-    // TODO: The total memory consumption for a particular query depends on 
the number
-    // of *available* cores, i.e., it depends the resource consumption of other
-    // concurrent queries. Figure out how to account for that.
-    int maxScannerThreads = Math.min(perHostScanRanges,
-        RuntimeEnv.INSTANCE.getNumCores() * THREADS_PER_CORE);
-    // Account for the max scanner threads query option.
-    if (queryOptions.isSetNum_scanner_threads() &&
-        queryOptions.getNum_scanner_threads() > 0) {
-      maxScannerThreads =
-          Math.min(maxScannerThreads, queryOptions.getNum_scanner_threads());
+    int maxScannerThreads;
+    if (queryOptions.getMt_dop() >= 1) {
+      maxScannerThreads = 1;
+    } else {
+      maxScannerThreads = Math.min(perHostScanRanges, 
RuntimeEnv.INSTANCE.getNumCores());
+      // Account for the max scanner threads query option.
+      if (queryOptions.isSetNum_scanner_threads() &&
+          queryOptions.getNum_scanner_threads() > 0) {
+        maxScannerThreads =
+            Math.min(maxScannerThreads, queryOptions.getNum_scanner_threads());
+      }
     }
 
     long avgScanRangeBytes = (long) Math.ceil(totalBytes_ / (double) 
scanRanges_.size());
     // The +1 accounts for an extra I/O buffer to read past the scan range due 
to a
     // trailing record spanning Hdfs blocks.
+    long readSize = BackendConfig.INSTANCE.getReadSize();
     long perThreadIoBuffers =
-        Math.min((long) Math.ceil(avgScanRangeBytes / (double) 
IO_MGR_BUFFER_SIZE),
+        Math.min((long) Math.ceil(avgScanRangeBytes / (double) readSize),
             MAX_IO_BUFFERS_PER_THREAD) + 1;
-    perHostMemCost_ = maxScannerThreads * perThreadIoBuffers * 
IO_MGR_BUFFER_SIZE;
+    long perInstanceMemEstimate = maxScannerThreads * perThreadIoBuffers * 
readSize;
 
     // Sanity check: the tighter estimation should not exceed the per-host 
maximum.
     long perHostUpperBound = getPerHostMemUpperBound();
-    if (perHostMemCost_ > perHostUpperBound) {
-      LOG.warn(String.format("Per-host mem cost %s exceeded per-host upper 
bound %s.",
-          PrintUtils.printBytes(perHostMemCost_),
+    if (perInstanceMemEstimate > perHostUpperBound) {
+      LOG.warn(String.format("Per-instance mem cost %s exceeded per-host upper 
bound %s.",
+          PrintUtils.printBytes(perInstanceMemEstimate),
           PrintUtils.printBytes(perHostUpperBound)));
-      perHostMemCost_ = perHostUpperBound;
+      perInstanceMemEstimate = perHostUpperBound;
     }
+    resourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 0);
   }
 
   /**
@@ -873,9 +874,9 @@ public class HdfsScanNode extends ScanNode {
    */
   public static long getPerHostMemUpperBound() {
     // THREADS_PER_CORE each using a default of
-    // MAX_IO_BUFFERS_PER_THREAD * IO_MGR_BUFFER_SIZE bytes.
-    return (long) RuntimeEnv.INSTANCE.getNumCores() * (long) THREADS_PER_CORE *
-        MAX_IO_BUFFERS_PER_THREAD * IO_MGR_BUFFER_SIZE;
+    // MAX_IO_BUFFERS_PER_THREAD * read_size bytes.
+    return (long) RuntimeEnv.INSTANCE.getNumCores() * (long) 
MAX_THREAD_TOKENS_PER_CORE *
+        MAX_IO_BUFFERS_PER_THREAD * BackendConfig.INSTANCE.getReadSize();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
index 996f981..8dc9f62 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
@@ -24,11 +24,11 @@ import org.apache.impala.analysis.Expr;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.Table;
-import org.apache.impala.common.PrintUtils;
 import org.apache.impala.thrift.TDataSink;
 import org.apache.impala.thrift.TDataSinkType;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.THdfsTableSink;
+import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TTableSink;
 import org.apache.impala.thrift.TTableSinkType;
 import com.google.common.base.Preconditions;
@@ -39,7 +39,8 @@ import com.google.common.collect.Lists;
  *
  */
 public class HdfsTableSink extends TableSink {
-  // Default number of partitions used for computeCosts() in the absence of 
column stats.
+  // Default number of partitions used for computeResourceProfile() in the 
absence of
+  // column stats.
   protected final long DEFAULT_NUM_PARTITIONS = 10;
 
   // Exprs for computing the output partition(s).
@@ -67,31 +68,37 @@ public class HdfsTableSink extends TableSink {
   }
 
   @Override
-  public void computeCosts() {
+  public void computeResourceProfile(TQueryOptions queryOptions) {
     HdfsTable table = (HdfsTable) targetTable_;
     // TODO: Estimate the memory requirements more accurately by partition 
type.
     HdfsFileFormat format = table.getMajorityFormat();
     PlanNode inputNode = fragment_.getPlanRoot();
-    int numNodes = fragment_.getNumNodes();
-    // Compute the per-host number of partitions, taking the number of nodes
+    int numInstances = fragment_.getNumInstances(queryOptions.getMt_dop());
+    // Compute the per-instance number of partitions, taking the number of 
nodes
     // and the data partition of the fragment executing this sink into account.
-    long numPartitions = fragment_.getNumDistinctValues(partitionKeyExprs_);
-    if (numPartitions == -1) numPartitions = DEFAULT_NUM_PARTITIONS;
+    long numPartitionsPerInstance =
+        fragment_.getPerInstanceNdv(queryOptions.getMt_dop(), 
partitionKeyExprs_);
+    if (numPartitionsPerInstance == -1) {
+      numPartitionsPerInstance = DEFAULT_NUM_PARTITIONS;
+    }
     long perPartitionMemReq = getPerPartitionMemReq(format);
 
+    long perInstanceMemEstimate;
     // The estimate is based purely on the per-partition mem req if the input 
cardinality_
     // or the avg row size is unknown.
     if (inputNode.getCardinality() == -1 || inputNode.getAvgRowSize() == -1) {
-      perHostMemCost_ = numPartitions * perPartitionMemReq;
-      return;
+      perInstanceMemEstimate = numPartitionsPerInstance * perPartitionMemReq;
+    } else {
+      // The per-partition estimate may be higher than the memory required to 
buffer
+      // the entire input data.
+      long perInstanceInputCardinality =
+          Math.max(1L, inputNode.getCardinality() / numInstances);
+      long perInstanceInputBytes =
+          (long) Math.ceil(perInstanceInputCardinality * 
inputNode.getAvgRowSize());
+      perInstanceMemEstimate =
+          Math.min(perInstanceInputBytes, numPartitionsPerInstance * 
perPartitionMemReq);
     }
-
-    // The per-partition estimate may be higher than the memory required to 
buffer
-    // the entire input data.
-    long perHostInputCardinality = Math.max(1L, inputNode.getCardinality() / 
numNodes);
-    long perHostInputBytes =
-        (long) Math.ceil(perHostInputCardinality * inputNode.getAvgRowSize());
-    perHostMemCost_ = Math.min(perHostInputBytes, numPartitions * 
perPartitionMemReq);
+    resourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 0);
   }
 
   /**
@@ -100,10 +107,19 @@ public class HdfsTableSink extends TableSink {
    */
   private long getPerPartitionMemReq(HdfsFileFormat format) {
     switch (format) {
-      // Writing to a Parquet table requires up to 1GB of buffer per partition.
-      // TODO: The per-partition memory requirement is configurable in the 
QueryOptions.
-      case PARQUET: return 1024L * 1024L * 1024L;
-      case TEXT: return 100L * 1024L;
+      case PARQUET:
+        // Writing to a Parquet table requires up to 1GB of buffer per 
partition.
+        // TODO: The per-partition memory requirement is configurable in the 
QueryOptions.
+        return 1024L * 1024L * 1024L;
+      case TEXT:
+      case LZO_TEXT:
+        // Very approximate estimate of amount of data buffered.
+        return 100L * 1024L;
+      case RC_FILE:
+      case SEQUENCE_FILE:
+      case AVRO:
+        // Very approximate estimate of amount of data buffered.
+        return 100L * 1024L;
       default:
         Preconditions.checkState(false, "Unsupported TableSink format " +
             format.toString());
@@ -112,9 +128,8 @@ public class HdfsTableSink extends TableSink {
   }
 
   @Override
-  public String getExplainString(String prefix, String detailPrefix,
-      TExplainLevel explainLevel) {
-    StringBuilder output = new StringBuilder();
+  public void appendSinkExplainString(String prefix, String detailPrefix,
+      TQueryOptions queryOptions, TExplainLevel explainLevel, StringBuilder 
output) {
     String overwriteStr = ", OVERWRITE=" + (overwrite_ ? "true" : "false");
     String partitionKeyStr = "";
     if (!partitionKeyExprs_.isEmpty()) {
@@ -139,13 +154,7 @@ public class HdfsTableSink extends TableSink {
             + (totalNumPartitions == 0 ? 1 : totalNumPartitions));
       }
       output.append("\n");
-      if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
-        output.append(PrintUtils.printHosts(detailPrefix, 
fragment_.getNumNodes()));
-        output.append(PrintUtils.printMemCost(" ", perHostMemCost_));
-        output.append("\n");
-      }
     }
-    return output.toString();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java 
b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
index 7afced6..69cc133 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
@@ -19,21 +19,13 @@ package org.apache.impala.planner;
 
 import java.util.List;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.BinaryPredicate;
 import org.apache.impala.analysis.Expr;
-import org.apache.impala.common.ImpalaException;
 import org.apache.impala.thrift.TDataSink;
 import org.apache.impala.thrift.TDataSinkType;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TJoinBuildSink;
-import org.apache.impala.thrift.TPlanNode;
-import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
-import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
@@ -77,9 +69,8 @@ public class JoinBuildSink extends DataSink {
   }
 
   @Override
-  public String getExplainString(String prefix, String detailPrefix,
-      TExplainLevel detailLevel) {
-    StringBuilder output = new StringBuilder();
+  public void appendSinkExplainString(String prefix, String detailPrefix,
+      TQueryOptions queryOptions, TExplainLevel detailLevel, StringBuilder 
output) {
     output.append(String.format("%s%s\n", prefix, "JOIN BUILD"));
     if (detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) {
       output.append(
@@ -91,11 +82,11 @@ public class JoinBuildSink extends DataSink {
             .append(Expr.toSql(buildExprs_) + "\n");
       }
     }
-    return output.toString();
   }
 
   @Override
-  public void computeCosts() {
-    // TODO: implement?
+  public void computeResourceProfile(TQueryOptions queryOptions) {
+    // The memory consumption is counted against the join PlanNode.
+    resourceProfile_ = new ResourceProfile(0, 0);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/JoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinNode.java 
b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
index 8e963ff..b40ef55 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
@@ -41,9 +41,9 @@ import com.google.common.base.Preconditions;
 public abstract class JoinNode extends PlanNode {
   private final static Logger LOG = LoggerFactory.getLogger(JoinNode.class);
 
-  // Default per-host memory requirement used if no valid stats are available.
+  // Default per-instance memory requirement used if no valid stats are 
available.
   // TODO: Come up with a more useful heuristic (e.g., based on scanned 
partitions).
-  protected final static long DEFAULT_PER_HOST_MEM = 2L * 1024L * 1024L * 
1024L;
+  protected final static long DEFAULT_PER_INSTANCE_MEM = 2L * 1024L * 1024L * 
1024L;
 
   // Slop in percent allowed when comparing stats for the purpose of 
determining whether
   // an equi-join condition is a foreign/primary key join.
@@ -153,6 +153,8 @@ public abstract class JoinNode extends PlanNode {
   public void setDistributionMode(DistributionMode distrMode) { distrMode_ = 
distrMode; }
   public JoinTableId getJoinTableId() { return joinTableId_; }
   public void setJoinTableId(JoinTableId id) { joinTableId_ = id; }
+  /// True if this consumes all of its right input before outputting any rows.
+  abstract public boolean isBlockingJoinNode();
 
   @Override
   public void init(Analyzer analyzer) throws ImpalaException {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 64e5fde..bc3cdc0 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -43,6 +43,7 @@ import org.apache.impala.thrift.TKuduScanNode;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
+import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TScanRange;
 import org.apache.impala.thrift.TScanRangeLocation;
 import org.apache.impala.thrift.TScanRangeLocationList;
@@ -275,6 +276,11 @@ public class KuduScanNode extends ScanNode {
   }
 
   @Override
+  public void computeResourceProfile(TQueryOptions queryOptions) {
+    resourceProfile_ = new ResourceProfile(0, 0);
+  }
+
+  @Override
   protected String getNodeExplainString(String prefix, String detailPrefix,
       TExplainLevel detailLevel) {
     StringBuilder result = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java 
b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
index 35b9022..b7dcdd8 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
@@ -23,11 +23,11 @@ import java.util.List;
 
 import org.apache.impala.analysis.DescriptorTable;
 import org.apache.impala.catalog.Table;
-import org.apache.impala.common.PrintUtils;
 import org.apache.impala.thrift.TDataSink;
 import org.apache.impala.thrift.TDataSinkType;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TKuduTableSink;
+import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TTableSink;
 import org.apache.impala.thrift.TTableSinkType;
 
@@ -51,17 +51,16 @@ public class KuduTableSink extends TableSink {
   }
 
   @Override
-  public String getExplainString(String prefix, String detailPrefix,
-      TExplainLevel explainLevel) {
-    StringBuilder output = new StringBuilder();
+  public void appendSinkExplainString(String prefix, String detailPrefix,
+      TQueryOptions queryOptions, TExplainLevel explainLevel, StringBuilder 
output) {
     output.append(prefix + sinkOp_.toExplainString());
     output.append(" KUDU [" + targetTable_.getFullName() + "]\n");
-    if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
-      output.append(PrintUtils.printHosts(detailPrefix, 
fragment_.getNumNodes()));
-      output.append(PrintUtils.printMemCost(" ", perHostMemCost_));
-      output.append("\n");
-    }
-    return output.toString();
+  }
+
+  @Override
+  public void computeResourceProfile(TQueryOptions queryOptions) {
+    // TODO: add a memory estimate
+    resourceProfile_ = new ResourceProfile(0, 0);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java 
b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
index 0213c8b..e69f97b 100644
--- a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
@@ -58,6 +58,9 @@ public class NestedLoopJoinNode extends JoinNode {
   }
 
   @Override
+  public boolean isBlockingJoinNode() { return true; }
+
+  @Override
   public void init(Analyzer analyzer) throws ImpalaException {
     super.init(analyzer);
     Preconditions.checkState(eqJoinConjuncts_.isEmpty());
@@ -74,14 +77,16 @@ public class NestedLoopJoinNode extends JoinNode {
   }
 
   @Override
-  public void computeCosts(TQueryOptions queryOptions) {
+  public void computeResourceProfile(TQueryOptions queryOptions) {
+    long perInstanceMemEstimate;
     if (getChild(1).getCardinality() == -1 || getChild(1).getAvgRowSize() == -1
         || numNodes_ == 0) {
-      perHostMemCost_ = DEFAULT_PER_HOST_MEM;
-      return;
+      perInstanceMemEstimate = DEFAULT_PER_INSTANCE_MEM;
+    } else {
+      perInstanceMemEstimate =
+          (long) Math.ceil(getChild(1).cardinality_ * getChild(1).avgRowSize_);
     }
-    perHostMemCost_ =
-        (long) Math.ceil(getChild(1).cardinality_ * getChild(1).avgRowSize_);
+    resourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 0);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/ParallelPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/ParallelPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/ParallelPlanner.java
index 8f2a1a4..f5cc5d8 100644
--- a/fe/src/main/java/org/apache/impala/planner/ParallelPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/ParallelPlanner.java
@@ -17,18 +17,13 @@
 
 package org.apache.impala.planner;
 
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.impala.common.IdGenerator;
-import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 
 /**
  * The parallel planner is responsible for breaking up a single distributed 
plan
@@ -183,6 +178,9 @@ public class ParallelPlanner {
       buildFragment.getChildren().add(inputFragments.get(i));
     }
 
+    // compute the resource profile for the newly-added build sink.
+    buildSink.computeResourceProfile(ctx_.getQueryOptions());
+
     // assign plan and cohort id
     buildFragment.setPlanId(planIdGenerator_.getNextId());
     PlanId parentPlanId = join.getFragment().getPlanId();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/PipelinedPlanNodeSet.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/planner/PipelinedPlanNodeSet.java 
b/fe/src/main/java/org/apache/impala/planner/PipelinedPlanNodeSet.java
index 249987a..c2ae0fd 100644
--- a/fe/src/main/java/org/apache/impala/planner/PipelinedPlanNodeSet.java
+++ b/fe/src/main/java/org/apache/impala/planner/PipelinedPlanNodeSet.java
@@ -38,7 +38,7 @@ import com.google.common.collect.Sets;
  * set contains all build-side nodes. The second set contains the leftmost
  * scan. Both sets contain all join nodes because they execute and consume
  * resources during the build and probe phases. Similarly, all nodes below a 
'blocking'
- * node (e.g, an AggregationNode) are placed into a differnet plan node set 
than the
+ * node (e.g, an AggregationNode) are placed into a different plan node set 
than the
  * nodes above it, but the blocking node itself belongs to both sets.
  */
 public class PipelinedPlanNodeSet {
@@ -46,39 +46,32 @@ public class PipelinedPlanNodeSet {
 
   // Minimum per-host resource requirements to ensure that no plan node set 
can have
   // estimates of zero, even if the contained PlanNodes have estimates of zero.
-  public static final long MIN_PER_HOST_MEM = 10 * 1024 * 1024;
-  public static final int MIN_PER_HOST_VCORES = 1;
+  public static final long MIN_PER_HOST_MEM_ESTIMATE_BYTES = 10 * 1024 * 1024;
 
   // List of plan nodes that execute and consume resources concurrently.
-  private final ArrayList<PlanNode> planNodes = Lists.newArrayList();
+  private final ArrayList<PlanNode> planNodes_ = Lists.newArrayList();
 
   // DataSinks that execute and consume resources concurrently.
   // Primarily used for estimating the cost of insert queries.
-  private final List<DataSink> dataSinks = Lists.newArrayList();
+  private final List<DataSink> dataSinks_ = Lists.newArrayList();
 
-  // Estimated per-host memory and CPU requirements.
-  // Valid after computeResourceEstimates().
-  private long perHostMem = MIN_PER_HOST_MEM;
-  private int perHostVcores = MIN_PER_HOST_VCORES;
-
-  public void add(PlanNode node) {
+  private void addNode(PlanNode node) {
     Preconditions.checkNotNull(node.getFragment());
-    planNodes.add(node);
+    planNodes_.add(node);
   }
 
-  public void addSink(DataSink sink) {
+  private void addSink(DataSink sink) {
     Preconditions.checkNotNull(sink);
-    dataSinks.add(sink);
+    dataSinks_.add(sink);
   }
 
   /**
-   * Computes the estimated per-host memory and CPU requirements of this plan 
node set.
-   * Optionally excludes unpartitioned fragments from the estimation.
-   * Returns true if at least one plan node was included in the estimation.
-   * Otherwise returns false indicating the estimates are invalid.
+   * Computes the per-host resource profile of this plan node set.
+   *
+   * If there are no nodes included in the estimate, the returned estimate 
will not be
+   * valid.
    */
-  public boolean computeResourceEstimates(boolean 
excludeUnpartitionedFragments,
-      TQueryOptions queryOptions) {
+  public ResourceProfile computePerHostResources(TQueryOptions queryOptions) {
     Set<PlanFragment> uniqueFragments = Sets.newHashSet();
 
     // Distinguish the per-host memory estimates for scan nodes and non-scan 
nodes to
@@ -86,74 +79,66 @@ public class PipelinedPlanNodeSet {
     // scans. The memory required by all concurrent scans of the same type 
(Hdfs/Hbase)
     // cannot exceed the per-host upper memory bound for that scan type. 
Intuitively,
     // the amount of I/O buffers is limited by the disk bandwidth.
-    long perHostHbaseScanMem = 0L;
-    long perHostHdfsScanMem = 0L;
-    long perHostNonScanMem = 0L;
+    long hbaseScanMemEstimate = 0L;
+    long hdfsScanMemEstimate = 0L;
+    long nonScanMemEstimate = 0L;
+    long minReservationBytes = 0L;
+    int numNodesIncluded = 0;
 
-    for (int i = 0; i < planNodes.size(); ++i) {
-      PlanNode node = planNodes.get(i);
+    for (PlanNode node : planNodes_) {
       PlanFragment fragment = node.getFragment();
-      if (!fragment.isPartitioned() && excludeUnpartitionedFragments) continue;
-      node.computeCosts(queryOptions);
+      // Multiple instances of a partitioned fragment may execute per host
+      int instancesPerHost = 
fragment.getNumInstancesPerHost(queryOptions.getMt_dop());
+
+      ResourceProfile nodeProfile = node.getResourceProfile();
+      Preconditions.checkState(nodeProfile.getMemEstimateBytes() >= 0);
+      long memEstimate = instancesPerHost * nodeProfile.getMemEstimateBytes();
+      ++numNodesIncluded;
       uniqueFragments.add(fragment);
-      if (node.getPerHostMemCost() < 0) {
-        LOG.warn(String.format("Invalid per-host memory requirement %s of node 
%s.\n" +
-            "PlanNode stats are: numNodes_=%s ", node.getPerHostMemCost(),
-            node.getClass().getSimpleName(), node.getNumNodes()));
-      }
       if (node instanceof HBaseScanNode) {
-        perHostHbaseScanMem += node.getPerHostMemCost();
+        hbaseScanMemEstimate += memEstimate;
       } else if (node instanceof HdfsScanNode) {
-        perHostHdfsScanMem += node.getPerHostMemCost();
+        hdfsScanMemEstimate += memEstimate;
       } else {
-        perHostNonScanMem += node.getPerHostMemCost();
+        nonScanMemEstimate += memEstimate;
       }
+      Preconditions.checkState(nodeProfile.getMinReservationBytes() >= 0);
+      minReservationBytes += instancesPerHost * 
nodeProfile.getMinReservationBytes();
+    }
+
+    if (queryOptions.getMt_dop() == 0) {
+      // The thread tokens for the non-MT path impose a limit on the memory 
that can
+      // be consumed by concurrent scans.
+      hbaseScanMemEstimate =
+          Math.min(hbaseScanMemEstimate, 
HBaseScanNode.getPerHostMemUpperBound());
+      hdfsScanMemEstimate =
+          Math.min(hdfsScanMemEstimate, 
HdfsScanNode.getPerHostMemUpperBound());
     }
 
-    // The memory required by concurrent scans cannot exceed the upper memory 
bound
-    // for that scan type.
-    // TODO: In the future, we may want to restrict scanner concurrency based 
on a
-    // memory limit. This estimation will need to accoung for that as well.
-    perHostHbaseScanMem =
-        Math.min(perHostHbaseScanMem, HBaseScanNode.getPerHostMemUpperBound());
-    perHostHdfsScanMem =
-        Math.min(perHostHdfsScanMem, HdfsScanNode.getPerHostMemUpperBound());
-
-    long perHostDataSinkMem = 0L;
-    for (int i = 0; i < dataSinks.size(); ++i) {
-      DataSink sink = dataSinks.get(i);
+    long dataSinkMemEstimate = 0L;
+    for (DataSink sink: dataSinks_) {
       PlanFragment fragment = sink.getFragment();
-      if (!fragment.isPartitioned() && excludeUnpartitionedFragments) continue;
       // Sanity check that this plan-node set has at least one PlanNode of 
fragment.
       Preconditions.checkState(uniqueFragments.contains(fragment));
-      sink.computeCosts();
-      if (sink.getPerHostMemCost() < 0) {
-        LOG.warn(String.format("Invalid per-host memory requirement %s of sink 
%s.\n",
-            sink.getPerHostMemCost(), sink.getClass().getSimpleName()));
-      }
-      perHostDataSinkMem += sink.getPerHostMemCost();
+      int instancesPerHost = 
fragment.getNumInstancesPerHost(queryOptions.getMt_dop());
+
+      ResourceProfile sinkProfile = sink.getResourceProfile();
+      Preconditions.checkState(sinkProfile.getMemEstimateBytes() >= 0);
+      dataSinkMemEstimate += instancesPerHost * 
sinkProfile.getMemEstimateBytes();
+      Preconditions.checkState(sinkProfile.getMinReservationBytes() >= 0);
+      minReservationBytes += instancesPerHost * 
sinkProfile.getMinReservationBytes();
     }
 
     // Combine the memory estimates of all sinks, scans nodes and non-scan 
nodes.
-    long perHostMem = perHostHdfsScanMem + perHostHbaseScanMem + 
perHostNonScanMem +
-        perHostDataSinkMem;
-
-    // The backend needs at least one thread per fragment.
-    int perHostVcores = uniqueFragments.size();
-
-    // This plan node set might only have unpartitioned fragments.
-    // Only set estimates if they are valid.
-    if (perHostMem >= 0 && perHostVcores >= 0) {
-      this.perHostMem = perHostMem;
-      this.perHostVcores = perHostVcores;
-      return true;
-    }
-    return false;
+    long perHostMemEstimate =
+        Math.max(MIN_PER_HOST_MEM_ESTIMATE_BYTES, hdfsScanMemEstimate
+                + hbaseScanMemEstimate + nonScanMemEstimate + 
dataSinkMemEstimate);
+    // This plan node set might only have unpartitioned fragments and be 
invalid.
+    return numNodesIncluded > 0 ?
+        new ResourceProfile(perHostMemEstimate, minReservationBytes) :
+          ResourceProfile.invalid();
   }
 
-  public long getPerHostMem() { return perHostMem; }
-  public int getPerHostVcores() { return perHostVcores; }
-
   /**
    * Computes and returns the pipelined plan node sets of the given plan.
    */
@@ -175,19 +160,19 @@ public class PipelinedPlanNodeSet {
    */
   private static void computePlanNodeSets(PlanNode node, PipelinedPlanNodeSet 
lhsSet,
       PipelinedPlanNodeSet rhsSet, ArrayList<PipelinedPlanNodeSet> 
planNodeSets) {
-    lhsSet.add(node);
+    lhsSet.addNode(node);
     if (node == node.getFragment().getPlanRoot() && 
node.getFragment().hasSink()) {
       lhsSet.addSink(node.getFragment().getSink());
     }
 
-    if (node instanceof HashJoinNode) {
+    if (node instanceof JoinNode && ((JoinNode)node).isBlockingJoinNode()) {
       // Create a new set for the right-hand sides of joins if necessary.
       if (rhsSet == null) {
         rhsSet = new PipelinedPlanNodeSet();
         planNodeSets.add(rhsSet);
       }
       // The join node itself is added to the lhsSet (above) and the rhsSet.
-      rhsSet.add(node);
+      rhsSet.addNode(node);
       computePlanNodeSets(node.getChild(1), rhsSet, null, planNodeSets);
       computePlanNodeSets(node.getChild(0), lhsSet, rhsSet, planNodeSets);
       return;
@@ -197,8 +182,10 @@ public class PipelinedPlanNodeSet {
       // We add blocking nodes to two plan node sets because they require 
resources while
       // consuming their input (execution of the preceding set) and while they
       // emit their output (execution of the following set).
+      // TODO: IMPALA-4862: this logic does not accurately reflect the 
behaviour of
+      // concurrent join builds in the backend
       lhsSet = new PipelinedPlanNodeSet();
-      lhsSet.add(node);
+      lhsSet.addNode(node);
       planNodeSets.add(lhsSet);
       // Join builds under this blocking node belong in a new rhsSet.
       rhsSet = null;
@@ -207,7 +194,9 @@ public class PipelinedPlanNodeSet {
     // Assume that non-join, non-blocking nodes with multiple children
     // (e.g., ExchangeNodes) consume their inputs in an arbitrary order,
     // i.e., all child subtrees execute concurrently.
-    // TODO: This is not true for UnionNodes anymore. Fix the estimates 
accordingly.
+    // TODO: IMPALA-4862: can overestimate resource consumption of UnionNodes 
- the
+    // execution of union branches is serialised within a fragment (but not 
across
+    // fragment boundaries).
     for (PlanNode child: node.getChildren()) {
       computePlanNodeSets(child, lhsSet, rhsSet, planNodeSets);
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java 
b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
index e4888f9..3e89137 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
@@ -26,11 +26,13 @@ import org.apache.impala.analysis.TupleId;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.NotImplementedException;
+import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.TreeNode;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPartitionType;
 import org.apache.impala.thrift.TPlanFragment;
 import org.apache.impala.thrift.TPlanFragmentTree;
+import org.apache.impala.thrift.TQueryOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,6 +76,8 @@ import com.google.common.collect.Sets;
  *   fix that
  */
 public class PlanFragment extends TreeNode<PlanFragment> {
+  private final static Logger LOG = 
LoggerFactory.getLogger(PlanFragment.class);
+
   private final PlanFragmentId fragmentId_;
   private PlanId planId_;
   private CohortId cohortId_;
@@ -145,6 +149,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
 
   /**
    * Finalize plan tree and create stream sink, if needed.
+   * Computes resource profiles for all nodes and sinks in this fragment.
    * If this fragment is hash partitioned, ensures that the corresponding 
partition
    * exprs of all hash-partitioning senders are cast to identical types.
    * Otherwise, the hashes generated for identical partition values may differ
@@ -159,6 +164,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
       streamSink.setFragment(this);
       sink_ = streamSink;
     }
+    computeResourceProfile(analyzer);
 
     if (!dataPartition_.isHashPartitioned()) return;
 
@@ -196,6 +202,19 @@ public class PlanFragment extends TreeNode<PlanFragment> {
   }
 
   /**
+   * Compute the resource profile of the fragment. Must be called after all the
+   * plan nodes and sinks are added to the fragment.
+   */
+  private void computeResourceProfile(Analyzer analyzer) {
+    sink_.computeResourceProfile(analyzer.getQueryOptions());
+    List<PlanNode> nodes = Lists.newArrayList();
+    collectPlanNodes(nodes);
+    for (PlanNode node: nodes) {
+      node.computeResourceProfile(analyzer.getQueryOptions());
+    }
+  }
+
+  /**
    * Return the number of nodes on which the plan fragment will execute.
    * invalid: -1
    */
@@ -203,18 +222,40 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     return dataPartition_ == DataPartition.UNPARTITIONED ? 1 : 
planRoot_.getNumNodes();
   }
 
- /**
-   * Estimates the per-node number of distinct values of exprs based on the 
data
-   * partition of this fragment and its number of nodes. Returns -1 for an 
invalid
-   * estimate, e.g., because getNumDistinctValues() failed on one of the exprs.
+  /**
+   * Return the number of instances of this fragment per host that it executes 
on.
+   * invalid: -1
    */
-  public long getNumDistinctValues(List<Expr> exprs) {
+  public int getNumInstancesPerHost(int mt_dop) {
+    Preconditions.checkState(mt_dop >= 0);
+    if (dataPartition_ == DataPartition.UNPARTITIONED) return 1;
+    return mt_dop == 0 ? 1 : mt_dop;
+  }
+
+  /**
+   * Return the total number of instances of this fragment across all hosts.
+   * invalid: -1
+   */
+  public int getNumInstances(int mt_dop) {
+    if (dataPartition_ == DataPartition.UNPARTITIONED) return 1;
+    int numNodes = planRoot_.getNumNodes();
+    if (numNodes == -1) return -1;
+    return getNumInstancesPerHost(mt_dop) * numNodes;
+  }
+
+  /**
+    * Estimates the number of distinct values of exprs per fragment instance 
based on the
+    * data partition of this fragment, the number of nodes, and the degree of 
parallelism.
+    * Returns -1 for an invalid estimate, e.g., because getNumDistinctValues() 
failed on
+    * one of the exprs.
+    */
+  public long getPerInstanceNdv(int mt_dop, List<Expr> exprs) {
     Preconditions.checkNotNull(dataPartition_);
     long result = 1;
-    int numNodes = getNumNodes();
-    Preconditions.checkState(numNodes >= 0);
+    int numInstances = getNumInstances(mt_dop);
+    Preconditions.checkState(numInstances >= 0);
     // The number of nodes is zero for empty tables.
-    if (numNodes == 0) return 0;
+    if (numInstances == 0) return 0;
     for (Expr expr: exprs) {
       long numDistinct = expr.getNumDistinctValues();
       if (numDistinct == -1) {
@@ -222,7 +263,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
         break;
       }
       if (dataPartition_.getPartitionExprs().contains(expr)) {
-        numDistinct = (long)Math.max((double) numDistinct / (double) numNodes, 
1L);
+        numDistinct = (long)Math.max((double) numDistinct / (double) 
numInstances, 1L);
       }
       result = PlanNode.multiplyCardinalities(result, numDistinct);
     }
@@ -254,8 +295,8 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     }
   }
 
-  public String getExplainString(TExplainLevel detailLevel) {
-    return getExplainString("", "", detailLevel);
+  public String getExplainString(TQueryOptions queryOptions, TExplainLevel 
detailLevel) {
+    return getExplainString("", "", queryOptions, detailLevel);
   }
 
   /**
@@ -263,7 +304,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
    * output will be prefixed by prefix.
    */
   protected final String getExplainString(String rootPrefix, String prefix,
-      TExplainLevel detailLevel) {
+      TQueryOptions queryOptions, TExplainLevel detailLevel) {
     StringBuilder str = new StringBuilder();
     Preconditions.checkState(dataPartition_ != null);
     String detailPrefix = prefix + "|  ";  // sink detail
@@ -272,17 +313,25 @@ public class PlanFragment extends TreeNode<PlanFragment> {
       prefix = "  ";
       rootPrefix = "  ";
       detailPrefix = prefix + "|  ";
-      str.append(String.format("%s:PLAN FRAGMENT [%s]\n", 
fragmentId_.toString(),
-          dataPartition_.getExplainString()));
+      str.append(getFragmentHeaderString(queryOptions.getMt_dop()));
+      str.append("\n");
       if (sink_ != null && sink_ instanceof DataStreamSink) {
-        str.append(sink_.getExplainString(rootPrefix, prefix, detailLevel) + 
"\n");
+        str.append(
+            sink_.getExplainString(rootPrefix, detailPrefix, queryOptions, 
detailLevel));
       }
+    } else if (detailLevel == TExplainLevel.EXTENDED) {
+      // Print a fragment prefix displaying the # nodes and # instances
+      str.append(rootPrefix);
+      str.append(getFragmentHeaderString(queryOptions.getMt_dop()));
+      str.append("\n");
+      rootPrefix = prefix;
     }
 
     String planRootPrefix = rootPrefix;
     // Always print sinks other than DataStreamSinks.
     if (sink_ != null && !(sink_ instanceof DataStreamSink)) {
-      str.append(sink_.getExplainString(rootPrefix, detailPrefix, 
detailLevel));
+      str.append(
+          sink_.getExplainString(rootPrefix, detailPrefix, queryOptions, 
detailLevel));
       if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
         str.append(prefix + "|\n");
       }
@@ -290,11 +339,24 @@ public class PlanFragment extends TreeNode<PlanFragment> {
       planRootPrefix = prefix;
     }
     if (planRoot_ != null) {
-      str.append(planRoot_.getExplainString(planRootPrefix, prefix, 
detailLevel));
+      str.append(
+          planRoot_.getExplainString(planRootPrefix, prefix, queryOptions, 
detailLevel));
     }
     return str.toString();
   }
 
+  /**
+   * Get a header string for a fragment in an explain plan.
+   */
+  public String getFragmentHeaderString(int mt_dop) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(String.format("%s:PLAN FRAGMENT [%s]", 
fragmentId_.toString(),
+        dataPartition_.getExplainString()));
+    builder.append(PrintUtils.printNumHosts(" ", getNumNodes()));
+    builder.append(PrintUtils.printNumInstances(" ", getNumInstances(mt_dop)));
+    return builder.toString();
+  }
+
   /** Returns true if this fragment is partitioned. */
   public boolean isPartitioned() {
     return (dataPartition_.getType() != TPartitionType.UNPARTITIONED);

Reply via email to