Revert "HIVE-16330 : Improve plans for scalar subquery with aggregates (Vineet Garg via Ashutosh Chauhan)"
This reverts commit a113ede99e82a11384a144616938334696032dd8. Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cde41e9e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cde41e9e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cde41e9e Branch: refs/heads/master Commit: cde41e9ea522b11bcef6f427d564cc3061a8d636 Parents: a113ede Author: Prasanth Jayachandran <prasan...@apache.org> Authored: Tue May 9 23:08:27 2017 -0700 Committer: Prasanth Jayachandran <prasan...@apache.org> Committed: Tue May 9 23:08:27 2017 -0700 ---------------------------------------------------------------------- .../optimizer/calcite/HivePlannerContext.java | 12 +- .../calcite/rules/HiveSubQueryRemoveRule.java | 59 +- .../hadoop/hive/ql/parse/CalcitePlanner.java | 25 +- .../apache/hadoop/hive/ql/parse/QBSubQuery.java | 62 +- .../calcite/TestCBORuleFiredOnlyOnce.java | 3 +- .../subquery_scalar_corr_multi_rows.q | 3 +- .../subquery_scalar_corr_multi_rows.q.out | 3 +- .../clientpositive/llap/subquery_scalar.q.out | 2421 ++++++++++++--- .../clientpositive/llap/subquery_select.q.out | 2810 ++++++++++++++---- .../results/clientpositive/perf/query1.q.out | 240 +- .../results/clientpositive/perf/query30.q.out | 335 ++- .../results/clientpositive/perf/query6.q.out | 349 ++- .../results/clientpositive/perf/query81.q.out | 335 ++- .../results/clientpositive/perf/query9.q.out | 1019 +++++-- 14 files changed, 5805 insertions(+), 1871 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/cde41e9e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HivePlannerContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HivePlannerContext.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HivePlannerContext.java index bdf9955..d0b1757 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HivePlannerContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HivePlannerContext.java @@ -22,7 +22,6 @@ import org.apache.calcite.plan.Context; import org.apache.calcite.rel.RelNode; import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveAlgorithmsConf; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRulesRegistry; - import java.util.Set; @@ -30,18 +29,17 @@ public class HivePlannerContext implements Context { private HiveAlgorithmsConf algoConfig; private HiveRulesRegistry registry; private CalciteConnectionConfig calciteConfig; - private SubqueryConf subqueryConfig; + private Set<RelNode> corrScalarRexSQWithAgg; public HivePlannerContext(HiveAlgorithmsConf algoConfig, HiveRulesRegistry registry, - CalciteConnectionConfig calciteConfig, Set<RelNode> corrScalarRexSQWithAgg, - Set<RelNode> scalarAggNoGbyWindowing) { + CalciteConnectionConfig calciteConfig, Set<RelNode> corrScalarRexSQWithAgg) { this.algoConfig = algoConfig; this.registry = registry; this.calciteConfig = calciteConfig; // this is to keep track if a subquery is correlated and contains aggregate // this is computed in CalcitePlanner while planning and is later required by subuery remove rule // hence this is passed using HivePlannerContext - this.subqueryConfig = new SubqueryConf(corrScalarRexSQWithAgg, scalarAggNoGbyWindowing); + this.corrScalarRexSQWithAgg = corrScalarRexSQWithAgg; } public <T> T unwrap(Class<T> clazz) { @@ -54,8 +52,8 @@ public class HivePlannerContext implements Context { if (clazz.isInstance(calciteConfig)) { return clazz.cast(calciteConfig); } - if(clazz.isInstance(subqueryConfig)) { - return clazz.cast(subqueryConfig); + if(clazz.isInstance(corrScalarRexSQWithAgg)) { + return clazz.cast(corrScalarRexSQWithAgg); } return null; } http://git-wip-us.apache.org/repos/asf/hive/blob/cde41e9e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSubQueryRemoveRule.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSubQueryRemoveRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSubQueryRemoveRule.java index 83d3f74..c692cc0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSubQueryRemoveRule.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSubQueryRemoveRule.java @@ -56,7 +56,6 @@ import java.util.Set; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveSubQRemoveRelBuilder; -import org.apache.hadoop.hive.ql.optimizer.calcite.SubqueryConf; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; /** @@ -100,12 +99,11 @@ public abstract class HiveSubQueryRemoveRule extends RelOptRule{ final int fieldCount = builder.peek().getRowType().getFieldCount(); assert(filter instanceof HiveFilter); - SubqueryConf subqueryConfig = filter.getCluster().getPlanner().getContext().unwrap(SubqueryConf.class); - boolean isCorrScalarQuery = subqueryConfig.getCorrScalarRexSQWithAgg().contains(e.rel); - boolean hasNoWindowingAndNoGby = subqueryConfig.getScalarAggWithoutGbyWindowing().contains(e.rel); + Set<RelNode> corrScalarQueries = filter.getCluster().getPlanner().getContext().unwrap(Set.class); + boolean isCorrScalarQuery = corrScalarQueries.contains(e.rel); final RexNode target = apply(e, HiveFilter.getVariablesSet(e), logic, - builder, 1, fieldCount, isCorrScalarQuery, hasNoWindowingAndNoGby); + builder, 1, fieldCount, isCorrScalarQuery); final RexShuttle shuttle = new ReplaceSubQueryShuttle(e, target); builder.filter(shuttle.apply(filter.getCondition())); builder.project(fields(builder, filter.getRowType().getFieldCount())); @@ -124,12 +122,11 @@ public abstract class HiveSubQueryRemoveRule extends RelOptRule{ builder.push(project.getInput()); final int fieldCount = builder.peek().getRowType().getFieldCount(); - SubqueryConf subqueryConfig = project.getCluster().getPlanner().getContext().unwrap(SubqueryConf.class); - boolean isCorrScalarQuery = subqueryConfig.getCorrScalarRexSQWithAgg().contains(e.rel); - boolean hasNoWindowingAndNoGby = subqueryConfig.getScalarAggWithoutGbyWindowing().contains(e.rel); + Set<RelNode> corrScalarQueries = project.getCluster().getPlanner().getContext().unwrap(Set.class); + boolean isCorrScalarQuery = corrScalarQueries.contains(e.rel); final RexNode target = apply(e, HiveFilter.getVariablesSet(e), - logic, builder, 1, fieldCount, isCorrScalarQuery, hasNoWindowingAndNoGby); + logic, builder, 1, fieldCount, isCorrScalarQuery); final RexShuttle shuttle = new ReplaceSubQueryShuttle(e, target); builder.project(shuttle.apply(project.getProjects()), project.getRowType().getFieldNames()); @@ -168,32 +165,28 @@ public abstract class HiveSubQueryRemoveRule extends RelOptRule{ protected RexNode apply(RexSubQuery e, Set<CorrelationId> variablesSet, RelOptUtil.Logic logic, HiveSubQRemoveRelBuilder builder, int inputCount, int offset, - boolean isCorrScalarAgg, - boolean hasNoWindowingAndNoGby ) { + boolean isCorrScalarAgg) { switch (e.getKind()) { case SCALAR_QUERY: - // if scalar query has aggregate and no windowing and no gby avoid adding sq_count_check - // since it is guaranteed to produce at most one row - if(!hasNoWindowingAndNoGby) { - builder.push(e.rel); - // returns single row/column - builder.aggregate(builder.groupKey(), builder.count(false, "cnt")); - - SqlFunction countCheck = - new SqlFunction("sq_count_check", SqlKind.OTHER_FUNCTION, ReturnTypes.BIGINT, - InferTypes.RETURN_TYPE, OperandTypes.NUMERIC, SqlFunctionCategory.USER_DEFINED_FUNCTION); - - // we create FILTER (sq_count_check(count()) <= 1) instead of PROJECT because RelFieldTrimmer - // ends up getting rid of Project since it is not used further up the tree - builder.filter(builder.call(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, - builder.call(countCheck, builder.field("cnt")), builder.literal(1))); - if (!variablesSet.isEmpty()) { - builder.join(JoinRelType.LEFT, builder.literal(true), variablesSet); - } else - builder.join(JoinRelType.INNER, builder.literal(true), variablesSet); - - offset++; + builder.push(e.rel); + // returns single row/column + builder.aggregate(builder.groupKey(), + builder.count(false, "cnt")); + + SqlFunction countCheck = new SqlFunction("sq_count_check", SqlKind.OTHER_FUNCTION, ReturnTypes.BIGINT, + InferTypes.RETURN_TYPE, OperandTypes.NUMERIC, SqlFunctionCategory.USER_DEFINED_FUNCTION); + + // we create FILTER (sq_count_check(count()) <= 1) instead of PROJECT because RelFieldTrimmer + // ends up getting rid of Project since it is not used further up the tree + builder.filter(builder.call(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, + builder.call(countCheck, builder.field("cnt")), + builder.literal(1))); + if( !variablesSet.isEmpty()) + { + builder.join(JoinRelType.LEFT, builder.literal(true), variablesSet); } + else + builder.join(JoinRelType.INNER, builder.literal(true), variablesSet); if(isCorrScalarAgg) { // Transformation : // Outer Query Left Join (inner query) on correlated predicate and preserve rows only from left side. @@ -225,8 +218,10 @@ public abstract class HiveSubQueryRemoveRule extends RelOptRule{ //Transformation is to left join for correlated predicates and inner join otherwise, // but do a count on inner side before that to make sure it generates atmost 1 row. + builder.push(e.rel); builder.join(JoinRelType.LEFT, builder.literal(true), variablesSet); + offset++; return field(builder, inputCount, offset); case IN: http://git-wip-us.apache.org/repos/asf/hive/blob/cde41e9e/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index fa96e94..5d640be 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -1299,7 +1299,6 @@ public class CalcitePlanner extends SemanticAnalyzer { // this is to keep track if a subquery is correlated and contains aggregate // since this is special cased when it is rewritten in SubqueryRemoveRule Set<RelNode> corrScalarRexSQWithAgg = new HashSet<RelNode>(); - Set<RelNode> scalarAggNoGbyNoWin = new HashSet<RelNode>(); // TODO: Do we need to keep track of RR, ColNameToPosMap for every op or // just last one. @@ -1333,7 +1332,7 @@ public class CalcitePlanner extends SemanticAnalyzer { Boolean.FALSE.toString()); CalciteConnectionConfig calciteConfig = new CalciteConnectionConfigImpl(calciteConfigProperties); HivePlannerContext confContext = new HivePlannerContext(algorithmsConf, registry, calciteConfig, - corrScalarRexSQWithAgg, scalarAggNoGbyNoWin); + corrScalarRexSQWithAgg); RelOptPlanner planner = HiveVolcanoPlanner.createPlanner(confContext); final RexBuilder rexBuilder = cluster.getRexBuilder(); final RelOptCluster optCluster = RelOptCluster.create(planner, rexBuilder); @@ -2426,8 +2425,8 @@ public class CalcitePlanner extends SemanticAnalyzer { } private void subqueryRestrictionCheck(QB qb, ASTNode searchCond, RelNode srcRel, - boolean forHavingClause, Set<ASTNode> corrScalarQueries, - Set<ASTNode> scalarQueriesWithAggNoWinNoGby) throws SemanticException { + boolean forHavingClause, + Set<ASTNode> corrScalarQueries) throws SemanticException { List<ASTNode> subQueriesInOriginalTree = SubQueryUtils.findSubQueries(searchCond); ASTNode clonedSearchCond = (ASTNode) SubQueryUtils.adaptor.dupTree(searchCond); @@ -2462,25 +2461,18 @@ public class CalcitePlanner extends SemanticAnalyzer { String havingInputAlias = null; - boolean [] subqueryConfig = {false, false}; - subQuery.subqueryRestrictionsCheck(inputRR, forHavingClause, - havingInputAlias, subqueryConfig); - if(subqueryConfig[0]) { + boolean isCorrScalarWithAgg = subQuery.subqueryRestrictionsCheck(inputRR, forHavingClause, havingInputAlias); + if(isCorrScalarWithAgg) { corrScalarQueries.add(originalSubQueryAST); } - if(subqueryConfig[1]) { - scalarQueriesWithAggNoWinNoGby.add(originalSubQueryAST); - } } } private boolean genSubQueryRelNode(QB qb, ASTNode node, RelNode srcRel, boolean forHavingClause, Map<ASTNode, RelNode> subQueryToRelNode) throws SemanticException { Set<ASTNode> corrScalarQueriesWithAgg = new HashSet<ASTNode>(); - Set<ASTNode> scalarQueriesWithAggNoWinNoGby= new HashSet<ASTNode>(); //disallow subqueries which HIVE doesn't currently support - subqueryRestrictionCheck(qb, node, srcRel, forHavingClause, corrScalarQueriesWithAgg, - scalarQueriesWithAggNoWinNoGby); + subqueryRestrictionCheck(qb, node, srcRel, forHavingClause, corrScalarQueriesWithAgg); Deque<ASTNode> stack = new ArrayDeque<ASTNode>(); stack.push(node); @@ -2510,14 +2502,9 @@ public class CalcitePlanner extends SemanticAnalyzer { subQueryToRelNode.put(next, subQueryRelNode); //keep track of subqueries which are scalar, correlated and contains aggregate // subquery expression. This will later be special cased in Subquery remove rule - // for correlated scalar queries with aggregate we have take care of the case where - // inner aggregate happens on empty result if(corrScalarQueriesWithAgg.contains(next)) { corrScalarRexSQWithAgg.add(subQueryRelNode); } - if(scalarQueriesWithAggNoWinNoGby.contains(next)) { - scalarAggNoGbyNoWin.add(subQueryRelNode); - } isSubQuery = true; break; default: http://git-wip-us.apache.org/repos/asf/hive/blob/cde41e9e/ql/src/java/org/apache/hadoop/hive/ql/parse/QBSubQuery.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBSubQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBSubQuery.java index 0097a04..ec52741 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBSubQuery.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBSubQuery.java @@ -526,9 +526,9 @@ public class QBSubQuery implements ISubQueryJoinInfo { * @return true if it is correlated scalar subquery with an aggregate * @throws SemanticException */ - void subqueryRestrictionsCheck(RowResolver parentQueryRR, + boolean subqueryRestrictionsCheck(RowResolver parentQueryRR, boolean forHavingClause, - String outerQueryAlias, boolean [] subqueryConfig) + String outerQueryAlias) throws SemanticException { ASTNode insertClause = getChildFromSubqueryAST("Insert", HiveParser.TOK_INSERT); @@ -568,35 +568,37 @@ public class QBSubQuery implements ISubQueryJoinInfo { hasCount = hasCount | ( r == 2 ); } - // figure out correlation and presence of non-equi join predicate - boolean hasCorrelation = false; - boolean hasNonEquiJoinPred = false; + ASTNode whereClause = SubQueryUtils.subQueryWhere(insertClause); - if ( whereClause != null ) { - ASTNode searchCond = (ASTNode) whereClause.getChild(0); - List<ASTNode> conjuncts = new ArrayList<ASTNode>(); - SubQueryUtils.extractConjuncts(searchCond, conjuncts); - - ConjunctAnalyzer conjunctAnalyzer = - new ConjunctAnalyzer(parentQueryRR, forHavingClause, outerQueryAlias); - - for (ASTNode conjunctAST : conjuncts) { - Conjunct conjunct = conjunctAnalyzer.analyzeConjunct(conjunctAST); - if (conjunct.isCorrelated()) { - hasCorrelation = true; - } - if (conjunct.eitherSideRefersBoth() && conjunctAST.getType() != HiveParser.EQUAL) { - hasNonEquiJoinPred = true; - } - } + + if ( whereClause == null ) { + return false; } + ASTNode searchCond = (ASTNode) whereClause.getChild(0); + List<ASTNode> conjuncts = new ArrayList<ASTNode>(); + SubQueryUtils.extractConjuncts(searchCond, conjuncts); - // figure out if there is group by + ConjunctAnalyzer conjunctAnalyzer = new ConjunctAnalyzer(parentQueryRR, + forHavingClause, outerQueryAlias); + + boolean hasCorrelation = false; + boolean hasNonEquiJoinPred = false; + for(ASTNode conjunctAST : conjuncts) { + Conjunct conjunct = conjunctAnalyzer.analyzeConjunct(conjunctAST); + if(conjunct.isCorrelated()){ + hasCorrelation = true; + } + if ( conjunct.eitherSideRefersBoth() && conjunctAST.getType() != HiveParser.EQUAL) { + hasNonEquiJoinPred = true; + } + } boolean noImplicityGby = true; - if ( insertClause.getChildCount() > 3 && - insertClause.getChild(3).getType() == HiveParser.TOK_GROUPBY ) { + if ( insertClause.getChild(1).getChildCount() > 3 && + insertClause.getChild(1).getChild(3).getType() == HiveParser.TOK_GROUPBY ) { + if((ASTNode) insertClause.getChild(1).getChild(3) != null){ noImplicityGby = false; + } } /* @@ -641,24 +643,22 @@ public class QBSubQuery implements ISubQueryJoinInfo { subQueryAST, "Scalar subqueries with aggregate cannot have non-equi join predicate")); } - if(!hasWindowing) { - subqueryConfig[1] = true; - } if(hasCorrelation) { - subqueryConfig[0] = true; + return true; } } else if(operator.getType() == SubQueryType.IN) { if(hasCount && hasCorrelation) { - subqueryConfig[0] = true; + return true; } } else if (operator.getType() == SubQueryType.NOT_IN) { if(hasCorrelation) { - subqueryConfig[0] = true; + return true; } } } + return false; } void validateAndRewriteAST(RowResolver outerQueryRR, http://git-wip-us.apache.org/repos/asf/hive/blob/cde41e9e/ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/TestCBORuleFiredOnlyOnce.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/TestCBORuleFiredOnlyOnce.java b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/TestCBORuleFiredOnlyOnce.java index 884e034..4823950 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/TestCBORuleFiredOnlyOnce.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/TestCBORuleFiredOnlyOnce.java @@ -61,8 +61,7 @@ public class TestCBORuleFiredOnlyOnce { // Create rules registry to not trigger a rule more than once HiveRulesRegistry registry = new HiveRulesRegistry(); - HivePlannerContext context = new HivePlannerContext(null, registry, null, - null, null); + HivePlannerContext context = new HivePlannerContext(null, registry, null, null); HepPlanner planner = new HepPlanner(programBuilder.build(), context); // Cluster http://git-wip-us.apache.org/repos/asf/hive/blob/cde41e9e/ql/src/test/queries/clientnegative/subquery_scalar_corr_multi_rows.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientnegative/subquery_scalar_corr_multi_rows.q b/ql/src/test/queries/clientnegative/subquery_scalar_corr_multi_rows.q index e71a60d..e9ea703 100644 --- a/ql/src/test/queries/clientnegative/subquery_scalar_corr_multi_rows.q +++ b/ql/src/test/queries/clientnegative/subquery_scalar_corr_multi_rows.q @@ -1,3 +1,2 @@ -- inner query produces more than one row -select * from part where p_size > - (select count(*) from part p where p.p_mfgr = part.p_mfgr group by p_type); \ No newline at end of file +select * from part where p_size > (select count(*) from part p where p.p_mfgr = part.p_mfgr group by p_type); \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/cde41e9e/ql/src/test/results/clientnegative/subquery_scalar_corr_multi_rows.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientnegative/subquery_scalar_corr_multi_rows.q.out b/ql/src/test/results/clientnegative/subquery_scalar_corr_multi_rows.q.out index 8377085..3235048 100644 --- a/ql/src/test/results/clientnegative/subquery_scalar_corr_multi_rows.q.out +++ b/ql/src/test/results/clientnegative/subquery_scalar_corr_multi_rows.q.out @@ -1,5 +1,4 @@ -PREHOOK: query: select * from part where p_size > - (select count(*) from part p where p.p_mfgr = part.p_mfgr group by p_type) +PREHOOK: query: select * from part where p_size > (select count(*) from part p where p.p_mfgr = part.p_mfgr group by p_type) PREHOOK: type: QUERY PREHOOK: Input: default@part #### A masked pattern was here ####