HIVE-15905 : Inefficient plan for correlated subqueries (Vineet Garg via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bddf5a7a Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bddf5a7a Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bddf5a7a Branch: refs/heads/master Commit: bddf5a7a974fcfc4f350123f561da5f4ddcf43e0 Parents: b14ef6d Author: Vineet Garg <[email protected]> Authored: Mon Feb 13 20:23:00 2017 -0800 Committer: Ashutosh Chauhan <[email protected]> Committed: Tue Feb 14 17:25:12 2017 -0800 ---------------------------------------------------------------------- .../calcite/rules/HiveRelDecorrelator.java | 718 +-- .../queries/clientpositive/subquery_multi.q | 8 +- .../clientpositive/constprog_partitioner.q.out | 117 +- .../clientpositive/llap/explainuser_1.q.out | 684 +-- .../clientpositive/llap/subquery_exists.q.out | 169 +- .../clientpositive/llap/subquery_in.q.out | 2591 +++-------- .../clientpositive/llap/subquery_multi.q.out | 2788 +++--------- .../clientpositive/llap/subquery_notin.q.out | 4222 +++++------------- .../clientpositive/llap/subquery_scalar.q.out | 2679 +++-------- .../clientpositive/llap/subquery_views.q.out | 556 +-- .../llap/vector_mapjoin_reduce.q.out | 196 +- .../results/clientpositive/perf/query1.q.out | 190 +- .../results/clientpositive/perf/query16.q.out | 222 +- .../results/clientpositive/perf/query30.q.out | 375 +- .../results/clientpositive/perf/query6.q.out | 403 +- .../results/clientpositive/perf/query69.q.out | 615 ++- .../results/clientpositive/perf/query81.q.out | 375 +- .../test/results/clientpositive/semijoin5.q.out | 138 +- .../spark/constprog_partitioner.q.out | 87 +- .../clientpositive/spark/subquery_exists.q.out | 167 +- .../clientpositive/spark/subquery_in.q.out | 2406 +++------- .../spark/vector_mapjoin_reduce.q.out | 216 +- .../clientpositive/subquery_exists.q.out | 221 +- .../clientpositive/subquery_exists_having.q.out | 232 +- .../clientpositive/subquery_in_having.q.out | 641 +-- .../clientpositive/subquery_notexists.q.out | 209 +- .../subquery_notexists_having.q.out | 223 +- .../clientpositive/subquery_notin_having.q.out | 644 +-- .../subquery_unqualcolumnrefs.q.out | 687 +-- .../clientpositive/vector_mapjoin_reduce.q.out | 322 +- 30 files changed, 6219 insertions(+), 16882 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/bddf5a7a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelDecorrelator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelDecorrelator.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelDecorrelator.java index 9c26801..5f37fc1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelDecorrelator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelDecorrelator.java @@ -35,6 +35,7 @@ import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.rel.core.Correlate; import org.apache.calcite.rel.core.CorrelationId; +import org.apache.calcite.rel.core.Filter; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.RelFactories; @@ -115,10 +116,12 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Objects; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; +import javax.annotation.Nonnull; /** * NOTE: this whole logic is replicated from Calcite's RelDecorrelator @@ -211,7 +214,7 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { RelNode newRootRel = decorrelator.removeCorrelationViaRule(rootRel); - if (!decorrelator.cm.mapCorVarToCorRel.isEmpty()) { + if (!decorrelator.cm.mapCorToCorRel.isEmpty()) { newRootRel = decorrelator.decorrelate(newRootRel); } @@ -266,16 +269,16 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { private Function2<RelNode, RelNode, Void> createCopyHook() { return new Function2<RelNode, RelNode, Void>() { public Void apply(RelNode oldNode, RelNode newNode) { - if (cm.mapRefRelToCorVar.containsKey(oldNode)) { - cm.mapRefRelToCorVar.putAll(newNode, - cm.mapRefRelToCorVar.get(oldNode)); + if (cm.mapRefRelToCorRef.containsKey(oldNode)) { + cm.mapRefRelToCorRef.putAll(newNode, + cm.mapRefRelToCorRef.get(oldNode)); } if (oldNode instanceof LogicalCorrelate && newNode instanceof LogicalCorrelate) { LogicalCorrelate oldCor = (LogicalCorrelate) oldNode; CorrelationId c = oldCor.getCorrelationId(); - if (cm.mapCorVarToCorRel.get(c) == oldNode) { - cm.mapCorVarToCorRel.put(c, newNode); + if (cm.mapCorToCorRel.get(c) == oldNode) { + cm.mapCorToCorRel.put(c, newNode); } if (generatedCorRels.contains(oldNode)) { @@ -355,7 +358,7 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { List<RelNode> newInputs = Lists.newArrayList(); for (int i = 0; i < oldInputs.size(); ++i) { final Frame frame = getInvoke(oldInputs.get(i), rel); - if (frame == null || !frame.corVarOutputPos.isEmpty()) { + if (frame == null || !frame.corDefOutputs.isEmpty()) { // if input is not rewritten, or if it produces correlated // variables, terminate rewrite return null; @@ -372,7 +375,7 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // the output position should not change since there are no corVars // coming from below. return register(rel, newRel, identityMap(rel.getRowType().getFieldCount()), - ImmutableSortedMap.<Correlation, Integer>of()); + ImmutableSortedMap.<CorDef, Integer>of()); } /** @@ -388,7 +391,7 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // // Sort itself should not reference cor vars. - assert !cm.mapRefRelToCorVar.containsKey(rel); + assert !cm.mapRefRelToCorRef.containsKey(rel); // Sort only references field positions in collations field. // The collations field in the newRel now need to refer to the @@ -406,7 +409,7 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { Mappings.TargetMapping mapping = Mappings.target( - frame.oldToNewOutputPos, + frame.oldToNewOutputs, oldInput.getRowType().getFieldCount(), newInput.getRowType().getFieldCount()); @@ -416,8 +419,8 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { final RelNode newSort = HiveSortLimit.create(newInput, newCollation, rel.offset, rel.fetch); // Sort does not change input ordering - return register(rel, newSort, frame.oldToNewOutputPos, - frame.corVarOutputPos); + return register(rel, newSort, frame.oldToNewOutputs, + frame.corDefOutputs); } /** * Rewrite Sort. @@ -432,7 +435,7 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // // Sort itself should not reference cor vars. - assert !cm.mapRefRelToCorVar.containsKey(rel); + assert !cm.mapRefRelToCorRef.containsKey(rel); // Sort only references field positions in collations field. // The collations field in the newRel now need to refer to the @@ -450,7 +453,7 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { Mappings.TargetMapping mapping = Mappings.target( - frame.oldToNewOutputPos, + frame.oldToNewOutputs, oldInput.getRowType().getFieldCount(), newInput.getRowType().getFieldCount()); @@ -460,8 +463,8 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { final RelNode newSort = HiveSortLimit.create(newInput, newCollation, rel.offset, rel.fetch); // Sort does not change input ordering - return register(rel, newSort, frame.oldToNewOutputPos, - frame.corVarOutputPos); + return register(rel, newSort, frame.oldToNewOutputs, + frame.corDefOutputs); } /** @@ -493,7 +496,7 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // // Aggregate itself should not reference cor vars. - assert !cm.mapRefRelToCorVar.containsKey(rel); + assert !cm.mapRefRelToCorRef.containsKey(rel); final RelNode oldInput = rel.getInput(); final Frame frame = getInvoke(oldInput, rel); @@ -502,16 +505,10 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { return null; } - //I think this is a bug in Calcite where Aggregate seems to always expect - // correlated variable in nodes underneath it which is not true for queries such as - // select p.empno, li.mgr from (select distinct empno as empno from emp) p join emp li on p.empno= li.empno where li.sal = 1 - // and li.deptno in (select deptno from emp where JOB = 'AIR' AND li.mgr=mgr) - - //assert !frame.corVarOutputPos.isEmpty(); final RelNode newInput = frame.r; // map from newInput - Map<Integer, Integer> mapNewInputToProjOutputPos = Maps.newHashMap(); + Map<Integer, Integer> mapNewInputToProjOutputs = new HashMap<>(); final int oldGroupKeyCount = rel.getGroupSet().cardinality(); // Project projects the original expressions, @@ -533,25 +530,25 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { omittedConstants.put(i, constant); continue; } - int newInputPos = frame.oldToNewOutputPos.get(i); + int newInputPos = frame.oldToNewOutputs.get(i); projects.add(RexInputRef.of2(newInputPos, newInputOutput)); - mapNewInputToProjOutputPos.put(newInputPos, newPos); + mapNewInputToProjOutputs.put(newInputPos, newPos); newPos++; } - final SortedMap<Correlation, Integer> mapCorVarToOutputPos = new TreeMap<>(); - if (!frame.corVarOutputPos.isEmpty()) { + final SortedMap<CorDef, Integer> corDefOutputs = new TreeMap<>(); + if (!frame.corDefOutputs.isEmpty()) { // If input produces correlated variables, move them to the front, // right after any existing GROUP BY fields. // Now add the corVars from the input, starting from // position oldGroupKeyCount. - for (Map.Entry<Correlation, Integer> entry - : frame.corVarOutputPos.entrySet()) { + for (Map.Entry<CorDef, Integer> entry + : frame.corDefOutputs.entrySet()) { projects.add(RexInputRef.of2(entry.getValue(), newInputOutput)); - mapCorVarToOutputPos.put(entry.getKey(), newPos); - mapNewInputToProjOutputPos.put(entry.getValue(), newPos); + corDefOutputs.put(entry.getKey(), newPos); + mapNewInputToProjOutputs.put(entry.getValue(), newPos); newPos++; } } @@ -559,9 +556,9 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // add the remaining fields final int newGroupKeyCount = newPos; for (int i = 0; i < newInputOutput.size(); i++) { - if (!mapNewInputToProjOutputPos.containsKey(i)) { + if (!mapNewInputToProjOutputs.containsKey(i)) { projects.add(RexInputRef.of2(i, newInputOutput)); - mapNewInputToProjOutputPos.put(i, newPos); + mapNewInputToProjOutputs.put(i, newPos); newPos++; } } @@ -587,13 +584,13 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // newInput Map<Integer, Integer> combinedMap = Maps.newHashMap(); - for (Integer oldInputPos : frame.oldToNewOutputPos.keySet()) { + for (Integer oldInputPos : frame.oldToNewOutputs.keySet()) { combinedMap.put(oldInputPos, - mapNewInputToProjOutputPos.get( - frame.oldToNewOutputPos.get(oldInputPos))); + mapNewInputToProjOutputs.get( + frame.oldToNewOutputs.get(oldInputPos))); } - register(oldInput, newProject, combinedMap, mapCorVarToOutputPos); + register(oldInput, newProject, combinedMap, corDefOutputs); // now it's time to rewrite the Aggregate final ImmutableBitSet newGroupSet = ImmutableBitSet.range(newGroupKeyCount); @@ -642,7 +639,7 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { final List<RexNode> postProjects = new ArrayList<>(relBuilder.fields()); for (Map.Entry<Integer, RexLiteral> entry : omittedConstants.descendingMap().entrySet()) { - postProjects.add(entry.getKey() + frame.corVarOutputPos.size(), + postProjects.add(entry.getKey() + frame.corDefOutputs.size(), entry.getValue()); } relBuilder.project(postProjects); @@ -650,7 +647,7 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // Aggregate does not change input ordering so corVars will be // located at the same position as the input newProject. - return register(rel, relBuilder.build(), combinedMap, mapCorVarToOutputPos); + return register(rel, relBuilder.build(), combinedMap, corDefOutputs); } public Frame getInvoke(RelNode r, RelNode parent) { @@ -689,7 +686,7 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // // Aggregate itself should not reference cor vars. - assert !cm.mapRefRelToCorVar.containsKey(rel); + assert !cm.mapRefRelToCorRef.containsKey(rel); final RelNode oldInput = rel.getInput(); final Frame frame = getInvoke(oldInput, rel); @@ -701,7 +698,7 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { final RelNode newInput = frame.r; // map from newInput - Map<Integer, Integer> mapNewInputToProjOutputPos = Maps.newHashMap(); + Map<Integer, Integer> mapNewInputToProjOutputs = new HashMap<>(); final int oldGroupKeyCount = rel.getGroupSet().cardinality(); // Project projects the original expressions, @@ -723,25 +720,25 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { omittedConstants.put(i, constant); continue; } - int newInputPos = frame.oldToNewOutputPos.get(i); + int newInputPos = frame.oldToNewOutputs.get(i); projects.add(RexInputRef.of2(newInputPos, newInputOutput)); - mapNewInputToProjOutputPos.put(newInputPos, newPos); + mapNewInputToProjOutputs.put(newInputPos, newPos); newPos++; } - final SortedMap<Correlation, Integer> mapCorVarToOutputPos = new TreeMap<>(); - if (!frame.corVarOutputPos.isEmpty()) { + final SortedMap<CorDef, Integer> corDefOutputs = new TreeMap<>(); + if (!frame.corDefOutputs.isEmpty()) { // If input produces correlated variables, move them to the front, // right after any existing GROUP BY fields. // Now add the corVars from the input, starting from // position oldGroupKeyCount. - for (Map.Entry<Correlation, Integer> entry - : frame.corVarOutputPos.entrySet()) { + for (Map.Entry<CorDef, Integer> entry + : frame.corDefOutputs.entrySet()) { projects.add(RexInputRef.of2(entry.getValue(), newInputOutput)); - mapCorVarToOutputPos.put(entry.getKey(), newPos); - mapNewInputToProjOutputPos.put(entry.getValue(), newPos); + corDefOutputs.put(entry.getKey(), newPos); + mapNewInputToProjOutputs.put(entry.getValue(), newPos); newPos++; } } @@ -749,9 +746,9 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // add the remaining fields final int newGroupKeyCount = newPos; for (int i = 0; i < newInputOutput.size(); i++) { - if (!mapNewInputToProjOutputPos.containsKey(i)) { + if (!mapNewInputToProjOutputs.containsKey(i)) { projects.add(RexInputRef.of2(i, newInputOutput)); - mapNewInputToProjOutputPos.put(i, newPos); + mapNewInputToProjOutputs.put(i, newPos); newPos++; } } @@ -776,13 +773,13 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // newInput Map<Integer, Integer> combinedMap = Maps.newHashMap(); - for (Integer oldInputPos : frame.oldToNewOutputPos.keySet()) { + for (Integer oldInputPos : frame.oldToNewOutputs.keySet()) { combinedMap.put(oldInputPos, - mapNewInputToProjOutputPos.get( - frame.oldToNewOutputPos.get(oldInputPos))); + mapNewInputToProjOutputs.get( + frame.oldToNewOutputs.get(oldInputPos))); } - register(oldInput, newProject, combinedMap, mapCorVarToOutputPos); + register(oldInput, newProject, combinedMap, corDefOutputs); // now it's time to rewrite the Aggregate final ImmutableBitSet newGroupSet = ImmutableBitSet.range(newGroupKeyCount); @@ -827,7 +824,7 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { final List<RexNode> postProjects = new ArrayList<>(relBuilder.fields()); for (Map.Entry<Integer, RexLiteral> entry : omittedConstants.descendingMap().entrySet()) { - postProjects.add(entry.getKey() + frame.corVarOutputPos.size(), + postProjects.add(entry.getKey() + frame.corDefOutputs.size(), entry.getValue()); } relBuilder.project(postProjects); @@ -835,7 +832,7 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // Aggregate does not change input ordering so corVars will be // located at the same position as the input newProject. - return register(rel, relBuilder.build(), combinedMap, mapCorVarToOutputPos); + return register(rel, relBuilder.build(), combinedMap, corDefOutputs); } } @@ -862,16 +859,12 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // If this LogicalProject has correlated reference, create value generator // and produce the correlated variables in the new output. - if (cm.mapRefRelToCorVar.containsKey(rel)) { - decorrelateInputWithValueGenerator(rel); - - // The old input should be mapped to the LogicalJoin created by - // rewriteInputWithValueGenerator(). - frame = map.get(oldInput); + if (cm.mapRefRelToCorRef.containsKey(rel)) { + frame = decorrelateInputWithValueGenerator(rel); } // LogicalProject projects the original expressions - final Map<Integer, Integer> mapOldToNewOutputPos = Maps.newHashMap(); + final Map<Integer, Integer> mapOldToNewOutputs = new HashMap<>(); int newPos; for (newPos = 0; newPos < oldProjects.size(); newPos++) { projects.add( @@ -879,39 +872,24 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { Pair.of( decorrelateExpr(oldProjects.get(newPos)), relOutput.get(newPos).getName())); - mapOldToNewOutputPos.put(newPos, newPos); + mapOldToNewOutputs.put(newPos, newPos); } // Project any correlated variables the input wants to pass along. - // There could be situation e.g. multiple correlated variables refering to - // same outer variable, in which case Project will be created with multiple - // fields with same name. Hive doesn't allow HiveProject with multiple fields - // having same name. So to avoid that we keep a set of all fieldnames and - // on encountering an existing one a new field/column name is generated - final Set<String> corrFieldName = Sets.newHashSet(); - int pos = 0; - - final SortedMap<Correlation, Integer> mapCorVarToOutputPos = new TreeMap<>(); - for (Map.Entry<Correlation, Integer> entry : frame.corVarOutputPos.entrySet()) { - final RelDataTypeField field = frame.r.getRowType().getFieldList().get(entry.getValue()); - RexNode projectChild = (RexNode) new RexInputRef(entry.getValue(), field.getType()); - String fieldName = field.getName(); - if(corrFieldName.contains(fieldName)) - { - fieldName = SemanticAnalyzer.getColumnInternalName(pos++); - } - - projects.add(Pair.of(projectChild ,fieldName)); - corrFieldName.add(fieldName); - mapCorVarToOutputPos.put(entry.getKey(), newPos); + final SortedMap<CorDef, Integer> corDefOutputs = new TreeMap<>(); + for (Map.Entry<CorDef, Integer> entry : frame.corDefOutputs.entrySet()) { + projects.add( + RexInputRef.of2(entry.getValue(), + frame.r.getRowType().getFieldList())); + corDefOutputs.put(entry.getKey(), newPos); newPos++; } RelNode newProject = HiveProject.create(frame.r, Pair.left(projects), SqlValidatorUtil.uniquify(Pair.right(projects))); - return register(rel, newProject, mapOldToNewOutputPos, - mapCorVarToOutputPos); + return register(rel, newProject, mapOldToNewOutputs, + corDefOutputs); } } /** @@ -941,16 +919,12 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // If this LogicalProject has correlated reference, create value generator // and produce the correlated variables in the new output. - if (cm.mapRefRelToCorVar.containsKey(rel)) { - decorrelateInputWithValueGenerator(rel); - - // The old input should be mapped to the LogicalJoin created by - // rewriteInputWithValueGenerator(). - frame = map.get(oldInput); + if (cm.mapRefRelToCorRef.containsKey(rel)) { + frame = decorrelateInputWithValueGenerator(rel); } // LogicalProject projects the original expressions - final Map<Integer, Integer> mapOldToNewOutputPos = Maps.newHashMap(); + final Map<Integer, Integer> mapOldToNewOutputs = new HashMap<>(); int newPos; for (newPos = 0; newPos < oldProjects.size(); newPos++) { projects.add( @@ -958,23 +932,23 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { Pair.of( decorrelateExpr(oldProjects.get(newPos)), relOutput.get(newPos).getName())); - mapOldToNewOutputPos.put(newPos, newPos); + mapOldToNewOutputs.put(newPos, newPos); } // Project any correlated variables the input wants to pass along. - final SortedMap<Correlation, Integer> mapCorVarToOutputPos = new TreeMap<>(); - for (Map.Entry<Correlation, Integer> entry : frame.corVarOutputPos.entrySet()) { + final SortedMap<CorDef, Integer> corDefOutputs = new TreeMap<>(); + for (Map.Entry<CorDef, Integer> entry : frame.corDefOutputs.entrySet()) { projects.add( RexInputRef.of2(entry.getValue(), frame.r.getRowType().getFieldList())); - mapCorVarToOutputPos.put(entry.getKey(), newPos); + corDefOutputs.put(entry.getKey(), newPos); newPos++; } RelNode newProject = HiveProject.create(frame.r, Pair.left(projects), Pair.right(projects)); - return register(rel, newProject, mapOldToNewOutputPos, - mapCorVarToOutputPos); + return register(rel, newProject, mapOldToNewOutputs, + corDefOutputs); } /** @@ -988,17 +962,17 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { * @return RelNode the root of the resultant RelNode tree */ private RelNode createValueGenerator( - Iterable<Correlation> correlations, + Iterable<CorRef> correlations, int valueGenFieldOffset, - SortedMap<Correlation, Integer> mapCorVarToOutputPos) { - final Map<RelNode, List<Integer>> mapNewInputToOutputPos = + SortedMap<CorDef, Integer> corDefOutputs) { + final Map<RelNode, List<Integer>> mapNewInputToOutputs = new HashMap<>(); final Map<RelNode, Integer> mapNewInputToNewOffset = new HashMap<>(); // Input provides the definition of a correlated variable. // Add to map all the referenced positions (relative to each input rel). - for (Correlation corVar : correlations) { + for (CorRef corVar : correlations) { final int oldCorVarOffset = corVar.field; final RelNode oldInput = getCorRel(corVar); @@ -1007,21 +981,21 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { assert frame != null; final RelNode newInput = frame.r; - final List<Integer> newLocalOutputPosList; - if (!mapNewInputToOutputPos.containsKey(newInput)) { - newLocalOutputPosList = Lists.newArrayList(); + final List<Integer> newLocalOutputs; + if (!mapNewInputToOutputs.containsKey(newInput)) { + newLocalOutputs = new ArrayList<>(); } else { - newLocalOutputPosList = - mapNewInputToOutputPos.get(newInput); + newLocalOutputs = + mapNewInputToOutputs.get(newInput); } - final int newCorVarOffset = frame.oldToNewOutputPos.get(oldCorVarOffset); + final int newCorVarOffset = frame.oldToNewOutputs.get(oldCorVarOffset); // Add all unique positions referenced. - if (!newLocalOutputPosList.contains(newCorVarOffset)) { - newLocalOutputPosList.add(newCorVarOffset); + if (!newLocalOutputs.contains(newCorVarOffset)) { + newLocalOutputs.add(newCorVarOffset); } - mapNewInputToOutputPos.put(newInput, newLocalOutputPosList); + mapNewInputToOutputs.put(newInput, newLocalOutputs); } int offset = 0; @@ -1031,24 +1005,24 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // To make sure the plan does not change in terms of join order, // join these rels based on their occurrence in cor var list which // is sorted. - final Set<RelNode> joinedInputRelSet = Sets.newHashSet(); + final Set<RelNode> joinedInputs = new HashSet<>(); RelNode r = null; - for (Correlation corVar : correlations) { + for (CorRef corVar : correlations) { final RelNode oldInput = getCorRel(corVar); assert oldInput != null; final RelNode newInput = map.get(oldInput).r; assert newInput != null; - if (!joinedInputRelSet.contains(newInput)) { + if (!joinedInputs.contains(newInput)) { RelNode project = RelOptUtil.createProject( newInput, - mapNewInputToOutputPos.get(newInput)); + mapNewInputToOutputs.get(newInput)); RelNode distinct = RelOptUtil.createDistinctRel(project); RelOptCluster cluster = distinct.getCluster(); - joinedInputRelSet.add(newInput); + joinedInputs.add(newInput); mapNewInputToNewOffset.put(newInput, offset); offset += distinct.getRowType().getFieldCount(); @@ -1067,32 +1041,29 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // the join output, leaving room for valueGenFieldOffset because // valueGenerators are joined with the original left input of the rel // referencing correlated variables. - for (Correlation corVar : correlations) { + for (CorRef corRef : correlations) { // The first input of a Correlator is always the rel defining // the correlated variables. - final RelNode oldInput = getCorRel(corVar); + final RelNode oldInput = getCorRel(corRef); assert oldInput != null; final Frame frame = map.get(oldInput); final RelNode newInput = frame.r; assert newInput != null; - final List<Integer> newLocalOutputPosList = - mapNewInputToOutputPos.get(newInput); + final List<Integer> newLocalOutputs = + mapNewInputToOutputs.get(newInput); - final int newLocalOutputPos = frame.oldToNewOutputPos.get(corVar.field); + final int newLocalOutput = frame.oldToNewOutputs.get(corRef.field); - // newOutputPos is the index of the cor var in the referenced + // newOutput is the index of the cor var in the referenced // position list plus the offset of referenced position list of // each newInput. - final int newOutputPos = - newLocalOutputPosList.indexOf(newLocalOutputPos) + final int newOutput = + newLocalOutputs.indexOf(newLocalOutput) + mapNewInputToNewOffset.get(newInput) + valueGenFieldOffset; - if (mapCorVarToOutputPos.containsKey(corVar)) { - assert mapCorVarToOutputPos.get(corVar) == newOutputPos; - } - mapCorVarToOutputPos.put(corVar, newOutputPos); + corDefOutputs.put(corRef.def(), newOutput); } return r; @@ -1101,33 +1072,57 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { //this returns the source of corVar i.e. Rel which produces cor var // value. Therefore it is always LogicalCorrelate's left input which is outer query - private RelNode getCorRel(Correlation corVar) { - final RelNode r = cm.mapCorVarToCorRel.get(corVar.corr); - + private RelNode getCorRel(CorRef corVar) { + final RelNode r = cm.mapCorToCorRel.get(corVar.corr); RelNode ret = r.getInput(0); return ret; } - private void decorrelateInputWithValueGenerator(RelNode rel) { + private Frame decorrelateInputWithValueGenerator(RelNode rel) { // currently only handles one input input assert rel.getInputs().size() == 1; RelNode oldInput = rel.getInput(0); final Frame frame = map.get(oldInput); - final SortedMap<Correlation, Integer> mapCorVarToOutputPos = - new TreeMap<>(frame.corVarOutputPos); + final SortedMap<CorDef, Integer> corDefOutputs = + new TreeMap<>(frame.corDefOutputs); + + final Collection<CorRef> corVarList = cm.mapRefRelToCorRef.get(rel); + + // Try to populate correlation variables using local fields. + // This means that we do not need a value generator. + if (rel instanceof Filter) { + SortedMap<CorDef, Integer> map = new TreeMap<>(); + for (CorRef correlation : corVarList) { + final CorDef def = correlation.def(); + if (corDefOutputs.containsKey(def) || map.containsKey(def)) { + continue; + } + try { + findCorrelationEquivalent(correlation, ((Filter) rel).getCondition()); + } catch (Util.FoundOne e) { + map.put(def, (Integer) e.getNode()); + } + } + // If all correlation variables are now satisfied, skip creating a value + // generator. + if (map.size() == corVarList.size()) { + map.putAll(frame.corDefOutputs); + return register(oldInput, frame.r, + frame.oldToNewOutputs, map); + } + } - final Collection<Correlation> corVarList = cm.mapRefRelToCorVar.get(rel); int leftInputOutputCount = frame.r.getRowType().getFieldCount(); - // can directly add positions into mapCorVarToOutputPos since join + // can directly add positions into corDefOutputs since join // does not change the output ordering from the inputs. RelNode valueGen = createValueGenerator( corVarList, leftInputOutputCount, - mapCorVarToOutputPos); + corDefOutputs); RelNode join = LogicalJoin.create(frame.r, valueGen, rexBuilder.makeLiteral(true), @@ -1136,7 +1131,66 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // LogicalJoin or LogicalFilter does not change the old input ordering. All // input fields from newLeftInput(i.e. the original input to the old // LogicalFilter) are in the output and in the same position. - register(oldInput, join, frame.oldToNewOutputPos, mapCorVarToOutputPos); + return register(oldInput, join, frame.oldToNewOutputs, corDefOutputs); + } + + + /** Finds a {@link RexInputRef} that is equivalent to a {@link CorRef}, + * and if found, throws a {@link Util.FoundOne}. */ + private void findCorrelationEquivalent(CorRef correlation, RexNode e) + throws Util.FoundOne { + switch (e.getKind()) { + case EQUALS: + final RexCall call = (RexCall) e; + final List<RexNode> operands = call.getOperands(); + if (references(operands.get(0), correlation) + && operands.get(1) instanceof RexInputRef) { + throw new Util.FoundOne(((RexInputRef) operands.get(1)).getIndex()); + } + if (references(operands.get(1), correlation) + && operands.get(0) instanceof RexInputRef) { + throw new Util.FoundOne(((RexInputRef) operands.get(0)).getIndex()); + } + break; + case AND: + for (RexNode operand : ((RexCall) e).getOperands()) { + findCorrelationEquivalent(correlation, operand); + } + } + } + + private boolean references(RexNode e, CorRef correlation) { + switch (e.getKind()) { + case CAST: + final RexNode operand = ((RexCall) e).getOperands().get(0); + if (isWidening(e.getType(), operand.getType())) { + return references(operand, correlation); + } + return false; + case FIELD_ACCESS: + final RexFieldAccess f = (RexFieldAccess) e; + if (f.getField().getIndex() == correlation.field + && f.getReferenceExpr() instanceof RexCorrelVariable) { + if (((RexCorrelVariable) f.getReferenceExpr()).id == correlation.corr) { + return true; + } + } + // fall through + default: + return false; + } + } + + /** Returns whether one type is just a widening of another. + * + * <p>For example:<ul> + * <li>{@code VARCHAR(10)} is a widening of {@code VARCHAR(5)}. + * <li>{@code VARCHAR(10)} is a widening of {@code VARCHAR(10) NOT NULL}. + * </ul> + */ + private boolean isWidening(RelDataType type, RelDataType type1) { + return type.getSqlTypeName() == type1.getSqlTypeName() + && type.getPrecision() >= type1.getPrecision(); } public Frame decorrelateRel(HiveFilter rel) throws SemanticException { @@ -1166,25 +1220,20 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // If this LogicalFilter has correlated reference, create value generator // and produce the correlated variables in the new output. - if (cm.mapRefRelToCorVar.containsKey(rel)) { - decorrelateInputWithValueGenerator(rel); - - // The old input should be mapped to the newly created LogicalJoin by - // rewriteInputWithValueGenerator(). - frame = map.get(oldInput); + if (cm.mapRefRelToCorRef.containsKey(rel)) { + frame = decorrelateInputWithValueGenerator(rel); } // Replace the filter expression to reference output of the join // Map filter to the new filter over join - RelNode newFilter = new HiveFilter(rel.getCluster(), rel.getTraitSet(), frame.r, - decorrelateExpr(rel.getCondition())); + relBuilder.push(frame.r).filter(decorrelateExpr(rel.getCondition())); // Filter does not change the input ordering. // Filter rel does not permute the input. // All corvars produced by filter will have the same output positions in the // input rel. - return register(rel, newFilter, frame.oldToNewOutputPos, - frame.corVarOutputPos); + return register(rel, relBuilder.build(), frame.oldToNewOutputs, + frame.corDefOutputs); } } @@ -1219,26 +1268,22 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // If this LogicalFilter has correlated reference, create value generator // and produce the correlated variables in the new output. - if (cm.mapRefRelToCorVar.containsKey(rel)) { - decorrelateInputWithValueGenerator(rel); + if (cm.mapRefRelToCorRef.containsKey(rel)) { + frame = decorrelateInputWithValueGenerator(rel); - // The old input should be mapped to the newly created LogicalJoin by - // rewriteInputWithValueGenerator(). - frame = map.get(oldInput); } // Replace the filter expression to reference output of the join // Map filter to the new filter over join - RelNode newFilter = new HiveFilter(rel.getCluster(), rel.getTraitSet(), frame.r, - decorrelateExpr(rel.getCondition())); + relBuilder.push(frame.r).filter(decorrelateExpr(rel.getCondition())); // Filter does not change the input ordering. // Filter rel does not permute the input. // All corvars produced by filter will have the same output positions in the // input rel. - return register(rel, newFilter, frame.oldToNewOutputPos, - frame.corVarOutputPos); + return register(rel, relBuilder.build(), frame.oldToNewOutputs, + frame.corDefOutputs); } /** @@ -1268,18 +1313,18 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { return null; } - if (rightFrame.corVarOutputPos.isEmpty()) { + if (rightFrame.corDefOutputs.isEmpty()) { return null; } assert rel.getRequiredColumns().cardinality() - <= rightFrame.corVarOutputPos.keySet().size(); + <= rightFrame.corDefOutputs.keySet().size(); // Change correlator rel into a join. // Join all the correlated variables produced by this correlator rel // with the values generated and propagated from the right input - final SortedMap<Correlation, Integer> corVarOutputPos = - new TreeMap<>(rightFrame.corVarOutputPos); + final SortedMap<CorDef, Integer> corDefOutputs = + new TreeMap<>(rightFrame.corDefOutputs); final List<RexNode> conditions = new ArrayList<>(); final List<RelDataTypeField> newLeftOutput = leftFrame.r.getRowType().getFieldList(); @@ -1288,14 +1333,14 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { final List<RelDataTypeField> newRightOutput = rightFrame.r.getRowType().getFieldList(); - for (Map.Entry<Correlation, Integer> rightOutputPos - : Lists.newArrayList(corVarOutputPos.entrySet())) { - final Correlation corVar = rightOutputPos.getKey(); - if (!corVar.corr.equals(rel.getCorrelationId())) { + for (Map.Entry<CorDef, Integer> rightOutput + : new ArrayList<>(corDefOutputs.entrySet())) { + final CorDef corDef = rightOutput.getKey(); + if (!corDef.corr.equals(rel.getCorrelationId())) { continue; } - final int newLeftPos = leftFrame.oldToNewOutputPos.get(corVar.field); - final int newRightPos = rightOutputPos.getValue(); + final int newLeftPos = leftFrame.oldToNewOutputs.get(corDef.field); + final int newRightPos = rightOutput.getValue(); conditions.add( rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, RexInputRef.of(newLeftPos, newLeftOutput), @@ -1303,23 +1348,23 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { newRightOutput.get(newRightPos).getType()))); // remove this cor var from output position mapping - corVarOutputPos.remove(corVar); + corDefOutputs.remove(corDef); } // Update the output position for the cor vars: only pass on the cor // vars that are not used in the join key. - for (Correlation corVar : corVarOutputPos.keySet()) { - int newPos = corVarOutputPos.get(corVar) + newLeftFieldCount; - corVarOutputPos.put(corVar, newPos); + for (CorDef corDef : corDefOutputs.keySet()) { + int newPos = corDefOutputs.get(corDef) + newLeftFieldCount; + corDefOutputs.put(corDef, newPos); } // then add any cor var from the left input. Do not need to change // output positions. - corVarOutputPos.putAll(leftFrame.corVarOutputPos); + corDefOutputs.putAll(leftFrame.corDefOutputs); // Create the mapping between the output of the old correlation rel // and the new join rel - final Map<Integer, Integer> mapOldToNewOutputPos = Maps.newHashMap(); + final Map<Integer, Integer> mapOldToNewOutputs = new HashMap<>(); int oldLeftFieldCount = oldLeft.getRowType().getFieldCount(); @@ -1328,13 +1373,13 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { == oldLeftFieldCount + oldRightFieldCount; // Left input positions are not changed. - mapOldToNewOutputPos.putAll(leftFrame.oldToNewOutputPos); + mapOldToNewOutputs.putAll(leftFrame.oldToNewOutputs); // Right input positions are shifted by newLeftFieldCount. for (int i = 0; i < oldRightFieldCount; i++) { - mapOldToNewOutputPos.put( + mapOldToNewOutputs.put( i + oldLeftFieldCount, - rightFrame.oldToNewOutputPos.get(i) + newLeftFieldCount); + rightFrame.oldToNewOutputs.get(i) + newLeftFieldCount); } final RexNode condition = @@ -1343,7 +1388,7 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { LogicalJoin.create(leftFrame.r, rightFrame.r, condition, ImmutableSet.<CorrelationId>of(), rel.getJoinType().toJoinType()); - return register(rel, newJoin, mapOldToNewOutputPos, corVarOutputPos); + return register(rel, newJoin, mapOldToNewOutputs, corDefOutputs); } public Frame decorrelateRel(HiveJoin rel) throws SemanticException{ @@ -1369,7 +1414,7 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // Create the mapping between the output of the old correlation rel // and the new join rel - Map<Integer, Integer> mapOldToNewOutputPos = Maps.newHashMap(); + Map<Integer, Integer> mapOldToNewOutputs = Maps.newHashMap(); int oldLeftFieldCount = oldLeft.getRowType().getFieldCount(); int newLeftFieldCount = leftFrame.r.getRowType().getFieldCount(); @@ -1379,24 +1424,24 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { == oldLeftFieldCount + oldRightFieldCount; // Left input positions are not changed. - mapOldToNewOutputPos.putAll(leftFrame.oldToNewOutputPos); + mapOldToNewOutputs.putAll(leftFrame.oldToNewOutputs); // Right input positions are shifted by newLeftFieldCount. for (int i = 0; i < oldRightFieldCount; i++) { - mapOldToNewOutputPos.put(i + oldLeftFieldCount, - rightFrame.oldToNewOutputPos.get(i) + newLeftFieldCount); + mapOldToNewOutputs.put(i + oldLeftFieldCount, + rightFrame.oldToNewOutputs.get(i) + newLeftFieldCount); } - final SortedMap<Correlation, Integer> mapCorVarToOutputPos = - new TreeMap<>(leftFrame.corVarOutputPos); + final SortedMap<CorDef, Integer> corDefOutputs = + new TreeMap<>(leftFrame.corDefOutputs); // Right input positions are shifted by newLeftFieldCount. - for (Map.Entry<Correlation, Integer> entry - : rightFrame.corVarOutputPos.entrySet()) { - mapCorVarToOutputPos.put(entry.getKey(), + for (Map.Entry<CorDef, Integer> entry + : rightFrame.corDefOutputs.entrySet()) { + corDefOutputs.put(entry.getKey(), entry.getValue() + newLeftFieldCount); } - return register(rel, newJoin, mapOldToNewOutputPos, mapCorVarToOutputPos); + return register(rel, newJoin, mapOldToNewOutputs, corDefOutputs); } /** * Rewrite LogicalJoin. @@ -1427,7 +1472,7 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // Create the mapping between the output of the old correlation rel // and the new join rel - Map<Integer, Integer> mapOldToNewOutputPos = Maps.newHashMap(); + Map<Integer, Integer> mapOldToNewOutputs = Maps.newHashMap(); int oldLeftFieldCount = oldLeft.getRowType().getFieldCount(); int newLeftFieldCount = leftFrame.r.getRowType().getFieldCount(); @@ -1437,24 +1482,24 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { == oldLeftFieldCount + oldRightFieldCount; // Left input positions are not changed. - mapOldToNewOutputPos.putAll(leftFrame.oldToNewOutputPos); + mapOldToNewOutputs.putAll(leftFrame.oldToNewOutputs); // Right input positions are shifted by newLeftFieldCount. for (int i = 0; i < oldRightFieldCount; i++) { - mapOldToNewOutputPos.put(i + oldLeftFieldCount, - rightFrame.oldToNewOutputPos.get(i) + newLeftFieldCount); + mapOldToNewOutputs.put(i + oldLeftFieldCount, + rightFrame.oldToNewOutputs.get(i) + newLeftFieldCount); } - final SortedMap<Correlation, Integer> mapCorVarToOutputPos = - new TreeMap<>(leftFrame.corVarOutputPos); + final SortedMap<CorDef, Integer> corDefOutputs = + new TreeMap<>(leftFrame.corDefOutputs); // Right input positions are shifted by newLeftFieldCount. - for (Map.Entry<Correlation, Integer> entry - : rightFrame.corVarOutputPos.entrySet()) { - mapCorVarToOutputPos.put(entry.getKey(), + for (Map.Entry<CorDef, Integer> entry + : rightFrame.corDefOutputs.entrySet()) { + corDefOutputs.put(entry.getKey(), entry.getValue() + newLeftFieldCount); } - return register(rel, newJoin, mapOldToNewOutputPos, mapCorVarToOutputPos); + return register(rel, newJoin, mapOldToNewOutputs, corDefOutputs); } private RexInputRef getNewForOldInputRef(RexInputRef oldInputRef) { @@ -1490,8 +1535,8 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // figure out the newLocalOrdinal, relative to the newInput. int newLocalOrdinal = oldLocalOrdinal; - if (!frame.oldToNewOutputPos.isEmpty()) { - newLocalOrdinal = frame.oldToNewOutputPos.get(oldLocalOrdinal); + if (!frame.oldToNewOutputs.isEmpty()) { + newLocalOrdinal = frame.oldToNewOutputs.get(oldLocalOrdinal); } newOrdinal += newLocalOrdinal; @@ -1623,11 +1668,11 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // check that all correlated refs in the filter condition are // used in the join(as field access). - Set<Correlation> corVarInFilter = - Sets.newHashSet(cm.mapRefRelToCorVar.get(filter)); + Set<CorRef> corVarInFilter = + Sets.newHashSet(cm.mapRefRelToCorRef.get(filter)); for (RexFieldAccess correlatedJoinKey : correlatedJoinKeys) { - corVarInFilter.remove(cm.mapFieldAccessToCorVar.get(correlatedJoinKey)); + corVarInFilter.remove(cm.mapFieldAccessToCorRef.get(correlatedJoinKey)); } if (!corVarInFilter.isEmpty()) { @@ -1636,10 +1681,10 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // Check that the correlated variables referenced in these // comparisons do come from the correlatorRel. - corVarInFilter.addAll(cm.mapRefRelToCorVar.get(filter)); + corVarInFilter.addAll(cm.mapRefRelToCorRef.get(filter)); - for (Correlation corVar : corVarInFilter) { - if (cm.mapCorVarToCorRel.get(corVar.corr) != correlate) { + for (CorRef corVar : corVarInFilter) { + if (cm.mapCorToCorRel.get(corVar.corr) != correlate) { return false; } } @@ -1648,9 +1693,9 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // if project has any correlated reference, make sure they are also // provided by the current correlate. They will be projected out of the LHS // of the correlate. - if ((project != null) && cm.mapRefRelToCorVar.containsKey(project)) { - for (Correlation corVar : cm.mapRefRelToCorVar.get(project)) { - if (cm.mapCorVarToCorRel.get(corVar.corr) != correlate) { + if ((project != null) && cm.mapRefRelToCorRef.containsKey(project)) { + for (CorRef corVar : cm.mapRefRelToCorRef.get(project)) { + if (cm.mapCorToCorRel.get(corVar.corr) != correlate) { return false; } } @@ -1665,8 +1710,8 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { * @param correlate Correlator */ private void removeCorVarFromTree(LogicalCorrelate correlate) { - if (cm.mapCorVarToCorRel.get(correlate.getCorrelationId()) == correlate) { - cm.mapCorVarToCorRel.remove(correlate.getCorrelationId()); + if (cm.mapCorToCorRel.get(correlate.getCorrelationId()) == correlate) { + cm.mapCorToCorRel.remove(correlate.getCorrelationId()); } } @@ -1706,11 +1751,9 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { /** Registers a relational expression and the relational expression it became * after decorrelation. */ Frame register(RelNode rel, RelNode newRel, - Map<Integer, Integer> oldToNewOutputPos, - SortedMap<Correlation, Integer> corVarToOutputPos) { - assert allLessThan(oldToNewOutputPos.keySet(), - newRel.getRowType().getFieldCount(), Litmus.THROW); - final Frame frame = new Frame(newRel, corVarToOutputPos, oldToNewOutputPos); + Map<Integer, Integer> oldToNewOutputs, + SortedMap<CorDef, Integer> corDefOutputs) { + final Frame frame = new Frame(rel, newRel, corDefOutputs, oldToNewOutputs); map.put(rel, frame); return frame; } @@ -1744,15 +1787,16 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { if (frame != null) { // try to find in this input rel the position of cor var - final Correlation corVar = cm.mapFieldAccessToCorVar.get(fieldAccess); + final CorRef corRef = cm.mapFieldAccessToCorRef.get(fieldAccess); - if (corVar != null) { - Integer newInputPos = frame.corVarOutputPos.get(corVar); + if (corRef != null) { + Integer newInputPos = frame.corDefOutputs.get(corRef.def()); if (newInputPos != null) { // This input rel does produce the cor var referenced. // Assume fieldAccess has the correct type info. return new RexInputRef(newInputPos + newInputOutputOffset, - fieldAccess.getType()); + frame.r.getRowType().getFieldList().get(newInputPos) + .getType()); } } @@ -1767,7 +1811,12 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { } @Override public RexNode visitInputRef(RexInputRef inputRef) { - return getNewForOldInputRef(inputRef); + final RexInputRef ref = getNewForOldInputRef(inputRef); + if (ref.getIndex() == inputRef.getIndex() + && ref.getType() == inputRef.getType()) { + return inputRef; // re-use old object, to prevent needless expr cloning + } + return ref; } } @@ -1839,9 +1888,9 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { } @Override public RexNode visitFieldAccess(RexFieldAccess fieldAccess) { - if (cm.mapFieldAccessToCorVar.containsKey(fieldAccess)) { + if (cm.mapFieldAccessToCorRef.containsKey(fieldAccess)) { // if it is a corVar, change it to be input ref. - Correlation corVar = cm.mapFieldAccessToCorVar.get(fieldAccess); + CorRef corVar = cm.mapFieldAccessToCorRef.get(fieldAccess); // corVar offset should point to the leftInput of currentRel, // which is the Correlator. @@ -2083,7 +2132,7 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { int nullIndicatorPos; if ((right instanceof LogicalFilter) - && cm.mapRefRelToCorVar.containsKey(right)) { + && cm.mapRefRelToCorRef.containsKey(right)) { // rightInputRel has this shape: // // LogicalFilter (references corvar) @@ -2169,7 +2218,7 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { nullIndicatorPos = left.getRowType().getFieldCount() + rightJoinKeys.get(0).getIndex(); - } else if (cm.mapRefRelToCorVar.containsKey(project)) { + } else if (cm.mapRefRelToCorRef.containsKey(project)) { // check filter input contains no correlation if (RelOptUtil.getVariablesUsed(right).size() > 0) { return; @@ -2295,7 +2344,7 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { } if ((right instanceof LogicalFilter) - && cm.mapRefRelToCorVar.containsKey(right)) { + && cm.mapRefRelToCorRef.containsKey(right)) { // rightInputRel has this shape: // // LogicalFilter (references corvar) @@ -2412,7 +2461,7 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // first change the filter condition into a join condition joinCond = removeCorrelationExpr(filter.getCondition(), false); - } else if (cm.mapRefRelToCorVar.containsKey(aggInputProject)) { + } else if (cm.mapRefRelToCorRef.containsKey(aggInputProject)) { // check rightInputRel contains no correlation if (RelOptUtil.getVariablesUsed(right).size() > 0) { return; @@ -2727,8 +2776,8 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { // need to update the mapCorVarToCorRel Update the output position // for the cor vars: only pass on the cor vars that are not used in // the join key. - if (cm.mapCorVarToCorRel.get(correlate.getCorrelationId()) == correlate) { - cm.mapCorVarToCorRel.put(correlate.getCorrelationId(), newCorrelate); + if (cm.mapCorToCorRel.get(correlate.getCorrelationId()) == correlate) { + cm.mapCorToCorRel.put(correlate.getCorrelationId(), newCorrelate); } RelNode newOutput = @@ -2739,25 +2788,40 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { } /** - * {@code Correlation} here represents a unique reference to a correlation - * field. - * For instance, if a RelNode references emp.name multiple times, it would - * result in multiple {@code Correlation} objects that differ just in - * {@link Correlation#uniqueKey}. + * A unique reference to a correlation field. + * + * <p>For instance, if a RelNode references emp.name multiple times, it would + * result in multiple {@code CorRef} objects that differ just in + * {@link CorRef#uniqueKey}. */ - static class Correlation - implements Comparable<Correlation> { + static class CorRef implements Comparable<CorRef> { public final int uniqueKey; public final CorrelationId corr; public final int field; - Correlation(CorrelationId corr, int field, int uniqueKey) { + CorRef(CorrelationId corr, int field, int uniqueKey) { this.corr = corr; this.field = field; this.uniqueKey = uniqueKey; } - public int compareTo(Correlation o) { + @Override public String toString() { + return corr.getName() + '.' + field; + } + + @Override public int hashCode() { + return Objects.hash(uniqueKey, corr, field); + } + + @Override public boolean equals(Object o) { + return this == o + || o instanceof CorRef + && uniqueKey == ((CorRef) o).uniqueKey + && corr == ((CorRef) o).corr + && field == ((CorRef) o).field; + } + + public int compareTo(@Nonnull CorRef o) { int c = corr.compareTo(o.corr); if (c != 0) { return c; @@ -2768,6 +2832,44 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { } return Integer.compare(uniqueKey, o.uniqueKey); } + + public CorDef def() { + return new CorDef(corr, field); + } + } + + /** A correlation and a field. */ + static class CorDef implements Comparable<CorDef> { + public final CorrelationId corr; + public final int field; + + CorDef(CorrelationId corr, int field) { + this.corr = corr; + this.field = field; + } + + @Override public String toString() { + return corr.getName() + '.' + field; + } + + @Override public int hashCode() { + return Objects.hash(corr, field); + } + + @Override public boolean equals(Object o) { + return this == o + || o instanceof CorDef + && corr == ((CorDef) o).corr + && field == ((CorDef) o).field; + } + + public int compareTo(@Nonnull CorDef o) { + int c = corr.compareTo(o.corr); + if (c != 0) { + return c; + } + return Integer.compare(field, o.field); + } } /** A map of the locations of @@ -2779,61 +2881,60 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { * * <p>There are three maps:<ol> * - * <li>mapRefRelToCorVars map a rel node to the correlated variables it - * references; + * <li>{@link #mapRefRelToCorRef} maps a {@link RelNode} to the correlated + * variables it references; * - * <li>mapCorVarToCorRel maps a correlated variable to the correlatorRel - * providing it; + * <li>{@link #mapCorToCorRel} maps a correlated variable to the + * {@link Correlate} providing it; * - * <li>mapFieldAccessToCorVar maps a rex field access to - * the cor var it represents. Because typeFlattener does not clone or + * <li>{@link #mapFieldAccessToCorRef} maps a rex field access to + * the corVar it represents. Because typeFlattener does not clone or * modify a correlated field access this map does not need to be * updated. * * </ol> */ private static class CorelMap { - private final Multimap<RelNode, Correlation> mapRefRelToCorVar; - private final SortedMap<CorrelationId, RelNode> mapCorVarToCorRel; - private final Map<RexFieldAccess, Correlation> mapFieldAccessToCorVar; + private final Multimap<RelNode, CorRef> mapRefRelToCorRef; + private final SortedMap<CorrelationId, RelNode> mapCorToCorRel; + private final Map<RexFieldAccess, CorRef> mapFieldAccessToCorRef; // TODO: create immutable copies of all maps - private CorelMap(Multimap<RelNode, Correlation> mapRefRelToCorVar, - SortedMap<CorrelationId, RelNode> mapCorVarToCorRel, - Map<RexFieldAccess, Correlation> mapFieldAccessToCorVar) { - this.mapRefRelToCorVar = mapRefRelToCorVar; - this.mapCorVarToCorRel = mapCorVarToCorRel; - this.mapFieldAccessToCorVar = ImmutableMap.copyOf(mapFieldAccessToCorVar); + private CorelMap(Multimap<RelNode, CorRef> mapRefRelToCorRef, + SortedMap<CorrelationId, RelNode> mapCorToCorRel, + Map<RexFieldAccess, CorRef> mapFieldAccessToCorRef) { + this.mapRefRelToCorRef = mapRefRelToCorRef; + this.mapCorToCorRel = mapCorToCorRel; + this.mapFieldAccessToCorRef = ImmutableMap.copyOf(mapFieldAccessToCorRef); } @Override public String toString() { - return "mapRefRelToCorVar=" + mapRefRelToCorVar - + "\nmapCorVarToCorRel=" + mapCorVarToCorRel - + "\nmapFieldAccessToCorVar=" + mapFieldAccessToCorVar - + "\n"; + return "mapRefRelToCorRef=" + mapRefRelToCorRef + + "\nmapCorToCorRel=" + mapCorToCorRel + + "\nmapFieldAccessToCorRef=" + mapFieldAccessToCorRef + + "\n"; } @Override public boolean equals(Object obj) { return obj == this - || obj instanceof CorelMap - && mapRefRelToCorVar.equals(((CorelMap) obj).mapRefRelToCorVar) - && mapCorVarToCorRel.equals(((CorelMap) obj).mapCorVarToCorRel) - && mapFieldAccessToCorVar.equals( - ((CorelMap) obj).mapFieldAccessToCorVar); + || obj instanceof CorelMap + && mapRefRelToCorRef.equals(((CorelMap) obj).mapRefRelToCorRef) + && mapCorToCorRel.equals(((CorelMap) obj).mapCorToCorRel) + && mapFieldAccessToCorRef.equals( + ((CorelMap) obj).mapFieldAccessToCorRef); } @Override public int hashCode() { - return com.google.common.base.Objects.hashCode(mapRefRelToCorVar, - mapCorVarToCorRel, - mapFieldAccessToCorVar); + return Objects.hash(mapRefRelToCorRef, mapCorToCorRel, + mapFieldAccessToCorRef); } /** Creates a CorelMap with given contents. */ public static CorelMap of( - SortedSetMultimap<RelNode, Correlation> mapRefRelToCorVar, - SortedMap<CorrelationId, RelNode> mapCorVarToCorRel, - Map<RexFieldAccess, Correlation> mapFieldAccessToCorVar) { - return new CorelMap(mapRefRelToCorVar, mapCorVarToCorRel, - mapFieldAccessToCorVar); + SortedSetMultimap<RelNode, CorRef> mapRefRelToCorVar, + SortedMap<CorrelationId, RelNode> mapCorToCorRel, + Map<RexFieldAccess, CorRef> mapFieldAccessToCorVar) { + return new CorelMap(mapRefRelToCorVar, mapCorToCorRel, + mapFieldAccessToCorVar); } /** @@ -2842,27 +2943,26 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { * @return whether there are any correlating variables */ public boolean hasCorrelation() { - return !mapCorVarToCorRel.isEmpty(); + return !mapCorToCorRel.isEmpty(); } } /** Builds a {@link org.apache.calcite.sql2rel.RelDecorrelator.CorelMap}. */ private static class CorelMapBuilder extends HiveRelShuttleImpl { - final SortedMap<CorrelationId, RelNode> mapCorVarToCorRel = - new TreeMap<>(); - - final SortedSetMultimap<RelNode, Correlation> mapRefRelToCorVar = - Multimaps.newSortedSetMultimap( - Maps.<RelNode, Collection<Correlation>>newHashMap(), - new Supplier<TreeSet<Correlation>>() { - public TreeSet<Correlation> get() { - Bug.upgrade("use MultimapBuilder when we're on Guava-16"); - return Sets.newTreeSet(); - } - }); - - final Map<RexFieldAccess, Correlation> mapFieldAccessToCorVar = - new HashMap<>(); + final SortedMap<CorrelationId, RelNode> mapCorToCorRel = + new TreeMap<>(); + + final SortedSetMultimap<RelNode, CorRef> mapRefRelToCorRef = + Multimaps.newSortedSetMultimap( + new HashMap<RelNode, Collection<CorRef>>(), + new Supplier<TreeSet<CorRef>>() { + public TreeSet<CorRef> get() { + Bug.upgrade("use MultimapBuilder when we're on Guava-16"); + return Sets.newTreeSet(); + } + }); + + final Map<RexFieldAccess, CorRef> mapFieldAccessToCorVar = new HashMap<>(); final Holder<Integer> offset = Holder.of(0); int corrIdGenerator = 0; @@ -2872,7 +2972,7 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { /** Creates a CorelMap by iterating over a {@link RelNode} tree. */ CorelMap build(RelNode rel) { stripHep(rel).accept(this); - return new CorelMap(mapRefRelToCorVar, mapCorVarToCorRel, + return new CorelMap(mapRefRelToCorRef, mapCorToCorRel, mapFieldAccessToCorVar); } @@ -2902,7 +3002,7 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { } @Override public RelNode visit(LogicalCorrelate correlate) { - mapCorVarToCorRel.put(correlate.getCorrelationId(), correlate); + mapCorToCorRel.put(correlate.getCorrelationId(), correlate); return visitJoin(correlate); } @@ -2963,21 +3063,19 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { final RexNode ref = fieldAccess.getReferenceExpr(); if (ref instanceof RexCorrelVariable) { final RexCorrelVariable var = (RexCorrelVariable) ref; - if(mapFieldAccessToCorVar.containsKey(fieldAccess)) - { + if (mapFieldAccessToCorVar.containsKey(fieldAccess)) { //for cases where different Rel nodes are referring to // same correlation var (e.g. in case of NOT IN) // avoid generating another correlation var // and record the 'rel' is using the same correlation - mapRefRelToCorVar.put(rel, mapFieldAccessToCorVar.get(fieldAccess)); - } - else { - final Correlation correlation = - new Correlation(var.id, - fieldAccess.getField().getIndex(), - corrIdGenerator++); + mapRefRelToCorRef.put(rel, + mapFieldAccessToCorVar.get(fieldAccess)); + } else { + final CorRef correlation = + new CorRef(var.id, fieldAccess.getField().getIndex(), + corrIdGenerator++); mapFieldAccessToCorVar.put(fieldAccess, correlation); - mapRefRelToCorVar.put(rel, correlation); + mapRefRelToCorRef.put(rel, correlation); } } return super.visitFieldAccess(fieldAccess); @@ -2996,14 +3094,20 @@ public class HiveRelDecorrelator implements ReflectiveVisitor { * among its output fields. */ static class Frame { final RelNode r; - final ImmutableSortedMap<Correlation, Integer> corVarOutputPos; - final ImmutableMap<Integer, Integer> oldToNewOutputPos; + final ImmutableSortedMap<CorDef, Integer> corDefOutputs; + final ImmutableSortedMap<Integer, Integer> oldToNewOutputs; - Frame(RelNode r, SortedMap<Correlation, Integer> corVarOutputPos, - Map<Integer, Integer> oldToNewOutputPos) { + Frame(RelNode oldRel, RelNode r, SortedMap<CorDef, Integer> corDefOutputs, + Map<Integer, Integer> oldToNewOutputs) { this.r = Preconditions.checkNotNull(r); - this.corVarOutputPos = ImmutableSortedMap.copyOf(corVarOutputPos); - this.oldToNewOutputPos = ImmutableSortedMap.copyOf(oldToNewOutputPos); + this.corDefOutputs = ImmutableSortedMap.copyOf(corDefOutputs); + this.oldToNewOutputs = ImmutableSortedMap.copyOf(oldToNewOutputs); + assert allLessThan(corDefOutputs.values(), + r.getRowType().getFieldCount(), Litmus.THROW); + assert allLessThan(oldToNewOutputs.keySet(), + oldRel.getRowType().getFieldCount(), Litmus.THROW); + assert allLessThan(oldToNewOutputs.values(), + r.getRowType().getFieldCount(), Litmus.THROW); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/bddf5a7a/ql/src/test/queries/clientpositive/subquery_multi.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/subquery_multi.q b/ql/src/test/queries/clientpositive/subquery_multi.q index aff7f20..0362cc3 100644 --- a/ql/src/test/queries/clientpositive/subquery_multi.q +++ b/ql/src/test/queries/clientpositive/subquery_multi.q @@ -58,8 +58,12 @@ explain select * from part_null where p_name IN (select p_name from part_null) A select * from part_null where p_name IN (select p_name from part_null) AND NOT EXISTS (select c from tempty); -- corr, mix of IN/NOT IN -explain select * from part_null where p_name IN ( select p_name from part where part.p_type = part_null.p_type) AND p_brand NOT IN (select p_container from part where part.p_type = part_null.p_type AND p_brand IN (select p_brand from part pp where part.p_type = pp.p_type)); -select * from part_null where p_name IN ( select p_name from part where part.p_type = part_null.p_type) AND p_brand NOT IN (select p_container from part where part.p_type = part_null.p_type AND p_brand IN (select p_brand from part pp where part.p_type = pp.p_type)); +explain select * from part_null where p_name IN ( select p_name from part where part.p_type = part_null.p_type) + AND p_brand NOT IN (select p_container from part where part.p_type = part_null.p_type + AND p_brand IN (select p_brand from part pp where part.p_type = pp.p_type)); +select * from part_null where p_name IN ( select p_name from part where part.p_type = part_null.p_type) + AND p_brand NOT IN (select p_container from part where part.p_type = part_null.p_type + AND p_brand IN (select p_brand from part pp where part.p_type = pp.p_type)); -- mix of corr and uncorr explain select * from part_null where p_name IN ( select p_name from part) AND p_brand IN (select p_brand from part where part.p_type = part_null.p_type); http://git-wip-us.apache.org/repos/asf/hive/blob/bddf5a7a/ql/src/test/results/clientpositive/constprog_partitioner.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/constprog_partitioner.q.out b/ql/src/test/results/clientpositive/constprog_partitioner.q.out index 2a44269..8c7f9d3 100644 --- a/ql/src/test/results/clientpositive/constprog_partitioner.q.out +++ b/ql/src/test/results/clientpositive/constprog_partitioner.q.out @@ -80,95 +80,10 @@ WHERE li.l_linenumber = 1 AND li.l_orderkey IN (SELECT l_orderkey FROM lineitem WHERE l_shipmode = 'AIR' AND l_linenumber = li.l_linenumber) POSTHOOK: type: QUERY STAGE DEPENDENCIES: - Stage-3 is a root stage - Stage-2 depends on stages: Stage-3 - Stage-1 depends on stages: Stage-2 + Stage-1 is a root stage Stage-0 depends on stages: Stage-1 STAGE PLANS: - Stage: Stage-3 - Map Reduce - Map Operator Tree: - TableScan - alias: li - Statistics: Num rows: 100 Data size: 11999 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: l_linenumber (type: int) - outputColumnNames: l_linenumber - Statistics: Num rows: 100 Data size: 11999 Basic stats: COMPLETE Column stats: NONE - Group By Operator - keys: l_linenumber (type: int) - mode: hash - outputColumnNames: _col0 - Statistics: Num rows: 100 Data size: 11999 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: int) - sort order: + - Map-reduce partition columns: _col0 (type: int) - Statistics: Num rows: 100 Data size: 11999 Basic stats: COMPLETE Column stats: NONE - Reduce Operator Tree: - Group By Operator - keys: KEY._col0 (type: int) - mode: mergepartial - outputColumnNames: _col0 - Statistics: Num rows: 50 Data size: 5999 Basic stats: COMPLETE Column stats: NONE - File Output Operator - compressed: false - table: - input format: org.apache.hadoop.mapred.SequenceFileInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat - serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe - - Stage: Stage-2 - Map Reduce - Map Operator Tree: - TableScan - alias: lineitem - Statistics: Num rows: 100 Data size: 11999 Basic stats: COMPLETE Column stats: NONE - Filter Operator - predicate: (l_shipmode = 'AIR') (type: boolean) - Statistics: Num rows: 50 Data size: 5999 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: l_orderkey (type: int), l_linenumber (type: int) - outputColumnNames: _col0, _col1 - Statistics: Num rows: 50 Data size: 5999 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: _col1 (type: int) - sort order: + - Map-reduce partition columns: _col1 (type: int) - Statistics: Num rows: 50 Data size: 5999 Basic stats: COMPLETE Column stats: NONE - value expressions: _col0 (type: int) - TableScan - Reduce Output Operator - key expressions: _col0 (type: int) - sort order: + - Map-reduce partition columns: _col0 (type: int) - Statistics: Num rows: 50 Data size: 5999 Basic stats: COMPLETE Column stats: NONE - Reduce Operator Tree: - Join Operator - condition map: - Inner Join 0 to 1 - keys: - 0 _col1 (type: int) - 1 _col0 (type: int) - outputColumnNames: _col0, _col3 - Statistics: Num rows: 55 Data size: 6598 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: _col0 (type: int), _col3 (type: int) - outputColumnNames: _col0, _col1 - Statistics: Num rows: 55 Data size: 6598 Basic stats: COMPLETE Column stats: NONE - Group By Operator - keys: _col0 (type: int), _col1 (type: int) - mode: hash - outputColumnNames: _col0, _col1 - Statistics: Num rows: 55 Data size: 6598 Basic stats: COMPLETE Column stats: NONE - File Output Operator - compressed: false - table: - input format: org.apache.hadoop.mapred.SequenceFileInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat - serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe - Stage: Stage-1 Map Reduce Map Operator Tree: @@ -189,11 +104,25 @@ STAGE PLANS: Statistics: Num rows: 50 Data size: 5999 Basic stats: COMPLETE Column stats: NONE value expressions: _col1 (type: int), _col2 (type: int) TableScan - Reduce Output Operator - key expressions: _col0 (type: int), _col1 (type: int) - sort order: ++ - Map-reduce partition columns: _col0 (type: int), _col1 (type: int) - Statistics: Num rows: 55 Data size: 6598 Basic stats: COMPLETE Column stats: NONE + alias: lineitem + Statistics: Num rows: 100 Data size: 11999 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: ((l_shipmode = 'AIR') and (l_linenumber = l_linenumber)) (type: boolean) + Statistics: Num rows: 25 Data size: 2999 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: l_orderkey (type: int), l_linenumber (type: int) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 25 Data size: 2999 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: _col0 (type: int), _col1 (type: int) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 25 Data size: 2999 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: int) + sort order: ++ + Map-reduce partition columns: _col0 (type: int), _col1 (type: int) + Statistics: Num rows: 25 Data size: 2999 Basic stats: COMPLETE Column stats: NONE Reduce Operator Tree: Join Operator condition map: @@ -202,14 +131,14 @@ STAGE PLANS: 0 _col0 (type: int), 1 (type: int) 1 _col0 (type: int), _col1 (type: int) outputColumnNames: _col1, _col2 - Statistics: Num rows: 60 Data size: 7257 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 55 Data size: 6598 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: _col1 (type: int), _col2 (type: int) outputColumnNames: _col0, _col1 - Statistics: Num rows: 60 Data size: 7257 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 55 Data size: 6598 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false - Statistics: Num rows: 60 Data size: 7257 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 55 Data size: 6598 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
