HIVE-10882 : CBO: Calcite Operator To Hive Operator (Calcite Return Path) empty filtersMap of join operator causes wrong results (Jesus Camacho Rodriguez via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5363af9a Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5363af9a Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5363af9a Branch: refs/heads/beeline-cli Commit: 5363af9aa1d98f82bdef4861b533314dc19a77d0 Parents: 2620ebb Author: Jesus Camacho Rodriguez <[email protected]> Authored: Mon Jul 13 05:26:49 2015 -0700 Committer: Ashutosh Chauhan <[email protected]> Committed: Mon Jul 13 15:31:37 2015 -0700 ---------------------------------------------------------------------- .../ql/optimizer/calcite/HiveRelOptUtil.java | 37 - .../calcite/reloperators/HiveJoin.java | 14 + .../calcite/reloperators/HiveMultiJoin.java | 28 +- .../calcite/reloperators/HiveSemiJoin.java | 57 +- .../calcite/rules/HiveJoinToMultiJoinRule.java | 50 +- .../calcite/rules/HiveRelFieldTrimmer.java | 3 +- .../calcite/translator/HiveOpConverter.java | 122 +- .../hadoop/hive/ql/parse/CalcitePlanner.java | 4 +- .../queries/clientpositive/fouter_join_ppr.q | 73 + .../clientpositive/fouter_join_ppr.q.out | 1694 ++++++++++++++++++ 10 files changed, 1974 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/5363af9a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOptUtil.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOptUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOptUtil.java index ab793f1..5a5954d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOptUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOptUtil.java @@ -226,43 +226,6 @@ public class HiveRelOptUtil extends RelOptUtil { } } -// if ((rangeOp == null) -// && ((leftKey == null) || (rightKey == null))) { -// // no equality join keys found yet: -// // try transforming the condition to -// // equality "join" conditions, e.g. -// // f(LHS) > 0 ===> ( f(LHS) > 0 ) = TRUE, -// // and make the RHS produce TRUE, but only if we're strictly -// // looking for equi-joins -// final ImmutableBitSet projRefs = InputFinder.bits(condition); -// leftKey = null; -// rightKey = null; -// -// boolean foundInput = false; -// for (int i = 0; i < inputs.size() && !foundInput; i++) { -// final int lowerLimit = inputsRange[i].nextSetBit(0); -// final int upperLimit = inputsRange[i].length(); -// if (projRefs.nextSetBit(lowerLimit) < upperLimit) { -// leftInput = i; -// leftFields = inputs.get(leftInput).getRowType().getFieldList(); -// -// leftKey = condition.accept( -// new RelOptUtil.RexInputConverter( -// rexBuilder, -// leftFields, -// leftFields, -// adjustments)); -// -// rightKey = rexBuilder.makeLiteral(true); -// -// // effectively performing an equality comparison -// kind = SqlKind.EQUALS; -// -// foundInput = true; -// } -// } -// } - if ((leftKey != null) && (rightKey != null)) { // found suitable join keys // add them to key list, ensuring that if there is a http://git-wip-us.apache.org/repos/asf/hive/blob/5363af9a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java index 6814df6..ffd3196 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java @@ -43,6 +43,7 @@ import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.ImmutableIntList; import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelOptUtil; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo; import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil; import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCostModel.JoinAlgorithm; @@ -60,6 +61,7 @@ public class HiveJoin extends Join implements HiveRelNode { } private final boolean leftSemiJoin; + private final RexNode joinFilter; private final JoinPredicateInfo joinPredInfo; private JoinAlgorithm joinAlgorithm; private RelOptCost joinCost; @@ -82,6 +84,14 @@ public class HiveJoin extends Join implements HiveRelNode { JoinAlgorithm joinAlgo, boolean leftSemiJoin) throws InvalidRelException, CalciteSemanticException { super(cluster, TraitsUtil.getDefaultTraitSet(cluster), left, right, condition, joinType, variablesStopped); + final List<RelDataTypeField> systemFieldList = ImmutableList.of(); + List<List<RexNode>> joinKeyExprs = new ArrayList<List<RexNode>>(); + List<Integer> filterNulls = new ArrayList<Integer>(); + for (int i=0; i<this.getInputs().size(); i++) { + joinKeyExprs.add(new ArrayList<RexNode>()); + } + this.joinFilter = HiveRelOptUtil.splitHiveJoinCondition(systemFieldList, this.getInputs(), + this.getCondition(), joinKeyExprs, filterNulls, null); this.joinPredInfo = HiveCalciteUtil.JoinPredicateInfo.constructJoinPredicateInfo(this); this.joinAlgorithm = joinAlgo; this.leftSemiJoin = leftSemiJoin; @@ -105,6 +115,10 @@ public class HiveJoin extends Join implements HiveRelNode { } } + public RexNode getJoinFilter() { + return joinFilter; + } + public JoinPredicateInfo getJoinPredicateInfo() { return joinPredInfo; } http://git-wip-us.apache.org/repos/asf/hive/blob/5363af9a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveMultiJoin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveMultiJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveMultiJoin.java index 7a43f29..660f01d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveMultiJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveMultiJoin.java @@ -49,6 +49,7 @@ public final class HiveMultiJoin extends AbstractRelNode { private final RelDataType rowType; private final ImmutableList<Pair<Integer,Integer>> joinInputs; private final ImmutableList<JoinRelType> joinTypes; + private final ImmutableList<RexNode> filters; private final boolean outerJoin; private final JoinPredicateInfo joinPredInfo; @@ -59,30 +60,34 @@ public final class HiveMultiJoin extends AbstractRelNode { * * @param cluster cluster that join belongs to * @param inputs inputs into this multi-join - * @param condition join filter applicable to this join node + * @param condition join filter applicable to this join node * @param rowType row type of the join result of this node - * @param joinInputs + * @param joinInputs * @param joinTypes the join type corresponding to each input; if * an input is null-generating in a left or right * outer join, the entry indicates the type of * outer join; otherwise, the entry is set to * INNER + * @param filters filters associated with each join + * input */ public HiveMultiJoin( RelOptCluster cluster, List<RelNode> inputs, - RexNode joinFilter, + RexNode condition, RelDataType rowType, List<Pair<Integer,Integer>> joinInputs, - List<JoinRelType> joinTypes) { + List<JoinRelType> joinTypes, + List<RexNode> filters) { super(cluster, TraitsUtil.getDefaultTraitSet(cluster)); this.inputs = Lists.newArrayList(inputs); - this.condition = joinFilter; + this.condition = condition; this.rowType = rowType; assert joinInputs.size() == joinTypes.size(); this.joinInputs = ImmutableList.copyOf(joinInputs); this.joinTypes = ImmutableList.copyOf(joinTypes); + this.filters = ImmutableList.copyOf(filters); this.outerJoin = containsOuter(); try { @@ -107,7 +112,8 @@ public final class HiveMultiJoin extends AbstractRelNode { condition, rowType, joinInputs, - joinTypes); + joinTypes, + filters); } @Override @@ -156,7 +162,8 @@ public final class HiveMultiJoin extends AbstractRelNode { joinFilter, rowType, joinInputs, - joinTypes); + joinTypes, + filters); } /** @@ -188,6 +195,13 @@ public final class HiveMultiJoin extends AbstractRelNode { } /** + * @return join conditions filters + */ + public List<RexNode> getJoinFilters() { + return filters; + } + + /** * @return the join predicate information */ public JoinPredicateInfo getJoinPredicateInfo() { http://git-wip-us.apache.org/repos/asf/hive/blob/5363af9a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveSemiJoin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveSemiJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveSemiJoin.java index dd1691c..af82822 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveSemiJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveSemiJoin.java @@ -17,39 +17,86 @@ */ package org.apache.hadoop.hive.ql.optimizer.calcite.reloperators; +import java.util.ArrayList; +import java.util.List; + import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.InvalidRelException; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.JoinInfo; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.core.RelFactories.SemiJoinFactory; import org.apache.calcite.rel.core.SemiJoin; import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexNode; import org.apache.calcite.util.ImmutableIntList; +import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelOptUtil; + +import com.google.common.collect.ImmutableList; public class HiveSemiJoin extends SemiJoin implements HiveRelNode { public static final SemiJoinFactory HIVE_SEMIJOIN_FACTORY = new HiveSemiJoinFactoryImpl(); - public HiveSemiJoin(RelOptCluster cluster, + private final RexNode joinFilter; + + + public static HiveSemiJoin getSemiJoin( + RelOptCluster cluster, RelTraitSet traitSet, RelNode left, RelNode right, RexNode condition, ImmutableIntList leftKeys, ImmutableIntList rightKeys) { + try { + HiveSemiJoin semiJoin = new HiveSemiJoin(cluster, traitSet, left, right, + condition, leftKeys, rightKeys); + return semiJoin; + } catch (InvalidRelException | CalciteSemanticException e) { + throw new RuntimeException(e); + } + } + + protected HiveSemiJoin(RelOptCluster cluster, + RelTraitSet traitSet, + RelNode left, + RelNode right, + RexNode condition, + ImmutableIntList leftKeys, + ImmutableIntList rightKeys) throws InvalidRelException, CalciteSemanticException { super(cluster, traitSet, left, right, condition, leftKeys, rightKeys); + final List<RelDataTypeField> systemFieldList = ImmutableList.of(); + List<List<RexNode>> joinKeyExprs = new ArrayList<List<RexNode>>(); + List<Integer> filterNulls = new ArrayList<Integer>(); + for (int i=0; i<this.getInputs().size(); i++) { + joinKeyExprs.add(new ArrayList<RexNode>()); + } + this.joinFilter = HiveRelOptUtil.splitHiveJoinCondition(systemFieldList, this.getInputs(), + this.getCondition(), joinKeyExprs, filterNulls, null); + } + + public RexNode getJoinFilter() { + return joinFilter; } @Override public SemiJoin copy(RelTraitSet traitSet, RexNode condition, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) { - final JoinInfo joinInfo = JoinInfo.of(left, right, condition); - return new HiveSemiJoin(getCluster(), traitSet, left, right, condition, - joinInfo.leftKeys, joinInfo.rightKeys); + try { + final JoinInfo joinInfo = JoinInfo.of(left, right, condition); + return new HiveSemiJoin(getCluster(), traitSet, left, right, condition, + joinInfo.leftKeys, joinInfo.rightKeys); + } catch (InvalidRelException | CalciteSemanticException e) { + // Semantic error not possible. Must be a bug. Convert to + // internal error. + throw new AssertionError(e); + } } @Override @@ -72,7 +119,7 @@ public class HiveSemiJoin extends SemiJoin implements HiveRelNode { RexNode condition) { final JoinInfo joinInfo = JoinInfo.of(left, right, condition); final RelOptCluster cluster = left.getCluster(); - return new HiveSemiJoin(cluster, left.getTraitSet(), left, right, condition, + return getSemiJoin(cluster, left.getTraitSet(), left, right, condition, joinInfo.leftKeys, joinInfo.rightKeys); } } http://git-wip-us.apache.org/repos/asf/hive/blob/5363af9a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveJoinToMultiJoinRule.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveJoinToMultiJoinRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveJoinToMultiJoinRule.java index a0144f3..d0a29a7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveJoinToMultiJoinRule.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveJoinToMultiJoinRule.java @@ -124,25 +124,29 @@ public class HiveJoinToMultiJoinRule extends RelOptRule { // We check whether the join can be combined with any of its children final List<RelNode> newInputs = Lists.newArrayList(); - final List<RexNode> newJoinFilters = Lists.newArrayList(); - newJoinFilters.add(join.getCondition()); - final List<Pair<Pair<Integer,Integer>, JoinRelType>> joinSpecs = Lists.newArrayList(); + final List<RexNode> newJoinCondition = Lists.newArrayList(); + final List<Pair<Integer,Integer>> joinInputs = Lists.newArrayList(); + final List<JoinRelType> joinTypes = Lists.newArrayList(); + final List<RexNode> joinFilters = Lists.newArrayList(); // Left child - if (left instanceof Join || left instanceof HiveMultiJoin) { + if (left instanceof HiveJoin || left instanceof HiveMultiJoin) { final RexNode leftCondition; final List<Pair<Integer,Integer>> leftJoinInputs; final List<JoinRelType> leftJoinTypes; - if (left instanceof Join) { - Join hj = (Join) left; + final List<RexNode> leftJoinFilters; + if (left instanceof HiveJoin) { + HiveJoin hj = (HiveJoin) left; leftCondition = hj.getCondition(); leftJoinInputs = ImmutableList.of(Pair.of(0, 1)); leftJoinTypes = ImmutableList.of(hj.getJoinType()); + leftJoinFilters = ImmutableList.of(hj.getJoinFilter()); } else { HiveMultiJoin hmj = (HiveMultiJoin) left; leftCondition = hmj.getCondition(); leftJoinInputs = hmj.getJoinInputs(); leftJoinTypes = hmj.getJoinTypes(); + leftJoinFilters = hmj.getJoinFilters(); } boolean combinable; @@ -154,9 +158,11 @@ public class HiveJoinToMultiJoinRule extends RelOptRule { combinable = false; } if (combinable) { - newJoinFilters.add(leftCondition); + newJoinCondition.add(leftCondition); for (int i = 0; i < leftJoinInputs.size(); i++) { - joinSpecs.add(Pair.of(leftJoinInputs.get(i), leftJoinTypes.get(i))); + joinInputs.add(leftJoinInputs.get(i)); + joinTypes.add(leftJoinTypes.get(i)); + joinFilters.add(leftJoinFilters.get(i)); } newInputs.addAll(left.getInputs()); } else { // The join operation in the child is not on the same keys @@ -171,7 +177,8 @@ public class HiveJoinToMultiJoinRule extends RelOptRule { newInputs.add(right); // If we cannot combine any of the children, we bail out - if (newJoinFilters.size() == 1) { + newJoinCondition.add(join.getCondition()); + if (newJoinCondition.size() == 1) { return null; } @@ -181,18 +188,14 @@ public class HiveJoinToMultiJoinRule extends RelOptRule { for (int i=0; i<newInputs.size(); i++) { joinKeyExprs.add(new ArrayList<RexNode>()); } - RexNode otherCondition; + RexNode filters; try { - otherCondition = HiveRelOptUtil.splitHiveJoinCondition(systemFieldList, newInputs, join.getCondition(), - joinKeyExprs, filterNulls, null); + filters = HiveRelOptUtil.splitHiveJoinCondition(systemFieldList, newInputs, + join.getCondition(), joinKeyExprs, filterNulls, null); } catch (CalciteSemanticException e) { LOG.trace("Failed to merge joins", e); return null; } - // If there are remaining parts in the condition, we bail out - if (!otherCondition.isAlwaysTrue()) { - return null; - } ImmutableBitSet.Builder keysInInputsBuilder = ImmutableBitSet.builder(); for (int i=0; i<newInputs.size(); i++) { List<RexNode> partialCondition = joinKeyExprs.get(i); @@ -214,25 +217,30 @@ public class HiveJoinToMultiJoinRule extends RelOptRule { if (join.getJoinType() != JoinRelType.INNER) { int leftInput = keysInInputs.nextSetBit(0); int rightInput = keysInInputs.nextSetBit(numberLeftInputs); - joinSpecs.add(Pair.of(Pair.of(leftInput, rightInput), join.getJoinType())); + joinInputs.add(Pair.of(leftInput, rightInput)); + joinTypes.add(join.getJoinType()); + joinFilters.add(filters); } else { for (int i : leftReferencedInputs) { for (int j : rightReferencedInputs) { - joinSpecs.add(Pair.of(Pair.of(i, j), join.getJoinType())); + joinInputs.add(Pair.of(i, j)); + joinTypes.add(join.getJoinType()); + joinFilters.add(filters); } } } // We can now create a multijoin operator RexNode newCondition = RexUtil.flatten(rexBuilder, - RexUtil.composeConjunction(rexBuilder, newJoinFilters, false)); + RexUtil.composeConjunction(rexBuilder, newJoinCondition, false)); return new HiveMultiJoin( join.getCluster(), newInputs, newCondition, join.getRowType(), - Pair.left(joinSpecs), - Pair.right(joinSpecs)); + joinInputs, + joinTypes, + joinFilters); } private static boolean isCombinablePredicate(Join join, http://git-wip-us.apache.org/repos/asf/hive/blob/5363af9a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelFieldTrimmer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelFieldTrimmer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelFieldTrimmer.java index f72f67f..4144674 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelFieldTrimmer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelFieldTrimmer.java @@ -151,7 +151,8 @@ public class HiveRelFieldTrimmer extends RelFieldTrimmer { newConditionExpr, newRowType, join.getJoinInputs(), - join.getJoinTypes()); + join.getJoinTypes(), + join.getJoinFilters()); return new TrimResult(newJoin, mapping); } http://git-wip-us.apache.org/repos/asf/hive/blob/5363af9a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java index 86ac4d1..c711406 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelCollations; import org.apache.calcite.rel.RelDistribution; import org.apache.calcite.rel.RelDistribution.Type; @@ -55,8 +56,6 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidUtils.Operation; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; -import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo; -import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; @@ -98,6 +97,7 @@ import org.apache.hadoop.hive.ql.plan.UnionDesc; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; public class HiveOpConverter { @@ -352,13 +352,6 @@ public class HiveOpConverter { + " with row type: [" + joinRel.getRowType() + "]"); } - JoinPredicateInfo joinPredInfo; - if (joinRel instanceof HiveJoin) { - joinPredInfo = ((HiveJoin)joinRel).getJoinPredicateInfo(); - } else { - joinPredInfo = ((HiveMultiJoin)joinRel).getJoinPredicateInfo(); - } - // 4. Extract join key expressions from HiveSortExchange ExprNodeDesc[][] joinExpressions = new ExprNodeDesc[inputs.length][]; for (int i = 0; i < inputs.length; i++) { @@ -368,23 +361,22 @@ public class HiveOpConverter { // 5. Extract rest of join predicate info. We infer the rest of join condition // that will be added to the filters (join conditions that are not part of // the join key) - ExprNodeDesc[][] filterExpressions = new ExprNodeDesc[inputs.length][]; - for (int i = 0; i< inputs.length; i++) { + List<RexNode> joinFilters; + if (joinRel instanceof HiveJoin) { + joinFilters = ImmutableList.of(((HiveJoin)joinRel).getJoinFilter()); + } else { + joinFilters = ((HiveMultiJoin)joinRel).getJoinFilters(); + } + List<List<ExprNodeDesc>> filterExpressions = Lists.newArrayList(); + for (int i = 0; i< joinFilters.size(); i++) { List<ExprNodeDesc> filterExpressionsForInput = new ArrayList<ExprNodeDesc>(); - Set<String> keySet = new HashSet<String>(); - for (int j = 0; j < joinPredInfo.getNonEquiJoinPredicateElements().size(); j++) { - JoinLeafPredicateInfo joinLeafPredInfo = joinPredInfo. - getNonEquiJoinPredicateElements().get(j); - for (RexNode joinExprNode : joinLeafPredInfo.getJoinExprs(i)) { - if (keySet.add(joinExprNode.toString())) { - ExprNodeDesc expr = convertToExprNode(joinExprNode, joinRel, - null, newVcolsInCalcite); - filterExpressionsForInput.add(expr); - } + if (joinFilters.get(i) != null) { + for (RexNode conj : RelOptUtil.conjunctions(joinFilters.get(i))) { + ExprNodeDesc expr = convertToExprNode(conj, joinRel, null, newVcolsInCalcite); + filterExpressionsForInput.add(expr); } } - filterExpressions[i] = filterExpressionsForInput.toArray( - new ExprNodeDesc[filterExpressionsForInput.size()]); + filterExpressions.add(filterExpressionsForInput); } // 6. Generate Join operator @@ -822,7 +814,7 @@ public class HiveOpConverter { } private static JoinOperator genJoin(RelNode join, ExprNodeDesc[][] joinExpressions, - ExprNodeDesc[][] filterExpressions, List<Operator<?>> children, + List<List<ExprNodeDesc>> filterExpressions, List<Operator<?>> children, String[] baseSrc, String tabAlias) throws SemanticException { // 1. Extract join type @@ -849,6 +841,7 @@ public class HiveOpConverter { && joinType != JoinType.RIGHTOUTER; } + // 2. We create the join aux structures ArrayList<ColumnInfo> outputColumns = new ArrayList<ColumnInfo>(); ArrayList<String> outputColumnNames = new ArrayList<String>(join.getRowType() .getFieldNames()); @@ -862,7 +855,7 @@ public class HiveOpConverter { int outputPos = 0; for (int pos = 0; pos < children.size(); pos++) { - // 2. Backtracking from RS + // 2.1. Backtracking from RS ReduceSinkOperator inputRS = (ReduceSinkOperator) children.get(pos); if (inputRS.getNumParent() != 1) { throw new SemanticException("RS should have single parent"); @@ -874,7 +867,7 @@ public class HiveOpConverter { Byte tag = (byte) rsDesc.getTag(); - // 2.1. If semijoin... + // 2.1.1. If semijoin... if (semiJoin && pos != 0) { exprMap.put(tag, new ArrayList<ExprNodeDesc>()); childOps[pos] = inputRS; @@ -902,22 +895,52 @@ public class HiveOpConverter { exprMap.put(tag, new ArrayList<ExprNodeDesc>(descriptors.values())); colExprMap.putAll(descriptors); childOps[pos] = inputRS; + } + + // 3. We populate the filters and filterMap structure needed in the join descriptor + List<List<ExprNodeDesc>> filtersPerInput = Lists.newArrayList(); + int[][] filterMap = new int[children.size()][]; + for (int i=0; i<children.size(); i++) { + filtersPerInput.add(new ArrayList<ExprNodeDesc>()); + } + // 3. We populate the filters structure + for (int i=0; i<filterExpressions.size(); i++) { + int leftPos = joinCondns[i].getLeft(); + int rightPos = joinCondns[i].getRight(); - // 3. We populate the filters structure - List<ExprNodeDesc> filtersForInput = new ArrayList<ExprNodeDesc>(); - for (ExprNodeDesc expr : filterExpressions[pos]) { + for (ExprNodeDesc expr : filterExpressions.get(i)) { // We need to update the exprNode, as currently // they refer to columns in the output of the join; // they should refer to the columns output by the RS - updateExprNode(expr, colExprMap); - filtersForInput.add(expr); + int inputPos = updateExprNode(expr, reversedExprs, colExprMap); + if (inputPos == -1) { + inputPos = leftPos; + } + filtersPerInput.get(inputPos).add(expr); + + if (joinCondns[i].getType() == JoinDesc.FULL_OUTER_JOIN || + joinCondns[i].getType() == JoinDesc.LEFT_OUTER_JOIN || + joinCondns[i].getType() == JoinDesc.RIGHT_OUTER_JOIN) { + if (inputPos == leftPos) { + updateFilterMap(filterMap, leftPos, rightPos); + } else { + updateFilterMap(filterMap, rightPos, leftPos); + } + } } - filters.put(tag, filtersForInput); + } + for (int pos = 0; pos < children.size(); pos++) { + ReduceSinkOperator inputRS = (ReduceSinkOperator) children.get(pos); + ReduceSinkDesc rsDesc = inputRS.getConf(); + Byte tag = (byte) rsDesc.getTag(); + filters.put(tag, filtersPerInput.get(pos)); } + // 4. We create the join operator with its descriptor JoinDesc desc = new JoinDesc(exprMap, outputColumnNames, noOuterJoin, joinCondns, filters, joinExpressions); desc.setReversedExprs(reversedExprs); + desc.setFilterMap(filterMap); JoinOperator joinOp = (JoinOperator) OperatorFactory.getAndMakeChild(desc, new RowSchema( outputColumns), childOps); @@ -940,20 +963,49 @@ public class HiveOpConverter { * the execution engine expects filters in the Join operators * to be expressed that way. */ - private static void updateExprNode(ExprNodeDesc expr, Map<String, ExprNodeDesc> colExprMap) { + private static int updateExprNode(ExprNodeDesc expr, final Map<String, Byte> reversedExprs, + final Map<String, ExprNodeDesc> colExprMap) { + int inputPos = -1; if (expr instanceof ExprNodeGenericFuncDesc) { ExprNodeGenericFuncDesc func = (ExprNodeGenericFuncDesc) expr; List<ExprNodeDesc> newChildren = new ArrayList<ExprNodeDesc>(); for (ExprNodeDesc functionChild : func.getChildren()) { if (functionChild instanceof ExprNodeColumnDesc) { - newChildren.add(colExprMap.get(functionChild.getExprString())); + String colRef = functionChild.getExprString(); + inputPos = reversedExprs.get(colRef); + newChildren.add(colExprMap.get(colRef)); } else { - updateExprNode(functionChild, colExprMap); + inputPos = updateExprNode(functionChild, reversedExprs, colExprMap); newChildren.add(functionChild); } } func.setChildren(newChildren); } + return inputPos; + } + + private static void updateFilterMap(int[][] filterMap, int inputPos, int joinPos) { + int[] map = filterMap[inputPos]; + if (map == null) { + filterMap[inputPos] = new int[2]; + filterMap[inputPos][0] = joinPos; + filterMap[inputPos][1]++; + } else { + boolean inserted = false; + for (int j=0; j<map.length/2 && !inserted; j++) { + if (map[j*2] == joinPos) { + map[j*2+1]++; + inserted = true; + } + } + if (!inserted) { + int[] newMap = new int[map.length + 2]; + System.arraycopy(map, 0, newMap, 0, map.length); + newMap[map.length] = joinPos; + newMap[map.length+1]++; + filterMap[inputPos] = newMap; + } + } } private static JoinType extractJoinType(HiveJoin join) { http://git-wip-us.apache.org/repos/asf/hive/blob/5363af9a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 3b5dbe2..84bb951 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -1237,8 +1237,8 @@ public class CalcitePlanner extends SemanticAnalyzer { HiveProject.DEFAULT_PROJECT_FACTORY, inputRels, leftJoinKeys, rightJoinKeys, 0, leftKeys, rightKeys); - joinRel = new HiveSemiJoin(cluster, cluster.traitSetOf(HiveRelNode.CONVENTION), inputRels[0], - inputRels[1], calciteJoinCond, ImmutableIntList.copyOf(leftKeys), + joinRel = HiveSemiJoin.getSemiJoin(cluster, cluster.traitSetOf(HiveRelNode.CONVENTION), + inputRels[0], inputRels[1], calciteJoinCond, ImmutableIntList.copyOf(leftKeys), ImmutableIntList.copyOf(rightKeys)); } else { joinRel = HiveJoin.getJoin(cluster, leftRel, rightRel, calciteJoinCond, calciteJoinType, http://git-wip-us.apache.org/repos/asf/hive/blob/5363af9a/ql/src/test/queries/clientpositive/fouter_join_ppr.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/fouter_join_ppr.q b/ql/src/test/queries/clientpositive/fouter_join_ppr.q new file mode 100644 index 0000000..4bf3705 --- /dev/null +++ b/ql/src/test/queries/clientpositive/fouter_join_ppr.q @@ -0,0 +1,73 @@ +set hive.optimize.ppd=true; + +-- SORT_QUERY_RESULTS + +EXPLAIN EXTENDED + FROM + src a + FULL OUTER JOIN + srcpart b + ON (a.key = b.key AND b.ds = '2008-04-08') + SELECT a.key, a.value, b.key, b.value + WHERE a.key > 10 AND a.key < 20 AND b.key > 15 AND b.key < 25; + + FROM + src a + FULL OUTER JOIN + srcpart b + ON (a.key = b.key AND b.ds = '2008-04-08') + SELECT a.key, a.value, b.key, b.value + WHERE a.key > 10 AND a.key < 20 AND b.key > 15 AND b.key < 25; + +EXPLAIN EXTENDED + FROM + srcpart a + FULL OUTER JOIN + src b + ON (a.key = b.key AND a.ds = '2008-04-08') + SELECT a.key, a.value, b.key, b.value + WHERE a.key > 10 AND a.key < 20 AND b.key > 15 AND b.key < 25; + + FROM + srcpart a + FULL OUTER JOIN + src b + ON (a.key = b.key AND a.ds = '2008-04-08') + SELECT a.key, a.value, b.key, b.value + WHERE a.key > 10 AND a.key < 20 AND b.key > 15 AND b.key < 25; + + +EXPLAIN EXTENDED + FROM + src a + FULL OUTER JOIN + srcpart b + ON (a.key = b.key) + SELECT a.key, a.value, b.key, b.value + WHERE a.key > 10 AND a.key < 20 AND b.key > 15 AND b.key < 25 AND b.ds = '2008-04-08'; + + FROM + src a + FULL OUTER JOIN + srcpart b + ON (a.key = b.key) + SELECT a.key, a.value, b.key, b.value + WHERE a.key > 10 AND a.key < 20 AND b.key > 15 AND b.key < 25 AND b.ds = '2008-04-08'; + +EXPLAIN EXTENDED + FROM + srcpart a + FULL OUTER JOIN + src b + ON (a.key = b.key) + SELECT a.key, a.value, b.key, b.value + WHERE a.key > 10 AND a.key < 20 AND b.key > 15 AND b.key < 25 AND a.ds = '2008-04-08'; + + FROM + srcpart a + FULL OUTER JOIN + src b + ON (a.key = b.key) + SELECT a.key, a.value, b.key, b.value + WHERE a.key > 10 AND a.key < 20 AND b.key > 15 AND b.key < 25 AND a.ds = '2008-04-08'; +
