HIVE-12765: Support Intersect (distinct/all) Except (distinct/all) Minus (distinct/all) (Pengcheng Xiong, reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0049a21f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0049a21f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0049a21f Branch: refs/heads/hive-14535 Commit: 0049a21f5442581b463f1bdfe0b1c12983c62ab2 Parents: 394fc47 Author: Pengcheng Xiong <[email protected]> Authored: Sat Oct 22 13:48:26 2016 -0700 Committer: Pengcheng Xiong <[email protected]> Committed: Sat Oct 22 13:48:26 2016 -0700 ---------------------------------------------------------------------- .../test/resources/testconfiguration.properties | 4 + .../org/apache/hadoop/hive/ql/ErrorMsg.java | 2 + .../ql/optimizer/calcite/HiveCalciteUtil.java | 73 + .../ql/optimizer/calcite/HiveRelFactories.java | 16 +- .../calcite/reloperators/HiveExcept.java | 43 + .../calcite/reloperators/HiveIntersect.java | 43 + .../calcite/rules/HiveExceptRewriteRule.java | 375 ++++ .../calcite/rules/HiveIntersectMergeRule.java | 88 + .../calcite/rules/HiveIntersectRewriteRule.java | 250 +++ .../HiveProjectOverIntersectRemoveRule.java | 67 + .../rules/HiveSortLimitPullUpConstantsRule.java | 7 +- .../calcite/translator/ASTConverter.java | 26 +- .../hadoop/hive/ql/parse/CalcitePlanner.java | 124 +- .../org/apache/hadoop/hive/ql/parse/HiveLexer.g | 1 + .../apache/hadoop/hive/ql/parse/HiveParser.g | 22 +- .../hadoop/hive/ql/parse/IdentifiersParser.g | 2 +- .../org/apache/hadoop/hive/ql/parse/QBExpr.java | 2 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 38 +- .../TestSQL11ReservedKeyWordsNegative.java | 23 +- ql/src/test/queries/clientpositive/except_all.q | 58 + .../queries/clientpositive/except_distinct.q | 58 + .../test/queries/clientpositive/intersect_all.q | 42 + .../queries/clientpositive/intersect_distinct.q | 42 + .../queries/clientpositive/intersect_merge.q | 27 + .../results/clientpositive/except_all.q.out | 986 +++++++++ .../clientpositive/llap/except_distinct.q.out | 894 ++++++++ .../clientpositive/llap/intersect_all.q.out | 1697 +++++++++++++++ .../llap/intersect_distinct.q.out | 1292 ++++++++++++ .../clientpositive/llap/intersect_merge.q.out | 1956 ++++++++++++++++++ 29 files changed, 8177 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/0049a21f/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 09833ff..4e91452 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -403,8 +403,12 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\ minillap.query.files=acid_bucket_pruning.q,\ bucket5.q,\ bucket6.q,\ + except_distinct.q,\ explainuser_2.q,\ empty_dir_in_table.q,\ + intersect_all.q,\ + intersect_distinct.q,\ + intersect_merge.q,\ llap_udf.q,\ llapdecider.q,\ reduce_deduplicate.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/0049a21f/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index f308832..7ed3907 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -27,6 +27,7 @@ import java.util.regex.Pattern; import org.antlr.runtime.tree.Tree; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.ASTNodeOrigin; +import org.apache.hadoop.hive.ql.parse.SemanticException; /** * List of all error messages. @@ -450,6 +451,7 @@ public enum ErrorMsg { ACID_NOT_ENOUGH_HISTORY(10327, "Not enough history available for ({0},{1}). " + "Oldest available base: {2}", true), INVALID_COLUMN_NAME(10328, "Invalid column name"), + UNSUPPORTED_SET_OPERATOR(10329, "Unsupported set operator"), REPLACE_VIEW_WITH_MATERIALIZED(10400, "Attempt to replace view {0} with materialized view", true), REPLACE_MATERIALIZED_WITH_VIEW(10401, "Attempt to replace materialized view {0} with view", true), UPDATE_DELETE_VIEW(10402, "You cannot update or delete records in a view"), http://git-wip-us.apache.org/repos/asf/hive/blob/0049a21f/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java index c527e58..6ccd879 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java @@ -27,9 +27,11 @@ import java.util.Set; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.plan.RelOptUtil.InputFinder; import org.apache.calcite.plan.RelOptUtil.InputReferencedVisitor; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.RelFactories.ProjectFactory; import org.apache.calcite.rel.core.Sort; @@ -51,21 +53,30 @@ import org.apache.calcite.rex.RexSubQuery; import org.apache.calcite.rex.RexUtil; import org.apache.calcite.rex.RexVisitor; import org.apache.calcite.rex.RexVisitorImpl; +import org.apache.calcite.sql.SqlAggFunction; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.validate.SqlValidatorUtil; import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.Pair; import org.apache.calcite.util.Util; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.exec.FunctionInfo; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveMultiJoin; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableFunctionScan; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ExprNodeConverter; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.SqlFunctionConverter; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.TypeConverter; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.HiveParser; import org.apache.hadoop.hive.ql.parse.ParseUtils; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -942,6 +953,68 @@ public class HiveCalciteUtil { return fieldNames; } + public static AggregateCall createSingleArgAggCall(String funcName, RelOptCluster cluster, + PrimitiveTypeInfo typeInfo, Integer pos, RelDataType aggFnRetType) { + ImmutableList.Builder<RelDataType> aggArgRelDTBldr = new ImmutableList.Builder<RelDataType>(); + aggArgRelDTBldr.add(TypeConverter.convert(typeInfo, cluster.getTypeFactory())); + SqlAggFunction aggFunction = SqlFunctionConverter.getCalciteAggFn(funcName, false, + aggArgRelDTBldr.build(), aggFnRetType); + List<Integer> argList = new ArrayList<Integer>(); + argList.add(pos); + return new AggregateCall(aggFunction, false, argList, aggFnRetType, null); + } + + public static HiveTableFunctionScan createUDTFForSetOp(RelOptCluster cluster, RelNode input) + throws SemanticException { + RelTraitSet traitSet = TraitsUtil.getDefaultTraitSet(cluster); + + List<RexNode> originalInputRefs = Lists.transform(input.getRowType().getFieldList(), + new Function<RelDataTypeField, RexNode>() { + @Override + public RexNode apply(RelDataTypeField input) { + return new RexInputRef(input.getIndex(), input.getType()); + } + }); + ImmutableList.Builder<RelDataType> argTypeBldr = ImmutableList.<RelDataType> builder(); + for (int i = 0; i < originalInputRefs.size(); i++) { + argTypeBldr.add(originalInputRefs.get(i).getType()); + } + + RelDataType retType = input.getRowType(); + + String funcName = "replicate_rows"; + FunctionInfo fi = FunctionRegistry.getFunctionInfo(funcName); + SqlOperator calciteOp = SqlFunctionConverter.getCalciteOperator(funcName, fi.getGenericUDTF(), + argTypeBldr.build(), retType); + + // Hive UDTF only has a single input + List<RelNode> list = new ArrayList<>(); + list.add(input); + + RexNode rexNode = cluster.getRexBuilder().makeCall(calciteOp, originalInputRefs); + + return HiveTableFunctionScan.create(cluster, traitSet, list, rexNode, null, retType, null); + } + + // this will create a project which will project out the column in positions + public static HiveProject createProjectWithoutColumn(RelNode input, Set<Integer> positions) + throws CalciteSemanticException { + List<RexNode> originalInputRefs = Lists.transform(input.getRowType().getFieldList(), + new Function<RelDataTypeField, RexNode>() { + @Override + public RexNode apply(RelDataTypeField input) { + return new RexInputRef(input.getIndex(), input.getType()); + } + }); + List<RexNode> copyInputRefs = new ArrayList<>(); + for (int i = 0; i < originalInputRefs.size(); i++) { + if (!positions.contains(i)) { + copyInputRefs.add(originalInputRefs.get(i)); + } + } + return HiveProject.create(input, copyInputRefs, null); + } + /** * Walks over an expression and determines whether it is constant. */ http://git-wip-us.apache.org/repos/asf/hive/blob/0049a21f/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelFactories.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelFactories.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelFactories.java index cf93ed8..a123f63 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelFactories.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelFactories.java @@ -45,7 +45,9 @@ import org.apache.calcite.tools.RelBuilderFactory; import org.apache.calcite.util.ImmutableBitSet; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveIntersect; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveExcept; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSemiJoin; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortLimit; @@ -199,10 +201,18 @@ public class HiveRelFactories { private static class HiveSetOpFactoryImpl implements SetOpFactory { @Override public RelNode createSetOp(SqlKind kind, List<RelNode> inputs, boolean all) { - if (kind != SqlKind.UNION) { - throw new IllegalStateException("Expected to get Set operator of type Union. Found : " + kind); + if (kind == SqlKind.UNION) { + return new HiveUnion(inputs.get(0).getCluster(), inputs.get(0).getTraitSet(), inputs); + } else if (kind == SqlKind.INTERSECT) { + return new HiveIntersect(inputs.get(0).getCluster(), inputs.get(0).getTraitSet(), inputs, + all); + } else if (kind == SqlKind.EXCEPT) { + return new HiveExcept(inputs.get(0).getCluster(), inputs.get(0).getTraitSet(), inputs, + all); + } else { + throw new IllegalStateException("Expected to get set operator of type Union, Intersect or Except(Minus). Found : " + + kind); } - return new HiveUnion(inputs.get(0).getCluster(), inputs.get(0).getTraitSet(), inputs); } } http://git-wip-us.apache.org/repos/asf/hive/blob/0049a21f/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveExcept.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveExcept.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveExcept.java new file mode 100644 index 0000000..2c8e148 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveExcept.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.reloperators; + +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Minus; +import org.apache.calcite.rel.core.SetOp; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveRelNode.Implementor; + +public class HiveExcept extends Minus { + + public HiveExcept(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) { + super(cluster, traits, inputs, all); + } + + @Override + public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { + return new HiveExcept(this.getCluster(), traitSet, inputs, all); + } + + public void implement(Implementor implementor) { + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/0049a21f/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveIntersect.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveIntersect.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveIntersect.java new file mode 100644 index 0000000..19e1e02 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveIntersect.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.reloperators; + +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Intersect; +import org.apache.calcite.rel.core.SetOp; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveRelNode.Implementor; + +public class HiveIntersect extends Intersect { + + public HiveIntersect(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) { + super(cluster, traits, inputs, all); + } + + @Override + public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { + return new HiveIntersect(this.getCluster(), traitSet, inputs, all); + } + + public void implement(Implementor implementor) { + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/0049a21f/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveExceptRewriteRule.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveExceptRewriteRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveExceptRewriteRule.java new file mode 100644 index 0000000..b63ea02 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveExceptRewriteRule.java @@ -0,0 +1,375 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperator; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; +import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveExcept; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveIntersect; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveRelNode; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableFunctionScan; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.SqlFunctionConverter; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.TypeConverter; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableList.Builder; +import com.google.common.collect.Lists; + +/** + * Planner rule that rewrite + * {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveExcept} + * Note, we only have 2 branches because of except's semantic. + * R1 Except(all) R2 + * R1 introduce VCol â2â, R2 introduce VCol â1â + * R3 = GB(R1 on all keys + VCol + count(VCol) as c) union all GB(R2 on all keys + VCol + count(VCol) as c) + * R4 = GB(R3 on all keys + sum(c) as a + sum(VCol*c) as b) we + * have m+n=a, 2m+n=b where m is the #row in R1 and n is the #row in R2 then + * m=b-a, n=2a-b, m-n=2b-3a + * if it is except (distinct) + * then R5 = Fil (b-a>0 && 2a-b=0) R6 = select only keys from R5 + * else R5 = Fil (2b-3a>0) R6 = UDTF (R5) which will explode the tuples based on 2b-3a. + * Note that NULLs are handled the same as other values. Please refer to the test cases. + */ +public class HiveExceptRewriteRule extends RelOptRule { + + public static final HiveExceptRewriteRule INSTANCE = new HiveExceptRewriteRule(); + + protected static final Logger LOG = LoggerFactory.getLogger(HiveIntersectRewriteRule.class); + + + // ~ Constructors ----------------------------------------------------------- + + private HiveExceptRewriteRule() { + super(operand(HiveExcept.class, any())); + } + + // ~ Methods ---------------------------------------------------------------- + + public void onMatch(RelOptRuleCall call) { + final HiveExcept hiveExcept = call.rel(0); + + final RelOptCluster cluster = hiveExcept.getCluster(); + final RexBuilder rexBuilder = cluster.getRexBuilder(); + Builder<RelNode> bldr = new ImmutableList.Builder<RelNode>(); + + // 1st level GB: create a GB(R1 on all keys + VCol + count() as c) for each + // branch + try { + bldr.add(createFirstGB(hiveExcept.getInputs().get(0), true, cluster, rexBuilder)); + bldr.add(createFirstGB(hiveExcept.getInputs().get(1), false, cluster, rexBuilder)); + } catch (CalciteSemanticException e) { + LOG.debug(e.toString()); + throw new RuntimeException(e); + } + + // create a union above all the branches + // the schema of union looks like this + // all keys + VCol + c + HiveRelNode union = new HiveUnion(cluster, TraitsUtil.getDefaultTraitSet(cluster), bldr.build()); + + // 2nd level GB: create a GB (all keys + sum(c) as a + sum(VCol*c) as b) for + // each branch + final List<RexNode> gbChildProjLst = Lists.newArrayList(); + final List<Integer> groupSetPositions = Lists.newArrayList(); + int unionColumnSize = union.getRowType().getFieldList().size(); + for (int cInd = 0; cInd < unionColumnSize; cInd++) { + gbChildProjLst.add(rexBuilder.makeInputRef(union, cInd)); + // the last 2 columns are VCol and c + if (cInd < unionColumnSize - 2) { + groupSetPositions.add(cInd); + } + } + + try { + gbChildProjLst.add(multiply(rexBuilder.makeInputRef(union, unionColumnSize - 2), + rexBuilder.makeInputRef(union, unionColumnSize - 1), cluster, rexBuilder)); + } catch (CalciteSemanticException e) { + LOG.debug(e.toString()); + throw new RuntimeException(e); + } + + RelNode gbInputRel = null; + try { + // Here we create a project for the following reasons: + // (1) GBy only accepts arg as a position of the input, however, we need to sum on VCol*c + // (2) This can better reuse the function createSingleArgAggCall. + gbInputRel = HiveProject.create(union, gbChildProjLst, null); + } catch (CalciteSemanticException e) { + LOG.debug(e.toString()); + throw new RuntimeException(e); + } + + // gbInputRel's schema is like this + // all keys + VCol + c + VCol*c + List<AggregateCall> aggregateCalls = Lists.newArrayList(); + RelDataType aggFnRetType = TypeConverter.convert(TypeInfoFactory.longTypeInfo, + cluster.getTypeFactory()); + + // sum(c) + AggregateCall aggregateCall = HiveCalciteUtil.createSingleArgAggCall("sum", cluster, + TypeInfoFactory.longTypeInfo, unionColumnSize - 1, aggFnRetType); + aggregateCalls.add(aggregateCall); + + // sum(VCol*c) + aggregateCall = HiveCalciteUtil.createSingleArgAggCall("sum", cluster, + TypeInfoFactory.longTypeInfo, unionColumnSize, aggFnRetType); + aggregateCalls.add(aggregateCall); + + final ImmutableBitSet groupSet = ImmutableBitSet.of(groupSetPositions); + HiveRelNode aggregateRel = new HiveAggregate(cluster, + cluster.traitSetOf(HiveRelNode.CONVENTION), gbInputRel, false, groupSet, null, + aggregateCalls); + + // the schema after GB is like this + // all keys + sum(c) as a + sum(VCol*c) as b + // the column size is the same as unionColumnSize; + // (1) for except distinct add a filter (b-a>0 && 2a-b=0) + // i.e., a > 0 && 2a = b + // then add the project + // (2) for except all add a project to change it to + // (2b-3a) + all keys + // then add the UDTF + + if (!hiveExcept.all) { + RelNode filterRel = null; + try { + filterRel = new HiveFilter(cluster, cluster.traitSetOf(HiveRelNode.CONVENTION), + aggregateRel, makeFilterExprForExceptDistinct(aggregateRel, unionColumnSize, cluster, + rexBuilder)); + } catch (CalciteSemanticException e) { + LOG.debug(e.toString()); + throw new RuntimeException(e); + } + + // finally add a project to project out the last 2 columns + Set<Integer> projectOutColumnPositions = new HashSet<>(); + projectOutColumnPositions.add(filterRel.getRowType().getFieldList().size() - 2); + projectOutColumnPositions.add(filterRel.getRowType().getFieldList().size() - 1); + try { + call.transformTo(HiveCalciteUtil.createProjectWithoutColumn(filterRel, + projectOutColumnPositions)); + } catch (CalciteSemanticException e) { + LOG.debug(e.toString()); + throw new RuntimeException(e); + } + } else { + List<RexNode> originalInputRefs = Lists.transform(aggregateRel.getRowType().getFieldList(), + new Function<RelDataTypeField, RexNode>() { + @Override + public RexNode apply(RelDataTypeField input) { + return new RexInputRef(input.getIndex(), input.getType()); + } + }); + + List<RexNode> copyInputRefs = new ArrayList<>(); + try { + copyInputRefs.add(makeExprForExceptAll(aggregateRel, unionColumnSize, cluster, rexBuilder)); + } catch (CalciteSemanticException e) { + LOG.debug(e.toString()); + throw new RuntimeException(e); + } + for (int i = 0; i < originalInputRefs.size() - 2; i++) { + copyInputRefs.add(originalInputRefs.get(i)); + } + RelNode srcRel = null; + try { + srcRel = HiveProject.create(aggregateRel, copyInputRefs, null); + HiveTableFunctionScan udtf = HiveCalciteUtil.createUDTFForSetOp(cluster, srcRel); + // finally add a project to project out the 1st columns + Set<Integer> projectOutColumnPositions = new HashSet<>(); + projectOutColumnPositions.add(0); + call.transformTo(HiveCalciteUtil + .createProjectWithoutColumn(udtf, projectOutColumnPositions)); + } catch (SemanticException e) { + LOG.debug(e.toString()); + throw new RuntimeException(e); + } + } + } + + private RelNode createFirstGB(RelNode input, boolean left, RelOptCluster cluster, + RexBuilder rexBuilder) throws CalciteSemanticException { + final List<RexNode> gbChildProjLst = Lists.newArrayList(); + final List<Integer> groupSetPositions = Lists.newArrayList(); + for (int cInd = 0; cInd < input.getRowType().getFieldList().size(); cInd++) { + gbChildProjLst.add(rexBuilder.makeInputRef(input, cInd)); + groupSetPositions.add(cInd); + } + if (left) { + gbChildProjLst.add(rexBuilder.makeBigintLiteral(new BigDecimal(2))); + } else { + gbChildProjLst.add(rexBuilder.makeBigintLiteral(new BigDecimal(1))); + } + + // also add the last VCol + groupSetPositions.add(input.getRowType().getFieldList().size()); + + // create the project before GB + RelNode gbInputRel = HiveProject.create(input, gbChildProjLst, null); + + // groupSetPosition includes all the positions + final ImmutableBitSet groupSet = ImmutableBitSet.of(groupSetPositions); + + List<AggregateCall> aggregateCalls = Lists.newArrayList(); + RelDataType aggFnRetType = TypeConverter.convert(TypeInfoFactory.longTypeInfo, + cluster.getTypeFactory()); + + AggregateCall aggregateCall = HiveCalciteUtil.createSingleArgAggCall("count", cluster, + TypeInfoFactory.longTypeInfo, input.getRowType().getFieldList().size(), aggFnRetType); + aggregateCalls.add(aggregateCall); + return new HiveAggregate(cluster, cluster.traitSetOf(HiveRelNode.CONVENTION), gbInputRel, + false, groupSet, null, aggregateCalls); + } + + private RexNode multiply(RexNode r1, RexNode r2, RelOptCluster cluster, RexBuilder rexBuilder) + throws CalciteSemanticException { + List<RexNode> childRexNodeLst = new ArrayList<RexNode>(); + childRexNodeLst.add(r1); + childRexNodeLst.add(r2); + ImmutableList.Builder<RelDataType> calciteArgTypesBldr = new ImmutableList.Builder<RelDataType>(); + calciteArgTypesBldr.add(TypeConverter.convert(TypeInfoFactory.longTypeInfo, + cluster.getTypeFactory())); + calciteArgTypesBldr.add(TypeConverter.convert(TypeInfoFactory.longTypeInfo, + cluster.getTypeFactory())); + return rexBuilder.makeCall( + SqlFunctionConverter.getCalciteFn("*", calciteArgTypesBldr.build(), + TypeConverter.convert(TypeInfoFactory.longTypeInfo, cluster.getTypeFactory()), true), + childRexNodeLst); + } + + private RexNode makeFilterExprForExceptDistinct(HiveRelNode input, int columnSize, + RelOptCluster cluster, RexBuilder rexBuilder) throws CalciteSemanticException { + List<RexNode> childRexNodeLst = new ArrayList<RexNode>(); + RexInputRef a = rexBuilder.makeInputRef(input, columnSize - 2); + RexLiteral zero = rexBuilder.makeBigintLiteral(new BigDecimal(0)); + childRexNodeLst.add(a); + childRexNodeLst.add(zero); + ImmutableList.Builder<RelDataType> calciteArgTypesBldr = new ImmutableList.Builder<RelDataType>(); + calciteArgTypesBldr.add(TypeConverter.convert(TypeInfoFactory.longTypeInfo, + cluster.getTypeFactory())); + calciteArgTypesBldr.add(TypeConverter.convert(TypeInfoFactory.longTypeInfo, + cluster.getTypeFactory())); + // a>0 + RexNode aMorethanZero = rexBuilder.makeCall( + SqlFunctionConverter.getCalciteFn(">", calciteArgTypesBldr.build(), + TypeConverter.convert(TypeInfoFactory.longTypeInfo, cluster.getTypeFactory()), false), + childRexNodeLst); + childRexNodeLst = new ArrayList<RexNode>(); + RexLiteral two = rexBuilder.makeBigintLiteral(new BigDecimal(2)); + childRexNodeLst.add(a); + childRexNodeLst.add(two); + // 2*a + RexNode twoa = rexBuilder.makeCall( + SqlFunctionConverter.getCalciteFn("*", calciteArgTypesBldr.build(), + TypeConverter.convert(TypeInfoFactory.longTypeInfo, cluster.getTypeFactory()), false), + childRexNodeLst); + childRexNodeLst = new ArrayList<RexNode>(); + RexInputRef b = rexBuilder.makeInputRef(input, columnSize - 1); + childRexNodeLst.add(twoa); + childRexNodeLst.add(b); + // 2a=b + RexNode twoaEqualTob = rexBuilder.makeCall( + SqlFunctionConverter.getCalciteFn("=", calciteArgTypesBldr.build(), + TypeConverter.convert(TypeInfoFactory.longTypeInfo, cluster.getTypeFactory()), false), + childRexNodeLst); + childRexNodeLst = new ArrayList<RexNode>(); + childRexNodeLst.add(aMorethanZero); + childRexNodeLst.add(twoaEqualTob); + // a>0 && 2a=b + return rexBuilder.makeCall( + SqlFunctionConverter.getCalciteFn("and", calciteArgTypesBldr.build(), + TypeConverter.convert(TypeInfoFactory.longTypeInfo, cluster.getTypeFactory()), false), + childRexNodeLst); + } + + private RexNode makeExprForExceptAll(HiveRelNode input, int columnSize, RelOptCluster cluster, + RexBuilder rexBuilder) throws CalciteSemanticException { + List<RexNode> childRexNodeLst = new ArrayList<RexNode>(); + ImmutableList.Builder<RelDataType> calciteArgTypesBldr = new ImmutableList.Builder<RelDataType>(); + calciteArgTypesBldr.add(TypeConverter.convert(TypeInfoFactory.longTypeInfo, + cluster.getTypeFactory())); + calciteArgTypesBldr.add(TypeConverter.convert(TypeInfoFactory.longTypeInfo, + cluster.getTypeFactory())); + RexInputRef a = rexBuilder.makeInputRef(input, columnSize - 2); + RexLiteral three = rexBuilder.makeBigintLiteral(new BigDecimal(3)); + childRexNodeLst.add(three); + childRexNodeLst.add(a); + RexNode threea = rexBuilder.makeCall( + SqlFunctionConverter.getCalciteFn("*", calciteArgTypesBldr.build(), + TypeConverter.convert(TypeInfoFactory.longTypeInfo, cluster.getTypeFactory()), false), + childRexNodeLst); + + RexLiteral two = rexBuilder.makeBigintLiteral(new BigDecimal(2)); + RexInputRef b = rexBuilder.makeInputRef(input, columnSize - 1); + + // 2*b + childRexNodeLst = new ArrayList<RexNode>(); + childRexNodeLst.add(two); + childRexNodeLst.add(b); + RexNode twob = rexBuilder.makeCall( + SqlFunctionConverter.getCalciteFn("*", calciteArgTypesBldr.build(), + TypeConverter.convert(TypeInfoFactory.longTypeInfo, cluster.getTypeFactory()), false), + childRexNodeLst); + + // 2b-3a + childRexNodeLst = new ArrayList<RexNode>(); + childRexNodeLst.add(twob); + childRexNodeLst.add(threea); + return rexBuilder.makeCall( + SqlFunctionConverter.getCalciteFn("-", calciteArgTypesBldr.build(), + TypeConverter.convert(TypeInfoFactory.longTypeInfo, cluster.getTypeFactory()), false), + childRexNodeLst); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/0049a21f/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveIntersectMergeRule.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveIntersectMergeRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveIntersectMergeRule.java new file mode 100644 index 0000000..ba422af --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveIntersectMergeRule.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelNode; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveIntersect; +import org.apache.calcite.util.Util; + +/** + * Planner rule that merges multiple intersect into one + * Before the rule, it is + * intersect-branch1 + * |-----intersect-branch2 + * |-----branch3 + * After the rule, it becomes + * intersect-branch1 + * |-----branch2 + * |-----branch3 + * {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveIntersect} + */ +public class HiveIntersectMergeRule extends RelOptRule { + + public static final HiveIntersectMergeRule INSTANCE = new HiveIntersectMergeRule(); + + // ~ Constructors ----------------------------------------------------------- + + private HiveIntersectMergeRule() { + super( + operand(HiveIntersect.class, operand(RelNode.class, any()), operand(RelNode.class, any()))); + } + + // ~ Methods ---------------------------------------------------------------- + + public void onMatch(RelOptRuleCall call) { + final HiveIntersect topHiveIntersect = call.rel(0); + + final HiveIntersect bottomHiveIntersect; + if (call.rel(2) instanceof HiveIntersect) { + bottomHiveIntersect = call.rel(2); + } else if (call.rel(1) instanceof HiveIntersect) { + bottomHiveIntersect = call.rel(1); + } else { + return; + } + + boolean all = topHiveIntersect.all; + // top is distinct, we can always merge whether bottom is distinct or not + // top is all, we can only merge if bottom is also all + // that is to say, we should bail out if top is all and bottom is distinct + if (all && !bottomHiveIntersect.all) { + return; + } + + List<RelNode> inputs = new ArrayList<>(); + if (call.rel(2) instanceof HiveIntersect) { + inputs.add(topHiveIntersect.getInput(0)); + inputs.addAll(bottomHiveIntersect.getInputs()); + } else { + inputs.addAll(bottomHiveIntersect.getInputs()); + inputs.addAll(Util.skip(topHiveIntersect.getInputs())); + } + + HiveIntersect newIntersect = (HiveIntersect) topHiveIntersect.copy( + topHiveIntersect.getTraitSet(), inputs, all); + call.transformTo(newIntersect); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/0049a21f/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveIntersectRewriteRule.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveIntersectRewriteRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveIntersectRewriteRule.java new file mode 100644 index 0000000..5b0a7d7 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveIntersectRewriteRule.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Intersect; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperator; +import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; +import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveIntersect; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveRelNode; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableFunctionScan; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.SqlFunctionConverter; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.TypeConverter; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.esotericsoftware.minlog.Log; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableList.Builder; +import com.google.common.collect.Lists; + +/** + * Planner rule that rewrite + * {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveIntersect} + * Rewrite: (GB-Union All-GB)-GB-UDTF (on all attributes) + + Example: R1 Intersect All R2 + R3 = GB(R1 on all attributes + count() as c) union all GB(R2 on all attributes + count() as c) + R4 = GB(R3 on all attributes + count(c) as cnt + min(c) as m) + R5 = Fil ( cnt == #branch ) + + If it is intersect all then + R6 = UDTF (R5) which will explode the tuples based on min(c). + R7 = Proj(R6 on all attributes) + Else + R6 = Proj(R5 on all attributes) +else + */ +public class HiveIntersectRewriteRule extends RelOptRule { + + public static final HiveIntersectRewriteRule INSTANCE = new HiveIntersectRewriteRule(); + + protected static final Logger LOG = LoggerFactory.getLogger(HiveIntersectRewriteRule.class); + + + // ~ Constructors ----------------------------------------------------------- + + private HiveIntersectRewriteRule() { + super(operand(HiveIntersect.class, any())); + } + + // ~ Methods ---------------------------------------------------------------- + + public void onMatch(RelOptRuleCall call) { + final HiveIntersect hiveIntersect = call.rel(0); + + final RelOptCluster cluster = hiveIntersect.getCluster(); + final RexBuilder rexBuilder = cluster.getRexBuilder(); + int numOfBranch = hiveIntersect.getInputs().size(); + Builder<RelNode> bldr = new ImmutableList.Builder<RelNode>(); + + // 1st level GB: create a GB (col0, col1, count(1) as c) for each branch + for (int index = 0; index < numOfBranch; index++) { + RelNode input = hiveIntersect.getInputs().get(index); + final List<RexNode> gbChildProjLst = Lists.newArrayList(); + final List<Integer> groupSetPositions = Lists.newArrayList(); + for (int cInd = 0; cInd < input.getRowType().getFieldList().size(); cInd++) { + gbChildProjLst.add(rexBuilder.makeInputRef(input, cInd)); + groupSetPositions.add(cInd); + } + gbChildProjLst.add(rexBuilder.makeBigintLiteral(new BigDecimal(1))); + + // create the project before GB because we need a new project with extra column '1'. + RelNode gbInputRel = null; + try { + gbInputRel = HiveProject.create(input, gbChildProjLst, null); + } catch (CalciteSemanticException e) { + LOG.debug(e.toString()); + throw new RuntimeException(e); + } + + // groupSetPosition includes all the positions + final ImmutableBitSet groupSet = ImmutableBitSet.of(groupSetPositions); + + List<AggregateCall> aggregateCalls = Lists.newArrayList(); + RelDataType aggFnRetType = TypeConverter.convert(TypeInfoFactory.longTypeInfo, + cluster.getTypeFactory()); + + // count(1), 1's position is input.getRowType().getFieldList().size() + AggregateCall aggregateCall = HiveCalciteUtil.createSingleArgAggCall("count", cluster, + TypeInfoFactory.longTypeInfo, input.getRowType().getFieldList().size(), aggFnRetType); + aggregateCalls.add(aggregateCall); + + HiveRelNode aggregateRel = new HiveAggregate(cluster, + cluster.traitSetOf(HiveRelNode.CONVENTION), gbInputRel, false, groupSet, null, + aggregateCalls); + bldr.add(aggregateRel); + } + + // create a union above all the branches + HiveRelNode union = new HiveUnion(cluster, TraitsUtil.getDefaultTraitSet(cluster), bldr.build()); + + // 2nd level GB: create a GB (col0, col1, count(c)) for each branch + final List<Integer> groupSetPositions = Lists.newArrayList(); + // the index of c + int cInd = union.getRowType().getFieldList().size() - 1; + for (int index = 0; index < union.getRowType().getFieldList().size(); index++) { + if (index != cInd) { + groupSetPositions.add(index); + } + } + + List<AggregateCall> aggregateCalls = Lists.newArrayList(); + RelDataType aggFnRetType = TypeConverter.convert(TypeInfoFactory.longTypeInfo, + cluster.getTypeFactory()); + + AggregateCall aggregateCall = HiveCalciteUtil.createSingleArgAggCall("count", cluster, + TypeInfoFactory.longTypeInfo, cInd, aggFnRetType); + aggregateCalls.add(aggregateCall); + if (hiveIntersect.all) { + aggregateCall = HiveCalciteUtil.createSingleArgAggCall("min", cluster, + TypeInfoFactory.longTypeInfo, cInd, aggFnRetType); + aggregateCalls.add(aggregateCall); + } + + final ImmutableBitSet groupSet = ImmutableBitSet.of(groupSetPositions); + HiveRelNode aggregateRel = new HiveAggregate(cluster, + cluster.traitSetOf(HiveRelNode.CONVENTION), union, false, groupSet, null, aggregateCalls); + + // add a filter count(c) = #branches + int countInd = cInd; + List<RexNode> childRexNodeLst = new ArrayList<RexNode>(); + RexInputRef ref = rexBuilder.makeInputRef(aggregateRel, countInd); + RexLiteral literal = rexBuilder.makeBigintLiteral(new BigDecimal(numOfBranch)); + childRexNodeLst.add(ref); + childRexNodeLst.add(literal); + ImmutableList.Builder<RelDataType> calciteArgTypesBldr = new ImmutableList.Builder<RelDataType>(); + calciteArgTypesBldr.add(TypeConverter.convert(TypeInfoFactory.longTypeInfo, + cluster.getTypeFactory())); + calciteArgTypesBldr.add(TypeConverter.convert(TypeInfoFactory.longTypeInfo, + cluster.getTypeFactory())); + RexNode factoredFilterExpr = null; + try { + factoredFilterExpr = rexBuilder + .makeCall( + SqlFunctionConverter.getCalciteFn("=", calciteArgTypesBldr.build(), + TypeConverter.convert(TypeInfoFactory.longTypeInfo, cluster.getTypeFactory()), + true), childRexNodeLst); + } catch (CalciteSemanticException e) { + LOG.debug(e.toString()); + throw new RuntimeException(e); + } + + RelNode filterRel = new HiveFilter(cluster, cluster.traitSetOf(HiveRelNode.CONVENTION), + aggregateRel, factoredFilterExpr); + + if (!hiveIntersect.all) { + // the schema for intersect distinct is like this + // R3 on all attributes + count(c) as cnt + // finally add a project to project out the last column + Set<Integer> projectOutColumnPositions = new HashSet<>(); + projectOutColumnPositions.add(filterRel.getRowType().getFieldList().size() - 1); + try { + call.transformTo(HiveCalciteUtil.createProjectWithoutColumn(filterRel,projectOutColumnPositions)); + } catch (CalciteSemanticException e) { + LOG.debug(e.toString()); + throw new RuntimeException(e); + } + } else { + // the schema for intersect all is like this + // R3 + count(c) as cnt + min(c) as m + // we create a input project for udtf whose schema is like this + // min(c) as m + R3 + List<RexNode> originalInputRefs = Lists.transform(filterRel.getRowType().getFieldList(), + new Function<RelDataTypeField, RexNode>() { + @Override + public RexNode apply(RelDataTypeField input) { + return new RexInputRef(input.getIndex(), input.getType()); + } + }); + + List<RexNode> copyInputRefs = new ArrayList<>(); + copyInputRefs.add(originalInputRefs.get(originalInputRefs.size() - 1)); + for (int i = 0; i < originalInputRefs.size() - 2; i++) { + copyInputRefs.add(originalInputRefs.get(i)); + } + RelNode srcRel = null; + try { + srcRel = HiveProject.create(filterRel, copyInputRefs, null); + HiveTableFunctionScan udtf = HiveCalciteUtil.createUDTFForSetOp(cluster, srcRel); + // finally add a project to project out the 1st column + Set<Integer> projectOutColumnPositions = new HashSet<>(); + projectOutColumnPositions.add(0); + call.transformTo(HiveCalciteUtil + .createProjectWithoutColumn(udtf, projectOutColumnPositions)); + } catch (SemanticException e) { + LOG.debug(e.toString()); + throw new RuntimeException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/0049a21f/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveProjectOverIntersectRemoveRule.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveProjectOverIntersectRemoveRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveProjectOverIntersectRemoveRule.java new file mode 100644 index 0000000..92fdb24 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveProjectOverIntersectRemoveRule.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.Intersect; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rex.RexUtil; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveIntersect; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; + +import com.google.common.base.Predicate; + +/** + * HiveProjectOverIntersectRemoveRule removes a HiveProject over another + * HiveIntersect, provided the projects aren't projecting identical sets of + * input references. + */ +public class HiveProjectOverIntersectRemoveRule extends RelOptRule { + + public static final HiveProjectOverIntersectRemoveRule INSTANCE = new HiveProjectOverIntersectRemoveRule(); + + // ~ Constructors ----------------------------------------------------------- + + /** Creates a HiveProjectOverIntersectRemoveRule. */ + private HiveProjectOverIntersectRemoveRule() { + super(operand(HiveProject.class, operand(HiveIntersect.class, any()))); + } + + // ~ Methods ---------------------------------------------------------------- + + @Override + public boolean matches(RelOptRuleCall call) { + Project project = call.rel(0); + Intersect intersect = call.rel(1); + return isTrivial(project, intersect); + } + + public void onMatch(RelOptRuleCall call) { + call.transformTo(call.rel(1)); + } + + private static boolean isTrivial(Project project, Intersect intersect) { + return RexUtil.isIdentity(project.getProjects(), intersect.getRowType()); + } + +} + +// End HiveProjectOverIntersectRemoveRule.java http://git-wip-us.apache.org/repos/asf/hive/blob/0049a21f/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSortLimitPullUpConstantsRule.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSortLimitPullUpConstantsRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSortLimitPullUpConstantsRule.java index 3ec9dac..54874e3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSortLimitPullUpConstantsRule.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSortLimitPullUpConstantsRule.java @@ -66,12 +66,9 @@ public class HiveSortLimitPullUpConstantsRule extends RelOptRule { new HiveSortLimitPullUpConstantsRule(HiveSortLimit.class, HiveRelFactories.HIVE_BUILDER); - private HiveSortLimitPullUpConstantsRule( - Class<? extends Sort> sortClass, + private HiveSortLimitPullUpConstantsRule(Class<? extends Sort> sortClass, RelBuilderFactory relBuilderFactory) { - super(operand(RelNode.class, - operand(sortClass, any())), - relBuilderFactory, null); + super(operand(RelNode.class, unordered(operand(sortClass, any()))), relBuilderFactory, null); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/0049a21f/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java index 63aa086..e78c8e9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java @@ -342,8 +342,8 @@ public class ASTConverter { } private QueryBlockInfo convertSource(RelNode r) throws CalciteSemanticException { - Schema s; - ASTNode ast; + Schema s = null; + ASTNode ast = null; if (r instanceof TableScan) { TableScan f = (TableScan) r; @@ -379,19 +379,15 @@ public class ASTConverter { s = left.schema; } } else if (r instanceof Union) { - RelNode leftInput = ((Union) r).getInput(0); - RelNode rightInput = ((Union) r).getInput(1); - - ASTConverter leftConv = new ASTConverter(leftInput, this.derivedTableCount); - ASTConverter rightConv = new ASTConverter(rightInput, this.derivedTableCount); - ASTNode leftAST = leftConv.convert(); - ASTNode rightAST = rightConv.convert(); - - ASTNode unionAST = getUnionAllAST(leftAST, rightAST); - - String sqAlias = nextAlias(); - ast = ASTBuilder.subQuery(unionAST, sqAlias); - s = new Schema((Union) r, sqAlias); + Union u = ((Union) r); + ASTNode left = new ASTConverter(((Union) r).getInput(0), this.derivedTableCount).convert(); + for (int ind = 1; ind < u.getInputs().size(); ind++) { + left = getUnionAllAST(left, new ASTConverter(((Union) r).getInput(ind), + this.derivedTableCount).convert()); + String sqAlias = nextAlias(); + ast = ASTBuilder.subQuery(left, sqAlias); + s = new Schema((Union) r, sqAlias); + } } else { ASTConverter src = new ASTConverter(r, this.derivedTableCount); ASTNode srcAST = src.convert(); http://git-wip-us.apache.org/repos/asf/hive/blob/0049a21f/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index d32a0a7..714138a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -67,6 +67,7 @@ import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.rel.core.Filter; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.core.SetOp; import org.apache.calcite.rel.metadata.CachingRelMetadataProvider; import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider; import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider; @@ -140,8 +141,10 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil; import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveAlgorithmsConf; import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveVolcanoPlanner; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveExcept; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveGroupingID; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveIntersect; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveRelNode; @@ -153,6 +156,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveAggregateJoinTransposeRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveAggregateProjectMergeRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveAggregatePullUpConstantsRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveExceptRewriteRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveExpandDistinctAggregatesRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterAggregateTransposeRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterJoinRule; @@ -161,6 +165,8 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterProjectTransp import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterSetOpTransposeRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterSortTransposeRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveInsertExchange4JoinRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveIntersectMergeRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveIntersectRewriteRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveJoinAddNotNullRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveJoinCommuteRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveJoinProjectTransposeRule; @@ -171,6 +177,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HivePointLookupOptimize import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HivePreFilteringRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveProjectFilterPullUpConstantsRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveProjectMergeRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveProjectOverIntersectRemoveRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveProjectSortTransposeRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveReduceExpressionsRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveReduceExpressionsWithStatsRule; @@ -196,6 +203,7 @@ import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderExpression; import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderSpec; import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionExpression; import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionSpec; +import org.apache.hadoop.hive.ql.parse.QBExpr.Opcode; import org.apache.hadoop.hive.ql.parse.QBSubQuery.SubQueryType; import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.RangeBoundarySpec; @@ -1185,6 +1193,25 @@ public class CalcitePlanner extends SemanticAnalyzer { final int maxCNFNodeCount = conf.getIntVar(HiveConf.ConfVars.HIVE_CBO_CNF_NODES_LIMIT); final int minNumORClauses = conf.getIntVar(HiveConf.ConfVars.HIVEPOINTLOOKUPOPTIMIZERMIN); + //0. SetOp rewrite + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + basePlan = hepPlan(basePlan, true, mdProvider, null, HepMatchOrder.BOTTOM_UP, + HiveProjectOverIntersectRemoveRule.INSTANCE, HiveIntersectMergeRule.INSTANCE); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, + "Calcite: HiveProjectOverIntersectRemoveRule and HiveIntersectMerge rules"); + + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + basePlan = hepPlan(basePlan, false, mdProvider, null, HepMatchOrder.BOTTOM_UP, + HiveIntersectRewriteRule.INSTANCE); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, + "Calcite: HiveIntersectRewrite rule"); + + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + basePlan = hepPlan(basePlan, false, mdProvider, null, HepMatchOrder.BOTTOM_UP, + HiveExceptRewriteRule.INSTANCE); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, + "Calcite: HiveExceptRewrite rule"); + //1. Distinct aggregate rewrite // Run this optimization early, since it is expanding the operator pipeline. if (!conf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("mr") && @@ -1375,18 +1402,16 @@ public class CalcitePlanner extends SemanticAnalyzer { } @SuppressWarnings("nls") - private RelNode genUnionLogicalPlan(String unionalias, String leftalias, RelNode leftRel, + private RelNode genSetOpLogicalPlan(Opcode opcode, String alias, String leftalias, RelNode leftRel, String rightalias, RelNode rightRel) throws SemanticException { - HiveUnion unionRel = null; - // 1. Get Row Resolvers, Column map for original left and right input of - // Union Rel + // SetOp Rel RowResolver leftRR = this.relToHiveRR.get(leftRel); RowResolver rightRR = this.relToHiveRR.get(rightRel); HashMap<String, ColumnInfo> leftmap = leftRR.getFieldMap(leftalias); HashMap<String, ColumnInfo> rightmap = rightRR.getFieldMap(rightalias); - // 2. Validate that Union is feasible according to Hive (by using type + // 2. Validate that SetOp is feasible according to Hive (by using type // info from RR) if (leftmap.size() != rightmap.size()) { throw new SemanticException("Schema of both sides of union should match."); @@ -1395,8 +1420,8 @@ public class CalcitePlanner extends SemanticAnalyzer { ASTNode tabref = getQB().getAliases().isEmpty() ? null : getQB().getParseInfo() .getSrcForAlias(getQB().getAliases().get(0)); - // 3. construct Union Output RR using original left & right Input - RowResolver unionoutRR = new RowResolver(); + // 3. construct SetOp Output RR using original left & right Input + RowResolver setOpOutRR = new RowResolver(); Iterator<Map.Entry<String, ColumnInfo>> lIter = leftmap.entrySet().iterator(); Iterator<Map.Entry<String, ColumnInfo>> rIter = rightmap.entrySet().iterator(); @@ -1412,18 +1437,18 @@ public class CalcitePlanner extends SemanticAnalyzer { rInfo.getType()); if (commonTypeInfo == null) { throw new SemanticException(generateErrorMessage(tabref, - "Schema of both sides of union should match: Column " + field + "Schema of both sides of setop should match: Column " + field + " is of type " + lInfo.getType().getTypeName() + " on first table and type " + rInfo.getType().getTypeName() + " on second table")); } - ColumnInfo unionColInfo = new ColumnInfo(lInfo); - unionColInfo.setType(commonTypeInfo); - unionoutRR.put(unionalias, field, unionColInfo); + ColumnInfo setOpColInfo = new ColumnInfo(lInfo); + setOpColInfo.setType(commonTypeInfo); + setOpOutRR.put(alias, field, setOpColInfo); } // 4. Determine which columns requires cast on left/right input (Calcite - // requires exact types on both sides of union) + // requires exact types on both sides of SetOp) boolean leftNeedsTypeCast = false; boolean rightNeedsTypeCast = false; List<RexNode> leftProjs = new ArrayList<RexNode>(); @@ -1438,7 +1463,7 @@ public class CalcitePlanner extends SemanticAnalyzer { leftFieldDT = leftRowDT.get(i).getType(); rightFieldDT = rightRowDT.get(i).getType(); if (!leftFieldDT.equals(rightFieldDT)) { - unionFieldDT = TypeConverter.convert(unionoutRR.getColumnInfos().get(i).getType(), + unionFieldDT = TypeConverter.convert(setOpOutRR.getColumnInfos().get(i).getType(), cluster.getTypeFactory()); if (!unionFieldDT.equals(leftFieldDT)) { leftNeedsTypeCast = true; @@ -1461,28 +1486,49 @@ public class CalcitePlanner extends SemanticAnalyzer { // 5. Introduce Project Rel above original left/right inputs if cast is // needed for type parity - RelNode unionLeftInput = leftRel; - RelNode unionRightInput = rightRel; + RelNode setOpLeftInput = leftRel; + RelNode setOpRightInput = rightRel; if (leftNeedsTypeCast) { - unionLeftInput = HiveProject.create(leftRel, leftProjs, leftRel.getRowType() + setOpLeftInput = HiveProject.create(leftRel, leftProjs, leftRel.getRowType() .getFieldNames()); } if (rightNeedsTypeCast) { - unionRightInput = HiveProject.create(rightRel, rightProjs, rightRel.getRowType() + setOpRightInput = HiveProject.create(rightRel, rightProjs, rightRel.getRowType() .getFieldNames()); } - // 6. Construct Union Rel + // 6. Construct SetOp Rel Builder<RelNode> bldr = new ImmutableList.Builder<RelNode>(); - bldr.add(unionLeftInput); - bldr.add(unionRightInput); - unionRel = new HiveUnion(cluster, TraitsUtil.getDefaultTraitSet(cluster), bldr.build()); - - relToHiveRR.put(unionRel, unionoutRR); - relToHiveColNameCalcitePosMap.put(unionRel, - this.buildHiveToCalciteColumnMap(unionoutRR, unionRel)); - - return unionRel; + bldr.add(setOpLeftInput); + bldr.add(setOpRightInput); + SetOp setOpRel = null; + switch (opcode) { + case UNION: + setOpRel = new HiveUnion(cluster, TraitsUtil.getDefaultTraitSet(cluster), bldr.build()); + break; + case INTERSECT: + setOpRel = new HiveIntersect(cluster, TraitsUtil.getDefaultTraitSet(cluster), bldr.build(), + false); + break; + case INTERSECTALL: + setOpRel = new HiveIntersect(cluster, TraitsUtil.getDefaultTraitSet(cluster), bldr.build(), + true); + break; + case EXCEPT: + setOpRel = new HiveExcept(cluster, TraitsUtil.getDefaultTraitSet(cluster), bldr.build(), + false); + break; + case EXCEPTALL: + setOpRel = new HiveExcept(cluster, TraitsUtil.getDefaultTraitSet(cluster), bldr.build(), + true); + break; + default: + throw new SemanticException(ErrorMsg.UNSUPPORTED_SET_OPERATOR.getMsg(opcode.toString())); + } + relToHiveRR.put(setOpRel, setOpOutRR); + relToHiveColNameCalcitePosMap.put(setOpRel, + this.buildHiveToCalciteColumnMap(setOpOutRR, setOpRel)); + return setOpRel; } private RelNode genJoinRelNode(RelNode leftRel, RelNode rightRel, JoinType hiveJoinType, @@ -2077,9 +2123,9 @@ public class CalcitePlanner extends SemanticAnalyzer { */ private class AggInfo { private final List<ExprNodeDesc> m_aggParams; - private final TypeInfo m_returnType; - private final String m_udfName; - private final boolean m_distinct; + private final TypeInfo m_returnType; + private final String m_udfName; + private final boolean m_distinct; private AggInfo(List<ExprNodeDesc> aggParams, TypeInfo returnType, String udfName, boolean isDistinct) { @@ -3349,17 +3395,21 @@ public class CalcitePlanner extends SemanticAnalyzer { } private RelNode genLogicalPlan(QBExpr qbexpr) throws SemanticException { - if (qbexpr.getOpcode() == QBExpr.Opcode.NULLOP) { + switch (qbexpr.getOpcode()) { + case NULLOP: return genLogicalPlan(qbexpr.getQB(), false); - } - if (qbexpr.getOpcode() == QBExpr.Opcode.UNION) { + case UNION: + case INTERSECT: + case INTERSECTALL: + case EXCEPT: + case EXCEPTALL: RelNode qbexpr1Ops = genLogicalPlan(qbexpr.getQBExpr1()); RelNode qbexpr2Ops = genLogicalPlan(qbexpr.getQBExpr2()); - - return genUnionLogicalPlan(qbexpr.getAlias(), qbexpr.getQBExpr1().getAlias(), qbexpr1Ops, - qbexpr.getQBExpr2().getAlias(), qbexpr2Ops); + return genSetOpLogicalPlan(qbexpr.getOpcode(), qbexpr.getAlias(), qbexpr.getQBExpr1() + .getAlias(), qbexpr1Ops, qbexpr.getQBExpr2().getAlias(), qbexpr2Ops); + default: + return null; } - return null; } private RelNode genLogicalPlan(QB qb, boolean outerMostQB) throws SemanticException { http://git-wip-us.apache.org/repos/asf/hive/blob/0049a21f/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g index 5d3fa6a..17985d2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g @@ -96,6 +96,7 @@ KW_CLUSTER: 'CLUSTER'; KW_DISTRIBUTE: 'DISTRIBUTE'; KW_SORT: 'SORT'; KW_UNION: 'UNION'; +KW_EXCEPT: 'EXCEPT'; KW_LOAD: 'LOAD'; KW_EXPORT: 'EXPORT'; KW_IMPORT: 'IMPORT'; http://git-wip-us.apache.org/repos/asf/hive/blob/0049a21f/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index 5c16c55..7bf02bb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -88,6 +88,10 @@ TOK_DISTRIBUTEBY; TOK_SORTBY; TOK_UNIONALL; TOK_UNIONDISTINCT; +TOK_INTERSECTALL; +TOK_INTERSECTDISTINCT; +TOK_EXCEPTALL; +TOK_EXCEPTDISTINCT; TOK_JOIN; TOK_LEFTOUTERJOIN; TOK_RIGHTOUTERJOIN; @@ -449,6 +453,8 @@ import org.apache.hadoop.hive.conf.HiveConf; xlateMap.put("KW_DISTRIBUTE", "DISTRIBUTE"); xlateMap.put("KW_SORT", "SORT"); xlateMap.put("KW_UNION", "UNION"); + xlateMap.put("KW_INTERSECT", "INTERSECT"); + xlateMap.put("KW_EXCEPT", "EXCEPT"); xlateMap.put("KW_LOAD", "LOAD"); xlateMap.put("KW_DATA", "DATA"); xlateMap.put("KW_INPATH", "INPATH"); @@ -2302,6 +2308,12 @@ setOperator @after { popMsg(state); } : KW_UNION KW_ALL -> ^(TOK_UNIONALL) | KW_UNION KW_DISTINCT? -> ^(TOK_UNIONDISTINCT) + | KW_INTERSECT KW_ALL -> ^(TOK_INTERSECTALL) + | KW_INTERSECT KW_DISTINCT -> ^(TOK_INTERSECTDISTINCT) + | KW_EXCEPT KW_ALL -> ^(TOK_EXCEPTALL) + | KW_EXCEPT KW_DISTINCT -> ^(TOK_EXCEPTDISTINCT) + | KW_MINUS KW_ALL -> ^(TOK_EXCEPTALL) + | KW_MINUS KW_DISTINCT -> ^(TOK_EXCEPTDISTINCT) ; queryStatementExpression @@ -2457,7 +2469,7 @@ setOpSelectStatement[CommonTree t] ) ) -> {$setOpSelectStatement.tree != null && u.tree.getType()!=HiveParser.TOK_UNIONDISTINCT}? - ^(TOK_UNIONALL {$setOpSelectStatement.tree} $b) + ^($u {$setOpSelectStatement.tree} $b) -> {$setOpSelectStatement.tree == null && u.tree.getType()==HiveParser.TOK_UNIONDISTINCT}? ^(TOK_QUERY ^(TOK_FROM @@ -2471,9 +2483,13 @@ setOpSelectStatement[CommonTree t] ^(TOK_SELECTDI ^(TOK_SELEXPR TOK_ALLCOLREF)) ) ) - -> ^(TOK_UNIONALL {$t} $b) + -> ^($u {$t} $b) )+ - -> {$setOpSelectStatement.tree.getChild(0).getType()==HiveParser.TOK_UNIONALL}? + -> {$setOpSelectStatement.tree.getChild(0).getType()==HiveParser.TOK_UNIONALL + ||$setOpSelectStatement.tree.getChild(0).getType()==HiveParser.TOK_INTERSECTDISTINCT + ||$setOpSelectStatement.tree.getChild(0).getType()==HiveParser.TOK_INTERSECTALL + ||$setOpSelectStatement.tree.getChild(0).getType()==HiveParser.TOK_EXCEPTDISTINCT + ||$setOpSelectStatement.tree.getChild(0).getType()==HiveParser.TOK_EXCEPTALL}? ^(TOK_QUERY ^(TOK_FROM ^(TOK_SUBQUERY http://git-wip-us.apache.org/repos/asf/hive/blob/0049a21f/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g index 50987c3..f79960a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g @@ -698,7 +698,7 @@ nonReserved | KW_FIRST | KW_FORMAT | KW_FORMATTED | KW_FUNCTIONS | KW_HOLD_DDLTIME | KW_HOUR | KW_IDXPROPERTIES | KW_IGNORE | KW_INDEX | KW_INDEXES | KW_INPATH | KW_INPUTDRIVER | KW_INPUTFORMAT | KW_ITEMS | KW_JAR | KW_KEYS | KW_KEY_TYPE | KW_LAST | KW_LIMIT | KW_OFFSET | KW_LINES | KW_LOAD | KW_LOCATION | KW_LOCK | KW_LOCKS | KW_LOGICAL | KW_LONG - | KW_MAPJOIN | KW_MATERIALIZED | KW_METADATA | KW_MINUS | KW_MINUTE | KW_MONTH | KW_MSCK | KW_NOSCAN | KW_NO_DROP | KW_NULLS | KW_OFFLINE + | KW_MAPJOIN | KW_MATERIALIZED | KW_METADATA | KW_MINUTE | KW_MONTH | KW_MSCK | KW_NOSCAN | KW_NO_DROP | KW_NULLS | KW_OFFLINE | KW_OPTION | KW_OUTPUTDRIVER | KW_OUTPUTFORMAT | KW_OVERWRITE | KW_OWNER | KW_PARTITIONED | KW_PARTITIONS | KW_PLUS | KW_PRETTY | KW_PRINCIPALS | KW_PROTECTION | KW_PURGE | KW_QUARTER | KW_READ | KW_READONLY | KW_REBUILD | KW_RECORDREADER | KW_RECORDWRITER | KW_RELOAD | KW_RENAME | KW_REPAIR | KW_REPLACE | KW_REPLICATION | KW_RESTRICT | KW_REWRITE http://git-wip-us.apache.org/repos/asf/hive/blob/0049a21f/ql/src/java/org/apache/hadoop/hive/ql/parse/QBExpr.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBExpr.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBExpr.java index cccf0f6..7601509 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBExpr.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBExpr.java @@ -35,7 +35,7 @@ public class QBExpr { * */ public static enum Opcode { - NULLOP, UNION, INTERSECT, DIFF + NULLOP, UNION, INTERSECT, INTERSECTALL, EXCEPT, EXCEPTALL, DIFF }; private Opcode opcode; http://git-wip-us.apache.org/repos/asf/hive/blob/0049a21f/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 9db8a22..17dfd03 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -484,8 +484,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { throws SemanticException { assert (ast.getToken() != null); - switch (ast.getToken().getType()) { - case HiveParser.TOK_QUERY: { + if (ast.getToken().getType() == HiveParser.TOK_QUERY) { QB qb = new QB(id, alias, true); qb.setInsideView(insideView); Phase1Ctx ctx_1 = initPhase1Ctx(); @@ -494,25 +493,42 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { qbexpr.setOpcode(QBExpr.Opcode.NULLOP); qbexpr.setQB(qb); } - break; - case HiveParser.TOK_UNIONALL: { - qbexpr.setOpcode(QBExpr.Opcode.UNION); + // setop + else { + switch (ast.getToken().getType()) { + case HiveParser.TOK_UNIONALL: + qbexpr.setOpcode(QBExpr.Opcode.UNION); + break; + case HiveParser.TOK_INTERSECTALL: + qbexpr.setOpcode(QBExpr.Opcode.INTERSECTALL); + break; + case HiveParser.TOK_INTERSECTDISTINCT: + qbexpr.setOpcode(QBExpr.Opcode.INTERSECT); + break; + case HiveParser.TOK_EXCEPTALL: + qbexpr.setOpcode(QBExpr.Opcode.EXCEPTALL); + break; + case HiveParser.TOK_EXCEPTDISTINCT: + qbexpr.setOpcode(QBExpr.Opcode.EXCEPT); + break; + default: + throw new SemanticException(ErrorMsg.UNSUPPORTED_SET_OPERATOR.getMsg("Type " + + ast.getToken().getType())); + } // query 1 assert (ast.getChild(0) != null); QBExpr qbexpr1 = new QBExpr(alias + SUBQUERY_TAG_1); - doPhase1QBExpr((ASTNode) ast.getChild(0), qbexpr1, id + SUBQUERY_TAG_1, - alias + SUBQUERY_TAG_1, insideView); + doPhase1QBExpr((ASTNode) ast.getChild(0), qbexpr1, id + SUBQUERY_TAG_1, alias + + SUBQUERY_TAG_1, insideView); qbexpr.setQBExpr1(qbexpr1); // query 2 assert (ast.getChild(1) != null); QBExpr qbexpr2 = new QBExpr(alias + SUBQUERY_TAG_2); - doPhase1QBExpr((ASTNode) ast.getChild(1), qbexpr2, id + SUBQUERY_TAG_2, - alias + SUBQUERY_TAG_2, insideView); + doPhase1QBExpr((ASTNode) ast.getChild(1), qbexpr2, id + SUBQUERY_TAG_2, alias + + SUBQUERY_TAG_2, insideView); qbexpr.setQBExpr2(qbexpr2); } - break; - } } private LinkedHashMap<String, ASTNode> doPhase1GetAggregationsFromSelect( http://git-wip-us.apache.org/repos/asf/hive/blob/0049a21f/ql/src/test/org/apache/hadoop/hive/ql/parse/TestSQL11ReservedKeyWordsNegative.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestSQL11ReservedKeyWordsNegative.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestSQL11ReservedKeyWordsNegative.java index a427803..0dc6b19 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestSQL11ReservedKeyWordsNegative.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestSQL11ReservedKeyWordsNegative.java @@ -30,7 +30,7 @@ import org.junit.Test; /** * Parser tests for SQL11 Reserved KeyWords. Please find more information in - * HIVE-6617. Total number : 81 + * HIVE-6617. Total number : 82 * ALL,ALTER,ARRAY,AS,AUTHORIZATION,BETWEEN,BIGINT,BINARY * ,BOOLEAN,BOTH,BY,CONSTRAINT * ,CREATE,CUBE,CURRENT_DATE,CURRENT_TIMESTAMP,CURSOR, @@ -38,7 +38,7 @@ import org.junit.Test; * ,DOUBLE,DROP,EXISTS,EXTERNAL,FALSE,FETCH,FLOAT,FOR * ,FOREIGN,FULL,GRANT,GROUP,GROUPING * ,IMPORT,IN,INNER,INSERT,INT,INTERSECT,INTO,IS - * ,LATERAL,LEFT,LIKE,LOCAL,NONE,NULL + * ,LATERAL,LEFT,LIKE,LOCAL,MINUS,NONE,NULL * ,OF,ORDER,OUT,OUTER,PARTITION,PERCENT,PRECISION * ,PRIMARY,PROCEDURE,RANGE,READS, * REFERENCES,REGEXP,REVOKE,RIGHT,RLIKE,ROLLUP,ROW @@ -670,19 +670,32 @@ public class TestSQL11ReservedKeyWordsNegative { } @Test - public void testSQL11ReservedKeyWords_NONE() { + public void testSQL11ReservedKeyWords_MINUS() { try { - parse("CREATE TABLE NONE (col STRING)"); + parse("CREATE TABLE MINUS (col STRING)"); Assert.assertFalse("Expected ParseException", true); } catch (ParseException ex) { Assert.assertEquals( "Failure didn't match.", - "line 1:13 cannot recognize input near 'NONE' '(' 'col' in table name", + "line 1:13 cannot recognize input near 'MINUS' '(' 'col' in table name", ex.getMessage()); } } @Test + public void testSQL11ReservedKeyWords_NONE() { + try { + parse("CREATE TABLE NONE (col STRING)"); + Assert.assertFalse("Expected ParseException", true); + } catch (ParseException ex) { + Assert.assertEquals( + "Failure didn't match.", + "line 1:13 cannot recognize input near 'NONE' '(' 'col' in table name", + ex.getMessage()); + } + } + + @Test public void testSQL11ReservedKeyWords_NULL() { try { parse("CREATE TABLE NULL (col STRING)"); http://git-wip-us.apache.org/repos/asf/hive/blob/0049a21f/ql/src/test/queries/clientpositive/except_all.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/except_all.q b/ql/src/test/queries/clientpositive/except_all.q new file mode 100644 index 0000000..3b62459 --- /dev/null +++ b/ql/src/test/queries/clientpositive/except_all.q @@ -0,0 +1,58 @@ +set hive.mapred.mode=nonstrict; +set hive.cbo.enable=true; + +create table a(key int); + +insert into table a values (0),(1),(2),(2),(2),(2),(3),(NULL),(NULL); + +create table b(key bigint); + +insert into table b values (1),(2),(2),(3),(5),(5),(NULL),(NULL),(NULL); + +select * from a except all select * from b; + +drop table a; + +drop table b; + +create table a(key int, value int); + +insert into table a values (1,2),(1,2),(1,3),(2,3),(2,2); + +create table b(key int, value int); + +insert into table b values (1,2),(2,3),(2,2),(2,2),(2,20); + +select * from a except all select * from b; + +select * from b except all select * from a; + +select * from b except all select * from a intersect distinct select * from b; + +select * from b except all select * from a except distinct select * from b; + +select * from a except all select * from b union all select * from a except distinct select * from b; + +select * from a except all select * from b union select * from a except distinct select * from b; + +select * from a except all select * from b except distinct select * from a except distinct select * from b; + +select * from (select a.key, b.value from a join b on a.key=b.key)sub1 +except all +select * from (select a.key, b.value from a join b on a.key=b.key)sub2; + +select * from (select a.key, b.value from a join b on a.key=b.key)sub1 +except all +select * from (select b.value as key, a.key as value from a join b on a.key=b.key)sub2; + +explain select * from src except all select * from src; + +select * from src except all select * from src; + +explain select * from src except all select * from src except distinct select * from src except distinct select * from src; + +select * from src except all select * from src except distinct select * from src except distinct select * from src; + +explain select value from a group by value except distinct select key from b group by key; + +select value from a group by value except distinct select key from b group by key; http://git-wip-us.apache.org/repos/asf/hive/blob/0049a21f/ql/src/test/queries/clientpositive/except_distinct.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/except_distinct.q b/ql/src/test/queries/clientpositive/except_distinct.q new file mode 100644 index 0000000..9991447 --- /dev/null +++ b/ql/src/test/queries/clientpositive/except_distinct.q @@ -0,0 +1,58 @@ +set hive.mapred.mode=nonstrict; +set hive.cbo.enable=true; + +create table a(key int); + +insert into table a values (0),(1),(2),(2),(2),(2),(3),(NULL),(NULL); + +create table b(key bigint); + +insert into table b values (1),(2),(2),(3),(5),(5),(NULL),(NULL),(NULL); + +select * from a except distinct select * from b; + +drop table a; + +drop table b; + +create table a(key int, value int); + +insert into table a values (1,2),(1,2),(1,3),(2,3),(2,2); + +create table b(key int, value int); + +insert into table b values (1,2),(2,3),(2,2),(2,2),(2,20); + +select * from a except distinct select * from b; + +select * from b except distinct select * from a; + +select * from b except distinct select * from a intersect distinct select * from b; + +select * from b except distinct select * from a except distinct select * from b; + +select * from a except distinct select * from b union all select * from a except distinct select * from b; + +select * from a except distinct select * from b union select * from a except distinct select * from b; + +select * from a except distinct select * from b except distinct select * from a except distinct select * from b; + +select * from (select a.key, b.value from a join b on a.key=b.key)sub1 +except distinct +select * from (select a.key, b.value from a join b on a.key=b.key)sub2; + +select * from (select a.key, b.value from a join b on a.key=b.key)sub1 +except distinct +select * from (select b.value as key, a.key as value from a join b on a.key=b.key)sub2; + +explain select * from src except distinct select * from src; + +select * from src except distinct select * from src; + +explain select * from src except distinct select * from src except distinct select * from src except distinct select * from src; + +select * from src except distinct select * from src except distinct select * from src except distinct select * from src; + +explain select value from a group by value except distinct select key from b group by key; + +select value from a group by value except distinct select key from b group by key;
