Repository: incubator-impala
Updated Branches:
  refs/heads/master 158fd330b -> c7db60aa4


IMPALA-4794: Grouping distinct agg plan robust to data skew

This patch changes the query plan for grouping distinct aggregations to
be more robust to data skew in the grouping expressions. The existing
plan partitions data between phase-1 and phase-2 by the grouping exprs.
Under this strategy the data skewness on the grouping exprs directly
impacts performance. The new plan partitions data by both the grouping
exprs and distinct agg exprs, then adds one more aggregation and
exchange node. The new plan is more robust to data skew but does more
work than the old plan.

Testing: Modified existing planner tests which already provide
sufficient coverage. The pattern is that the distinct agg exprs are
added to the first exchange node, followed by an additional merge agg
and exchange node.

Change-Id: I7bdada0e328b555900c7b7ff8aabc8eb15ae8fa9
Reviewed-on: http://gerrit.cloudera.org:8080/7643
Reviewed-by: Alex Behm <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b660bd65
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b660bd65
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b660bd65

Branch: refs/heads/master
Commit: b660bd652f69913ac558d5e6ab5c4bdee3d97601
Parents: 158fd33
Author: Tianyi Wang <[email protected]>
Authored: Wed Aug 16 11:05:07 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Wed Aug 16 23:20:22 2017 +0000

----------------------------------------------------------------------
 .../impala/planner/DistributedPlanner.java      | 135 +++++++++----------
 .../queries/PlannerTest/aggregation.test        |  98 +++++++++-----
 .../queries/PlannerTest/distinct.test           |  60 ++++++---
 .../queries/PlannerTest/insert.test             |  10 +-
 .../queries/PlannerTest/kudu.test               |  14 +-
 .../queries/PlannerTest/tpch-all.test           |  24 +++-
 .../queries/PlannerTest/tpch-nested.test        |  12 +-
 7 files changed, 219 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b660bd65/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index 2266625..6571036 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -855,98 +855,87 @@ public class DistributedPlanner {
 
   /**
    * Returns a fragment that materialises the final result of a distinct 
aggregation
-   * where 'childFragment' is a partitioned fragment with the first phase 
aggregation
-   * as its root and 'node' is the second phase of the distinct aggregation.
+   * where 'childFragment' is a partitioned fragment with the phase-1 
aggregation
+   * as its root.
    */
-  private PlanFragment createPhase2DistinctAggregationFragment(AggregationNode 
node,
-      PlanFragment childFragment, ArrayList<PlanFragment> fragments)
-      throws ImpalaException {
-    ArrayList<Expr> groupingExprs = node.getAggInfo().getGroupingExprs();
-    boolean hasGrouping = !groupingExprs.isEmpty();
-
-    // The first-phase aggregation node is already in the child fragment.
-    Preconditions.checkState(node.getChild(0) == childFragment.getPlanRoot());
-
-    AggregateInfo firstPhaseAggInfo = ((AggregationNode) 
node.getChild(0)).getAggInfo();
-    List<Expr> partitionExprs = null;
-    if (hasGrouping) {
-      // We need to do
-      // - child fragment:
-      //   * phase-1 aggregation
-      // - merge fragment, hash-partitioned on grouping exprs:
-      //   * merge agg of phase 1
-      //   * phase 2 agg
-      // The output partition exprs of the child are the (input) grouping 
exprs of the
-      // parent. The grouping exprs reference the output tuple of the 1st 
phase, but the
-      // partitioning happens on the intermediate tuple of the 1st phase.
-      partitionExprs = Expr.substituteList(
-          groupingExprs, firstPhaseAggInfo.getOutputToIntermediateSmap(),
-          ctx_.getRootAnalyzer(), false);
-    } else {
-      // We need to do
-      // - child fragment:
-      //   * phase-1 aggregation
-      // - merge fragment 1, hash-partitioned on distinct exprs:
-      //   * merge agg of phase 1
-      //   * phase 2 agg
-      // - merge fragment 2, unpartitioned:
-      //   * merge agg of phase 2
-      partitionExprs = 
Expr.substituteList(firstPhaseAggInfo.getGroupingExprs(),
-          firstPhaseAggInfo.getIntermediateSmap(), ctx_.getRootAnalyzer(), 
false);
-    }
-
-    PlanFragment mergeFragment = null;
+  private PlanFragment createPhase2DistinctAggregationFragment(
+      AggregationNode phase2AggNode, PlanFragment childFragment,
+      ArrayList<PlanFragment> fragments) throws ImpalaException {
+    // The phase-1 aggregation node is already in the child fragment.
+    Preconditions.checkState(phase2AggNode.getChild(0) == 
childFragment.getPlanRoot());
+
+    AggregateInfo phase1AggInfo = ((AggregationNode) phase2AggNode.getChild(0))
+        .getAggInfo();
+    // We need to do
+    // - child fragment:
+    //   * phase-1 aggregation
+    // - first merge fragment, hash-partitioned on grouping and distinct exprs:
+    //   * merge agg of phase-1
+    //   * phase-2 agg
+    // - second merge fragment, partitioned on grouping exprs or unpartitioned
+    //   without grouping exprs
+    //   * merge agg of phase-2
+    // With grouping, the output partition exprs of the child are the (input) 
grouping
+    // exprs of the parent. The grouping exprs reference the output tuple of 
phase-1
+    // but the partitioning happens on the intermediate tuple of the phase-1.
+    ArrayList<Expr> partitionExprs = Expr.substituteList(
+        phase1AggInfo.getGroupingExprs(), phase1AggInfo.getIntermediateSmap(),
+        ctx_.getRootAnalyzer(), false);
+
+    PlanFragment firstMergeFragment;
     boolean childHasCompatPartition = 
ctx_.getRootAnalyzer().equivSets(partitionExprs,
         childFragment.getDataPartition().getPartitionExprs());
     if (childHasCompatPartition) {
       // The data is already partitioned on the required expressions, we can 
skip the
-      // phase 1 merge step.
-      childFragment.addPlanRoot(node);
-      mergeFragment = childFragment;
+      // phase-1 merge step.
+      childFragment.addPlanRoot(phase2AggNode);
+      firstMergeFragment = childFragment;
     } else {
       DataPartition mergePartition = 
DataPartition.hashPartitioned(partitionExprs);
       // Convert the existing node to a preaggregation.
-      AggregationNode preaggNode = (AggregationNode)node.getChild(0);
+      AggregationNode preaggNode = (AggregationNode)phase2AggNode.getChild(0);
       preaggNode.setIsPreagg(ctx_);
 
-      // place a merge aggregation step for the 1st phase in a new fragment
-      mergeFragment = createParentFragment(childFragment, mergePartition);
-      AggregateInfo phase1MergeAggInfo = firstPhaseAggInfo.getMergeAggInfo();
+      // place phase-1 merge aggregation step in a new fragment
+      firstMergeFragment = createParentFragment(childFragment, mergePartition);
+      AggregateInfo phase1MergeAggInfo = phase1AggInfo.getMergeAggInfo();
       AggregationNode phase1MergeAggNode =
           new AggregationNode(ctx_.getNextNodeId(), preaggNode, 
phase1MergeAggInfo);
       phase1MergeAggNode.init(ctx_.getRootAnalyzer());
       phase1MergeAggNode.unsetNeedsFinalize();
       phase1MergeAggNode.setIntermediateTuple();
-      mergeFragment.addPlanRoot(phase1MergeAggNode);
+      firstMergeFragment.addPlanRoot(phase1MergeAggNode);
 
-      // the 2nd-phase aggregation consumes the output of the merge agg;
-      // if there is a limit, it had already been placed with the 2nd 
aggregation
+      // the phase-2 aggregation consumes the output of the phase-1 merge agg;
+      // if there is a limit, it had already been placed with the phase-2 
aggregation
       // step (which is where it should be)
-      mergeFragment.addPlanRoot(node);
+      firstMergeFragment.addPlanRoot(phase2AggNode);
+      fragments.add(firstMergeFragment);
     }
-
-    if (!hasGrouping) {
-      // place the merge aggregation of the 2nd phase in an unpartitioned 
fragment;
-      // add preceding merge fragment at end
-      if (mergeFragment != childFragment) fragments.add(mergeFragment);
-
-      node.unsetNeedsFinalize();
-      node.setIntermediateTuple();
-      // Any limit should be placed in the final merge aggregation node
-      long limit = node.getLimit();
-      node.unsetLimit();
-      mergeFragment = createParentFragment(mergeFragment, 
DataPartition.UNPARTITIONED);
-      AggregateInfo phase2MergeAggInfo = node.getAggInfo().getMergeAggInfo();
-      AggregationNode phase2MergeAggNode = new 
AggregationNode(ctx_.getNextNodeId(), node,
-          phase2MergeAggInfo);
-      phase2MergeAggNode.init(ctx_.getRootAnalyzer());
-      // Transfer having predicates. If hasGrouping == true, the predicates 
should
-      // instead be evaluated by the 2nd phase agg (the predicates are already 
there).
-      node.transferConjuncts(phase2MergeAggNode);
-      phase2MergeAggNode.setLimit(limit);
-      mergeFragment.addPlanRoot(phase2MergeAggNode);
+    phase2AggNode.unsetNeedsFinalize();
+    phase2AggNode.setIntermediateTuple();
+    // Limit should be applied at the final merge aggregation node
+    long limit = phase2AggNode.getLimit();
+    phase2AggNode.unsetLimit();
+
+    DataPartition mergePartition;
+    if (phase2AggNode.getAggInfo().getGroupingExprs().isEmpty()) {
+      mergePartition = DataPartition.UNPARTITIONED;
+    } else {
+      phase2AggNode.setIsPreagg(ctx_);
+      mergePartition = DataPartition.hashPartitioned(
+          phase2AggNode.getAggInfo().getMergeAggInfo().getGroupingExprs());
     }
-    return mergeFragment;
+    PlanFragment secondMergeFragment =
+        createParentFragment(firstMergeFragment, mergePartition);
+    AggregationNode phase2MergeAggNode = new 
AggregationNode(ctx_.getNextNodeId(),
+        phase2AggNode, phase2AggNode.getAggInfo().getMergeAggInfo());
+    phase2MergeAggNode.init(ctx_.getRootAnalyzer());
+    phase2MergeAggNode.setLimit(limit);
+    // Transfer having predicates to final merge agg node
+    phase2AggNode.transferConjuncts(phase2MergeAggNode);
+    secondMergeFragment.addPlanRoot(phase2MergeAggNode);
+    return secondMergeFragment;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b660bd65/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
index b5c3970..15db74d 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
@@ -345,18 +345,24 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-07:EXCHANGE [UNPARTITIONED]
+09:EXCHANGE [UNPARTITIONED]
 |  limit: 10
 |
-04:AGGREGATE [FINALIZE]
-|  output: count(int_col)
+08:AGGREGATE [FINALIZE]
+|  output: count:merge(int_col)
 |  group by: t.bigint_col
 |  limit: 10
 |
+07:EXCHANGE [HASH(t.bigint_col)]
+|
+04:AGGREGATE [STREAMING]
+|  output: count(int_col)
+|  group by: t.bigint_col
+|
 06:AGGREGATE
 |  group by: t.bigint_col, int_col
 |
-05:EXCHANGE [HASH(t.bigint_col)]
+05:EXCHANGE [HASH(t.bigint_col,int_col)]
 |
 03:AGGREGATE [STREAMING]
 |  group by: bigint_col, int_col
@@ -458,19 +464,25 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-07:EXCHANGE [UNPARTITIONED]
+09:EXCHANGE [UNPARTITIONED]
 |  limit: 10
 |
-04:AGGREGATE [FINALIZE]
-|  output: count(int_col), count:merge(smallint_col)
+08:AGGREGATE [FINALIZE]
+|  output: count:merge(int_col), count:merge(smallint_col)
 |  group by: t.bigint_col
 |  limit: 10
 |
+07:EXCHANGE [HASH(t.bigint_col)]
+|
+04:AGGREGATE [STREAMING]
+|  output: count(int_col), count:merge(smallint_col)
+|  group by: t.bigint_col
+|
 06:AGGREGATE
 |  output: count:merge(smallint_col)
 |  group by: t.bigint_col, int_col
 |
-05:EXCHANGE [HASH(t.bigint_col)]
+05:EXCHANGE [HASH(t.bigint_col,int_col)]
 |
 03:AGGREGATE [STREAMING]
 |  output: count(smallint_col)
@@ -519,19 +531,25 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-10:EXCHANGE [UNPARTITIONED]
+12:EXCHANGE [UNPARTITIONED]
 |  limit: 10
 |
-05:AGGREGATE [FINALIZE]
-|  output: count(int_col), count:merge(smallint_col)
+11:AGGREGATE [FINALIZE]
+|  output: count:merge(int_col), count:merge(smallint_col)
 |  group by: t.bigint_col
 |  limit: 10
 |
+10:EXCHANGE [HASH(t.bigint_col)]
+|
+05:AGGREGATE [STREAMING]
+|  output: count(int_col), count:merge(smallint_col)
+|  group by: t.bigint_col
+|
 09:AGGREGATE
 |  output: count:merge(smallint_col)
 |  group by: t.bigint_col, int_col
 |
-08:EXCHANGE [HASH(t.bigint_col)]
+08:EXCHANGE [HASH(t.bigint_col,int_col)]
 |
 04:AGGREGATE [STREAMING]
 |  output: count(smallint_col)
@@ -569,7 +587,7 @@ PLAN-ROOT SINK
 |  group by: l_partkey
 |
 00:SCAN HDFS [tpch_parquet.lineitem]
-   partitions=1/1 files=3 size=193.74MB
+   partitions=1/1 files=3 size=193.92MB
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -592,7 +610,7 @@ PLAN-ROOT SINK
 |  group by: l_partkey
 |
 00:SCAN HDFS [tpch_parquet.lineitem]
-   partitions=1/1 files=3 size=193.74MB
+   partitions=1/1 files=3 size=193.92MB
 ====
 # test that aggregations are not placed below an unpartitioned exchange with a 
limit
 select count(*) from (select * from functional.alltypes limit 10) t
@@ -883,16 +901,22 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-05:EXCHANGE [UNPARTITIONED]
+07:EXCHANGE [UNPARTITIONED]
 |
-02:AGGREGATE [FINALIZE]
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(CAST(timestamp_col AS STRING)), 
group_concat:merge(CAST(timestamp_col AS STRING))
+|  group by: year
+|
+05:EXCHANGE [HASH(year)]
+|
+02:AGGREGATE [STREAMING]
 |  output: count(CAST(timestamp_col AS STRING)), 
group_concat(CAST(timestamp_col AS STRING))
 |  group by: year
 |
 04:AGGREGATE
 |  group by: year, CAST(timestamp_col AS STRING)
 |
-03:EXCHANGE [HASH(year)]
+03:EXCHANGE [HASH(year,CAST(timestamp_col AS STRING))]
 |
 01:AGGREGATE [STREAMING]
 |  group by: year, CAST(timestamp_col AS STRING)
@@ -998,9 +1022,15 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-05:EXCHANGE [UNPARTITIONED]
+07:EXCHANGE [UNPARTITIONED]
 |
-02:AGGREGATE [FINALIZE]
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(date_string_col), group_concat:merge(date_string_col, 
'-'), count:merge(*)
+|  group by: month, year
+|
+05:EXCHANGE [HASH(month,year)]
+|
+02:AGGREGATE [STREAMING]
 |  output: count(date_string_col), group_concat(date_string_col, '-'), 
count:merge(*)
 |  group by: month, year
 |
@@ -1008,7 +1038,7 @@ PLAN-ROOT SINK
 |  output: count:merge(*)
 |  group by: month, year, date_string_col
 |
-03:EXCHANGE [HASH(month,year)]
+03:EXCHANGE [HASH(month,year,date_string_col)]
 |
 01:AGGREGATE [STREAMING]
 |  output: count(*)
@@ -1093,12 +1123,12 @@ PLAN-ROOT SINK
 |--05:EXCHANGE [HASH(o_custkey)]
 |  |
 |  01:SCAN HDFS [tpch_parquet.orders]
-|     partitions=1/1 files=2 size=54.00MB
+|     partitions=1/1 files=2 size=54.20MB
 |
 04:EXCHANGE [HASH(c_custkey)]
 |
 00:SCAN HDFS [tpch_parquet.customer]
-   partitions=1/1 files=1 size=12.27MB
+   partitions=1/1 files=1 size=12.34MB
    predicates: c_nationkey = 16
    runtime filters: RF000 -> c_custkey
 ====
@@ -1133,12 +1163,12 @@ PLAN-ROOT SINK
 |--06:EXCHANGE [HASH(c_custkey)]
 |  |
 |  01:SCAN HDFS [tpch_parquet.customer]
-|     partitions=1/1 files=1 size=12.27MB
+|     partitions=1/1 files=1 size=12.34MB
 |
 05:EXCHANGE [HASH(o_custkey)]
 |
 00:SCAN HDFS [tpch_parquet.orders]
-   partitions=1/1 files=2 size=54.00MB
+   partitions=1/1 files=2 size=54.20MB
    runtime filters: RF000 -> o_custkey
 ====
 # Distinct grouping aggregation where input is partitioned on distinct and 
grouping exprs.
@@ -1150,9 +1180,15 @@ group by 1
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-07:EXCHANGE [UNPARTITIONED]
+09:EXCHANGE [UNPARTITIONED]
 |
-04:AGGREGATE [FINALIZE]
+08:AGGREGATE [FINALIZE]
+|  output: count:merge(c_custkey)
+|  group by: c_custkey
+|
+07:EXCHANGE [HASH(c_custkey)]
+|
+04:AGGREGATE [STREAMING]
 |  output: count(c_custkey)
 |  group by: c_custkey
 |
@@ -1166,12 +1202,12 @@ PLAN-ROOT SINK
 |--06:EXCHANGE [HASH(c_custkey)]
 |  |
 |  01:SCAN HDFS [tpch_parquet.customer]
-|     partitions=1/1 files=1 size=12.27MB
+|     partitions=1/1 files=1 size=12.34MB
 |
 05:EXCHANGE [HASH(o_custkey)]
 |
 00:SCAN HDFS [tpch_parquet.orders]
-   partitions=1/1 files=2 size=54.00MB
+   partitions=1/1 files=2 size=54.20MB
    runtime filters: RF000 -> o_custkey
 ====
 # Complex aggregation when two joins and an agg end up in same fragment.
@@ -1205,7 +1241,7 @@ PLAN-ROOT SINK
 |--08:EXCHANGE [BROADCAST]
 |  |
 |  02:SCAN HDFS [tpch_parquet.customer]
-|     partitions=1/1 files=1 size=12.27MB
+|     partitions=1/1 files=1 size=12.34MB
 |
 03:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: l_orderkey = o_orderkey, l_returnflag = o_clerk
@@ -1214,13 +1250,13 @@ PLAN-ROOT SINK
 |--07:EXCHANGE [HASH(o_orderkey,o_clerk)]
 |  |
 |  01:SCAN HDFS [tpch_parquet.orders]
-|     partitions=1/1 files=2 size=54.00MB
+|     partitions=1/1 files=2 size=54.20MB
 |     runtime filters: RF000 -> o_custkey, RF001 -> o_comment
 |
 06:EXCHANGE [HASH(l_orderkey,l_returnflag)]
 |
 00:SCAN HDFS [tpch_parquet.lineitem]
-   partitions=1/1 files=3 size=193.61MB
+   partitions=1/1 files=3 size=193.92MB
    runtime filters: RF002 -> l_orderkey, RF003 -> l_returnflag
 ====
 # IMPALA-4263: Grouping agg needs a merge step because the grouping exprs 
reference a

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b660bd65/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test
index 0a41fb9..1e6b3ba 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test
@@ -107,16 +107,22 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-05:EXCHANGE [UNPARTITIONED]
+07:EXCHANGE [UNPARTITIONED]
 |
-02:AGGREGATE [FINALIZE]
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(if(int_col IS NULL, NULL, bigint_col))
+|  group by: tinyint_col
+|
+05:EXCHANGE [HASH(tinyint_col)]
+|
+02:AGGREGATE [STREAMING]
 |  output: count(if(int_col IS NULL, NULL, bigint_col))
 |  group by: tinyint_col
 |
 04:AGGREGATE
 |  group by: tinyint_col, int_col, bigint_col
 |
-03:EXCHANGE [HASH(tinyint_col)]
+03:EXCHANGE [HASH(tinyint_col,int_col,bigint_col)]
 |
 01:AGGREGATE [STREAMING]
 |  group by: tinyint_col, int_col, bigint_col
@@ -143,16 +149,22 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-05:EXCHANGE [UNPARTITIONED]
+07:EXCHANGE [UNPARTITIONED]
 |
-02:AGGREGATE [FINALIZE]
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(int_col), sum:merge(int_col)
+|  group by: tinyint_col
+|
+05:EXCHANGE [HASH(tinyint_col)]
+|
+02:AGGREGATE [STREAMING]
 |  output: count(int_col), sum(int_col)
 |  group by: tinyint_col
 |
 04:AGGREGATE
 |  group by: tinyint_col, int_col
 |
-03:EXCHANGE [HASH(tinyint_col)]
+03:EXCHANGE [HASH(tinyint_col,int_col)]
 |
 01:AGGREGATE [STREAMING]
 |  group by: tinyint_col, int_col
@@ -217,9 +229,15 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-05:EXCHANGE [UNPARTITIONED]
+07:EXCHANGE [UNPARTITIONED]
 |
-02:AGGREGATE [FINALIZE]
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(int_col), min:merge(smallint_col), max:merge(string_col)
+|  group by: tinyint_col
+|
+05:EXCHANGE [HASH(tinyint_col)]
+|
+02:AGGREGATE [STREAMING]
 |  output: count(int_col), min:merge(smallint_col), max:merge(string_col)
 |  group by: tinyint_col
 |
@@ -227,7 +245,7 @@ PLAN-ROOT SINK
 |  output: min:merge(smallint_col), max:merge(string_col)
 |  group by: tinyint_col, int_col
 |
-03:EXCHANGE [HASH(tinyint_col)]
+03:EXCHANGE [HASH(tinyint_col,int_col)]
 |
 01:AGGREGATE [STREAMING]
 |  output: min(smallint_col), max(string_col)
@@ -256,9 +274,15 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-05:EXCHANGE [UNPARTITIONED]
+07:EXCHANGE [UNPARTITIONED]
 |
-02:AGGREGATE [FINALIZE]
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(int_col), sum:merge(int_col), count:merge(*), 
sum:merge(int_col), min:merge(smallint_col), max:merge(bigint_col)
+|  group by: tinyint_col
+|
+05:EXCHANGE [HASH(tinyint_col)]
+|
+02:AGGREGATE [STREAMING]
 |  output: count(int_col), sum(int_col), count:merge(*), sum:merge(int_col), 
min:merge(smallint_col), max:merge(bigint_col)
 |  group by: tinyint_col
 |
@@ -266,7 +290,7 @@ PLAN-ROOT SINK
 |  output: count:merge(*), sum:merge(int_col), min:merge(smallint_col), 
max:merge(bigint_col)
 |  group by: tinyint_col, int_col
 |
-03:EXCHANGE [HASH(tinyint_col)]
+03:EXCHANGE [HASH(tinyint_col,int_col)]
 |
 01:AGGREGATE [STREAMING]
 |  output: count(*), sum(int_col), min(smallint_col), max(bigint_col)
@@ -508,16 +532,22 @@ select * from (select count(distinct int_col) cd from 
functional.alltypes group
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-05:EXCHANGE [UNPARTITIONED]
+07:EXCHANGE [UNPARTITIONED]
 |
-02:AGGREGATE [FINALIZE]
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(int_col)
+|  group by: bool_col
+|
+05:EXCHANGE [HASH(bool_col)]
+|
+02:AGGREGATE [STREAMING]
 |  output: count(int_col)
 |  group by: bool_col
 |
 04:AGGREGATE
 |  group by: bool_col, int_col
 |
-03:EXCHANGE [HASH(bool_col)]
+03:EXCHANGE [HASH(bool_col,int_col)]
 |
 01:AGGREGATE [STREAMING]
 |  group by: bool_col, int_col

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b660bd65/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
index bdbc92f..afecff9 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
@@ -402,14 +402,20 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, 
PARTITION-KEYS=(2010,10)]
 WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(2010,10)]
 |  partitions=1
 |
-02:AGGREGATE [FINALIZE]
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(int_col)
+|  group by: string_col
+|
+05:EXCHANGE [HASH(string_col)]
+|
+02:AGGREGATE [STREAMING]
 |  output: count(int_col)
 |  group by: string_col
 |
 04:AGGREGATE
 |  group by: string_col, int_col
 |
-03:EXCHANGE [HASH(string_col)]
+03:EXCHANGE [HASH(string_col,int_col)]
 |
 01:AGGREGATE [STREAMING]
 |  group by: string_col, int_col

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b660bd65/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
index 436aa51..079d291 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
@@ -90,19 +90,25 @@ INSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [functional_kudu.testtbl]
 |
-06:PARTIAL SORT
+08:PARTIAL SORT
 |  order by: KuduPartition(count(id)) ASC NULLS LAST, count(id) ASC NULLS LAST
 |
-05:EXCHANGE [KUDU(KuduPartition(count(id)))]
+07:EXCHANGE [KUDU(KuduPartition(count(id)))]
 |
-02:AGGREGATE [FINALIZE]
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(id)
+|  group by: name
+|
+05:EXCHANGE [HASH(name)]
+|
+02:AGGREGATE [STREAMING]
 |  output: count(id)
 |  group by: name
 |
 04:AGGREGATE
 |  group by: name, id
 |
-03:EXCHANGE [HASH(name)]
+03:EXCHANGE [HASH(name,id)]
 |
 01:AGGREGATE [STREAMING]
 |  group by: name, id

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b660bd65/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
index 4713ff3..2c6db60 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
@@ -2917,20 +2917,26 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-12:MERGING-EXCHANGE [UNPARTITIONED]
+14:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: count(ps_suppkey) DESC, p_brand ASC, p_type ASC, p_size ASC
 |
 07:SORT
 |  order by: count(ps_suppkey) DESC, p_brand ASC, p_type ASC, p_size ASC
 |
-06:AGGREGATE [FINALIZE]
+13:AGGREGATE [FINALIZE]
+|  output: count:merge(ps_suppkey)
+|  group by: p_brand, p_type, p_size
+|
+12:EXCHANGE [HASH(p_brand,p_type,p_size)]
+|
+06:AGGREGATE [STREAMING]
 |  output: count(ps_suppkey)
 |  group by: p_brand, p_type, p_size
 |
 11:AGGREGATE
 |  group by: p_brand, p_type, p_size, ps_suppkey
 |
-10:EXCHANGE [HASH(p_brand,p_type,p_size)]
+10:EXCHANGE [HASH(p_brand,p_type,p_size,ps_suppkey)]
 |
 05:AGGREGATE [STREAMING]
 |  group by: p_brand, p_type, p_size, ps_suppkey
@@ -2960,20 +2966,26 @@ PLAN-ROOT SINK
 ---- PARALLELPLANS
 PLAN-ROOT SINK
 |
-12:MERGING-EXCHANGE [UNPARTITIONED]
+14:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: count(ps_suppkey) DESC, p_brand ASC, p_type ASC, p_size ASC
 |
 07:SORT
 |  order by: count(ps_suppkey) DESC, p_brand ASC, p_type ASC, p_size ASC
 |
-06:AGGREGATE [FINALIZE]
+13:AGGREGATE [FINALIZE]
+|  output: count:merge(ps_suppkey)
+|  group by: p_brand, p_type, p_size
+|
+12:EXCHANGE [HASH(p_brand,p_type,p_size)]
+|
+06:AGGREGATE [STREAMING]
 |  output: count(ps_suppkey)
 |  group by: p_brand, p_type, p_size
 |
 11:AGGREGATE
 |  group by: p_brand, p_type, p_size, ps_suppkey
 |
-10:EXCHANGE [HASH(p_brand,p_type,p_size)]
+10:EXCHANGE [HASH(p_brand,p_type,p_size,ps_suppkey)]
 |
 05:AGGREGATE [STREAMING]
 |  group by: p_brand, p_type, p_size, ps_suppkey

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b660bd65/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test
index dd1818a..aad75e0 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test
@@ -1854,20 +1854,26 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-13:MERGING-EXCHANGE [UNPARTITIONED]
+15:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: count(s_suppkey) DESC, p_brand ASC, p_type ASC, p_size ASC
 |
 09:SORT
 |  order by: count(s_suppkey) DESC, p_brand ASC, p_type ASC, p_size ASC
 |
-08:AGGREGATE [FINALIZE]
+14:AGGREGATE [FINALIZE]
+|  output: count:merge(s_suppkey)
+|  group by: p_brand, p_type, p_size
+|
+13:EXCHANGE [HASH(p_brand,p_type,p_size)]
+|
+08:AGGREGATE [STREAMING]
 |  output: count(s_suppkey)
 |  group by: p_brand, p_type, p_size
 |
 12:AGGREGATE
 |  group by: p_brand, p_type, p_size, s_suppkey
 |
-11:EXCHANGE [HASH(p_brand,p_type,p_size)]
+11:EXCHANGE [HASH(p_brand,p_type,p_size,s_suppkey)]
 |
 07:AGGREGATE [STREAMING]
 |  group by: p_brand, p_type, p_size, s_suppkey

Reply via email to