Repository: incubator-impala Updated Branches: refs/heads/master 158fd330b -> c7db60aa4
IMPALA-4794: Grouping distinct agg plan robust to data skew This patch changes the query plan for grouping distinct aggregations to be more robust to data skew in the grouping expressions. The existing plan partitions data between phase-1 and phase-2 by the grouping exprs. Under this strategy the data skewness on the grouping exprs directly impacts performance. The new plan partitions data by both the grouping exprs and distinct agg exprs, then adds one more aggregation and exchange node. The new plan is more robust to data skew but does more work than the old plan. Testing: Modified existing planner tests which already provide sufficient coverage. The pattern is that the distinct agg exprs are added to the first exchange node, followed by an additional merge agg and exchange node. Change-Id: I7bdada0e328b555900c7b7ff8aabc8eb15ae8fa9 Reviewed-on: http://gerrit.cloudera.org:8080/7643 Reviewed-by: Alex Behm <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b660bd65 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b660bd65 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b660bd65 Branch: refs/heads/master Commit: b660bd652f69913ac558d5e6ab5c4bdee3d97601 Parents: 158fd33 Author: Tianyi Wang <[email protected]> Authored: Wed Aug 16 11:05:07 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Wed Aug 16 23:20:22 2017 +0000 ---------------------------------------------------------------------- .../impala/planner/DistributedPlanner.java | 135 +++++++++---------- .../queries/PlannerTest/aggregation.test | 98 +++++++++----- .../queries/PlannerTest/distinct.test | 60 ++++++--- .../queries/PlannerTest/insert.test | 10 +- .../queries/PlannerTest/kudu.test | 14 +- .../queries/PlannerTest/tpch-all.test | 24 +++- .../queries/PlannerTest/tpch-nested.test | 12 +- 7 files changed, 219 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b660bd65/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java index 2266625..6571036 100644 --- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java @@ -855,98 +855,87 @@ public class DistributedPlanner { /** * Returns a fragment that materialises the final result of a distinct aggregation - * where 'childFragment' is a partitioned fragment with the first phase aggregation - * as its root and 'node' is the second phase of the distinct aggregation. + * where 'childFragment' is a partitioned fragment with the phase-1 aggregation + * as its root. */ - private PlanFragment createPhase2DistinctAggregationFragment(AggregationNode node, - PlanFragment childFragment, ArrayList<PlanFragment> fragments) - throws ImpalaException { - ArrayList<Expr> groupingExprs = node.getAggInfo().getGroupingExprs(); - boolean hasGrouping = !groupingExprs.isEmpty(); - - // The first-phase aggregation node is already in the child fragment. - Preconditions.checkState(node.getChild(0) == childFragment.getPlanRoot()); - - AggregateInfo firstPhaseAggInfo = ((AggregationNode) node.getChild(0)).getAggInfo(); - List<Expr> partitionExprs = null; - if (hasGrouping) { - // We need to do - // - child fragment: - // * phase-1 aggregation - // - merge fragment, hash-partitioned on grouping exprs: - // * merge agg of phase 1 - // * phase 2 agg - // The output partition exprs of the child are the (input) grouping exprs of the - // parent. The grouping exprs reference the output tuple of the 1st phase, but the - // partitioning happens on the intermediate tuple of the 1st phase. - partitionExprs = Expr.substituteList( - groupingExprs, firstPhaseAggInfo.getOutputToIntermediateSmap(), - ctx_.getRootAnalyzer(), false); - } else { - // We need to do - // - child fragment: - // * phase-1 aggregation - // - merge fragment 1, hash-partitioned on distinct exprs: - // * merge agg of phase 1 - // * phase 2 agg - // - merge fragment 2, unpartitioned: - // * merge agg of phase 2 - partitionExprs = Expr.substituteList(firstPhaseAggInfo.getGroupingExprs(), - firstPhaseAggInfo.getIntermediateSmap(), ctx_.getRootAnalyzer(), false); - } - - PlanFragment mergeFragment = null; + private PlanFragment createPhase2DistinctAggregationFragment( + AggregationNode phase2AggNode, PlanFragment childFragment, + ArrayList<PlanFragment> fragments) throws ImpalaException { + // The phase-1 aggregation node is already in the child fragment. + Preconditions.checkState(phase2AggNode.getChild(0) == childFragment.getPlanRoot()); + + AggregateInfo phase1AggInfo = ((AggregationNode) phase2AggNode.getChild(0)) + .getAggInfo(); + // We need to do + // - child fragment: + // * phase-1 aggregation + // - first merge fragment, hash-partitioned on grouping and distinct exprs: + // * merge agg of phase-1 + // * phase-2 agg + // - second merge fragment, partitioned on grouping exprs or unpartitioned + // without grouping exprs + // * merge agg of phase-2 + // With grouping, the output partition exprs of the child are the (input) grouping + // exprs of the parent. The grouping exprs reference the output tuple of phase-1 + // but the partitioning happens on the intermediate tuple of the phase-1. + ArrayList<Expr> partitionExprs = Expr.substituteList( + phase1AggInfo.getGroupingExprs(), phase1AggInfo.getIntermediateSmap(), + ctx_.getRootAnalyzer(), false); + + PlanFragment firstMergeFragment; boolean childHasCompatPartition = ctx_.getRootAnalyzer().equivSets(partitionExprs, childFragment.getDataPartition().getPartitionExprs()); if (childHasCompatPartition) { // The data is already partitioned on the required expressions, we can skip the - // phase 1 merge step. - childFragment.addPlanRoot(node); - mergeFragment = childFragment; + // phase-1 merge step. + childFragment.addPlanRoot(phase2AggNode); + firstMergeFragment = childFragment; } else { DataPartition mergePartition = DataPartition.hashPartitioned(partitionExprs); // Convert the existing node to a preaggregation. - AggregationNode preaggNode = (AggregationNode)node.getChild(0); + AggregationNode preaggNode = (AggregationNode)phase2AggNode.getChild(0); preaggNode.setIsPreagg(ctx_); - // place a merge aggregation step for the 1st phase in a new fragment - mergeFragment = createParentFragment(childFragment, mergePartition); - AggregateInfo phase1MergeAggInfo = firstPhaseAggInfo.getMergeAggInfo(); + // place phase-1 merge aggregation step in a new fragment + firstMergeFragment = createParentFragment(childFragment, mergePartition); + AggregateInfo phase1MergeAggInfo = phase1AggInfo.getMergeAggInfo(); AggregationNode phase1MergeAggNode = new AggregationNode(ctx_.getNextNodeId(), preaggNode, phase1MergeAggInfo); phase1MergeAggNode.init(ctx_.getRootAnalyzer()); phase1MergeAggNode.unsetNeedsFinalize(); phase1MergeAggNode.setIntermediateTuple(); - mergeFragment.addPlanRoot(phase1MergeAggNode); + firstMergeFragment.addPlanRoot(phase1MergeAggNode); - // the 2nd-phase aggregation consumes the output of the merge agg; - // if there is a limit, it had already been placed with the 2nd aggregation + // the phase-2 aggregation consumes the output of the phase-1 merge agg; + // if there is a limit, it had already been placed with the phase-2 aggregation // step (which is where it should be) - mergeFragment.addPlanRoot(node); + firstMergeFragment.addPlanRoot(phase2AggNode); + fragments.add(firstMergeFragment); } - - if (!hasGrouping) { - // place the merge aggregation of the 2nd phase in an unpartitioned fragment; - // add preceding merge fragment at end - if (mergeFragment != childFragment) fragments.add(mergeFragment); - - node.unsetNeedsFinalize(); - node.setIntermediateTuple(); - // Any limit should be placed in the final merge aggregation node - long limit = node.getLimit(); - node.unsetLimit(); - mergeFragment = createParentFragment(mergeFragment, DataPartition.UNPARTITIONED); - AggregateInfo phase2MergeAggInfo = node.getAggInfo().getMergeAggInfo(); - AggregationNode phase2MergeAggNode = new AggregationNode(ctx_.getNextNodeId(), node, - phase2MergeAggInfo); - phase2MergeAggNode.init(ctx_.getRootAnalyzer()); - // Transfer having predicates. If hasGrouping == true, the predicates should - // instead be evaluated by the 2nd phase agg (the predicates are already there). - node.transferConjuncts(phase2MergeAggNode); - phase2MergeAggNode.setLimit(limit); - mergeFragment.addPlanRoot(phase2MergeAggNode); + phase2AggNode.unsetNeedsFinalize(); + phase2AggNode.setIntermediateTuple(); + // Limit should be applied at the final merge aggregation node + long limit = phase2AggNode.getLimit(); + phase2AggNode.unsetLimit(); + + DataPartition mergePartition; + if (phase2AggNode.getAggInfo().getGroupingExprs().isEmpty()) { + mergePartition = DataPartition.UNPARTITIONED; + } else { + phase2AggNode.setIsPreagg(ctx_); + mergePartition = DataPartition.hashPartitioned( + phase2AggNode.getAggInfo().getMergeAggInfo().getGroupingExprs()); } - return mergeFragment; + PlanFragment secondMergeFragment = + createParentFragment(firstMergeFragment, mergePartition); + AggregationNode phase2MergeAggNode = new AggregationNode(ctx_.getNextNodeId(), + phase2AggNode, phase2AggNode.getAggInfo().getMergeAggInfo()); + phase2MergeAggNode.init(ctx_.getRootAnalyzer()); + phase2MergeAggNode.setLimit(limit); + // Transfer having predicates to final merge agg node + phase2AggNode.transferConjuncts(phase2MergeAggNode); + secondMergeFragment.addPlanRoot(phase2MergeAggNode); + return secondMergeFragment; } /** http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b660bd65/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test index b5c3970..15db74d 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test @@ -345,18 +345,24 @@ PLAN-ROOT SINK ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -07:EXCHANGE [UNPARTITIONED] +09:EXCHANGE [UNPARTITIONED] | limit: 10 | -04:AGGREGATE [FINALIZE] -| output: count(int_col) +08:AGGREGATE [FINALIZE] +| output: count:merge(int_col) | group by: t.bigint_col | limit: 10 | +07:EXCHANGE [HASH(t.bigint_col)] +| +04:AGGREGATE [STREAMING] +| output: count(int_col) +| group by: t.bigint_col +| 06:AGGREGATE | group by: t.bigint_col, int_col | -05:EXCHANGE [HASH(t.bigint_col)] +05:EXCHANGE [HASH(t.bigint_col,int_col)] | 03:AGGREGATE [STREAMING] | group by: bigint_col, int_col @@ -458,19 +464,25 @@ PLAN-ROOT SINK ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -07:EXCHANGE [UNPARTITIONED] +09:EXCHANGE [UNPARTITIONED] | limit: 10 | -04:AGGREGATE [FINALIZE] -| output: count(int_col), count:merge(smallint_col) +08:AGGREGATE [FINALIZE] +| output: count:merge(int_col), count:merge(smallint_col) | group by: t.bigint_col | limit: 10 | +07:EXCHANGE [HASH(t.bigint_col)] +| +04:AGGREGATE [STREAMING] +| output: count(int_col), count:merge(smallint_col) +| group by: t.bigint_col +| 06:AGGREGATE | output: count:merge(smallint_col) | group by: t.bigint_col, int_col | -05:EXCHANGE [HASH(t.bigint_col)] +05:EXCHANGE [HASH(t.bigint_col,int_col)] | 03:AGGREGATE [STREAMING] | output: count(smallint_col) @@ -519,19 +531,25 @@ PLAN-ROOT SINK ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -10:EXCHANGE [UNPARTITIONED] +12:EXCHANGE [UNPARTITIONED] | limit: 10 | -05:AGGREGATE [FINALIZE] -| output: count(int_col), count:merge(smallint_col) +11:AGGREGATE [FINALIZE] +| output: count:merge(int_col), count:merge(smallint_col) | group by: t.bigint_col | limit: 10 | +10:EXCHANGE [HASH(t.bigint_col)] +| +05:AGGREGATE [STREAMING] +| output: count(int_col), count:merge(smallint_col) +| group by: t.bigint_col +| 09:AGGREGATE | output: count:merge(smallint_col) | group by: t.bigint_col, int_col | -08:EXCHANGE [HASH(t.bigint_col)] +08:EXCHANGE [HASH(t.bigint_col,int_col)] | 04:AGGREGATE [STREAMING] | output: count(smallint_col) @@ -569,7 +587,7 @@ PLAN-ROOT SINK | group by: l_partkey | 00:SCAN HDFS [tpch_parquet.lineitem] - partitions=1/1 files=3 size=193.74MB + partitions=1/1 files=3 size=193.92MB ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | @@ -592,7 +610,7 @@ PLAN-ROOT SINK | group by: l_partkey | 00:SCAN HDFS [tpch_parquet.lineitem] - partitions=1/1 files=3 size=193.74MB + partitions=1/1 files=3 size=193.92MB ==== # test that aggregations are not placed below an unpartitioned exchange with a limit select count(*) from (select * from functional.alltypes limit 10) t @@ -883,16 +901,22 @@ PLAN-ROOT SINK ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -05:EXCHANGE [UNPARTITIONED] +07:EXCHANGE [UNPARTITIONED] | -02:AGGREGATE [FINALIZE] +06:AGGREGATE [FINALIZE] +| output: count:merge(CAST(timestamp_col AS STRING)), group_concat:merge(CAST(timestamp_col AS STRING)) +| group by: year +| +05:EXCHANGE [HASH(year)] +| +02:AGGREGATE [STREAMING] | output: count(CAST(timestamp_col AS STRING)), group_concat(CAST(timestamp_col AS STRING)) | group by: year | 04:AGGREGATE | group by: year, CAST(timestamp_col AS STRING) | -03:EXCHANGE [HASH(year)] +03:EXCHANGE [HASH(year,CAST(timestamp_col AS STRING))] | 01:AGGREGATE [STREAMING] | group by: year, CAST(timestamp_col AS STRING) @@ -998,9 +1022,15 @@ PLAN-ROOT SINK ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -05:EXCHANGE [UNPARTITIONED] +07:EXCHANGE [UNPARTITIONED] | -02:AGGREGATE [FINALIZE] +06:AGGREGATE [FINALIZE] +| output: count:merge(date_string_col), group_concat:merge(date_string_col, '-'), count:merge(*) +| group by: month, year +| +05:EXCHANGE [HASH(month,year)] +| +02:AGGREGATE [STREAMING] | output: count(date_string_col), group_concat(date_string_col, '-'), count:merge(*) | group by: month, year | @@ -1008,7 +1038,7 @@ PLAN-ROOT SINK | output: count:merge(*) | group by: month, year, date_string_col | -03:EXCHANGE [HASH(month,year)] +03:EXCHANGE [HASH(month,year,date_string_col)] | 01:AGGREGATE [STREAMING] | output: count(*) @@ -1093,12 +1123,12 @@ PLAN-ROOT SINK |--05:EXCHANGE [HASH(o_custkey)] | | | 01:SCAN HDFS [tpch_parquet.orders] -| partitions=1/1 files=2 size=54.00MB +| partitions=1/1 files=2 size=54.20MB | 04:EXCHANGE [HASH(c_custkey)] | 00:SCAN HDFS [tpch_parquet.customer] - partitions=1/1 files=1 size=12.27MB + partitions=1/1 files=1 size=12.34MB predicates: c_nationkey = 16 runtime filters: RF000 -> c_custkey ==== @@ -1133,12 +1163,12 @@ PLAN-ROOT SINK |--06:EXCHANGE [HASH(c_custkey)] | | | 01:SCAN HDFS [tpch_parquet.customer] -| partitions=1/1 files=1 size=12.27MB +| partitions=1/1 files=1 size=12.34MB | 05:EXCHANGE [HASH(o_custkey)] | 00:SCAN HDFS [tpch_parquet.orders] - partitions=1/1 files=2 size=54.00MB + partitions=1/1 files=2 size=54.20MB runtime filters: RF000 -> o_custkey ==== # Distinct grouping aggregation where input is partitioned on distinct and grouping exprs. @@ -1150,9 +1180,15 @@ group by 1 ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -07:EXCHANGE [UNPARTITIONED] +09:EXCHANGE [UNPARTITIONED] | -04:AGGREGATE [FINALIZE] +08:AGGREGATE [FINALIZE] +| output: count:merge(c_custkey) +| group by: c_custkey +| +07:EXCHANGE [HASH(c_custkey)] +| +04:AGGREGATE [STREAMING] | output: count(c_custkey) | group by: c_custkey | @@ -1166,12 +1202,12 @@ PLAN-ROOT SINK |--06:EXCHANGE [HASH(c_custkey)] | | | 01:SCAN HDFS [tpch_parquet.customer] -| partitions=1/1 files=1 size=12.27MB +| partitions=1/1 files=1 size=12.34MB | 05:EXCHANGE [HASH(o_custkey)] | 00:SCAN HDFS [tpch_parquet.orders] - partitions=1/1 files=2 size=54.00MB + partitions=1/1 files=2 size=54.20MB runtime filters: RF000 -> o_custkey ==== # Complex aggregation when two joins and an agg end up in same fragment. @@ -1205,7 +1241,7 @@ PLAN-ROOT SINK |--08:EXCHANGE [BROADCAST] | | | 02:SCAN HDFS [tpch_parquet.customer] -| partitions=1/1 files=1 size=12.27MB +| partitions=1/1 files=1 size=12.34MB | 03:HASH JOIN [INNER JOIN, PARTITIONED] | hash predicates: l_orderkey = o_orderkey, l_returnflag = o_clerk @@ -1214,13 +1250,13 @@ PLAN-ROOT SINK |--07:EXCHANGE [HASH(o_orderkey,o_clerk)] | | | 01:SCAN HDFS [tpch_parquet.orders] -| partitions=1/1 files=2 size=54.00MB +| partitions=1/1 files=2 size=54.20MB | runtime filters: RF000 -> o_custkey, RF001 -> o_comment | 06:EXCHANGE [HASH(l_orderkey,l_returnflag)] | 00:SCAN HDFS [tpch_parquet.lineitem] - partitions=1/1 files=3 size=193.61MB + partitions=1/1 files=3 size=193.92MB runtime filters: RF002 -> l_orderkey, RF003 -> l_returnflag ==== # IMPALA-4263: Grouping agg needs a merge step because the grouping exprs reference a http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b660bd65/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test b/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test index 0a41fb9..1e6b3ba 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test @@ -107,16 +107,22 @@ PLAN-ROOT SINK ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -05:EXCHANGE [UNPARTITIONED] +07:EXCHANGE [UNPARTITIONED] | -02:AGGREGATE [FINALIZE] +06:AGGREGATE [FINALIZE] +| output: count:merge(if(int_col IS NULL, NULL, bigint_col)) +| group by: tinyint_col +| +05:EXCHANGE [HASH(tinyint_col)] +| +02:AGGREGATE [STREAMING] | output: count(if(int_col IS NULL, NULL, bigint_col)) | group by: tinyint_col | 04:AGGREGATE | group by: tinyint_col, int_col, bigint_col | -03:EXCHANGE [HASH(tinyint_col)] +03:EXCHANGE [HASH(tinyint_col,int_col,bigint_col)] | 01:AGGREGATE [STREAMING] | group by: tinyint_col, int_col, bigint_col @@ -143,16 +149,22 @@ PLAN-ROOT SINK ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -05:EXCHANGE [UNPARTITIONED] +07:EXCHANGE [UNPARTITIONED] | -02:AGGREGATE [FINALIZE] +06:AGGREGATE [FINALIZE] +| output: count:merge(int_col), sum:merge(int_col) +| group by: tinyint_col +| +05:EXCHANGE [HASH(tinyint_col)] +| +02:AGGREGATE [STREAMING] | output: count(int_col), sum(int_col) | group by: tinyint_col | 04:AGGREGATE | group by: tinyint_col, int_col | -03:EXCHANGE [HASH(tinyint_col)] +03:EXCHANGE [HASH(tinyint_col,int_col)] | 01:AGGREGATE [STREAMING] | group by: tinyint_col, int_col @@ -217,9 +229,15 @@ PLAN-ROOT SINK ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -05:EXCHANGE [UNPARTITIONED] +07:EXCHANGE [UNPARTITIONED] | -02:AGGREGATE [FINALIZE] +06:AGGREGATE [FINALIZE] +| output: count:merge(int_col), min:merge(smallint_col), max:merge(string_col) +| group by: tinyint_col +| +05:EXCHANGE [HASH(tinyint_col)] +| +02:AGGREGATE [STREAMING] | output: count(int_col), min:merge(smallint_col), max:merge(string_col) | group by: tinyint_col | @@ -227,7 +245,7 @@ PLAN-ROOT SINK | output: min:merge(smallint_col), max:merge(string_col) | group by: tinyint_col, int_col | -03:EXCHANGE [HASH(tinyint_col)] +03:EXCHANGE [HASH(tinyint_col,int_col)] | 01:AGGREGATE [STREAMING] | output: min(smallint_col), max(string_col) @@ -256,9 +274,15 @@ PLAN-ROOT SINK ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -05:EXCHANGE [UNPARTITIONED] +07:EXCHANGE [UNPARTITIONED] | -02:AGGREGATE [FINALIZE] +06:AGGREGATE [FINALIZE] +| output: count:merge(int_col), sum:merge(int_col), count:merge(*), sum:merge(int_col), min:merge(smallint_col), max:merge(bigint_col) +| group by: tinyint_col +| +05:EXCHANGE [HASH(tinyint_col)] +| +02:AGGREGATE [STREAMING] | output: count(int_col), sum(int_col), count:merge(*), sum:merge(int_col), min:merge(smallint_col), max:merge(bigint_col) | group by: tinyint_col | @@ -266,7 +290,7 @@ PLAN-ROOT SINK | output: count:merge(*), sum:merge(int_col), min:merge(smallint_col), max:merge(bigint_col) | group by: tinyint_col, int_col | -03:EXCHANGE [HASH(tinyint_col)] +03:EXCHANGE [HASH(tinyint_col,int_col)] | 01:AGGREGATE [STREAMING] | output: count(*), sum(int_col), min(smallint_col), max(bigint_col) @@ -508,16 +532,22 @@ select * from (select count(distinct int_col) cd from functional.alltypes group ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -05:EXCHANGE [UNPARTITIONED] +07:EXCHANGE [UNPARTITIONED] | -02:AGGREGATE [FINALIZE] +06:AGGREGATE [FINALIZE] +| output: count:merge(int_col) +| group by: bool_col +| +05:EXCHANGE [HASH(bool_col)] +| +02:AGGREGATE [STREAMING] | output: count(int_col) | group by: bool_col | 04:AGGREGATE | group by: bool_col, int_col | -03:EXCHANGE [HASH(bool_col)] +03:EXCHANGE [HASH(bool_col,int_col)] | 01:AGGREGATE [STREAMING] | group by: bool_col, int_col http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b660bd65/testdata/workloads/functional-planner/queries/PlannerTest/insert.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test index bdbc92f..afecff9 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test @@ -402,14 +402,20 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(2010,10)] WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(2010,10)] | partitions=1 | -02:AGGREGATE [FINALIZE] +06:AGGREGATE [FINALIZE] +| output: count:merge(int_col) +| group by: string_col +| +05:EXCHANGE [HASH(string_col)] +| +02:AGGREGATE [STREAMING] | output: count(int_col) | group by: string_col | 04:AGGREGATE | group by: string_col, int_col | -03:EXCHANGE [HASH(string_col)] +03:EXCHANGE [HASH(string_col,int_col)] | 01:AGGREGATE [STREAMING] | group by: string_col, int_col http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b660bd65/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test index 436aa51..079d291 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test @@ -90,19 +90,25 @@ INSERT INTO KUDU [functional_kudu.testtbl] ---- DISTRIBUTEDPLAN INSERT INTO KUDU [functional_kudu.testtbl] | -06:PARTIAL SORT +08:PARTIAL SORT | order by: KuduPartition(count(id)) ASC NULLS LAST, count(id) ASC NULLS LAST | -05:EXCHANGE [KUDU(KuduPartition(count(id)))] +07:EXCHANGE [KUDU(KuduPartition(count(id)))] | -02:AGGREGATE [FINALIZE] +06:AGGREGATE [FINALIZE] +| output: count:merge(id) +| group by: name +| +05:EXCHANGE [HASH(name)] +| +02:AGGREGATE [STREAMING] | output: count(id) | group by: name | 04:AGGREGATE | group by: name, id | -03:EXCHANGE [HASH(name)] +03:EXCHANGE [HASH(name,id)] | 01:AGGREGATE [STREAMING] | group by: name, id http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b660bd65/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test index 4713ff3..2c6db60 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test @@ -2917,20 +2917,26 @@ PLAN-ROOT SINK ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -12:MERGING-EXCHANGE [UNPARTITIONED] +14:MERGING-EXCHANGE [UNPARTITIONED] | order by: count(ps_suppkey) DESC, p_brand ASC, p_type ASC, p_size ASC | 07:SORT | order by: count(ps_suppkey) DESC, p_brand ASC, p_type ASC, p_size ASC | -06:AGGREGATE [FINALIZE] +13:AGGREGATE [FINALIZE] +| output: count:merge(ps_suppkey) +| group by: p_brand, p_type, p_size +| +12:EXCHANGE [HASH(p_brand,p_type,p_size)] +| +06:AGGREGATE [STREAMING] | output: count(ps_suppkey) | group by: p_brand, p_type, p_size | 11:AGGREGATE | group by: p_brand, p_type, p_size, ps_suppkey | -10:EXCHANGE [HASH(p_brand,p_type,p_size)] +10:EXCHANGE [HASH(p_brand,p_type,p_size,ps_suppkey)] | 05:AGGREGATE [STREAMING] | group by: p_brand, p_type, p_size, ps_suppkey @@ -2960,20 +2966,26 @@ PLAN-ROOT SINK ---- PARALLELPLANS PLAN-ROOT SINK | -12:MERGING-EXCHANGE [UNPARTITIONED] +14:MERGING-EXCHANGE [UNPARTITIONED] | order by: count(ps_suppkey) DESC, p_brand ASC, p_type ASC, p_size ASC | 07:SORT | order by: count(ps_suppkey) DESC, p_brand ASC, p_type ASC, p_size ASC | -06:AGGREGATE [FINALIZE] +13:AGGREGATE [FINALIZE] +| output: count:merge(ps_suppkey) +| group by: p_brand, p_type, p_size +| +12:EXCHANGE [HASH(p_brand,p_type,p_size)] +| +06:AGGREGATE [STREAMING] | output: count(ps_suppkey) | group by: p_brand, p_type, p_size | 11:AGGREGATE | group by: p_brand, p_type, p_size, ps_suppkey | -10:EXCHANGE [HASH(p_brand,p_type,p_size)] +10:EXCHANGE [HASH(p_brand,p_type,p_size,ps_suppkey)] | 05:AGGREGATE [STREAMING] | group by: p_brand, p_type, p_size, ps_suppkey http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b660bd65/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test index dd1818a..aad75e0 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test @@ -1854,20 +1854,26 @@ PLAN-ROOT SINK ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -13:MERGING-EXCHANGE [UNPARTITIONED] +15:MERGING-EXCHANGE [UNPARTITIONED] | order by: count(s_suppkey) DESC, p_brand ASC, p_type ASC, p_size ASC | 09:SORT | order by: count(s_suppkey) DESC, p_brand ASC, p_type ASC, p_size ASC | -08:AGGREGATE [FINALIZE] +14:AGGREGATE [FINALIZE] +| output: count:merge(s_suppkey) +| group by: p_brand, p_type, p_size +| +13:EXCHANGE [HASH(p_brand,p_type,p_size)] +| +08:AGGREGATE [STREAMING] | output: count(s_suppkey) | group by: p_brand, p_type, p_size | 12:AGGREGATE | group by: p_brand, p_type, p_size, s_suppkey | -11:EXCHANGE [HASH(p_brand,p_type,p_size)] +11:EXCHANGE [HASH(p_brand,p_type,p_size,s_suppkey)] | 07:AGGREGATE [STREAMING] | group by: p_brand, p_type, p_size, s_suppkey
