Repository: incubator-impala
Updated Branches:
  refs/heads/master f982c3f76 -> 1f80396b2


IMPALA-4263: Fix wrong ommission of agg/analytic hash exchanges.

The bug: Our detection of partition compatibility for
grouping aggregations and analytic functions did not take into
account the effect of outer joins within the same fragment.
As a result, we used to incorrectly omit a required hash exchange.
For example, a hash exchange + merge phase is required if the
grouping expressions of an aggregation reference tuples
that are made nullable within the same fragment. The exchange is
needed to bring together NULLs produced by outer-join non-matches.

The fix: Check that the grouping/partition exprs do not reference
tuples that are made nullable within the same fragment.

Testing: Planner tests pass locally.

Change-Id: I121222179378e56836422a69451d840a012c9e54
Reviewed-on: http://gerrit.cloudera.org:8080/5774
Reviewed-by: Alex Behm <[email protected]>
Tested-by: Alex Behm <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/cd153d66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/cd153d66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/cd153d66

Branch: refs/heads/master
Commit: cd153d66dc20ebea7a9de7a2ef1c627fee45253c
Parents: f982c3f
Author: Alex Behm <[email protected]>
Authored: Wed Jan 4 22:19:38 2017 -0800
Committer: Alex Behm <[email protected]>
Committed: Mon Feb 13 23:00:01 2017 +0000

----------------------------------------------------------------------
 .../impala/planner/DistributedPlanner.java      |  38 ++++---
 .../org/apache/impala/planner/PlanFragment.java |  35 ++++--
 .../queries/PlannerTest/aggregation.test        | 106 +++++++++++++++++++
 .../queries/PlannerTest/analytic-fns.test       |  35 ++++++
 4 files changed, 190 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cd153d66/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index dd03ac8..71ddd5a 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -19,9 +19,7 @@ package org.apache.impala.planner;
 
 import java.util.ArrayList;
 import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.Set;
 
 import org.apache.impala.analysis.AggregateInfo;
 import org.apache.impala.analysis.AnalysisContext;
@@ -30,12 +28,17 @@ import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.InsertStmt;
 import org.apache.impala.analysis.JoinOperator;
 import org.apache.impala.analysis.QueryStmt;
+import org.apache.impala.analysis.TupleId;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.planner.JoinNode.DistributionMode;
 import org.apache.impala.planner.RuntimeFilterGenerator.RuntimeFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 
 /**
@@ -785,20 +788,23 @@ public class DistributedPlanner {
 
     DataPartition parentPartition = null;
     if (hasGrouping) {
-      // the parent fragment is partitioned on the grouping exprs;
-      // substitute grouping exprs to reference the *output* of the agg, not 
the input
       List<Expr> partitionExprs = node.getAggInfo().getPartitionExprs();
       if (partitionExprs == null) partitionExprs = groupingExprs;
-      partitionExprs = Expr.substituteList(partitionExprs,
-          node.getAggInfo().getIntermediateSmap(), ctx_.getRootAnalyzer(), 
false);
       boolean childHasCompatPartition = 
ctx_.getRootAnalyzer().equivSets(partitionExprs,
             childFragment.getDataPartition().getPartitionExprs());
-      if (childHasCompatPartition) {
-        // The data is already partitioned on the required expressions, we can 
just do
-        // the aggregation in the child fragment without an extra merge step.
+      if (childHasCompatPartition && 
!childFragment.refsNullableTupleId(partitionExprs)) {
+        // The data is already partitioned on the required expressions. We can 
do the
+        // aggregation in the child fragment without an extra merge step.
+        // An exchange+merge step is required if the grouping exprs reference 
a tuple
+        // that is made nullable in 'childFragment' to bring NULLs from 
outer-join
+        // non-matches together.
         childFragment.addPlanRoot(node);
         return childFragment;
       }
+      // the parent fragment is partitioned on the grouping exprs;
+      // substitute grouping exprs to reference the *output* of the agg, not 
the input
+      partitionExprs = Expr.substituteList(partitionExprs,
+          node.getAggInfo().getIntermediateSmap(), ctx_.getRootAnalyzer(), 
false);
       parentPartition = DataPartition.hashPartitioned(partitionExprs);
     } else {
       // the parent fragment is unpartitioned
@@ -973,12 +979,16 @@ public class DistributedPlanner {
     Preconditions.checkState(sortNode.isAnalyticSort());
     PlanFragment analyticFragment = childFragment;
     if (sortNode.getInputPartition() != null) {
-      // make sure the childFragment's output is partitioned as required by 
the sortNode
       sortNode.getInputPartition().substitute(
           childFragment.getPlanRoot().getOutputSmap(), ctx_.getRootAnalyzer());
-      if 
(!childFragment.getDataPartition().equals(sortNode.getInputPartition())) {
-        analyticFragment =
-            createParentFragment(childFragment, sortNode.getInputPartition());
+      // Make sure the childFragment's output is partitioned as required by 
the sortNode.
+      // Even if the fragment and the sort partition exprs are equal, an 
exchange is
+      // required if the sort partition exprs reference a tuple that is made 
nullable in
+      // 'childFragment' to bring NULLs from outer-join non-matches together.
+      DataPartition sortPartition = sortNode.getInputPartition();
+      if (!childFragment.getDataPartition().equals(sortPartition)
+          || 
childFragment.refsNullableTupleId(sortPartition.getPartitionExprs())) {
+        analyticFragment = createParentFragment(childFragment, sortPartition);
       }
     }
     analyticFragment.addPlanRoot(sortNode);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cd153d66/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java 
b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
index 0e5134f..168e485 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
@@ -18,30 +18,26 @@
 package org.apache.impala.planner;
 
 import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.Set;
 
 import org.apache.impala.analysis.Analyzer;
-import org.apache.impala.analysis.BinaryPredicate;
 import org.apache.impala.analysis.Expr;
-import org.apache.impala.analysis.JoinOperator;
-import org.apache.impala.analysis.SlotRef;
-import org.apache.impala.catalog.HdfsFileFormat;
-import org.apache.impala.catalog.HdfsTable;
+import org.apache.impala.analysis.TupleId;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.NotImplementedException;
 import org.apache.impala.common.TreeNode;
-import org.apache.impala.planner.JoinNode.DistributionMode;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPartitionType;
-import org.apache.impala.thrift.TPlan;
 import org.apache.impala.thrift.TPlanFragment;
 import org.apache.impala.thrift.TPlanFragmentTree;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 /**
  * PlanFragments form a tree structure via their ExchangeNodes. A tree of 
fragments
@@ -385,4 +381,23 @@ public class PlanFragment extends TreeNode<PlanFragment> {
 
     for (PlanFragment child: getChildren()) child.verifyTree();
   }
+
+  /**
+   * Returns true if 'exprs' reference a tuple that is made nullable in this 
fragment,
+   * but not in any of its input fragments.
+   */
+  public boolean refsNullableTupleId(List<Expr> exprs) {
+    Preconditions.checkNotNull(planRoot_);
+    List<TupleId> tids = Lists.newArrayList();
+    for (Expr e: exprs) e.getIds(tids, null);
+    Set<TupleId> nullableTids = 
Sets.newHashSet(planRoot_.getNullableTupleIds());
+    // Remove all tuple ids that were made nullable in an input fragment.
+    List<ExchangeNode> exchNodes = Lists.newArrayList();
+    planRoot_.collect(ExchangeNode.class, exchNodes);
+    for (ExchangeNode exchNode: exchNodes) {
+      nullableTids.removeAll(exchNode.getNullableTupleIds());
+    }
+    for (TupleId tid: tids) if (nullableTids.contains(tid)) return true;
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cd153d66/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
index b4fe75b..dab490e 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
@@ -1167,3 +1167,109 @@ PLAN-ROOT SINK
    partitions=1/1 files=3 size=193.61MB
    runtime filters: RF002 -> l_orderkey, RF003 -> l_returnflag
 ====
+# IMPALA-4263: Grouping agg needs a merge step because the grouping exprs 
reference a
+# tuple that is made nullable in the join fragment.
+select /* +straight_join */ t2.id, count(*)
+from functional.alltypes t1
+left outer join /* +shuffle */ functional.alltypessmall t2
+  on t1.id = t2.id
+group by t2.id
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+08:EXCHANGE [UNPARTITIONED]
+|
+07:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  group by: t2.id
+|
+06:EXCHANGE [HASH(t2.id)]
+|
+03:AGGREGATE [STREAMING]
+|  output: count(*)
+|  group by: t2.id
+|
+02:HASH JOIN [LEFT OUTER JOIN, PARTITIONED]
+|  hash predicates: t1.id = t2.id
+|
+|--05:EXCHANGE [HASH(t2.id)]
+|  |
+|  01:SCAN HDFS [functional.alltypessmall t2]
+|     partitions=4/4 files=4 size=6.32KB
+|
+04:EXCHANGE [HASH(t1.id)]
+|
+00:SCAN HDFS [functional.alltypes t1]
+   partitions=24/24 files=24 size=478.45KB
+====
+# IMPALA-4263: Grouping agg is placed in the join fragment and has no merge 
step.
+select /* +straight_join */ t1.id, count(*)
+from functional.alltypes t1
+left outer join /* +shuffle */ functional.alltypessmall t2
+  on t1.id = t2.id
+group by t1.id
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+06:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  group by: t1.id
+|
+02:HASH JOIN [LEFT OUTER JOIN, PARTITIONED]
+|  hash predicates: t1.id = t2.id
+|
+|--05:EXCHANGE [HASH(t2.id)]
+|  |
+|  01:SCAN HDFS [functional.alltypessmall t2]
+|     partitions=4/4 files=4 size=6.32KB
+|
+04:EXCHANGE [HASH(t1.id)]
+|
+00:SCAN HDFS [functional.alltypes t1]
+   partitions=24/24 files=24 size=478.45KB
+====
+# IMPALA-4263: Grouping agg is placed in the second join fragment and has no 
merge step.
+# The grouping exprs reference a nullable tuple (t2), but that tuple is made 
nullable in
+# the first join fragment, so it's correct to place the the aggregation in the 
second
+# join fragment without a merge step.
+select /* +straight_join */ t2.id, count(*)
+from functional.alltypes t1
+left outer join /* +shuffle */ functional.alltypessmall t2
+  on t1.int_col = t2.int_col
+left outer join /* +shuffle */ functional.alltypestiny t3
+  on t2.id = t3.id
+group by t2.id
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+10:EXCHANGE [UNPARTITIONED]
+|
+05:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  group by: t2.id
+|
+04:HASH JOIN [LEFT OUTER JOIN, PARTITIONED]
+|  hash predicates: t2.id = t3.id
+|
+|--09:EXCHANGE [HASH(t3.id)]
+|  |
+|  02:SCAN HDFS [functional.alltypestiny t3]
+|     partitions=4/4 files=4 size=460B
+|
+08:EXCHANGE [HASH(t2.id)]
+|
+03:HASH JOIN [LEFT OUTER JOIN, PARTITIONED]
+|  hash predicates: t1.int_col = t2.int_col
+|
+|--07:EXCHANGE [HASH(t2.int_col)]
+|  |
+|  01:SCAN HDFS [functional.alltypessmall t2]
+|     partitions=4/4 files=4 size=6.32KB
+|
+06:EXCHANGE [HASH(t1.int_col)]
+|
+00:SCAN HDFS [functional.alltypes t1]
+   partitions=24/24 files=24 size=478.45KB
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cd153d66/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
index 8c25730..de5e4cb 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
@@ -2327,3 +2327,38 @@ PLAN-ROOT SINK
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ====
+# IMPALA-4263: Analytic function needs a hash exchange because the partition 
exprs
+# reference a tuple that is made nullable in the join fragment.
+select /* +straight_join */ count(*) over (partition by t1.id)
+from functional.alltypes t1
+right outer join /* +shuffle */ functional.alltypessmall t2
+  on t1.id = t2.id
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+08:EXCHANGE [UNPARTITIONED]
+|
+04:ANALYTIC
+|  functions: count(*)
+|  partition by: t1.id
+|
+03:SORT
+|  order by: id ASC NULLS FIRST
+|
+07:EXCHANGE [HASH(t1.id)]
+|
+02:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
+|  hash predicates: t1.id = t2.id
+|  runtime filters: RF000 <- t2.id
+|
+|--06:EXCHANGE [HASH(t2.id)]
+|  |
+|  01:SCAN HDFS [functional.alltypessmall t2]
+|     partitions=4/4 files=4 size=6.32KB
+|
+05:EXCHANGE [HASH(t1.id)]
+|
+00:SCAN HDFS [functional.alltypes t1]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> t1.id
+====

Reply via email to