HIVE-12186 : Upgrade Hive to Calcite 1.5 (Jesus Camacho Rodriguez via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/da4b1b07 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/da4b1b07 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/da4b1b07 Branch: refs/heads/spark Commit: da4b1b07764718210377814edf06d5960d074f79 Parents: 1d5da09 Author: Jesus Camacho Rodriguez <[email protected]> Authored: Wed Nov 11 11:31:22 2015 -0800 Committer: Ashutosh Chauhan <[email protected]> Committed: Wed Nov 11 11:31:22 2015 -0800 ---------------------------------------------------------------------- pom.xml | 2 +- .../calcite/reloperators/HiveTableScan.java | 6 +- .../calcite/rules/HiveRelFieldTrimmer.java | 143 ++----------------- .../calcite/translator/ASTConverter.java | 34 ++--- .../calcite/translator/HiveOpConverter.java | 7 +- .../translator/PlanModifierForASTConv.java | 3 +- .../hadoop/hive/ql/parse/CalcitePlanner.java | 8 +- .../bucketizedhiveinputformat.q.out | 2 + .../spark/bucketizedhiveinputformat.q.out | 2 + 9 files changed, 38 insertions(+), 169 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/da4b1b07/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 282d077..99aeff7 100644 --- a/pom.xml +++ b/pom.xml @@ -105,7 +105,7 @@ <antlr.version>3.4</antlr.version> <avro.version>1.7.7</avro.version> <bonecp.version>0.8.0.RELEASE</bonecp.version> - <calcite.version>1.4.0-incubating</calcite.version> + <calcite.version>1.5.0</calcite.version> <datanucleus-api-jdo.version>3.2.6</datanucleus-api-jdo.version> <datanucleus-core.version>3.2.10</datanucleus-core.version> <datanucleus-rdbms.version>3.2.9</datanucleus-rdbms.version> http://git-wip-us.apache.org/repos/asf/hive/blob/da4b1b07/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java index 1831d69..446dc73 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java @@ -29,13 +29,13 @@ import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelWriter; -import org.apache.calcite.rel.core.RelFactories; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.tools.RelBuilder; import org.apache.calcite.util.ImmutableBitSet; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; @@ -152,7 +152,7 @@ public class HiveTableScan extends TableScan implements HiveRelNode { @Override public RelNode project(ImmutableBitSet fieldsUsed, Set<RelDataTypeField> extraFields, - RelFactories.ProjectFactory projectFactory) { + RelBuilder relBuilder) { // 1. If the schema is the same then bail out final int fieldCount = getRowType().getFieldCount(); @@ -183,7 +183,7 @@ public class HiveTableScan extends TableScan implements HiveRelNode { fieldNames)); // 5. Add Proj on top of TS - return projectFactory.createProject(newHT, exprList, new ArrayList<String>(fieldNames)); + return relBuilder.push(newHT).project(exprList, new ArrayList<String>(fieldNames)).build(); } public List<Integer> getNeededColIndxsFrmReloptHT() { http://git-wip-us.apache.org/repos/asf/hive/blob/da4b1b07/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelFieldTrimmer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelFieldTrimmer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelFieldTrimmer.java index a12fa2a..b543fbb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelFieldTrimmer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelFieldTrimmer.java @@ -23,10 +23,9 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Set; +import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Aggregate; -import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.rel.core.RelFactories; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; @@ -35,22 +34,23 @@ import org.apache.calcite.rex.RexPermuteInputsShuttle; import org.apache.calcite.rex.RexVisitor; import org.apache.calcite.sql.validate.SqlValidator; import org.apache.calcite.sql2rel.RelFieldTrimmer; +import org.apache.calcite.tools.RelBuilder; import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.mapping.IntPair; import org.apache.calcite.util.mapping.Mapping; import org.apache.calcite.util.mapping.MappingType; import org.apache.calcite.util.mapping.Mappings; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveMultiJoin; -import com.google.common.base.Function; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; - public class HiveRelFieldTrimmer extends RelFieldTrimmer { - private final RelFactories.AggregateFactory aggregateFactory; + protected static final Log LOG = LogFactory.getLog(HiveRelFieldTrimmer.class); + public HiveRelFieldTrimmer(SqlValidator validator, + RelOptCluster cluster, RelFactories.ProjectFactory projectFactory, RelFactories.FilterFactory filterFactory, RelFactories.JoinFactory joinFactory, @@ -58,9 +58,10 @@ public class HiveRelFieldTrimmer extends RelFieldTrimmer { RelFactories.SortFactory sortFactory, RelFactories.AggregateFactory aggregateFactory, RelFactories.SetOpFactory setOpFactory) { - super(validator, projectFactory, filterFactory, joinFactory, - semiJoinFactory, sortFactory, aggregateFactory, setOpFactory); - this.aggregateFactory = aggregateFactory; + super(validator, + RelBuilder.proto(projectFactory, filterFactory, joinFactory, + semiJoinFactory, sortFactory, aggregateFactory, setOpFactory) + .create(cluster, null)); } /** @@ -155,127 +156,5 @@ public class HiveRelFieldTrimmer extends RelFieldTrimmer { return new TrimResult(newJoin, mapping); } - /** - * Variant of {@link #trimFields(RelNode, ImmutableBitSet, Set)} for - * {@link org.apache.calcite.rel.logical.LogicalAggregate}. - */ - @Override - public TrimResult trimFields( - Aggregate aggregate, - ImmutableBitSet fieldsUsed, - Set<RelDataTypeField> extraFields) { - // Fields: - // - // | sys fields | group fields | indicator fields | agg functions | - // - // Two kinds of trimming: - // - // 1. If agg rel has system fields but none of these are used, create an - // agg rel with no system fields. - // - // 2. If aggregate functions are not used, remove them. - // - // But group and indicator fields stay, even if they are not used. - - final RelDataType rowType = aggregate.getRowType(); - - // Compute which input fields are used. - // 1. group fields are always used - final ImmutableBitSet.Builder inputFieldsUsed = - ImmutableBitSet.builder(aggregate.getGroupSet()); - // 2. agg functions - for (AggregateCall aggCall : aggregate.getAggCallList()) { - for (int i : aggCall.getArgList()) { - inputFieldsUsed.set(i); - } - if (aggCall.filterArg >= 0) { - inputFieldsUsed.set(aggCall.filterArg); - } - } - - // Create input with trimmed columns. - final RelNode input = aggregate.getInput(); - final Set<RelDataTypeField> inputExtraFields = Collections.emptySet(); - final TrimResult trimResult = - trimChild(aggregate, input, inputFieldsUsed.build(), inputExtraFields); - final RelNode newInput = trimResult.left; - final Mapping inputMapping = trimResult.right; - - // We have to return group keys and (if present) indicators. - // So, pretend that the consumer asked for them. - final int groupCount = aggregate.getGroupSet().cardinality(); - final int indicatorCount = aggregate.getIndicatorCount(); - fieldsUsed = - fieldsUsed.union(ImmutableBitSet.range(groupCount + indicatorCount)); - - // If the input is unchanged, and we need to project all columns, - // there's nothing to do. - if (input == newInput - && fieldsUsed.equals(ImmutableBitSet.range(rowType.getFieldCount()))) { - return new TrimResult( - aggregate, - Mappings.createIdentity(rowType.getFieldCount())); - } - - // Which agg calls are used by our consumer? - int j = groupCount + indicatorCount; - int usedAggCallCount = 0; - for (int i = 0; i < aggregate.getAggCallList().size(); i++) { - if (fieldsUsed.get(j++)) { - ++usedAggCallCount; - } - } - - // Offset due to the number of system fields having changed. - Mapping mapping = - Mappings.create( - MappingType.INVERSE_SURJECTION, - rowType.getFieldCount(), - groupCount + indicatorCount + usedAggCallCount); - - final ImmutableBitSet newGroupSet = - Mappings.apply(inputMapping, aggregate.getGroupSet()); - - final ImmutableList<ImmutableBitSet> newGroupSets = - ImmutableList.copyOf( - Iterables.transform(aggregate.getGroupSets(), - new Function<ImmutableBitSet, ImmutableBitSet>() { - @Override - public ImmutableBitSet apply(ImmutableBitSet input) { - return Mappings.apply(inputMapping, input); - } - })); - - // Populate mapping of where to find the fields. System, group key and - // indicator fields first. - for (j = 0; j < groupCount + indicatorCount; j++) { - mapping.set(j, j); - } - - // Now create new agg calls, and populate mapping for them. - final List<AggregateCall> newAggCallList = new ArrayList<>(); - j = groupCount + indicatorCount; - for (AggregateCall aggCall : aggregate.getAggCallList()) { - if (fieldsUsed.get(j)) { - AggregateCall newAggCall = - aggCall.copy(Mappings.apply2(inputMapping, aggCall.getArgList()), - Mappings.apply(inputMapping, aggCall.filterArg)); - if (newAggCall.equals(aggCall)) { - newAggCall = aggCall; // immutable -> canonize to save space - } - mapping.set(j, groupCount + indicatorCount + newAggCallList.size()); - newAggCallList.add(newAggCall); - } - ++j; - } - - RelNode newAggregate = aggregateFactory.createAggregate(newInput, - aggregate.indicator, newGroupSet, newGroupSets, newAggCallList); - - assert newAggregate.getClass() == aggregate.getClass(); - - return new TrimResult(newAggregate, mapping); - } - } http://git-wip-us.apache.org/repos/asf/hive/blob/da4b1b07/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java index e4ac154..d026e58 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java @@ -77,8 +77,7 @@ public class ASTConverter { private Aggregate groupBy; private Filter having; private Project select; - private Sort order; - private Sort limit; + private Sort orderLimit; private Schema schema; @@ -203,27 +202,14 @@ public class ASTConverter { * parent hence we need to go top down; but OB at each block really belong * to its src/from. Hence the need to pass in sort for each block from * its parent. + * 8. Limit */ - convertOBToASTNode((HiveSortLimit) order); - - // 8. Limit - convertLimitToASTNode((HiveSortLimit) limit); + convertOrderLimitToASTNode((HiveSortLimit) orderLimit); return hiveAST.getAST(); } - private void convertLimitToASTNode(HiveSortLimit limit) { - if (limit != null) { - HiveSortLimit hiveLimit = limit; - RexNode limitExpr = hiveLimit.getFetchExpr(); - if (limitExpr != null) { - Object val = ((RexLiteral) limitExpr).getValue2(); - hiveAST.limit = ASTBuilder.limit(val); - } - } - } - - private void convertOBToASTNode(HiveSortLimit order) { + private void convertOrderLimitToASTNode(HiveSortLimit order) { if (order != null) { HiveSortLimit hiveSortLimit = order; if (!hiveSortLimit.getCollation().getFieldCollations().isEmpty()) { @@ -264,6 +250,12 @@ public class ASTConverter { } hiveAST.order = orderAst; } + + RexNode limitExpr = hiveSortLimit.getFetchExpr(); + if (limitExpr != null) { + Object val = ((RexLiteral) limitExpr).getValue2(); + hiveAST.limit = ASTBuilder.limit(val); + } } } @@ -366,11 +358,7 @@ public class ASTConverter { if (ASTConverter.this.select != null) { ASTConverter.this.from = node; } else { - Sort hiveSortRel = (Sort) node; - if (hiveSortRel.getCollation().getFieldCollations().isEmpty()) - ASTConverter.this.limit = hiveSortRel; - else - ASTConverter.this.order = hiveSortRel; + ASTConverter.this.orderLimit = (Sort) node; } } /* http://git-wip-us.apache.org/repos/asf/hive/blob/da4b1b07/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java index f0f8aa8..130ee89 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java @@ -487,7 +487,7 @@ public class HiveOpConverter { if (this.semanticAnalyzer != null && semanticAnalyzer.getQB() != null && semanticAnalyzer.getQB().getParseInfo() != null) this.semanticAnalyzer.getQB().getParseInfo().setOuterQueryLimit(limit); - ArrayList<ColumnInfo> cinfoLst = createColInfos(inputOp); + ArrayList<ColumnInfo> cinfoLst = createColInfos(resultOp); resultOp = OperatorFactory.getAndMakeChild(limitDesc, new RowSchema(cinfoLst), resultOp); @@ -1059,10 +1059,6 @@ public class HiveOpConverter { } private static JoinType extractJoinType(HiveJoin join) { - // UNIQUE - if (join.isDistinct()) { - return JoinType.UNIQUE; - } // SEMIJOIN if (join.isLeftSemiJoin()) { return JoinType.LEFTSEMI; @@ -1080,6 +1076,7 @@ public class HiveOpConverter { resultJoinType = JoinType.RIGHTOUTER; break; default: + // TODO: UNIQUE JOIN resultJoinType = JoinType.INNER; break; } http://git-wip-us.apache.org/repos/asf/hive/blob/da4b1b07/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java index e820496..29e08f8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java @@ -312,7 +312,8 @@ public class PlanModifierForASTConv { boolean validChild = true; RelNode child = sortNode.getInput(); - if (!(HiveCalciteUtil.limitRelNode(sortNode) && HiveCalciteUtil.orderRelNode(child)) + if (!(HiveCalciteUtil.limitRelNode(sortNode) && HiveCalciteUtil.orderRelNode(child) + && HiveCalciteUtil.limitRelNode(child)) && !(child instanceof Project)) { validChild = false; } http://git-wip-us.apache.org/repos/asf/hive/blob/da4b1b07/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index e13356c..de67b54 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -58,8 +58,6 @@ import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.rel.core.Filter; import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.JoinRelType; -import org.apache.calcite.rel.core.Project; -import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.metadata.CachingRelMetadataProvider; import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider; import org.apache.calcite.rel.metadata.RelMetadataProvider; @@ -902,7 +900,8 @@ public class CalcitePlanner extends SemanticAnalyzer { HiveJoinToMultiJoinRule.INSTANCE, HiveProjectMergeRule.INSTANCE); // The previous rules can pull up projections through join operators, // thus we run the field trimmer again to push them back down - HiveRelFieldTrimmer fieldTrimmer = new HiveRelFieldTrimmer(null, HiveProject.DEFAULT_PROJECT_FACTORY, + HiveRelFieldTrimmer fieldTrimmer = new HiveRelFieldTrimmer(null, + cluster, HiveProject.DEFAULT_PROJECT_FACTORY, HiveFilter.DEFAULT_FILTER_FACTORY, HiveJoin.HIVE_JOIN_FACTORY, HiveSemiJoin.HIVE_SEMIJOIN_FACTORY, HiveSortLimit.HIVE_SORT_REL_FACTORY, HiveAggregate.HIVE_AGGR_REL_FACTORY, HiveUnion.UNION_REL_FACTORY); @@ -983,7 +982,8 @@ public class CalcitePlanner extends SemanticAnalyzer { new HivePartitionPruneRule(conf)); // 5. Projection Pruning - HiveRelFieldTrimmer fieldTrimmer = new HiveRelFieldTrimmer(null, HiveProject.DEFAULT_PROJECT_FACTORY, + HiveRelFieldTrimmer fieldTrimmer = new HiveRelFieldTrimmer(null, + cluster, HiveProject.DEFAULT_PROJECT_FACTORY, HiveFilter.DEFAULT_FILTER_FACTORY, HiveJoin.HIVE_JOIN_FACTORY, HiveSemiJoin.HIVE_SEMIJOIN_FACTORY, HiveSortLimit.HIVE_SORT_REL_FACTORY, HiveAggregate.HIVE_AGGR_REL_FACTORY, HiveUnion.UNION_REL_FACTORY); http://git-wip-us.apache.org/repos/asf/hive/blob/da4b1b07/ql/src/test/results/clientpositive/bucketizedhiveinputformat.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/bucketizedhiveinputformat.q.out b/ql/src/test/results/clientpositive/bucketizedhiveinputformat.q.out index 277b0f7..cfb95be 100644 --- a/ql/src/test/results/clientpositive/bucketizedhiveinputformat.q.out +++ b/ql/src/test/results/clientpositive/bucketizedhiveinputformat.q.out @@ -22,6 +22,8 @@ POSTHOOK: query: CREATE TABLE T2(name STRING) STORED AS SEQUENCEFILE POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@T2 +Warning: Shuffle Join JOIN[13][tables = [$hdt$_0, $hdt$_1, $hdt$_2]] in Stage 'Stage-2:MAPRED' is a cross product +Warning: Shuffle Join JOIN[10][tables = [$hdt$_0, $hdt$_1]] in Stage 'Stage-1:MAPRED' is a cross product PREHOOK: query: INSERT OVERWRITE TABLE T2 SELECT * FROM ( SELECT tmp1.name as name FROM ( SELECT name, 'MMM' AS n FROM T1) tmp1 http://git-wip-us.apache.org/repos/asf/hive/blob/da4b1b07/ql/src/test/results/clientpositive/spark/bucketizedhiveinputformat.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/spark/bucketizedhiveinputformat.q.out b/ql/src/test/results/clientpositive/spark/bucketizedhiveinputformat.q.out index 98c5802..c8fc4d3 100644 --- a/ql/src/test/results/clientpositive/spark/bucketizedhiveinputformat.q.out +++ b/ql/src/test/results/clientpositive/spark/bucketizedhiveinputformat.q.out @@ -22,6 +22,8 @@ POSTHOOK: query: CREATE TABLE T2(name STRING) STORED AS SEQUENCEFILE POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@T2 +Warning: Shuffle Join JOIN[10][tables = [$hdt$_0, $hdt$_1]] in Work 'Reducer 2' is a cross product +Warning: Shuffle Join JOIN[13][tables = [$hdt$_0, $hdt$_1, $hdt$_2]] in Work 'Reducer 3' is a cross product PREHOOK: query: INSERT OVERWRITE TABLE T2 SELECT * FROM ( SELECT tmp1.name as name FROM ( SELECT name, 'MMM' AS n FROM T1) tmp1
