jcamachor commented on a change in pull request #1553: URL: https://github.com/apache/hive/pull/1553#discussion_r502816547
########## File path: Jenkinsfile ########## @@ -174,6 +174,17 @@ def loadWS() { tar -xf archive.tar''' } +def saveFile(name) { Review comment: It seems this is unrelated to this patch? It may be better to split into multiple JIRAs/PRs. ########## File path: pom.xml ########## @@ -104,7 +104,7 @@ <maven.checkstyle.plugin.version>2.17</maven.checkstyle.plugin.version> <maven.build-helper.plugin.version>1.12</maven.build-helper.plugin.version> <maven.eclipse.plugin.version>2.10</maven.eclipse.plugin.version> - <maven.surefire.plugin.version>3.0.0-M4</maven.surefire.plugin.version> + <maven.surefire.plugin.version>3.0.0-M5</maven.surefire.plugin.version> Review comment: Same as above, not sure if it would belong exactly to same PR, but it would be better to have different JIRAs/PRs for this different issues. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java ########## @@ -386,125 +456,81 @@ public boolean sharedWorkOptimization(ParseContext pctx, SharedWorkOptimizerCach LOG.debug("Merging subtree starting at {} into subtree starting at {}", discardableTsOp, retainableTsOp); } else { - ExprNodeDesc newRetainableTsFilterExpr = null; - List<ExprNodeDesc> semijoinExprNodes = new ArrayList<>(); - if (retainableTsOp.getConf().getFilterExpr() != null) { - // Gather SJ expressions and normal expressions - List<ExprNodeDesc> allExprNodesExceptSemijoin = new ArrayList<>(); - splitExpressions(retainableTsOp.getConf().getFilterExpr(), - allExprNodesExceptSemijoin, semijoinExprNodes); - // Create new expressions - if (allExprNodesExceptSemijoin.size() > 1) { - newRetainableTsFilterExpr = ExprNodeGenericFuncDesc.newInstance( - new GenericUDFOPAnd(), allExprNodesExceptSemijoin); - } else if (allExprNodesExceptSemijoin.size() > 0 && - allExprNodesExceptSemijoin.get(0) instanceof ExprNodeGenericFuncDesc) { - newRetainableTsFilterExpr = allExprNodesExceptSemijoin.get(0); - } - // Push filter on top of children for retainable - pushFilterToTopOfTableScan(optimizerCache, retainableTsOp); + + if (sr.discardableOps.size() > 1) { + throw new RuntimeException("we can't discard more in this path"); Review comment: Can you leave a comment explaining how we could hit this error? What should have happened? Somehow we have something in retainable/discardable that should be equivalent but it is not? ########## File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java ########## @@ -338,8 +385,29 @@ public boolean sharedWorkOptimization(ParseContext pctx, SharedWorkOptimizerCach // about the part of the tree that can be merged. We need to regenerate the // cache because semijoin operators have been removed sr = extractSharedOptimizationInfoForRoot( - pctx, optimizerCache, retainableTsOp, discardableTsOp); - } else { + pctx, optimizerCache, retainableTsOp, discardableTsOp, true); + } else if (mode == Mode.DPPUnion) { + boolean mergeable = areMergeable(pctx, retainableTsOp, discardableTsOp); + if (!mergeable) { + LOG.debug("{} and {} cannot be merged", retainableTsOp, discardableTsOp); + continue; + } + boolean validMerge = + areMergeableDppUninon(pctx, optimizerCache, retainableTsOp, discardableTsOp); Review comment: typo -> areMergeableDppUninon ########## File path: ql/src/test/results/clientpositive/llap/join_parse.q.out ########## @@ -499,34 +499,41 @@ STAGE PLANS: sort order: + Map-reduce partition columns: _col0 (type: string) Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: (value is not null and key is not null) (type: boolean) + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator key expressions: _col0 (type: string) null sort order: z sort order: + Map-reduce partition columns: _col0 (type: string) - Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: string) Execution mode: vectorized, llap LLAP IO: all inputs Map 6 Map Operator Tree: TableScan - alias: src1 - filterExpr: (value is not null and key is not null) (type: boolean) - Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + alias: src2 Review comment: In this case, it seems we are still missing this merging opportunity (it may be for a different reason). ########## File path: ql/src/test/results/clientpositive/llap/special_character_in_tabnames_1.q.out ########## @@ -1986,18 +1986,18 @@ STAGE PLANS: Tez #### A masked pattern was here #### Edges: - Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE) Reducer 3 <- Reducer 2 (SIMPLE_EDGE) - Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE) + Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE) Reducer 5 <- Reducer 4 (SIMPLE_EDGE) - Reducer 7 <- Map 6 (SIMPLE_EDGE) + Reducer 6 <- Map 1 (SIMPLE_EDGE) #### A masked pattern was here #### Vertices: Map 1 Map Operator Tree: TableScan alias: b - filterExpr: key is not null (type: boolean) + filterExpr: (key is not null or (key > '9')) (type: boolean) Review comment: `ConstantPropagateProcFactory.foldExpr` ########## File path: ql/src/test/results/clientpositive/llap/subquery_multi.q.out ########## @@ -3040,11 +3037,12 @@ STAGE PLANS: Tez #### A masked pattern was here #### Edges: - Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 8 (SIMPLE_EDGE) Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE) Reducer 4 <- Map 1 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE) - Reducer 5 <- Reducer 4 (XPROD_EDGE), Reducer 6 (XPROD_EDGE) + Reducer 5 <- Reducer 4 (XPROD_EDGE), Reducer 7 (XPROD_EDGE) Reducer 6 <- Map 1 (CUSTOM_SIMPLE_EDGE) + Reducer 7 <- Map 1 (CUSTOM_SIMPLE_EDGE) Review comment: Same as with other subquery tests, it seems in these cases reutilization is less. ########## File path: ql/src/test/results/clientpositive/llap/subquery_in.q.out ########## @@ -5078,9 +5087,10 @@ STAGE PLANS: Edges: Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE) Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE) - Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE) + Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE) Reducer 5 <- Map 1 (SIMPLE_EDGE) Reducer 6 <- Map 1 (SIMPLE_EDGE) + Reducer 7 <- Map 1 (SIMPLE_EDGE) Review comment: It seems we have an additional stage here (though in this case, new mode is disabled). ########## File path: ql/src/test/results/clientpositive/perf/tez/constraints/query44.q.out ########## @@ -103,102 +107,143 @@ Stage-0 Top N Key Operator [TNK_99] (rows=6951 width=218) keys:_col1,top n:100 Merge Join Operator [MERGEJOIN_116] (rows=6951 width=218) - Conds:RS_66._col2=RS_146._col0(Inner),Output:["_col1","_col5","_col7"] - <-Map 11 [SIMPLE_EDGE] vectorized - SHUFFLE [RS_146] + Conds:RS_66._col2=RS_163._col0(Inner),Output:["_col1","_col5","_col7"] + <-Map 14 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_163] PartitionCols:_col0 - Select Operator [SEL_144] (rows=462000 width=111) + Select Operator [SEL_161] (rows=462000 width=111) Output:["_col0","_col1"] TableScan [TS_56] (rows=462000 width=111) default@item,i1,Tbl:COMPLETE,Col:COMPLETE,Output:["i_item_sk","i_product_name"] <-Reducer 6 [SIMPLE_EDGE] SHUFFLE [RS_66] PartitionCols:_col2 Merge Join Operator [MERGEJOIN_115] (rows=6951 width=115) - Conds:RS_63._col0=RS_145._col0(Inner),Output:["_col1","_col2","_col5"] - <-Map 11 [SIMPLE_EDGE] vectorized - SHUFFLE [RS_145] + Conds:RS_63._col0=RS_162._col0(Inner),Output:["_col1","_col2","_col5"] + <-Map 14 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_162] PartitionCols:_col0 - Please refer to the previous Select Operator [SEL_144] + Please refer to the previous Select Operator [SEL_161] <-Reducer 5 [SIMPLE_EDGE] SHUFFLE [RS_63] PartitionCols:_col0 Merge Join Operator [MERGEJOIN_114] (rows=6951 width=12) - Conds:RS_138._col1=RS_143._col1(Inner),Output:["_col0","_col1","_col2"] - <-Reducer 4 [SIMPLE_EDGE] vectorized - SHUFFLE [RS_138] + Conds:RS_146._col1=RS_160._col1(Inner),Output:["_col0","_col1","_col2"] + <-Reducer 12 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_160] PartitionCols:_col1 - Select Operator [SEL_137] (rows=6951 width=8) + Select Operator [SEL_159] (rows=6951 width=8) Output:["_col0","_col1"] - Filter Operator [FIL_136] (rows=6951 width=116) + Filter Operator [FIL_158] (rows=6951 width=116) predicate:(rank_window_0 < 11) - PTF Operator [PTF_135] (rows=20854 width=116) - Function definitions:[{},{"name:":"windowingtablefunction","order by:":"_col1 ASC NULLS LAST","partition by:":"0"}] - Select Operator [SEL_134] (rows=20854 width=116) + PTF Operator [PTF_157] (rows=20854 width=116) + Function definitions:[{},{"name:":"windowingtablefunction","order by:":"_col1 DESC NULLS FIRST","partition by:":"0"}] + Select Operator [SEL_156] (rows=20854 width=116) Output:["_col0","_col1"] - <-Reducer 3 [SIMPLE_EDGE] - SHUFFLE [RS_21] + <-Reducer 11 [SIMPLE_EDGE] + SHUFFLE [RS_49] PartitionCols:0 - Top N Key Operator [TNK_100] (rows=20854 width=228) + Top N Key Operator [TNK_101] (rows=20854 width=228) keys:_col1,top n:11 - Filter Operator [FIL_20] (rows=20854 width=228) + Filter Operator [FIL_48] (rows=20854 width=228) predicate:(_col1 > (0.9 * _col2)) - Merge Join Operator [MERGEJOIN_112] (rows=62562 width=228) + Merge Join Operator [MERGEJOIN_113] (rows=62562 width=228) Conds:(Inner),Output:["_col0","_col1","_col2"] <-Reducer 10 [CUSTOM_SIMPLE_EDGE] vectorized - PARTITION_ONLY_SHUFFLE [RS_133] - Select Operator [SEL_132] (rows=1 width=112) + PARTITION_ONLY_SHUFFLE [RS_150] + Select Operator [SEL_149] (rows=62562 width=116) + Output:["_col0","_col1"] + Filter Operator [FIL_148] (rows=62562 width=124) + predicate:CAST( (_col1 / _col2) AS decimal(11,6)) is not null + Group By Operator [GBY_147] (rows=62562 width=124) + Output:["_col0","_col1","_col2"],aggregations:["sum(VALUE._col0)","count(VALUE._col1)"],keys:KEY._col0 + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_131] + PartitionCols:_col0 + Group By Operator [GBY_127] (rows=3199976 width=124) + Output:["_col0","_col1","_col2"],aggregations:["sum(ss_net_profit)","count(ss_net_profit)"],keys:ss_item_sk + Select Operator [SEL_123] (rows=6399952 width=114) + Output:["ss_item_sk","ss_net_profit"] + Filter Operator [FIL_119] (rows=6399952 width=114) + predicate:(ss_store_sk = 410) + TableScan [TS_0] (rows=575995635 width=114) + default@store_sales,ss1,Tbl:COMPLETE,Col:COMPLETE,Output:["ss_item_sk","ss_store_sk","ss_net_profit","ss_hdemo_sk"] Review comment: This is similar to mv_query44. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java ########## @@ -159,9 +158,15 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { // Gather information about the DPP table scans and store it in the cache gatherDPPTableScanOps(pctx, optimizerCache); + BaseSharedWorkOptimizer swo; + if (pctx.getConf().getBoolVar(ConfVars.HIVE_SHARED_WORK_MERGE_TS_SCHEMA)) { + swo = new BaseSharedWorkOptimizer(); Review comment: Does this need to be swapped with the one below? It seems that if HIVE_SHARED_WORK_MERGE_TS_SCHEMA is true, we should use the SchemaAwareSharedWorkOptimizer (or maybe the name is a bit misleading)? ########## File path: ql/src/test/queries/clientpositive/explainuser_1.q ########## @@ -9,6 +9,7 @@ --! qt:dataset:cbo_t1 set hive.vectorized.execution.enabled=false; set hive.strict.checks.bucketing=false; +set hive.optimize.shared.work.dppunion=false; Review comment: Any reason to set this to `false` here? ########## File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java ########## @@ -326,6 +372,7 @@ public boolean sharedWorkOptimization(ParseContext pctx, SharedWorkOptimizerCach LOG.debug("{} and {} cannot be merged", retainableTsOp, discardableTsOp); continue; } + // FIXME: I think this optimization is assymetric; but the check is symmetric Review comment: Is this a TODO for this patch or follow-up work? Also there is a typo (assymetric) ########## File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java ########## @@ -386,125 +456,81 @@ public boolean sharedWorkOptimization(ParseContext pctx, SharedWorkOptimizerCach LOG.debug("Merging subtree starting at {} into subtree starting at {}", discardableTsOp, retainableTsOp); } else { - ExprNodeDesc newRetainableTsFilterExpr = null; - List<ExprNodeDesc> semijoinExprNodes = new ArrayList<>(); - if (retainableTsOp.getConf().getFilterExpr() != null) { - // Gather SJ expressions and normal expressions - List<ExprNodeDesc> allExprNodesExceptSemijoin = new ArrayList<>(); - splitExpressions(retainableTsOp.getConf().getFilterExpr(), - allExprNodesExceptSemijoin, semijoinExprNodes); - // Create new expressions - if (allExprNodesExceptSemijoin.size() > 1) { - newRetainableTsFilterExpr = ExprNodeGenericFuncDesc.newInstance( - new GenericUDFOPAnd(), allExprNodesExceptSemijoin); - } else if (allExprNodesExceptSemijoin.size() > 0 && - allExprNodesExceptSemijoin.get(0) instanceof ExprNodeGenericFuncDesc) { - newRetainableTsFilterExpr = allExprNodesExceptSemijoin.get(0); - } - // Push filter on top of children for retainable - pushFilterToTopOfTableScan(optimizerCache, retainableTsOp); + + if (sr.discardableOps.size() > 1) { + throw new RuntimeException("we can't discard more in this path"); } - ExprNodeDesc newDiscardableTsFilterExpr = null; - if (discardableTsOp.getConf().getFilterExpr() != null) { - // If there is a single discardable operator, it is a TableScanOperator - // and it means that we will merge filter expressions for it. Thus, we - // might need to remove DPP predicates before doing that - List<ExprNodeDesc> allExprNodesExceptSemijoin = new ArrayList<>(); - splitExpressions(discardableTsOp.getConf().getFilterExpr(), - allExprNodesExceptSemijoin, new ArrayList<>()); - // Create new expressions - if (allExprNodesExceptSemijoin.size() > 1) { - newDiscardableTsFilterExpr = ExprNodeGenericFuncDesc.newInstance( - new GenericUDFOPAnd(), allExprNodesExceptSemijoin); - } else if (allExprNodesExceptSemijoin.size() > 0 && - allExprNodesExceptSemijoin.get(0) instanceof ExprNodeGenericFuncDesc) { - newDiscardableTsFilterExpr = allExprNodesExceptSemijoin.get(0); - } - // Remove and add semijoin filter from expressions - replaceSemijoinExpressions(discardableTsOp, semijoinExprNodes); - // Push filter on top of children for discardable - pushFilterToTopOfTableScan(optimizerCache, discardableTsOp); + + SharedWorkModel modelR = new SharedWorkModel(retainableTsOp); + SharedWorkModel modelD = new SharedWorkModel(discardableTsOp); + + // Push filter on top of children for retainable + pushFilterToTopOfTableScan(optimizerCache, retainableTsOp); + + if (mode == Mode.RemoveSemijoin || mode == Mode.SubtreeMerge) { + // FIXME: I think idea here is to clear the discardable's semijoin filter Review comment: Is this a TODO for this patch or follow-up work? ########## File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java ########## @@ -284,6 +294,41 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { return pctx; } + public enum Mode { + RemoveSemijoin, SubtreeMerge, DPPUnion, + } + + static class SharedWorkModel { + + private TableScanOperator ts; + private ExprNodeDesc normalFilterExpr; + private List<ExprNodeDesc> semijoinExprNodes = new ArrayList<>(); + + public SharedWorkModel(TableScanOperator ts) throws UDFArgumentException { + this.ts = ts; + TableScanOperator retainableTsOp = ts; + if (retainableTsOp.getConf().getFilterExpr() != null) { + // Gather SJ expressions and normal expressions + List<ExprNodeDesc> allExprNodesExceptSemijoin = new ArrayList<>(); + splitExpressions(retainableTsOp.getConf().getFilterExpr(), + allExprNodesExceptSemijoin, semijoinExprNodes); + // Create new expressions + if (allExprNodesExceptSemijoin.size() > 1) { + normalFilterExpr = ExprNodeGenericFuncDesc.newInstance( Review comment: it seems you can use `conjunction` method here too. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java ########## @@ -386,125 +456,81 @@ public boolean sharedWorkOptimization(ParseContext pctx, SharedWorkOptimizerCach LOG.debug("Merging subtree starting at {} into subtree starting at {}", discardableTsOp, retainableTsOp); } else { - ExprNodeDesc newRetainableTsFilterExpr = null; - List<ExprNodeDesc> semijoinExprNodes = new ArrayList<>(); - if (retainableTsOp.getConf().getFilterExpr() != null) { - // Gather SJ expressions and normal expressions - List<ExprNodeDesc> allExprNodesExceptSemijoin = new ArrayList<>(); - splitExpressions(retainableTsOp.getConf().getFilterExpr(), - allExprNodesExceptSemijoin, semijoinExprNodes); - // Create new expressions - if (allExprNodesExceptSemijoin.size() > 1) { - newRetainableTsFilterExpr = ExprNodeGenericFuncDesc.newInstance( - new GenericUDFOPAnd(), allExprNodesExceptSemijoin); - } else if (allExprNodesExceptSemijoin.size() > 0 && - allExprNodesExceptSemijoin.get(0) instanceof ExprNodeGenericFuncDesc) { - newRetainableTsFilterExpr = allExprNodesExceptSemijoin.get(0); - } - // Push filter on top of children for retainable - pushFilterToTopOfTableScan(optimizerCache, retainableTsOp); + + if (sr.discardableOps.size() > 1) { + throw new RuntimeException("we can't discard more in this path"); } - ExprNodeDesc newDiscardableTsFilterExpr = null; - if (discardableTsOp.getConf().getFilterExpr() != null) { - // If there is a single discardable operator, it is a TableScanOperator - // and it means that we will merge filter expressions for it. Thus, we - // might need to remove DPP predicates before doing that - List<ExprNodeDesc> allExprNodesExceptSemijoin = new ArrayList<>(); - splitExpressions(discardableTsOp.getConf().getFilterExpr(), - allExprNodesExceptSemijoin, new ArrayList<>()); - // Create new expressions - if (allExprNodesExceptSemijoin.size() > 1) { - newDiscardableTsFilterExpr = ExprNodeGenericFuncDesc.newInstance( - new GenericUDFOPAnd(), allExprNodesExceptSemijoin); - } else if (allExprNodesExceptSemijoin.size() > 0 && - allExprNodesExceptSemijoin.get(0) instanceof ExprNodeGenericFuncDesc) { - newDiscardableTsFilterExpr = allExprNodesExceptSemijoin.get(0); - } - // Remove and add semijoin filter from expressions - replaceSemijoinExpressions(discardableTsOp, semijoinExprNodes); - // Push filter on top of children for discardable - pushFilterToTopOfTableScan(optimizerCache, discardableTsOp); + + SharedWorkModel modelR = new SharedWorkModel(retainableTsOp); + SharedWorkModel modelD = new SharedWorkModel(discardableTsOp); + + // Push filter on top of children for retainable + pushFilterToTopOfTableScan(optimizerCache, retainableTsOp); + + if (mode == Mode.RemoveSemijoin || mode == Mode.SubtreeMerge) { + // FIXME: I think idea here is to clear the discardable's semijoin filter + // - by using the retainable's (which should be empty in case of this mode) + replaceSemijoinExpressions(discardableTsOp, modelR.getSemiJoinFilter()); } + // Push filter on top of children for discardable + pushFilterToTopOfTableScan(optimizerCache, discardableTsOp); + // Obtain filter for shared TS operator - ExprNodeGenericFuncDesc exprNode = null; - if (newRetainableTsFilterExpr != null && newDiscardableTsFilterExpr != null) { - // Combine - exprNode = (ExprNodeGenericFuncDesc) newRetainableTsFilterExpr; - if (!exprNode.isSame(newDiscardableTsFilterExpr)) { - // We merge filters from previous scan by ORing with filters from current scan - if (exprNode.getGenericUDF() instanceof GenericUDFOPOr) { - List<ExprNodeDesc> newChildren = new ArrayList<>(exprNode.getChildren().size() + 1); - for (ExprNodeDesc childExprNode : exprNode.getChildren()) { - if (childExprNode.isSame(newDiscardableTsFilterExpr)) { - // We do not need to do anything, it is in the OR expression - break; - } - newChildren.add(childExprNode); - } - if (exprNode.getChildren().size() == newChildren.size()) { - newChildren.add(newDiscardableTsFilterExpr); - exprNode = ExprNodeGenericFuncDesc.newInstance( - new GenericUDFOPOr(), - newChildren); - } - } else { - exprNode = ExprNodeGenericFuncDesc.newInstance( - new GenericUDFOPOr(), - Arrays.asList(exprNode, newDiscardableTsFilterExpr)); - } - } + ExprNodeDesc exprNode = null; + if (modelR.normalFilterExpr != null && modelD.normalFilterExpr != null) { + exprNode = disjunction(modelR.normalFilterExpr, modelD.normalFilterExpr); } - // Create expression node that will be used for the retainable table scan - if (!semijoinExprNodes.isEmpty()) { - if (exprNode != null) { - semijoinExprNodes.add(0, exprNode); - } - if (semijoinExprNodes.size() > 1) { - exprNode = ExprNodeGenericFuncDesc.newInstance( - new GenericUDFOPAnd(), semijoinExprNodes); - } else { - exprNode = (ExprNodeGenericFuncDesc) semijoinExprNodes.get(0); - } + List<ExprNodeDesc> semiJoinExpr = null; + if (mode == Mode.DPPUnion) { + assert modelR.semijoinExprNodes != null; + assert modelD.semijoinExprNodes != null; + ExprNodeDesc disjunction = disjunction(conjunction(modelR.semijoinExprNodes), conjunction(modelD.semijoinExprNodes)); + semiJoinExpr = disjunction == null ? null : Lists.newArrayList(disjunction); + } else { + semiJoinExpr = modelR.semijoinExprNodes; } + + // Create expression node that will be used for the retainable table scan + exprNode = conjunction(semiJoinExpr, exprNode); // Replace filter - retainableTsOp.getConf().setFilterExpr(exprNode); + retainableTsOp.getConf().setFilterExpr((ExprNodeGenericFuncDesc) exprNode); // Replace table scan operator List<Operator<? extends OperatorDesc>> allChildren = Lists.newArrayList(discardableTsOp.getChildOperators()); for (Operator<? extends OperatorDesc> op : allChildren) { - discardableTsOp.getChildOperators().remove(op); op.replaceParent(discardableTsOp, retainableTsOp); retainableTsOp.getChildOperators().add(op); } + discardableTsOp.getChildOperators().clear(); LOG.debug("Merging {} into {}", discardableTsOp, retainableTsOp); } // First we remove the input operators of the expression that Review comment: Probably this comment should go within the `if` clause now. ########## File path: ql/src/test/results/clientpositive/llap/cbo_SortUnionTransposeRule.q.out ########## @@ -1006,6 +1027,22 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Reducer 5 + Execution mode: vectorized, llap Review comment: Why do we end up with an additional reducer in this plan? ########## File path: ql/src/test/queries/clientpositive/subquery_ALL.q ########## @@ -1,4 +1,5 @@ --! qt:dataset:part +set hive.optimize.shared.work.dppunion=false; Review comment: It seems there is a few of these tests for which the new mode is disabled. Can we enable it? These kind of complicated subqueries where we have multiple repetitions of same tables is where it can become pretty useful. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java ########## @@ -284,6 +294,41 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { return pctx; } + public enum Mode { + RemoveSemijoin, SubtreeMerge, DPPUnion, Review comment: Can we add a comment about each of these modes? ########## File path: ql/src/test/results/clientpositive/llap/keep_uniform.q.out ########## @@ -516,6 +516,21 @@ STAGE PLANS: valueColumns: 15:int Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: int) + Filter Operator Review comment: There is not new reutilization opportunities, but there is additional filter expressions. Expected? ########## File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java ########## @@ -284,6 +294,41 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { return pctx; } + public enum Mode { + RemoveSemijoin, SubtreeMerge, DPPUnion, + } + + static class SharedWorkModel { Review comment: Comment about this class. It seems it holds a TS, the filter expression that does not include the SJ, and the SJ filters? ########## File path: ql/src/test/results/clientpositive/llap/dynamic_partition_pruning.q.out ########## @@ -4277,21 +4277,37 @@ STAGE PLANS: alias: srcpart filterExpr: ds is not null (type: boolean) Statistics: Num rows: 2000 Data size: 389248 Basic stats: COMPLETE Column stats: COMPLETE - Group By Operator - keys: ds (type: string) - minReductionHashAggr: 0.99 - mode: hash - outputColumnNames: _col0 - Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - key expressions: _col0 (type: string) - null sort order: z - sort order: + - Map-reduce partition columns: _col0 (type: string) + Filter Operator + predicate: ds is not null (type: boolean) Review comment: It seems we are introducing a redundant filter in the new plan. Note that before it was only present in TS operator. ########## File path: ql/src/test/results/clientpositive/llap/ppd_repeated_alias.q.out ########## @@ -348,14 +348,14 @@ STAGE PLANS: #### A masked pattern was here #### Edges: Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) - Reducer 3 <- Map 4 (XPROD_EDGE), Reducer 2 (XPROD_EDGE) + Reducer 3 <- Map 1 (XPROD_EDGE), Reducer 2 (XPROD_EDGE) #### A masked pattern was here #### Vertices: Map 1 Map Operator Tree: TableScan alias: c - filterExpr: foo is not null (type: boolean) + filterExpr: (foo is not null or (foo = 1)) (type: boolean) Review comment: Looking at this filter expression, I think we could call `foldExpr` when the new expression is created in SWO, in order to simplify these filters. ########## File path: ql/src/test/results/clientpositive/llap/subquery_in.q.out ########## @@ -4355,6 +4355,9 @@ STAGE PLANS: sort order: + Map-reduce partition columns: _col0 (type: string) Statistics: Num rows: 13 Data size: 1352 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator Review comment: Additional filter (though in this case new mode is disabled?) ########## File path: ql/src/test/results/clientpositive/llap/sharedworkresidual.q.out ########## @@ -143,6 +143,10 @@ STAGE PLANS: sort order: Statistics: Num rows: 1 Data size: 188 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: string) + Select Operator Review comment: Missed opportunity (maybe not very relevant but wanted to point out in case there is some oversight). ########## File path: ql/src/test/results/clientpositive/perf/tez/constraints/query2.q.out ########## @@ -128,46 +128,104 @@ Plan optimized by CBO. Vertex dependency in root stage Map 1 <- Union 2 (CONTAINS) -Map 9 <- Union 2 (CONTAINS) -Reducer 3 <- Map 10 (SIMPLE_EDGE), Union 2 (SIMPLE_EDGE) +Map 13 <- Union 14 (CONTAINS) +Map 15 <- Union 14 (CONTAINS) +Map 8 <- Union 2 (CONTAINS) +Reducer 10 <- Map 9 (SIMPLE_EDGE), Union 14 (SIMPLE_EDGE) +Reducer 11 <- Reducer 10 (SIMPLE_EDGE) +Reducer 12 <- Map 9 (SIMPLE_EDGE), Reducer 11 (SIMPLE_EDGE) +Reducer 3 <- Map 9 (SIMPLE_EDGE), Union 2 (SIMPLE_EDGE) Reducer 4 <- Reducer 3 (SIMPLE_EDGE) -Reducer 5 <- Map 10 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE) -Reducer 6 <- Reducer 5 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE) +Reducer 5 <- Map 9 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE) +Reducer 6 <- Reducer 12 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE) Reducer 7 <- Reducer 6 (SIMPLE_EDGE) -Reducer 8 <- Map 10 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE) Stage-0 Fetch Operator limit:-1 Stage-1 Reducer 7 vectorized - File Output Operator [FS_173] - Select Operator [SEL_172] (rows=12881 width=788) + File Output Operator [FS_187] + Select Operator [SEL_186] (rows=12881 width=788) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7"] <-Reducer 6 [SIMPLE_EDGE] SHUFFLE [RS_57] Select Operator [SEL_56] (rows=12881 width=788) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7"] Merge Join Operator [MERGEJOIN_146] (rows=12881 width=1572) Conds:RS_53.(_col0 - 53)=RS_54._col0(Inner),Output:["_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col9","_col10","_col11","_col12","_col13","_col14","_col15","_col16"] + <-Reducer 12 [SIMPLE_EDGE] + SHUFFLE [RS_54] + PartitionCols:_col0 + Merge Join Operator [MERGEJOIN_145] (rows=652 width=788) + Conds:RS_185._col0=RS_181._col0(Inner),Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7"] + <-Map 9 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_181] + PartitionCols:_col0 + Select Operator [SEL_177] (rows=652 width=4) + Output:["_col0"] + Filter Operator [FIL_173] (rows=652 width=8) + predicate:((d_year = 2001) and d_week_seq is not null) + TableScan [TS_8] (rows=73049 width=99) + default@date_dim,date_dim,Tbl:COMPLETE,Col:COMPLETE,Output:["d_date_sk","d_week_seq","d_day_name","d_year"] + <-Reducer 11 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_185] + PartitionCols:_col0 + Group By Operator [GBY_184] (rows=13152 width=788) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7"],aggregations:["sum(VALUE._col0)","sum(VALUE._col1)","sum(VALUE._col2)","sum(VALUE._col3)","sum(VALUE._col4)","sum(VALUE._col5)","sum(VALUE._col6)"],keys:KEY._col0 + <-Reducer 10 [SIMPLE_EDGE] + SHUFFLE [RS_40] + PartitionCols:_col0 + Group By Operator [GBY_39] (rows=3182784 width=788) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7"],aggregations:["sum(_col1)","sum(_col2)","sum(_col3)","sum(_col4)","sum(_col5)","sum(_col6)","sum(_col7)"],keys:_col0 + Select Operator [SEL_37] (rows=430516591 width=143) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7"] + Merge Join Operator [MERGEJOIN_144] (rows=430516591 width=143) + Conds:Union 14._col0=RS_180._col0(Inner),Output:["_col1","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10"] + <-Map 9 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_180] + PartitionCols:_col0 + Select Operator [SEL_176] (rows=73049 width=36) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8"] + Filter Operator [FIL_172] (rows=73049 width=99) + predicate:d_week_seq is not null + Please refer to the previous TableScan [TS_8] + <-Union 14 [SIMPLE_EDGE] + <-Map 13 [CONTAINS] vectorized + Reduce Output Operator [RS_193] + PartitionCols:_col0 + Select Operator [SEL_192] (rows=143966864 width=115) + Output:["_col0","_col1"] + Filter Operator [FIL_191] (rows=143966864 width=115) + predicate:ws_sold_date_sk is not null + TableScan [TS_157] (rows=144002668 width=115) + default@web_sales,web_sales,Tbl:COMPLETE,Col:COMPLETE,Output:["ws_sold_date_sk","ws_ext_sales_price"] + <-Map 15 [CONTAINS] vectorized + Reduce Output Operator [RS_196] + PartitionCols:_col0 + Select Operator [SEL_195] (rows=286549727 width=115) + Output:["_col0","_col1"] + Filter Operator [FIL_194] (rows=286549727 width=115) + predicate:cs_sold_date_sk is not null + TableScan [TS_162] (rows=287989836 width=115) + default@catalog_sales,catalog_sales,Tbl:COMPLETE,Col:COMPLETE,Output:["cs_sold_date_sk","cs_ext_sales_price"] Review comment: This seems to be another missing opportunity. (Btw, I am pointing out the missing optimizations... But there are certainly some improvements too! It is just that we should understand why we are regressing in these cases, if the missed opportunity can lead to a significant performance hit, and in that case, if there is anything that can be done within the scope of this patch, or otherwise what we should be doing in a follow-up to fix it) ########## File path: ql/src/test/results/clientpositive/perf/tez/constraints/mv_query44.q.out ########## @@ -125,102 +129,143 @@ Stage-0 Top N Key Operator [TNK_99] (rows=6951 width=218) keys:_col1,top n:100 Merge Join Operator [MERGEJOIN_116] (rows=6951 width=218) - Conds:RS_66._col2=RS_146._col0(Inner),Output:["_col1","_col5","_col7"] - <-Map 11 [SIMPLE_EDGE] vectorized - SHUFFLE [RS_146] + Conds:RS_66._col2=RS_163._col0(Inner),Output:["_col1","_col5","_col7"] + <-Map 14 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_163] PartitionCols:_col0 - Select Operator [SEL_144] (rows=462000 width=111) + Select Operator [SEL_161] (rows=462000 width=111) Output:["_col0","_col1"] TableScan [TS_56] (rows=462000 width=111) default@item,i1,Tbl:COMPLETE,Col:COMPLETE,Output:["i_item_sk","i_product_name"] <-Reducer 6 [SIMPLE_EDGE] SHUFFLE [RS_66] PartitionCols:_col2 Merge Join Operator [MERGEJOIN_115] (rows=6951 width=115) - Conds:RS_63._col0=RS_145._col0(Inner),Output:["_col1","_col2","_col5"] - <-Map 11 [SIMPLE_EDGE] vectorized - SHUFFLE [RS_145] + Conds:RS_63._col0=RS_162._col0(Inner),Output:["_col1","_col2","_col5"] + <-Map 14 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_162] PartitionCols:_col0 - Please refer to the previous Select Operator [SEL_144] + Please refer to the previous Select Operator [SEL_161] <-Reducer 5 [SIMPLE_EDGE] SHUFFLE [RS_63] PartitionCols:_col0 Merge Join Operator [MERGEJOIN_114] (rows=6951 width=12) - Conds:RS_138._col1=RS_143._col1(Inner),Output:["_col0","_col1","_col2"] - <-Reducer 4 [SIMPLE_EDGE] vectorized - SHUFFLE [RS_138] + Conds:RS_146._col1=RS_160._col1(Inner),Output:["_col0","_col1","_col2"] + <-Reducer 12 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_160] PartitionCols:_col1 - Select Operator [SEL_137] (rows=6951 width=8) + Select Operator [SEL_159] (rows=6951 width=8) Output:["_col0","_col1"] - Filter Operator [FIL_136] (rows=6951 width=116) + Filter Operator [FIL_158] (rows=6951 width=116) predicate:(rank_window_0 < 11) - PTF Operator [PTF_135] (rows=20854 width=116) - Function definitions:[{},{"name:":"windowingtablefunction","order by:":"_col1 ASC NULLS LAST","partition by:":"0"}] - Select Operator [SEL_134] (rows=20854 width=116) + PTF Operator [PTF_157] (rows=20854 width=116) + Function definitions:[{},{"name:":"windowingtablefunction","order by:":"_col1 DESC NULLS FIRST","partition by:":"0"}] + Select Operator [SEL_156] (rows=20854 width=116) Output:["_col0","_col1"] - <-Reducer 3 [SIMPLE_EDGE] - SHUFFLE [RS_21] + <-Reducer 11 [SIMPLE_EDGE] + SHUFFLE [RS_49] PartitionCols:0 - Top N Key Operator [TNK_100] (rows=20854 width=228) + Top N Key Operator [TNK_101] (rows=20854 width=228) keys:_col1,top n:11 - Filter Operator [FIL_20] (rows=20854 width=228) + Filter Operator [FIL_48] (rows=20854 width=228) predicate:(_col1 > (0.9 * _col2)) - Merge Join Operator [MERGEJOIN_112] (rows=62562 width=228) + Merge Join Operator [MERGEJOIN_113] (rows=62562 width=228) Conds:(Inner),Output:["_col0","_col1","_col2"] <-Reducer 10 [CUSTOM_SIMPLE_EDGE] vectorized - PARTITION_ONLY_SHUFFLE [RS_133] - Select Operator [SEL_132] (rows=1 width=112) + PARTITION_ONLY_SHUFFLE [RS_150] + Select Operator [SEL_149] (rows=62562 width=116) + Output:["_col0","_col1"] + Filter Operator [FIL_148] (rows=62562 width=124) + predicate:CAST( (_col1 / _col2) AS decimal(11,6)) is not null + Group By Operator [GBY_147] (rows=62562 width=124) + Output:["_col0","_col1","_col2"],aggregations:["sum(VALUE._col0)","count(VALUE._col1)"],keys:KEY._col0 + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_131] + PartitionCols:_col0 + Group By Operator [GBY_127] (rows=3199976 width=124) + Output:["_col0","_col1","_col2"],aggregations:["sum(ss_net_profit)","count(ss_net_profit)"],keys:ss_item_sk + Select Operator [SEL_123] (rows=6399952 width=114) + Output:["ss_item_sk","ss_net_profit"] + Filter Operator [FIL_119] (rows=6399952 width=114) + predicate:(ss_store_sk = 410) + TableScan [TS_0] (rows=575995635 width=114) + default@store_sales,ss1,Tbl:COMPLETE,Col:COMPLETE,Output:["ss_item_sk","ss_store_sk","ss_net_profit","ss_hdemo_sk"] Review comment: It seems in this case, we are missing a reutilization opportunity. ########## File path: ql/src/test/results/clientpositive/llap/special_character_in_tabnames_quotes_1.q.out ########## @@ -2896,10 +2896,11 @@ STAGE PLANS: Tez #### A masked pattern was here #### Edges: - Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 5 (CUSTOM_SIMPLE_EDGE) + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 6 (CUSTOM_SIMPLE_EDGE) Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE) Reducer 4 <- Reducer 3 (SIMPLE_EDGE) Reducer 5 <- Map 1 (CUSTOM_SIMPLE_EDGE) + Reducer 6 <- Map 1 (CUSTOM_SIMPLE_EDGE) Review comment: It seems in this case optimization led to an additional stage. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For additional commands, e-mail: gitbox-h...@hive.apache.org