http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java 
b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
index 03010b8..c18d7de 100644
--- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
@@ -17,31 +17,34 @@
 
 package org.apache.impala.planner;
 
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.impala.analysis.AggregateInfo;
 import org.apache.impala.analysis.Analyzer;
+import org.apache.impala.analysis.CaseExpr;
+import org.apache.impala.analysis.CaseWhenClause;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.FunctionCallExpr;
-import org.apache.impala.analysis.SlotId;
+import org.apache.impala.analysis.MultiAggregateInfo;
+import org.apache.impala.analysis.MultiAggregateInfo.AggPhase;
+import org.apache.impala.analysis.NumericLiteral;
+import org.apache.impala.analysis.TupleId;
+import org.apache.impala.analysis.ValidTupleIdExpr;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.thrift.TAggregationNode;
+import org.apache.impala.thrift.TAggregator;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TExpr;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.util.BitUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 
 /**
  * Aggregation computation.
@@ -57,23 +60,33 @@ public class AggregationNode extends PlanNode {
   // Conservative minimum size of hash table for low-cardinality aggregations.
   private final static long MIN_HASH_TBL_MEM = 10L * 1024L * 1024L;
 
-  private final AggregateInfo aggInfo_;
+  private final MultiAggregateInfo multiAggInfo_;
+  private final AggPhase aggPhase_;
 
-  // Set to true if this aggregation node needs to run the Finalize step. This
-  // node is the root node of a distributed aggregation.
-  private boolean needsFinalize_;
+  // Aggregation-class infos derived from 'multiAggInfo_' and 'aggPhase_' in 
c'tor.
+  private final List<AggregateInfo> aggInfos_;
 
-  // If true, use streaming preaggregation algorithm. Not valid if this is a 
merge agg.
-  private boolean useStreamingPreagg_;
+  // If true, this node produces intermediate aggregation tuples.
+  private boolean useIntermediateTuple_ = false;
 
-  /**
-   * Create an agg node from aggInfo.
-   */
-  public AggregationNode(PlanNodeId id, PlanNode input, AggregateInfo aggInfo) 
{
-    super(id, aggInfo.getOutputTupleId().asList(), "AGGREGATE");
-    aggInfo_ = aggInfo;
+  // If true, this node performs the finalize step.
+  private boolean needsFinalize_ = false;
+
+  // If true, this node uses streaming preaggregation. Invalid if this is a 
merge agg.
+  private boolean useStreamingPreagg_ = false;
+
+  // Resource profiles for each aggregation class.
+  private List<ResourceProfile> resourceProfiles_;
+
+  public AggregationNode(
+      PlanNodeId id, PlanNode input, MultiAggregateInfo multiAggInfo, AggPhase 
aggPhase) {
+    super(id, "AGGREGATE");
     children_.add(input);
+    multiAggInfo_ = multiAggInfo;
+    aggInfos_ = multiAggInfo_.getMaterializedAggInfos(aggPhase);
+    aggPhase_ = aggPhase;
     needsFinalize_ = true;
+    computeTupleIds();
   }
 
   /**
@@ -81,11 +94,48 @@ public class AggregationNode extends PlanNode {
    */
   private AggregationNode(PlanNodeId id, AggregationNode src) {
     super(id, src, "AGGREGATE");
-    aggInfo_ = src.aggInfo_;
+    multiAggInfo_ = src.multiAggInfo_;
+    aggPhase_ = src.aggPhase_;
+    aggInfos_ = src.aggInfos_;
     needsFinalize_ = src.needsFinalize_;
+    useIntermediateTuple_ = src.useIntermediateTuple_;
   }
 
-  public AggregateInfo getAggInfo() { return aggInfo_; }
+  @Override
+  public void computeTupleIds() {
+    clearTupleIds();
+    for (AggregateInfo aggInfo : aggInfos_) {
+      TupleId aggClassTupleId = null;
+      if (useIntermediateTuple_) {
+        aggClassTupleId = aggInfo.getIntermediateTupleId();
+      } else {
+        aggClassTupleId = aggInfo.getOutputTupleId();
+      }
+      tupleIds_.add(aggClassTupleId);
+      tblRefIds_.add(aggClassTupleId);
+      // Nullable tuples are only required to distinguish between multiple
+      // aggregation classes.
+      if (aggInfos_.size() > 1) {
+        nullableTupleIds_.add(aggClassTupleId);
+      }
+    }
+  }
+
+  /**
+   * Sets this node as a preaggregation.
+   */
+  public void setIsPreagg(PlannerContext ctx) {
+    if (ctx.getQueryOptions().disable_streaming_preaggregations) {
+      useStreamingPreagg_ = false;
+      return;
+    }
+    for (AggregateInfo aggInfo : aggInfos_) {
+      if (aggInfo.getGroupingExprs().size() > 0) {
+        useStreamingPreagg_ = true;
+        return;
+      }
+    }
+  }
 
   /**
    * Unsets this node as requiring finalize. Only valid to call this if it is
@@ -96,25 +146,21 @@ public class AggregationNode extends PlanNode {
     needsFinalize_ = false;
   }
 
-  /**
-   * Sets this node as a preaggregation. Only valid to call this if it is not 
marked
-   * as a preaggregation
-   */
-  public void setIsPreagg(PlannerContext ctx_) {
-    TQueryOptions query_options = ctx_.getQueryOptions();
-    useStreamingPreagg_ =  !query_options.disable_streaming_preaggregations &&
-        aggInfo_.getGroupingExprs().size() > 0;
+  public void setIntermediateTuple() {
+    useIntermediateTuple_ = true;
+    computeTupleIds();
   }
 
-  /**
-   * Have this node materialize the aggregation's intermediate tuple instead of
-   * the output tuple.
-   */
-  public void setIntermediateTuple() {
-    Preconditions.checkState(!tupleIds_.isEmpty());
-    
Preconditions.checkState(tupleIds_.get(0).equals(aggInfo_.getOutputTupleId()));
-    tupleIds_.clear();
-    tupleIds_.add(aggInfo_.getIntermediateTupleId());
+  public MultiAggregateInfo getMultiAggInfo() { return multiAggInfo_; }
+  public AggPhase getAggPhase() { return aggPhase_; }
+  public boolean hasGrouping() { return multiAggInfo_.hasGrouping(); }
+  public boolean isSingleClassAgg() { return aggInfos_.size() == 1; }
+
+  public boolean isDistinctAgg() {
+    for (AggregateInfo aggInfo : aggInfos_) {
+      if (aggInfo.isDistinctAgg()) return true;
+    }
+    return false;
   }
 
   @Override
@@ -122,47 +168,40 @@ public class AggregationNode extends PlanNode {
 
   @Override
   public void init(Analyzer analyzer) throws InternalException {
-    // Assign predicates to the top-most agg in the single-node plan that can 
evaluate
-    // them, as follows: For non-distinct aggs place them in the 1st phase agg 
node. For
-    // distinct aggs place them in the 2nd phase agg node. The conjuncts are
-    // transferred to the proper place in the multi-node plan via 
transferConjuncts().
-    if (tupleIds_.get(0).equals(aggInfo_.getResultTupleId()) && 
!aggInfo_.isMerge()) {
-      // Ignore predicates bound by a grouping slot produced by a SlotRef 
grouping expr.
-      // Those predicates are already evaluated below this agg node (e.g., in 
a scan),
-      // because the grouping slot must be in the same equivalence class as 
another slot
-      // below this agg node. We must not ignore other grouping slots in order 
to retain
-      // conjuncts bound by those grouping slots in createEquivConjuncts() 
(IMPALA-2089).
-      // Those conjuncts cannot be redundant because our equivalence classes 
do not
-      // capture dependencies with non-SlotRef exprs.
-      Set<SlotId> groupBySlots = Sets.newHashSet();
-      for (int i = 0; i < aggInfo_.getGroupingExprs().size(); ++i) {
-        if (aggInfo_.getGroupingExprs().get(i).unwrapSlotRef(true) == null) 
continue;
-        
groupBySlots.add(aggInfo_.getOutputTupleDesc().getSlots().get(i).getId());
-      }
-      ArrayList<Expr> bindingPredicates =
-          analyzer.getBoundPredicates(tupleIds_.get(0), groupBySlots, true);
-      conjuncts_.addAll(bindingPredicates);
-
-      // also add remaining unassigned conjuncts_
-      assignConjuncts(analyzer);
-
-      analyzer.createEquivConjuncts(tupleIds_.get(0), conjuncts_, 
groupBySlots);
+    Preconditions.checkState(tupleIds_.size() == aggInfos_.size());
+    // Assign conjuncts to the top-most agg in the single-node plan. They are 
transferred
+    // to the proper place in the distributed plan via transferConjuncts().
+    if (aggPhase_ == multiAggInfo_.getConjunctAssignmentPhase()) {
+      conjuncts_.clear();
+      // TODO: If this is the transposition phase, then we can push conjuncts 
that
+      // reference a single aggregation class down into the aggregators of the
+      // previous phase.
+      conjuncts_.addAll(multiAggInfo_.collectConjuncts(analyzer, true));
+      conjuncts_ = orderConjunctsByCost(conjuncts_);
     }
-    conjuncts_ = orderConjunctsByCost(conjuncts_);
+
     // Compute the mem layout for both tuples here for simplicity.
-    aggInfo_.getOutputTupleDesc().computeMemLayout();
-    aggInfo_.getIntermediateTupleDesc().computeMemLayout();
+    for (AggregateInfo aggInfo : aggInfos_) {
+      aggInfo.getOutputTupleDesc().computeMemLayout();
+      aggInfo.getIntermediateTupleDesc().computeMemLayout();
+    }
 
-    // do this at the end so it can take all conjuncts into account
+    // Do at the end so it can take all conjuncts into account
     computeStats(analyzer);
 
     // don't call createDefaultSMap(), it would point our conjuncts (= Having 
clause)
     // to our input; our conjuncts don't get substituted because they already
     // refer to our output
     outputSmap_ = getCombinedChildSmap();
-    aggInfo_.substitute(outputSmap_, analyzer);
-    // assert consistent aggregate expr and slot materialization
-    aggInfo_.checkConsistency();
+
+    // Substitute exprs and check consistency.
+    // All of the AggregationNodes corresponding to a MultiAggregationInfo 
will have the
+    // same outputSmap_, so just substitute it once.
+    if (aggPhase_ == AggPhase.FIRST) multiAggInfo_.substitute(outputSmap_, 
analyzer);
+    for (AggregateInfo aggInfo : aggInfos_) {
+      aggInfo.substitute(outputSmap_, analyzer);
+      aggInfo.checkConsistency();
+    }
   }
 
   @Override
@@ -179,21 +218,27 @@ public class AggregationNode extends PlanNode {
     // some others, the estimate doesn't overshoot dramatically)
     // cardinality: product of # of distinct values produced by grouping exprs
 
-    // Any non-grouping aggregation has at least one distinct value
-    cardinality_ = aggInfo_.getGroupingExprs().isEmpty() ? 1 :
-      Expr.getNumDistinctValues(aggInfo_.getGroupingExprs());
-    // take HAVING predicate into account
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Agg: cardinality=" + Long.toString(cardinality_));
+    cardinality_ = 0;
+    for (AggregateInfo aggInfo : aggInfos_) {
+      List<Expr> groupingExprs = aggInfo.getGroupingExprs();
+      if (groupingExprs.isEmpty()) {
+        // Non-grouping aggregation class.
+        cardinality_ += 1;
+      } else {
+        long ndvs = Expr.getNumDistinctValues(groupingExprs);
+        // if we ended up with an overflow, the estimate is certain to be wrong
+        if (ndvs < 0) {
+          cardinality_ = -1;
+          break;
+        }
+        cardinality_ += checkedAdd(cardinality_, ndvs);
+      }
     }
+
+    // Take conjuncts into account.
     if (cardinality_ > 0) {
-      cardinality_ = Math.round((double) cardinality_ * computeSelectivity());
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("sel=" + Double.toString(computeSelectivity()));
-      }
+      cardinality_ = (long) Math.round((double) cardinality_ * 
computeSelectivity());
     }
-    // if we ended up with an overflow, the estimate is certain to be wrong
-    if (cardinality_ < 0) cardinality_ = -1;
     // Sanity check the cardinality_ based on the input cardinality_.
     if (getChild(0).getCardinality() != -1) {
       if (cardinality_ == -1) {
@@ -205,38 +250,141 @@ public class AggregationNode extends PlanNode {
       }
     }
     cardinality_ = capAtLimit(cardinality_);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("stats Agg: cardinality=" + Long.toString(cardinality_));
-    }
   }
 
-  @Override
-  protected String debugString() {
-    return Objects.toStringHelper(this)
-        .add("aggInfo", aggInfo_.debugString())
-        .addValue(super.debugString())
-        .toString();
+  /**
+   * Returns a list of exprs suitable for hash partitioning the output of this 
node
+   * before the merge aggregation step. Only valid to call if this node is not 
a merge
+   * or transposing aggregation. The returned exprs are bound by the 
intermediate tuples.
+   * Takes the SHUFFLE_DISTINCT_EXPRS query option into account.
+   *
+   * For single-class aggregations the returned exprs are typically the 
grouping exprs.
+   * With SHUFFLE_DISTINCT_EXPRS=true the distinct exprs are also included if 
this is the
+   * non-merge first-phase aggregation of a distinct aggregation.
+   *
+   * For multi-class aggregations the returned exprs are a list of CASE exprs 
which you
+   * can think of as a "union" of the merge partition exprs of each class. 
Each CASE
+   * switches on the valid tuple id of an input row to determine the 
aggregation class,
+   * and selects the corresponding partition expr.
+   * The main challenges with crafting these exprs are:
+   * 1. Different aggregation classes can have a different number of distinct 
exprs
+   *    Solution: The returned list is maximally wide to accommodate the widest
+   *    aggregation class. For classes that have fewer than the max distinct 
exprs we add
+   *    constant dummy exprs in the corresponding branch of the CASE.
+   * 2. A CASE expr must return a single output type, but different 
aggregation classes
+   *    may have incompatible distinct exprs, so selecting the distinct exprs 
directly
+   *    in the CASE branches would not always work (unless we cast everything 
to STRING,
+   *    which we try to avoid). Instead, we call MURMUR_HASH() on the exprs to 
produce
+   *    a hash value. That way, all branches of a CASE return the same type.
+   *    Considering that the backend will do another round of hashing, there's 
an
+   *    unnecessary double hashing here that we deemed acceptable for now and 
has
+   *    potential for cleanup (maybe the FE should always add a MURMUR_HASH()).
+   * The handling of SHUFFLE_DISTINCT_EXPRS is analogous to the single-class 
case.
+   *
+   * Example:
+   * SELECT COUNT(DISTINCT a,b), COUNT(DISTINCT c) FROM t GROUP BY d
+   * Suppose the two aggregation classes have intermediate tuple ids 0 and 1.
+   *
+   * Challenges explained on this example:
+   * 1. First class has distinct exprs a,b and second class has c. We need to 
accommodate
+   *    the widest class (a,b) and also hash on the grouping expr (d), so 
there will be
+   *    three cases.
+   * 2. The types of a and c might be incompatible
+   *
+   * The first-phase partition exprs are a list of the following 3 exprs:
+   * CASE valid_tid()
+   *   WHEN 0 THEN murmur_hash(d) <-- d SlotRef into tuple 0
+   *   WHEN 1 THEN murmur_hash(d) <-- d SlotRef into tuple 1
+   * END,
+   * CASE valid_tid()
+   *   WHEN 0 THEN murmur_hash(a)
+   *   WHEN 1 THEN murmur_hash(c)
+   * END,
+   * CASE valid_tid()
+   *   WHEN 0 THEN murmur_hash(b)
+   *   WHEN 1 THEN 0 <-- dummy constant integer
+   * END
+   */
+  public List<Expr> getMergePartitionExprs(Analyzer analyzer) {
+    Preconditions.checkState(!tupleIds_.isEmpty());
+    Preconditions.checkState(!aggPhase_.isMerge() && !aggPhase_.isTranspose());
+
+    boolean shuffleDistinctExprs = 
analyzer.getQueryOptions().shuffle_distinct_exprs;
+    if (aggInfos_.size() == 1) {
+      AggregateInfo aggInfo = aggInfos_.get(0);
+      List<Expr> groupingExprs = null;
+      if (aggPhase_.isFirstPhase() && hasGrouping() && !shuffleDistinctExprs) {
+        groupingExprs = multiAggInfo_.getSubstGroupingExprs();
+      } else {
+        groupingExprs = aggInfo.getPartitionExprs();
+        if (groupingExprs == null) groupingExprs = aggInfo.getGroupingExprs();
+      }
+      return Expr.substituteList(
+          groupingExprs, aggInfo.getIntermediateSmap(), analyzer, false);
+    }
+
+    int maxNumExprs = 0;
+    for (AggregateInfo aggInfo : aggInfos_) {
+      if (aggInfo.getGroupingExprs() == null) continue;
+      maxNumExprs = Math.max(maxNumExprs, aggInfo.getGroupingExprs().size());
+    }
+    if (maxNumExprs == 0) return Collections.emptyList();
+
+    List<Expr> result = Lists.newArrayList();
+    for (int i = 0; i < maxNumExprs; ++i) {
+      List<CaseWhenClause> caseWhenClauses = Lists.newArrayList();
+      for (AggregateInfo aggInfo : aggInfos_) {
+        TupleId tid;
+        if (aggInfo.isDistinctAgg()) {
+          tid = aggInfo.getOutputTupleId();
+        } else {
+          tid = aggInfo.getIntermediateTupleId();
+        }
+        List<Expr> groupingExprs = aggInfo.getGroupingExprs();
+        if (aggPhase_.isFirstPhase() && hasGrouping() && 
!shuffleDistinctExprs) {
+          groupingExprs = multiAggInfo_.getSubstGroupingExprs();
+        }
+        Expr whenExpr = NumericLiteral.create(tid.asInt());
+        Expr thenExpr;
+        if (groupingExprs == null || i >= groupingExprs.size()) {
+          thenExpr = NumericLiteral.create(0);
+        } else {
+          thenExpr = new FunctionCallExpr(
+              "murmur_hash", Lists.newArrayList(groupingExprs.get(i).clone()));
+          thenExpr.analyzeNoThrow(analyzer);
+          thenExpr = thenExpr.substitute(aggInfo.getIntermediateSmap(), 
analyzer, true);
+        }
+        caseWhenClauses.add(new CaseWhenClause(whenExpr, thenExpr));
+      }
+      CaseExpr caseExpr =
+          new CaseExpr(new ValidTupleIdExpr(tupleIds_), caseWhenClauses, null);
+      caseExpr.analyzeNoThrow(analyzer);
+      result.add(caseExpr);
+    }
+    return result;
   }
 
   @Override
   protected void toThrift(TPlanNode msg) {
+    msg.agg_node = new TAggregationNode();
     msg.node_type = TPlanNodeType.AGGREGATION_NODE;
-
-    List<TExpr> aggregateFunctions = Lists.newArrayList();
-    // only serialize agg exprs that are being materialized
-    for (FunctionCallExpr e: aggInfo_.getMaterializedAggregateExprs()) {
-      aggregateFunctions.add(e.treeToThrift());
-    }
-    aggInfo_.checkConsistency();
-    msg.agg_node = new TAggregationNode(
-        aggregateFunctions,
-        aggInfo_.getIntermediateTupleId().asInt(),
-        aggInfo_.getOutputTupleId().asInt(), needsFinalize_,
-        useStreamingPreagg_,
-        getChild(0).getCardinality());
-    List<Expr> groupingExprs = aggInfo_.getGroupingExprs();
-    if (groupingExprs != null) {
-      msg.agg_node.setGrouping_exprs(Expr.treesToThrift(groupingExprs));
+    boolean replicateInput = aggPhase_ == AggPhase.FIRST && aggInfos_.size() > 
1;
+    msg.agg_node.setReplicate_input(replicateInput);
+    msg.agg_node.setEstimated_input_cardinality(getChild(0).getCardinality());
+    for (int i = 0; i < aggInfos_.size(); ++i) {
+      AggregateInfo aggInfo = aggInfos_.get(i);
+      List<TExpr> aggregateFunctions = Lists.newArrayList();
+      for (FunctionCallExpr e : aggInfo.getMaterializedAggregateExprs()) {
+        aggregateFunctions.add(e.treeToThrift());
+      }
+      TAggregator taggregator = new TAggregator(aggregateFunctions,
+          aggInfo.getIntermediateTupleId().asInt(), 
aggInfo.getOutputTupleId().asInt(),
+          needsFinalize_, useStreamingPreagg_, 
resourceProfiles_.get(i).toThrift());
+      List<Expr> groupingExprs = aggInfo.getGroupingExprs();
+      if (!groupingExprs.isEmpty()) {
+        taggregator.setGrouping_exprs(Expr.treesToThrift(groupingExprs));
+      }
+      msg.agg_node.addToAggregators(taggregator);
     }
   }
 
@@ -257,34 +405,67 @@ public class AggregationNode extends PlanNode {
     output.append("\n");
 
     if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
-      ArrayList<FunctionCallExpr> aggExprs = 
aggInfo_.getMaterializedAggregateExprs();
-      if (!aggExprs.isEmpty()) {
-        output.append(detailPrefix + "output: ")
-            .append(getExplainString(aggExprs) + "\n");
-      }
-      // TODO: is this the best way to display this. It currently would
-      // have DISTINCT_PC(DISTINCT_PC(col)) for the merge phase but not
-      // very obvious what that means if you don't already know.
-
-      // TODO: group by can be very long. Break it into multiple lines
-      if (!aggInfo_.getGroupingExprs().isEmpty()) {
-        output.append(detailPrefix + "group by: ")
-            .append(getExplainString(aggInfo_.getGroupingExprs()) + "\n");
+      if (aggInfos_.size() == 1) {
+        output.append(getAggInfoExplainString(detailPrefix, aggInfos_.get(0)));
+      } else {
+        for (int i = 0; i < aggInfos_.size(); ++i) {
+          AggregateInfo aggInfo = aggInfos_.get(i);
+          output.append(String.format("%sClass %d\n", detailPrefix, i));
+          output.append(getAggInfoExplainString(detailPrefix + "  ", aggInfo));
+        }
       }
       if (!conjuncts_.isEmpty()) {
-        output.append(detailPrefix + "having: ")
-            .append(getExplainString(conjuncts_) + "\n");
+        output.append(detailPrefix)
+            .append("having: ")
+            .append(getExplainString(conjuncts_))
+            .append("\n");
       }
     }
     return output.toString();
   }
 
+  private StringBuilder getAggInfoExplainString(String prefix, AggregateInfo 
aggInfo) {
+    StringBuilder output = new StringBuilder();
+    List<FunctionCallExpr> aggExprs = aggInfo.getMaterializedAggregateExprs();
+    List<Expr> groupingExprs = aggInfo.getGroupingExprs();
+    if (!aggExprs.isEmpty()) {
+      output.append(prefix)
+          .append("output: ")
+          .append(getExplainString(aggExprs))
+          .append("\n");
+    }
+    if (!groupingExprs.isEmpty()) {
+      output.append(prefix)
+          .append("group by: ")
+          .append(getExplainString(groupingExprs))
+          .append("\n");
+    }
+    return output;
+  }
+
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
+    resourceProfiles_ = Lists.newArrayListWithCapacity(aggInfos_.size());
+    resourceProfiles_.clear();
+    for (AggregateInfo aggInfo : aggInfos_) {
+      resourceProfiles_.add(computeAggClassResourceProfile(queryOptions, 
aggInfo));
+    }
+    if (aggInfos_.size() == 1) {
+      nodeResourceProfile_ = resourceProfiles_.get(0);
+    } else {
+      nodeResourceProfile_ = ResourceProfile.noReservation(0);
+      for (ResourceProfile aggProfile : resourceProfiles_) {
+        nodeResourceProfile_ = nodeResourceProfile_.sum(aggProfile);
+      }
+    }
+  }
+
+  private ResourceProfile computeAggClassResourceProfile(
+      TQueryOptions queryOptions, AggregateInfo aggInfo) {
     Preconditions.checkNotNull(
         fragment_, "PlanNode must be placed into a fragment before calling 
this method.");
-    long perInstanceCardinality = fragment_.getPerInstanceNdv(
-        queryOptions.getMt_dop(), aggInfo_.getGroupingExprs());
+    long perInstanceCardinality =
+        fragment_.getPerInstanceNdv(queryOptions.getMt_dop(), 
aggInfo.getGroupingExprs());
     long perInstanceMemEstimate;
     long perInstanceDataBytes = -1;
     if (perInstanceCardinality == -1) {
@@ -299,12 +480,12 @@ public class AggregationNode extends PlanNode {
           PlannerContext.HASH_TBL_SPACE_OVERHEAD, MIN_HASH_TBL_MEM);
     }
 
-    // Must be kept in sync with PartitionedAggregationNode::MinReservation() 
in be.
+    // Must be kept in sync with GroupingAggregator::MinReservation() in 
backend.
     long perInstanceMinMemReservation;
     long bufferSize = queryOptions.getDefault_spillable_buffer_size();
     long maxRowBufferSize =
         computeMaxSpillableBufferSize(bufferSize, 
queryOptions.getMax_row_size());
-    if (aggInfo_.getGroupingExprs().isEmpty()) {
+    if (aggInfo.getGroupingExprs().isEmpty()) {
       perInstanceMinMemReservation = 0;
     } else {
       // This is a grouping pre-aggregation or merge aggregation.
@@ -330,17 +511,18 @@ public class AggregationNode extends PlanNode {
         perInstanceMinMemReservation = bufferSize * PARTITION_FANOUT +
             Math.max(64 * 1024 * PARTITION_FANOUT, bufferSize);
       } else {
-        long minBuffers = PARTITION_FANOUT + 1 + (aggInfo_.needsSerialize() ? 
1 : 0);
+        long minBuffers = PARTITION_FANOUT + 1 + (aggInfo.needsSerialize() ? 1 
: 0);
         // Two of the buffers need to be buffers large enough to hold the 
maximum-sized
         // row to serve as input and output buffers while repartitioning.
         perInstanceMinMemReservation = bufferSize * (minBuffers - 2) + 
maxRowBufferSize * 2;
       }
     }
 
-    nodeResourceProfile_ = new ResourceProfileBuilder()
+    return new ResourceProfileBuilder()
         .setMemEstimateBytes(perInstanceMemEstimate)
         .setMinMemReservationBytes(perInstanceMinMemReservation)
         .setSpillableBufferBytes(bufferSize)
-        .setMaxRowBufferBytes(maxRowBufferSize).build();
+        .setMaxRowBufferBytes(maxRowBufferSize)
+        .build();
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index 4302244..ac3115b 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -20,19 +20,17 @@ package org.apache.impala.planner;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.impala.analysis.AggregateInfo;
 import org.apache.impala.analysis.AnalysisContext;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.InsertStmt;
 import org.apache.impala.analysis.JoinOperator;
-import static org.apache.impala.analysis.JoinOperator.*;
+import org.apache.impala.analysis.MultiAggregateInfo.AggPhase;
 import org.apache.impala.analysis.QueryStmt;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.planner.JoinNode.DistributionMode;
-import org.apache.impala.planner.RuntimeFilterGenerator.RuntimeFilter;
 import org.apache.impala.util.KuduUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -789,21 +787,21 @@ public class DistributedPlanner {
    * Returns a fragment that materializes the aggregation result of 'node'.
    * If the child fragment is partitioned, the result fragment will be 
partitioned on
    * the grouping exprs of 'node'.
-   * If 'node' is phase 1 of a 2-phase DISTINCT aggregation, this will simply
-   * add 'node' to the child fragment and return the child fragment; the new
-   * fragment will be created by the subsequent call of 
createAggregationFragment()
-   * for the phase 2 AggregationNode.
+   * If 'node' is phase 1 of a 2-phase DISTINCT aggregation or the 'transpose' 
phase of a
+   * multiple-distinct aggregation, this will simply add 'node' to the child 
fragment and
+   * return the child fragment; the new fragment will be created by the call of
+   * createAggregationFragment() for the phase 2 AggregationNode.
    */
   private PlanFragment createAggregationFragment(AggregationNode node,
       PlanFragment childFragment, ArrayList<PlanFragment> fragments)
       throws ImpalaException {
-    if (!childFragment.isPartitioned()) {
+    if (!childFragment.isPartitioned() || node.getAggPhase() == 
AggPhase.TRANSPOSE) {
       // nothing to distribute; do full aggregation directly within 
childFragment
       childFragment.addPlanRoot(node);
       return childFragment;
     }
 
-    if (node.getAggInfo().isDistinctAgg()) {
+    if (node.isDistinctAgg()) {
       // 'node' is phase 1 of a DISTINCT aggregation; the actual agg fragment
       // will get created in the next createAggregationFragment() call
       // for the parent AggregationNode
@@ -813,7 +811,7 @@ public class DistributedPlanner {
 
     // Check if 'node' is phase 2 of a DISTINCT aggregation.
     boolean isDistinct = node.getChild(0) instanceof AggregationNode
-          && 
((AggregationNode)(node.getChild(0))).getAggInfo().isDistinctAgg();
+        && ((AggregationNode) (node.getChild(0))).isDistinctAgg();
     if (isDistinct) {
       return createPhase2DistinctAggregationFragment(node, childFragment, 
fragments);
     } else {
@@ -827,18 +825,15 @@ public class DistributedPlanner {
    * aggregation.
    */
   private PlanFragment createMergeAggregationFragment(
-      AggregationNode node, PlanFragment childFragment)
-      throws ImpalaException {
+      AggregationNode node, PlanFragment childFragment) throws ImpalaException 
{
     Preconditions.checkArgument(childFragment.isPartitioned());
-    ArrayList<Expr> groupingExprs = node.getAggInfo().getGroupingExprs();
-    boolean hasGrouping = !groupingExprs.isEmpty();
-
+    List<Expr> partitionExprs = 
node.getMergePartitionExprs(ctx_.getRootAnalyzer());
+    boolean hasGrouping = !partitionExprs.isEmpty();
     DataPartition parentPartition = null;
     if (hasGrouping) {
-      List<Expr> partitionExprs = node.getAggInfo().getPartitionExprs();
-      if (partitionExprs == null) partitionExprs = groupingExprs;
-      boolean childHasCompatPartition = 
ctx_.getRootAnalyzer().setsHaveValueTransfer(
-          partitionExprs, 
childFragment.getDataPartition().getPartitionExprs(), true);
+      boolean childHasCompatPartition = node.isSingleClassAgg()
+          && ctx_.getRootAnalyzer().setsHaveValueTransfer(partitionExprs,
+                 childFragment.getDataPartition().getPartitionExprs(), true);
       if (childHasCompatPartition) {
         // The data is already partitioned on the required expressions. We can 
do the
         // aggregation in the child fragment without an extra merge step.
@@ -848,13 +843,8 @@ public class DistributedPlanner {
         childFragment.addPlanRoot(node);
         return childFragment;
       }
-      // the parent fragment is partitioned on the grouping exprs;
-      // substitute grouping exprs to reference the *output* of the agg, not 
the input
-      partitionExprs = Expr.substituteList(partitionExprs,
-          node.getAggInfo().getIntermediateSmap(), ctx_.getRootAnalyzer(), 
false);
       parentPartition = DataPartition.hashPartitioned(partitionExprs);
     } else {
-      // the parent fragment is unpartitioned
       parentPartition = DataPartition.UNPARTITIONED;
     }
 
@@ -874,7 +864,7 @@ public class DistributedPlanner {
     // place a merge aggregation step in a new fragment
     PlanFragment mergeFragment = createParentFragment(childFragment, 
parentPartition);
     AggregationNode mergeAggNode = new AggregationNode(ctx_.getNextNodeId(),
-        mergeFragment.getPlanRoot(), node.getAggInfo().getMergeAggInfo());
+        mergeFragment.getPlanRoot(), node.getMultiAggInfo(), 
AggPhase.FIRST_MERGE);
     mergeAggNode.init(ctx_.getRootAnalyzer());
     mergeAggNode.setLimit(limit);
     // Merge of non-grouping agg only processes one tuple per Impala daemon - 
codegen
@@ -904,65 +894,44 @@ public class DistributedPlanner {
   private PlanFragment createPhase2DistinctAggregationFragment(
       AggregationNode phase2AggNode, PlanFragment childFragment,
       ArrayList<PlanFragment> fragments) throws ImpalaException {
-    // When a query has both grouping and distinct exprs, impala can 
optionally include
-    // the distinct exprs in the hash exchange of the first aggregation phase 
to spread
-    // the data among more nodes. However, this plan requires another hash 
exchange on the
-    // grouping exprs in the second phase which is not required when omitting 
the distinct
-    // exprs in the first phase. Shuffling by both is better if the grouping 
exprs have
-    // low NDVs.
-    boolean shuffleDistinctExprs = 
ctx_.getQueryOptions().shuffle_distinct_exprs ||
-        phase2AggNode.getAggInfo().getGroupingExprs().isEmpty();
     // The phase-1 aggregation node is already in the child fragment.
     Preconditions.checkState(phase2AggNode.getChild(0) == 
childFragment.getPlanRoot());
-
-    AggregateInfo phase1AggInfo = ((AggregationNode) phase2AggNode.getChild(0))
-        .getAggInfo();
-    ArrayList<Expr> partitionExprs;
+    // When a query has both grouping and distinct exprs, Impala can 
optionally include
+    // the distinct exprs in the hash exchange of the first aggregation phase 
to spread
+    // the data among more nodes. However, this plan requires another hash 
exchange on
+    // the grouping exprs in the second phase which is not required when 
omitting the
+    // distinct exprs in the first phase. Shuffling by both is better if the 
grouping
+    // exprs have low NDVs.
+    boolean shuffleDistinctExprs = 
ctx_.getQueryOptions().shuffle_distinct_exprs;
+    boolean hasGrouping = phase2AggNode.hasGrouping();
+
+    AggregationNode phase1AggNode = ((AggregationNode) 
phase2AggNode.getChild(0));
     // With grouping, the output partition exprs of the child are the (input) 
grouping
     // exprs of the parent. The grouping exprs reference the output tuple of 
phase-1
     // but the partitioning happens on the intermediate tuple of the phase-1.
-    if (shuffleDistinctExprs) {
-      // We need to do
-      // - child fragment:
-      //   * phase-1 aggregation
-      // - first merge fragment, hash-partitioned on grouping and distinct 
exprs:
-      //   * merge agg of phase-1
-      //   * phase-2 agg
-      // - second merge fragment, partitioned on grouping exprs or 
unpartitioned
-      //   without grouping exprs
-      //   * merge agg of phase-2
-      partitionExprs = Expr.substituteList(
-          phase1AggInfo.getGroupingExprs(), 
phase1AggInfo.getIntermediateSmap(),
-          ctx_.getRootAnalyzer(), false);
-    } else {
-      // We need to do
-      // - child fragment:
-      //   * phase-1 aggregation
-      // - merge fragment, hash-partitioned on grouping exprs:
-      //   * merge agg of phase-1
-      //   * phase-2 agg
-      partitionExprs = 
Expr.substituteList(phase2AggNode.getAggInfo().getGroupingExprs(),
-          phase1AggInfo.getOutputToIntermediateSmap(), ctx_.getRootAnalyzer(), 
false);
-    }
+    List<Expr> phase1PartitionExprs =
+        phase1AggNode.getMergePartitionExprs(ctx_.getRootAnalyzer());
+
     PlanFragment firstMergeFragment;
-    boolean childHasCompatPartition = 
ctx_.getRootAnalyzer().setsHaveValueTransfer(
-        partitionExprs, childFragment.getDataPartition().getPartitionExprs(), 
true);
+    boolean childHasCompatPartition = phase1AggNode.isSingleClassAgg()
+        && ctx_.getRootAnalyzer().setsHaveValueTransfer(phase1PartitionExprs,
+               childFragment.getDataPartition().getPartitionExprs(), true);
     if (childHasCompatPartition) {
       // The data is already partitioned on the required expressions, we can 
skip the
       // phase-1 merge step.
       childFragment.addPlanRoot(phase2AggNode);
       firstMergeFragment = childFragment;
     } else {
-      DataPartition mergePartition = 
DataPartition.hashPartitioned(partitionExprs);
-      // Convert the existing node to a preaggregation.
-      AggregationNode preaggNode = (AggregationNode)phase2AggNode.getChild(0);
-      preaggNode.setIsPreagg(ctx_);
+      phase1AggNode.setIntermediateTuple();
+      phase1AggNode.setIsPreagg(ctx_);
+
+      DataPartition phase1MergePartition =
+          DataPartition.hashPartitioned(phase1PartitionExprs);
 
       // place phase-1 merge aggregation step in a new fragment
-      firstMergeFragment = createParentFragment(childFragment, mergePartition);
-      AggregateInfo phase1MergeAggInfo = phase1AggInfo.getMergeAggInfo();
-      AggregationNode phase1MergeAggNode =
-          new AggregationNode(ctx_.getNextNodeId(), preaggNode, 
phase1MergeAggInfo);
+      firstMergeFragment = createParentFragment(childFragment, 
phase1MergePartition);
+      AggregationNode phase1MergeAggNode = new 
AggregationNode(ctx_.getNextNodeId(),
+          phase1AggNode, phase1AggNode.getMultiAggInfo(), 
AggPhase.FIRST_MERGE);
       phase1MergeAggNode.init(ctx_.getRootAnalyzer());
       phase1MergeAggNode.unsetNeedsFinalize();
       phase1MergeAggNode.setIntermediateTuple();
@@ -972,27 +941,30 @@ public class DistributedPlanner {
       // if there is a limit, it had already been placed with the phase-2 
aggregation
       // step (which is where it should be)
       firstMergeFragment.addPlanRoot(phase2AggNode);
-      if (shuffleDistinctExprs) fragments.add(firstMergeFragment);
+      if (shuffleDistinctExprs || !hasGrouping) 
fragments.add(firstMergeFragment);
     }
-    if (!shuffleDistinctExprs) return firstMergeFragment;
+    if (!shuffleDistinctExprs && hasGrouping) return firstMergeFragment;
+
     phase2AggNode.unsetNeedsFinalize();
     phase2AggNode.setIntermediateTuple();
     // Limit should be applied at the final merge aggregation node
     long limit = phase2AggNode.getLimit();
     phase2AggNode.unsetLimit();
 
-    DataPartition mergePartition;
-    if (phase2AggNode.getAggInfo().getGroupingExprs().isEmpty()) {
-      mergePartition = DataPartition.UNPARTITIONED;
+    DataPartition phase2MergePartition;
+    List<Expr> phase2PartitionExprs =
+        phase2AggNode.getMergePartitionExprs(ctx_.getRootAnalyzer());
+    if (phase2PartitionExprs.isEmpty()) {
+      phase2MergePartition = DataPartition.UNPARTITIONED;
     } else {
       phase2AggNode.setIsPreagg(ctx_);
-      mergePartition = DataPartition.hashPartitioned(
-          phase2AggNode.getAggInfo().getMergeAggInfo().getGroupingExprs());
+      phase2MergePartition = 
DataPartition.hashPartitioned(phase2PartitionExprs);
     }
     PlanFragment secondMergeFragment =
-        createParentFragment(firstMergeFragment, mergePartition);
+        createParentFragment(firstMergeFragment, phase2MergePartition);
+
     AggregationNode phase2MergeAggNode = new 
AggregationNode(ctx_.getNextNodeId(),
-        phase2AggNode, phase2AggNode.getAggInfo().getMergeAggInfo());
+        phase2AggNode, phase2AggNode.getMultiAggInfo(), AggPhase.SECOND_MERGE);
     phase2MergeAggNode.init(ctx_.getRootAnalyzer());
     phase2MergeAggNode.setLimit(limit);
     // Transfer having predicates to final merge agg node

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java 
b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index 6b3f032..313597e 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -39,6 +39,8 @@ import org.apache.impala.analysis.ExprId;
 import org.apache.impala.analysis.ExprSubstitutionMap;
 import org.apache.impala.analysis.InlineViewRef;
 import org.apache.impala.analysis.JoinOperator;
+import org.apache.impala.analysis.MultiAggregateInfo;
+import org.apache.impala.analysis.MultiAggregateInfo.AggPhase;
 import org.apache.impala.analysis.NullLiteral;
 import org.apache.impala.analysis.QueryStmt;
 import org.apache.impala.analysis.SelectStmt;
@@ -261,13 +263,17 @@ public class SingleNodePlanner {
         AnalyticInfo analyticInfo = selectStmt.getAnalyticInfo();
         AnalyticPlanner analyticPlanner =
             new AnalyticPlanner(analyticInfo, analyzer, ctx_);
+        MultiAggregateInfo multiAggInfo = selectStmt.getMultiAggInfo();
+        List<Expr> groupingExprs = multiAggInfo != null ?
+            multiAggInfo.getGroupingExprs() :
+            Collections.<Expr>emptyList();
         List<Expr> inputPartitionExprs = Lists.newArrayList();
-        AggregateInfo aggInfo = selectStmt.getAggInfo();
-        root = analyticPlanner.createSingleNodePlan(root,
-            aggInfo != null ? aggInfo.getGroupingExprs() : null, 
inputPartitionExprs);
-        if (aggInfo != null && !inputPartitionExprs.isEmpty()) {
+        root = analyticPlanner.createSingleNodePlan(
+            root, groupingExprs, inputPartitionExprs);
+        if (multiAggInfo != null && !inputPartitionExprs.isEmpty()
+            && multiAggInfo.getMaterializedAggClasses().size() == 1) {
           // analytic computation will benefit from a partition on 
inputPartitionExprs
-          aggInfo.setPartitionExprs(inputPartitionExprs);
+          
multiAggInfo.getMaterializedAggClass(0).setPartitionExprs(inputPartitionExprs);
         }
       }
     } else {
@@ -610,13 +616,20 @@ public class SingleNodePlanner {
     List<SubplanRef> subplanRefs = Lists.newArrayList();
     computeParentAndSubplanRefs(
         selectStmt.getTableRefs(), analyzer.isStraightJoin(), parentRefs, 
subplanRefs);
-    AggregateInfo aggInfo = selectStmt.getAggInfo();
-    PlanNode root = createTableRefsPlan(parentRefs, subplanRefs, aggInfo, 
analyzer);
-    // add aggregation, if any
-    if (aggInfo != null) {
-      if (root instanceof HdfsScanNode) {
-        aggInfo.substitute(((HdfsScanNode) root).getOptimizedAggSmap(), 
analyzer);
-        aggInfo.getMergeAggInfo().substitute(((HdfsScanNode) 
root).getOptimizedAggSmap(), analyzer);
+    MultiAggregateInfo multiAggInfo = selectStmt.getMultiAggInfo();
+    // Only optimize scan/agg plan if there is a single aggregation class.
+    AggregateInfo scanAggInfo = null;
+    if (multiAggInfo != null && 
multiAggInfo.getMaterializedAggClasses().size() == 1) {
+      scanAggInfo = multiAggInfo.getMaterializedAggClass(0);
+    }
+    PlanNode root = createTableRefsPlan(parentRefs, subplanRefs, scanAggInfo, 
analyzer);
+    // Add aggregation, if any.
+    if (multiAggInfo != null) {
+      // Apply substitution for optimized scan/agg plan,
+      if (scanAggInfo != null && root instanceof HdfsScanNode) {
+        scanAggInfo.substitute(((HdfsScanNode) root).getOptimizedAggSmap(), 
analyzer);
+        scanAggInfo.getMergeAggInfo().substitute(
+            ((HdfsScanNode) root).getOptimizedAggSmap(), analyzer);
       }
       root = createAggregationPlan(selectStmt, analyzer, root);
     }
@@ -886,28 +899,56 @@ public class SingleNodePlanner {
    * Returns a new AggregationNode that materializes the aggregation of the 
given stmt.
    * Assigns conjuncts from the Having clause to the returned node.
    */
-  private PlanNode createAggregationPlan(SelectStmt selectStmt, Analyzer 
analyzer,
-      PlanNode root) throws ImpalaException {
-    Preconditions.checkState(selectStmt.getAggInfo() != null);
-    // add aggregation, if required
-    AggregateInfo aggInfo = selectStmt.getAggInfo();
-    root = new AggregationNode(ctx_.getNextNodeId(), root, aggInfo);
-    root.init(analyzer);
-    Preconditions.checkState(root.hasValidStats());
-    // if we're computing DISTINCT agg fns, the analyzer already created the
-    // 2nd phase agginfo
-    if (aggInfo.isDistinctAgg()) {
-      ((AggregationNode)root).unsetNeedsFinalize();
-      // The output of the 1st phase agg is the 1st phase intermediate.
-      ((AggregationNode)root).setIntermediateTuple();
-      root = new AggregationNode(ctx_.getNextNodeId(), root,
-          aggInfo.getSecondPhaseDistinctAggInfo());
-      root.init(analyzer);
-      Preconditions.checkState(root.hasValidStats());
+  private AggregationNode createAggregationPlan(
+      SelectStmt selectStmt, Analyzer analyzer, PlanNode root) throws 
ImpalaException {
+    MultiAggregateInfo multiAggInfo =
+        Preconditions.checkNotNull(selectStmt.getMultiAggInfo());
+    AggregationNode result = createAggregationPlan(root, multiAggInfo, 
analyzer);
+    ExprSubstitutionMap simplifiedAggSmap = 
multiAggInfo.getSimplifiedAggSmap();
+    if (simplifiedAggSmap == null) return result;
+
+    // Fix up aggregations that simplified to a single class after
+    // materializeRequiredSlots().
+
+    // Collect conjuncts and then re-assign them to the top-most aggregation 
node
+    // of the simplified plan.
+    AggregationNode dummyAgg = new AggregationNode(
+        ctx_.getNextNodeId(), result, multiAggInfo, AggPhase.TRANSPOSE);
+    dummyAgg.init(analyzer);
+    List<Expr> conjuncts =
+        Expr.substituteList(dummyAgg.getConjuncts(), simplifiedAggSmap, 
analyzer, true);
+    // Validate conjuncts after substitution.
+    for (Expr c : conjuncts) {
+      Preconditions.checkState(c.isBound(result.getTupleIds().get(0)));
     }
-    // add Having clause
-    root.assignConjuncts(analyzer);
-    return root;
+    result.getConjuncts().addAll(conjuncts);
+
+    // Apply simplification substitution in ancestors.
+    result.setOutputSmap(
+        ExprSubstitutionMap.compose(result.getOutputSmap(), simplifiedAggSmap, 
analyzer));
+    return result;
+  }
+
+  private AggregationNode createAggregationPlan(PlanNode root,
+      MultiAggregateInfo multiAggInfo, Analyzer analyzer) throws 
InternalException {
+    Preconditions.checkNotNull(multiAggInfo);
+    AggregationNode firstPhaseAgg =
+        new AggregationNode(ctx_.getNextNodeId(), root, multiAggInfo, 
AggPhase.FIRST);
+    firstPhaseAgg.init(analyzer);
+    if (!multiAggInfo.hasSecondPhase()) return firstPhaseAgg;
+
+    firstPhaseAgg.unsetNeedsFinalize();
+    firstPhaseAgg.setIntermediateTuple();
+
+    AggregationNode secondPhaseAgg = new AggregationNode(
+        ctx_.getNextNodeId(), firstPhaseAgg, multiAggInfo, AggPhase.SECOND);
+    secondPhaseAgg.init(analyzer);
+    if (!multiAggInfo.hasTransposePhase()) return secondPhaseAgg;
+
+    AggregationNode transposePhaseAgg = new AggregationNode(
+        ctx_.getNextNodeId(), secondPhaseAgg, multiAggInfo, 
AggPhase.TRANSPOSE);
+    transposePhaseAgg.init(analyzer);
+    return transposePhaseAgg;
   }
 
  /**
@@ -1621,7 +1662,7 @@ public class SingleNodePlanner {
       result = createUnionPlan(
           analyzer, unionStmt, unionStmt.getDistinctOperands(), null);
       result = new AggregationNode(
-          ctx_.getNextNodeId(), result, unionStmt.getDistinctAggInfo());
+          ctx_.getNextNodeId(), result, unionStmt.getDistinctAggInfo(), 
AggPhase.FIRST);
       result.init(analyzer);
     }
     // create ALL tree

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java
index a0dec83..905fae6 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java
@@ -18,6 +18,8 @@
 package org.apache.impala.analysis;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -30,7 +32,6 @@ import 
org.apache.impala.analysis.TimestampArithmeticExpr.TimeUnit;
 import org.apache.impala.catalog.AggregateFunction;
 import org.apache.impala.catalog.BuiltinsDb;
 import org.apache.impala.catalog.Catalog;
-import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.Db;
 import org.apache.impala.catalog.Function;
@@ -2204,13 +2205,15 @@ public class AnalyzeExprsTest extends AnalyzerTest {
     TupleDescriptor tblRefDesc = stmt.fromClause_.get(0).getDesc();
     tblRefDesc.materializeSlots();
     tblRefDesc.computeMemLayout();
-    if (stmt.hasAggInfo()) {
-      TupleDescriptor intDesc = stmt.getAggInfo().intermediateTupleDesc_;
+    if (stmt.hasMultiAggInfo()) {
+      Preconditions.checkState(stmt.getMultiAggInfo().getAggClasses().size() 
== 1);
+      AggregateInfo aggInfo = stmt.getMultiAggInfo().getAggClass(0);
+      TupleDescriptor intDesc = aggInfo.intermediateTupleDesc_;
       intDesc.materializeSlots();
       intDesc.computeMemLayout();
-      checkSerializedMTime(stmt.getAggInfo().getAggregateExprs().get(0), 
expectMTime);
+      checkSerializedMTime(aggInfo.getAggregateExprs().get(0), expectMTime);
       checkSerializedMTime(
-          stmt.getAggInfo().getMergeAggInfo().getAggregateExprs().get(0), 
expectMTime);
+          aggInfo.getMergeAggInfo().getAggregateExprs().get(0), expectMTime);
     } else {
       checkSerializedMTime(stmt.getSelectList().getItems().get(0).getExpr(), 
expectMTime);
     }
@@ -2727,7 +2730,7 @@ public class AnalyzeExprsTest extends AnalyzerTest {
   }
 
   @Test
-  public void TestAppxCountDistinctOption() throws AnalysisException, 
CatalogException {
+  public void TestAppxCountDistinctOption() throws AnalysisException {
     TQueryOptions queryOptions = new TQueryOptions();
     queryOptions.setAppx_count_distinct(true);
 
@@ -2736,20 +2739,24 @@ public class AnalyzeExprsTest extends AnalyzerTest {
     // Accumulates count(distinct) for all columns of both alltypesTbl and 
decimalTbl.
     List<String> allCountDistinctFns = Lists.newArrayList();
 
-    Table alltypesTbl = catalog_.getTable("functional", "alltypes");
+    Table alltypesTbl = catalog_.getOrLoadTable("functional", "alltypes");
     for (Column col: alltypesTbl.getColumns()) {
       String colName = col.getName();
       // Test a single count(distinct) with some other aggs.
-      AnalyzesOk(String.format(
-          "select count(distinct %s), sum(distinct smallint_col), " +
+      String countDistinctFn = String.format("count(distinct %s)", colName);
+      SelectStmt stmt = (SelectStmt) AnalyzesOk(String.format(
+          "select %s, sum(distinct smallint_col), " +
           "avg(float_col), min(%s) " +
           "from functional.alltypes",
-          colName, colName), createAnalysisCtx(queryOptions));
-      countDistinctFns.add(String.format("count(distinct %s)", colName));
+          countDistinctFn, colName), createAnalysisCtx(queryOptions));
+      validateSingleColAppxCountDistinctRewrite(stmt, colName);
+      countDistinctFns.add(countDistinctFn);
     }
     // Test a single query with a count(distinct) on all columns of 
alltypesTbl.
-    AnalyzesOk(String.format("select %s from functional.alltypes",
+    SelectStmt alltypesStmt = (SelectStmt) AnalyzesOk(String.format(
+        "select %s from functional.alltypes",
         Joiner.on(",").join(countDistinctFns)), 
createAnalysisCtx(queryOptions));
+    assertAllNdvAggExprs(alltypesStmt, alltypesTbl.getColumns().size());
 
     allCountDistinctFns.addAll(countDistinctFns);
     countDistinctFns.clear();
@@ -2757,38 +2764,85 @@ public class AnalyzeExprsTest extends AnalyzerTest {
     for (Column col: decimalTbl.getColumns()) {
       String colName = col.getName();
       // Test a single count(distinct) with some other aggs.
-      AnalyzesOk(String.format(
+      SelectStmt stmt = (SelectStmt) AnalyzesOk(String.format(
           "select count(distinct %s), sum(distinct d1), " +
           "avg(d2), min(%s) " +
           "from functional.decimal_tbl",
           colName, colName), createAnalysisCtx(queryOptions));
       countDistinctFns.add(String.format("count(distinct %s)", colName));
+      validateSingleColAppxCountDistinctRewrite(stmt, colName);
     }
     // Test a single query with a count(distinct) on all columns of decimalTbl.
-    AnalyzesOk(String.format("select %s from functional.decimal_tbl",
+    SelectStmt decimalTblStmt = (SelectStmt) AnalyzesOk(String.format(
+        "select %s from functional.decimal_tbl",
         Joiner.on(",").join(countDistinctFns)), 
createAnalysisCtx(queryOptions));
+    assertAllNdvAggExprs(decimalTblStmt, decimalTbl.getColumns().size());
 
     allCountDistinctFns.addAll(countDistinctFns);
 
     // Test a single query with a count(distinct) on all columns of both
     // alltypes/decimalTbl.
-    AnalyzesOk(String.format(
+    SelectStmt comboStmt = (SelectStmt) AnalyzesOk(String.format(
         "select %s from functional.alltypes cross join functional.decimal_tbl",
-        Joiner.on(",").join(countDistinctFns)), 
createAnalysisCtx(queryOptions));
+        Joiner.on(",").join(allCountDistinctFns)), 
createAnalysisCtx(queryOptions));
+    assertAllNdvAggExprs(comboStmt, alltypesTbl.getColumns().size() +
+        decimalTbl.getColumns().size());
 
     // The rewrite does not work for multiple count() arguments.
-    AnalysisError("select count(distinct int_col, bigint_col), " +
-        "count(distinct string_col, float_col) from functional.alltypes",
-        createAnalysisCtx(queryOptions),
-        "all DISTINCT aggregate functions need to have the same set of 
parameters as " +
-        "count(DISTINCT int_col, bigint_col); deviating function: " +
-        "count(DISTINCT string_col, float_col)");
+    SelectStmt noRewriteStmt = (SelectStmt) AnalyzesOk(
+        "select count(distinct int_col, bigint_col), " +
+        "count(distinct string_col, float_col) from functional.alltypes");
+    assertNoNdvAggExprs(noRewriteStmt, 2);
+
     // The rewrite only applies to the count() function.
-    AnalysisError(
+    noRewriteStmt = (SelectStmt) AnalyzesOk(
         "select avg(distinct int_col), sum(distinct float_col) from 
functional.alltypes",
-        createAnalysisCtx(queryOptions),
-        "all DISTINCT aggregate functions need to have the same set of 
parameters as " +
-        "avg(DISTINCT int_col); deviating function: sum(DISTINCT");
+        createAnalysisCtx(queryOptions));
+    assertNoNdvAggExprs(noRewriteStmt, 2);
+  }
+
+  // Checks that 'stmt' has two aggregate exprs - DISTINCT 'sum' and 
non-DISTINCT 'ndv'.
+  private void validateSingleColAppxCountDistinctRewrite(
+      SelectStmt stmt, String rewrittenColName) {
+    MultiAggregateInfo multiAggInfo = stmt.getMultiAggInfo();
+    List<FunctionCallExpr> aggExprs = multiAggInfo.getAggExprs();
+    assertEquals(4, aggExprs.size());
+    int numDistinctExprs = 0;
+    int numNdvExprs = 0;
+    for (FunctionCallExpr aggExpr : aggExprs) {
+      if (aggExpr.isDistinct()) {
+        assertEquals("sum", aggExpr.getFnName().toString());
+        ++numDistinctExprs;
+      }
+      if (aggExpr.getFnName().toString().equals("ndv")) {
+        assertEquals(rewrittenColName, aggExpr.getChild(0).toSql());
+        ++numNdvExprs;
+      }
+    }
+    assertEquals(1, numDistinctExprs);
+    assertEquals(1, numNdvExprs);
+  }
+
+  // Checks that all aggregate exprs in 'stmt' are non-DISTINCT 'ndv'.
+  private void assertAllNdvAggExprs(SelectStmt stmt, int expectedNumAggExprs) {
+    MultiAggregateInfo multiAggInfo = stmt.getMultiAggInfo();
+    List<FunctionCallExpr> aggExprs = multiAggInfo.getAggExprs();
+    assertEquals(expectedNumAggExprs, aggExprs.size());
+    for (FunctionCallExpr aggExpr : aggExprs) {
+      assertFalse(aggExpr.isDistinct());
+      assertEquals("ndv", aggExpr.getFnName().toString());
+    }
+  }
+
+  // Checks that all aggregate exprs in 'stmt' are DISTINCT and not 'ndv'.
+  private void assertNoNdvAggExprs(SelectStmt stmt, int expectedNumAggExprs) {
+    MultiAggregateInfo multiAggInfo = stmt.getMultiAggInfo();
+    List<FunctionCallExpr> aggExprs = multiAggInfo.getAggExprs();
+    assertEquals(expectedNumAggExprs, aggExprs.size());
+    for (FunctionCallExpr aggExpr : aggExprs) {
+      assertTrue(aggExpr.isDistinct());
+      assertNotEquals("ndv", aggExpr.getFnName().toString());
+    }
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index fe7e352..c508409 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -24,8 +24,8 @@ import static org.junit.Assert.fail;
 
 import java.lang.reflect.Field;
 import java.util.List;
+import java.util.Set;
 
-import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.PrimitiveType;
 import org.apache.impala.catalog.ScalarType;
@@ -41,6 +41,7 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 public class AnalyzeStmtsTest extends AnalyzerTest {
 
@@ -2114,9 +2115,6 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
     AnalysisError("select distinct id from functional.testtbl having max(id) > 
0",
         "cannot combine SELECT DISTINCT with aggregate functions or GROUP BY");
     AnalyzesOk("select count(distinct id, zip) from functional.testtbl");
-    AnalysisError("select count(distinct id, zip), count(distinct zip) " +
-        "from functional.testtbl",
-        "all DISTINCT aggregate functions need to have the same set of 
parameters");
     AnalyzesOk("select tinyint_col, count(distinct int_col, bigint_col) "
         + "from functional.alltypesagg group by 1");
     AnalyzesOk("select tinyint_col, count(distinct int_col),"
@@ -2127,14 +2125,48 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
     AnalyzesOk("select sum(distinct t1.bigint_col), avg(distinct 
t1.bigint_col) " +
         "from functional.alltypes t1 group by t1.int_col, t1.int_col");
 
-    AnalysisError("select tinyint_col, count(distinct int_col),"
-        + "sum(distinct bigint_col) from functional.alltypesagg group by 1",
-        "all DISTINCT aggregate functions need to have the same set of 
parameters");
     // min and max are ignored in terms of DISTINCT
     AnalyzesOk("select tinyint_col, count(distinct int_col),"
         + "min(distinct smallint_col), max(distinct string_col) "
         + "from functional.alltypesagg group by 1");
 
+    // Test multiple distinct aggregations.
+    Table alltypesTbl = catalog_.getOrLoadTable("functional", "alltypes");
+    List<String> distinctFns = Lists.newArrayList();
+    for (Column col : alltypesTbl.getColumns()) {
+      distinctFns.add(String.format("count(distinct %s)", col.getName()));
+    }
+    // Test a single query with a count(distinct) on all columns of 
alltypesTbl.
+    AnalyzesOk(String.format(
+        "select %s from functional.alltypes", 
Joiner.on(",").join(distinctFns)));
+
+    // Test various mixes of distinct and non-distinct, including multiple 
distinct.
+    Set<String> testAggExprs = Sets.newHashSet(
+        "count(tinyint_col)",
+        "count(string_col)",
+        "avg(float_col)",
+        "max(string_col)",
+        "count(distinct tinyint_col)",
+        "count(distinct tinyint_col, smallint_col, int_col)",
+        "avg(distinct double_col)",
+        "count(distinct string_col)",
+        "sum(distinct smallint_col)"
+        );
+    List<String> testGroupByExprs = Lists.newArrayList(
+        "int_col", "bigint_col", "string_col",
+        "string_col, date_string_col",
+        "id, double_col, date_string_col"
+        );
+    for (Set<String> aggExprs : Sets.powerSet(testAggExprs)) {
+      if (aggExprs.isEmpty()) continue;
+      String selectList = Joiner.on(",").join(aggExprs);
+      AnalyzesOk(String.format("select %s from functional.alltypes", 
selectList));
+      for (String groupByExprs : testGroupByExprs) {
+        AnalyzesOk(String.format(
+            "select %s from functional.alltypes group by %s", selectList, 
groupByExprs));
+      }
+    }
+
     // IMPALA-6114: Test that numeric literals having the same value, but 
different types are
     // considered distinct.
     AnalyzesOk("select distinct cast(0 as decimal(14)), 0 from 
functional.alltypes");
@@ -2155,7 +2187,8 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
         String stmtSql = String.format("select %s from %s", aggFnCall, 
tblName);
         SelectStmt stmt = (SelectStmt) AnalyzesOk(stmtSql);
         // Verify that the resolved function signature matches as expected.
-        Type[] args = 
stmt.getAggInfo().getAggregateExprs().get(0).getFn().getArgs();
+        AggregateInfo aggInfo = stmt.getMultiAggInfo().getAggClass(0);
+        Type[] args = aggInfo.getAggregateExprs().get(0).getFn().getArgs();
         assertEquals(args.length, 2);
         assertTrue(col.getType().matchesType(args[0]) ||
             col.getType().isStringType() && args[0].equals(Type.STRING));
@@ -2256,6 +2289,23 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
     AnalyzesOk("select tinyint_col, count(distinct int_col),"
         + "sum(distinct int_col) from " +
         "(select * from functional.alltypesagg) x group by 1");
+    AnalyzesOk("select * from " +
+        "(select count(distinct id, zip), count(distinct zip) " +
+        "from functional.testtbl) x");
+    AnalyzesOk("select * from " + "(select tinyint_col, count(distinct 
int_col)," +
+        "sum(distinct bigint_col) from functional.alltypesagg group by 1) x");
+    AnalyzesOk("select count(distinct id, zip) " +
+        "from (select * from functional.testtbl) x");
+    AnalyzesOk("select tinyint_col, count(distinct int_col, bigint_col) " +
+        "from (select * from functional.alltypesagg) x group by 1");
+    AnalyzesOk("select tinyint_col, count(distinct int_col)," +
+        "sum(distinct int_col) from " +
+        "(select * from functional.alltypesagg) x group by 1");
+    AnalyzesOk("select count(distinct id, zip), count(distinct zip) " +
+        " from (select * from functional.testtbl) x");
+    AnalyzesOk("select tinyint_col, count(distinct int_col)," +
+        "sum(distinct bigint_col) from " +
+        "(select * from functional.alltypesagg) x group by 1");
 
     // Error case when distinct is inside an inline view
     AnalysisError("select * from " +
@@ -2267,13 +2317,6 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
     AnalysisError("select * from " +
         "(select distinct id, zip, count(*) from functional.testtbl group by 
1, 2) x",
         "cannot combine SELECT DISTINCT with aggregate functions or GROUP BY");
-    AnalysisError("select * from " +
-        "(select count(distinct id, zip), count(distinct zip) " +
-        "from functional.testtbl) x",
-        "all DISTINCT aggregate functions need to have the same set of 
parameters");
-    AnalysisError("select * from " + "(select tinyint_col, count(distinct 
int_col),"
-        + "sum(distinct bigint_col) from functional.alltypesagg group by 1) x",
-        "all DISTINCT aggregate functions need to have the same set of 
parameters");
 
     // Error case when inline view is in the from clause
     AnalysisError("select distinct count(*) from (select * from 
functional.testtbl) x",
@@ -2284,20 +2327,6 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
     AnalysisError("select distinct id, zip, count(*) from " +
         "(select * from functional.testtbl) x group by 1, 2",
         "cannot combine SELECT DISTINCT with aggregate functions or GROUP BY");
-    AnalyzesOk("select count(distinct id, zip) " +
-        "from (select * from functional.testtbl) x");
-    AnalysisError("select count(distinct id, zip), count(distinct zip) " +
-        " from (select * from functional.testtbl) x",
-        "all DISTINCT aggregate functions need to have the same set of 
parameters");
-    AnalyzesOk("select tinyint_col, count(distinct int_col, bigint_col) "
-        + "from (select * from functional.alltypesagg) x group by 1");
-    AnalyzesOk("select tinyint_col, count(distinct int_col),"
-        + "sum(distinct int_col) from " +
-        "(select * from functional.alltypesagg) x group by 1");
-    AnalysisError("select tinyint_col, count(distinct int_col),"
-        + "sum(distinct bigint_col) from " +
-        "(select * from functional.alltypesagg) x group by 1",
-        "all DISTINCT aggregate functions need to have the same set of 
parameters");
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
index 7fc8768..643f85e 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
@@ -19,7 +19,6 @@ package org.apache.impala.analysis;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.impala.catalog.Function;
@@ -27,7 +26,6 @@ import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FrontendTestBase;
-import org.apache.impala.thrift.TExpr;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -58,47 +56,6 @@ public class AnalyzerTest extends FrontendTestBase {
   }
 
   /**
-   * Check whether SelectStmt components can be converted to thrift.
-   */
-  protected void checkSelectToThrift(SelectStmt node) {
-    // convert select list exprs and where clause to thrift
-    List<Expr> selectListExprs = node.getResultExprs();
-    List<TExpr> thriftExprs = Expr.treesToThrift(selectListExprs);
-    LOG.info("select list:\n");
-    for (TExpr expr: thriftExprs) {
-      LOG.info(expr.toString() + "\n");
-    }
-    for (Expr expr: selectListExprs) {
-      checkBinaryExprs(expr);
-    }
-    if (node.getWhereClause() != null) {
-      TExpr thriftWhere = node.getWhereClause().treeToThrift();
-      LOG.info("WHERE pred: " + thriftWhere.toString() + "\n");
-      checkBinaryExprs(node.getWhereClause());
-    }
-    AggregateInfo aggInfo = node.getAggInfo();
-    if (aggInfo != null) {
-      if (aggInfo.getGroupingExprs() != null) {
-        LOG.info("grouping exprs:\n");
-        for (Expr expr: aggInfo.getGroupingExprs()) {
-          LOG.info(expr.treeToThrift().toString() + "\n");
-          checkBinaryExprs(expr);
-        }
-      }
-      LOG.info("aggregate exprs:\n");
-      for (Expr expr: aggInfo.getAggregateExprs()) {
-        LOG.info(expr.treeToThrift().toString() + "\n");
-        checkBinaryExprs(expr);
-      }
-      if (node.getHavingPred() != null) {
-        TExpr thriftHaving = node.getHavingPred().treeToThrift();
-        LOG.info("HAVING pred: " + thriftHaving.toString() + "\n");
-        checkBinaryExprs(node.getHavingPred());
-      }
-    }
-  }
-
-  /**
    * Generates and analyzes two variants of the given query by replacing all 
occurrences
    * of "$TBL" in the query string with the unqualified and fully-qualified 
version of
    * the given table name. The unqualified variant is analyzed using an 
analyzer that has
@@ -133,23 +90,6 @@ public class AnalyzerTest extends FrontendTestBase {
     AnalysisError(fqQuery, expectedError);
   }
 
-  /**
-   * Makes sure that operands to binary exprs having same type.
-   */
-  private void checkBinaryExprs(Expr expr) {
-    if (expr instanceof BinaryPredicate
-        || (expr instanceof ArithmeticExpr
-        && ((ArithmeticExpr) expr).getOp() != ArithmeticExpr.Operator.BITNOT)) 
{
-      Assert.assertEquals(expr.getChildren().size(), 2);
-      // The types must be equal or one of them is NULL_TYPE.
-      Assert.assertTrue(expr.getChild(0).getType() == 
expr.getChild(1).getType()
-          || expr.getChild(0).getType().isNull() || 
expr.getChild(1).getType().isNull());
-    }
-    for (Expr child: expr.getChildren()) {
-      checkBinaryExprs(child);
-    }
-  }
-
   @Test
   public void TestCompressedText() throws AnalysisException {
     AnalyzesOk("SELECT count(*) FROM functional_text_lzo.tinyinttable");

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java 
b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 87cd518..0cc2e48 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -75,6 +75,31 @@ public class PlannerTest extends PlannerTestBase {
   }
 
   @Test
+  public void testMultipleDistinct() {
+    // TODO: Multiple distinct with count(distinct a,b,c) variants.
+    // TODO: Multiple distinct in subplan.
+    // TODO: Multiple distinct in subqueries.
+    // TODO: Multiple distinct lineage tests.
+    // TODO: Multiple distinct and SHUFFLE_DISTINCT_EXPRS tests
+    runPlannerTestFile("multiple-distinct");
+  }
+
+  @Test
+  public void testMultipleDistinctMaterialization() {
+    runPlannerTestFile("multiple-distinct-materialization");
+  }
+
+  @Test
+  public void testMultipleDistinctPredicates() {
+    runPlannerTestFile("multiple-distinct-predicates");
+  }
+
+  @Test
+  public void testMultipleDistinctLimit() {
+    runPlannerTestFile("multiple-distinct-limit");
+  }
+
+  @Test
   public void testShuffleByDistinctExprs() {
     runPlannerTestFile("shuffle-by-distinct-exprs");
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test
index 1e6b3ba..6e66e30 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test
@@ -579,3 +579,48 @@ PLAN-ROOT SINK
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ====
+# Query block with a single distinct and multiple non-distinct aggs simplifies 
to a
+# non-grouping aggregation plan.
+select a, c from
+  (select min(string_col) a, count(distinct smallint_col) b,
+          max(string_col) c
+   from functional.alltypes
+   having min(string_col) < '9' and min(string_col) < max(string_col)) v
+---- PLAN
+PLAN-ROOT SINK
+|
+02:AGGREGATE [FINALIZE]
+|  output: min:merge(string_col), max:merge(string_col)
+|  having: min(string_col) < '9', min(string_col) < max(string_col)
+|
+01:AGGREGATE
+|  output: min(string_col), max(string_col)
+|  group by: smallint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+06:AGGREGATE [FINALIZE]
+|  output: min:merge(string_col), max:merge(string_col)
+|  having: min(string_col) < '9', min(string_col) < max(string_col)
+|
+05:EXCHANGE [UNPARTITIONED]
+|
+02:AGGREGATE
+|  output: min:merge(string_col), max:merge(string_col)
+|
+04:AGGREGATE
+|  output: min:merge(string_col), max:merge(string_col)
+|  group by: smallint_col
+|
+03:EXCHANGE [HASH(smallint_col)]
+|
+01:AGGREGATE [STREAMING]
+|  output: min(string_col), max(string_col)
+|  group by: smallint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/testdata/workloads/functional-planner/queries/PlannerTest/multiple-distinct-limit.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/multiple-distinct-limit.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/multiple-distinct-limit.test
new file mode 100644
index 0000000..d3c5553
--- /dev/null
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/multiple-distinct-limit.test
@@ -0,0 +1,181 @@
+# Test correct placement of limit.
+select count(distinct tinyint_col) a, min(timestamp_col) b,
+       count(distinct smallint_col) c, max(timestamp_col) d
+from functional.alltypes
+limit 10
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 2, count(tinyint_col)), aggif(valid_tid() = 4, 
count(smallint_col)), aggif(valid_tid() = 5, min(timestamp_col)), 
aggif(valid_tid() = 5, max(timestamp_col))
+|  limit: 10
+|
+02:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count(tinyint_col)
+|  Class 1
+|    output: count(smallint_col)
+|  Class 2
+|    output: min:merge(timestamp_col), max:merge(timestamp_col)
+|
+01:AGGREGATE
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: smallint_col
+|  Class 2
+|    output: min(timestamp_col), max(timestamp_col)
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 2, count(tinyint_col)), aggif(valid_tid() = 4, 
count(smallint_col)), aggif(valid_tid() = 5, min(timestamp_col)), 
aggif(valid_tid() = 5, max(timestamp_col))
+|  limit: 10
+|
+07:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count:merge(tinyint_col)
+|  Class 1
+|    output: count:merge(smallint_col)
+|  Class 2
+|    output: min:merge(timestamp_col), max:merge(timestamp_col)
+|
+06:EXCHANGE [UNPARTITIONED]
+|
+02:AGGREGATE
+|  Class 0
+|    output: count(tinyint_col)
+|  Class 1
+|    output: count(smallint_col)
+|  Class 2
+|    output: min:merge(timestamp_col), max:merge(timestamp_col)
+|
+05:AGGREGATE
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: smallint_col
+|  Class 2
+|    output: min:merge(timestamp_col), max:merge(timestamp_col)
+|
+04:EXCHANGE [HASH(CASE valid_tid() WHEN 1 THEN murmur_hash(tinyint_col) WHEN 3 
THEN murmur_hash(smallint_col) WHEN 5 THEN 0 END)]
+|
+01:AGGREGATE [STREAMING]
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: smallint_col
+|  Class 2
+|    output: min(timestamp_col), max(timestamp_col)
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Test correct placement of limit. Simplifies to a single class with one 
distinct agg.
+select b from (
+  select count(distinct tinyint_col) a, min(timestamp_col) b,
+         count(distinct smallint_col) c, max(timestamp_col) d
+  from functional.alltypes limit 20) v
+limit 10
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: min(timestamp_col)
+|  limit: 10
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:AGGREGATE [FINALIZE]
+|  output: min:merge(timestamp_col)
+|  limit: 10
+|
+03:EXCHANGE [UNPARTITIONED]
+|
+01:AGGREGATE
+|  output: min(timestamp_col)
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Test correct placement of limit. Simplifies to a single class with a 
non-distinct agg.
+select d from (
+  select count(distinct tinyint_col) a, min(timestamp_col) b,
+         count(distinct smallint_col) c, max(timestamp_col) d
+  from functional.alltypes limit 20) v
+limit 10
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: max(timestamp_col)
+|  limit: 10
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:AGGREGATE [FINALIZE]
+|  output: max:merge(timestamp_col)
+|  limit: 10
+|
+03:EXCHANGE [UNPARTITIONED]
+|
+01:AGGREGATE
+|  output: max(timestamp_col)
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Test correct placement of limit. Simplifies to a single class with distinct
+# and non-distinct aggss.
+select d, c, d from (
+  select count(distinct tinyint_col) a, min(timestamp_col) b,
+         count(distinct smallint_col) c, max(timestamp_col) d
+  from functional.alltypes limit 20) v
+limit 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:AGGREGATE [FINALIZE]
+|  output: count(smallint_col), max:merge(timestamp_col)
+|  limit: 10
+|
+01:AGGREGATE
+|  output: max(timestamp_col)
+|  group by: smallint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:AGGREGATE [FINALIZE]
+|  output: count:merge(smallint_col), max:merge(timestamp_col)
+|  limit: 10
+|
+06:EXCHANGE [UNPARTITIONED]
+|
+02:AGGREGATE
+|  output: count(smallint_col), max:merge(timestamp_col)
+|
+05:AGGREGATE
+|  output: max:merge(timestamp_col)
+|  group by: smallint_col
+|
+04:EXCHANGE [HASH(smallint_col)]
+|
+01:AGGREGATE [STREAMING]
+|  output: max(timestamp_col)
+|  group by: smallint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====

Reply via email to