Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Tue Oct 14 19:06:45 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.optimizer; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -29,12 +30,17 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; +import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; +import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.MuxOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; @@ -42,12 +48,16 @@ import org.apache.hadoop.hive.ql.parse.O import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; +import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.JoinCondDesc; +import org.apache.hadoop.hive.ql.plan.JoinDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.OpTraits; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.Statistics; +import org.apache.hadoop.util.ReflectionUtils; /** * ConvertJoinMapJoin is an optimization that replaces a common join @@ -60,39 +70,46 @@ public class ConvertJoinMapJoin implemen static final private Log LOG = LogFactory.getLog(ConvertJoinMapJoin.class.getName()); + @SuppressWarnings("unchecked") @Override - /* - * (non-Javadoc) - * we should ideally not modify the tree we traverse. - * However, since we need to walk the tree at any time when we modify the - * operator, we might as well do it here. - */ - public Object process(Node nd, Stack<Node> stack, - NodeProcessorCtx procCtx, Object... nodeOutputs) - throws SemanticException { + /* + * (non-Javadoc) we should ideally not modify the tree we traverse. However, + * since we need to walk the tree at any time when we modify the operator, we + * might as well do it here. + */ + public Object + process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) + throws SemanticException { OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx; - if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) { + JoinOperator joinOp = (JoinOperator) nd; + + if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN) + && !(context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN))) { + // we are just converting to a common merge join operator. The shuffle + // join in map-reduce case. + int pos = 0; // it doesn't matter which position we use in this case. + convertJoinSMBJoin(joinOp, context, pos, 0, false, false); return null; } - JoinOperator joinOp = (JoinOperator) nd; - // if we have traits, and table info is present in the traits, we know the + // if we have traits, and table info is present in the traits, we know the // exact number of buckets. Else choose the largest number of estimated // reducers from the parent operators. int numBuckets = -1; int estimatedBuckets = -1; + TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf); if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) { for (Operator<? extends OperatorDesc>parentOp : joinOp.getParentOperators()) { if (parentOp.getOpTraits().getNumBuckets() > 0) { - numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ? - parentOp.getOpTraits().getNumBuckets() : numBuckets; + numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ? + parentOp.getOpTraits().getNumBuckets() : numBuckets; } if (parentOp instanceof ReduceSinkOperator) { ReduceSinkOperator rs = (ReduceSinkOperator)parentOp; - estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ? + estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ? rs.getConf().getNumReducers() : estimatedBuckets; } } @@ -107,29 +124,80 @@ public class ConvertJoinMapJoin implemen numBuckets = 1; } LOG.info("Estimated number of buckets " + numBuckets); - int mapJoinConversionPos = mapJoinConversionPos(joinOp, context, numBuckets); + int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets); if (mapJoinConversionPos < 0) { - // we cannot convert to bucket map join, we cannot convert to - // map join either based on the size + // we cannot convert to bucket map join, we cannot convert to + // map join either based on the size. Check if we can convert to SMB join. + if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) { + convertJoinSMBJoin(joinOp, context, 0, 0, false, false); + return null; + } + Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null; + try { + bigTableMatcherClass = + (Class<? extends BigTableSelectorForAutoSMJ>) (Class.forName(HiveConf.getVar( + context.parseContext.getConf(), + HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN_BIGTABLE_SELECTOR))); + } catch (ClassNotFoundException e) { + throw new SemanticException(e.getMessage()); + } + + BigTableSelectorForAutoSMJ bigTableMatcher = + ReflectionUtils.newInstance(bigTableMatcherClass, null); + JoinDesc joinDesc = joinOp.getConf(); + JoinCondDesc[] joinCondns = joinDesc.getConds(); + Set<Integer> joinCandidates = MapJoinProcessor.getBigTableCandidates(joinCondns); + if (joinCandidates.isEmpty()) { + // This is a full outer join. This can never be a map-join + // of any type. So return false. + return false; + } + mapJoinConversionPos = + bigTableMatcher.getBigTablePosition(context.parseContext, joinOp, joinCandidates); + if (mapJoinConversionPos < 0) { + // contains aliases from sub-query + // we are just converting to a common merge join operator. The shuffle + // join in map-reduce case. + int pos = 0; // it doesn't matter which position we use in this case. + convertJoinSMBJoin(joinOp, context, pos, 0, false, false); + return null; + } + + if (checkConvertJoinSMBJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) { + convertJoinSMBJoin(joinOp, context, mapJoinConversionPos, + tezBucketJoinProcCtx.getNumBuckets(), tezBucketJoinProcCtx.isSubQuery(), true); + } else { + // we are just converting to a common merge join operator. The shuffle + // join in map-reduce case. + int pos = 0; // it doesn't matter which position we use in this case. + convertJoinSMBJoin(joinOp, context, pos, 0, false, false); + } return null; } - if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) { - if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos)) { - return null; + if (numBuckets > 1) { + if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) { + if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) { + return null; + } } } LOG.info("Convert to non-bucketed map join"); // check if we can convert to map join no bucket scaling. - mapJoinConversionPos = mapJoinConversionPos(joinOp, context, 1); + mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1); if (mapJoinConversionPos < 0) { + // we are just converting to a common merge join operator. The shuffle + // join in map-reduce case. + int pos = 0; // it doesn't matter which position we use in this case. + convertJoinSMBJoin(joinOp, context, pos, 0, false, false); return null; } MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos); // map join operator by default has no bucket cols - mapJoinOp.setOpTraits(new OpTraits(null, -1)); + mapJoinOp.setOpTraits(new OpTraits(null, -1, null)); + mapJoinOp.setStatistics(joinOp.getStatistics()); // propagate this change till the next RS for (Operator<? extends OperatorDesc> childOp : mapJoinOp.getChildOperators()) { setAllChildrenTraitsToNull(childOp); @@ -138,11 +206,107 @@ public class ConvertJoinMapJoin implemen return null; } + // replaces the join operator with a new CommonJoinOperator, removes the + // parent reduce sinks + private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context, + int mapJoinConversionPos, int numBuckets, boolean isSubQuery, boolean adjustParentsChildren) + throws SemanticException { + ParseContext parseContext = context.parseContext; + MapJoinDesc mapJoinDesc = null; + if (adjustParentsChildren) { + mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, parseContext.getOpParseCtx(), + joinOp, parseContext.getJoinContext().get(joinOp), mapJoinConversionPos, true); + } else { + JoinDesc joinDesc = joinOp.getConf(); + // retain the original join desc in the map join. + mapJoinDesc = + new MapJoinDesc(null, null, joinDesc.getExprs(), null, null, + joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(), + joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null); + } + + @SuppressWarnings("unchecked") + CommonMergeJoinOperator mergeJoinOp = + (CommonMergeJoinOperator) OperatorFactory.get(new CommonMergeJoinDesc(numBuckets, + isSubQuery, mapJoinConversionPos, mapJoinDesc)); + OpTraits opTraits = + new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, joinOp.getOpTraits() + .getSortCols()); + mergeJoinOp.setOpTraits(opTraits); + mergeJoinOp.setStatistics(joinOp.getStatistics()); + + for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) { + int pos = parentOp.getChildOperators().indexOf(joinOp); + parentOp.getChildOperators().remove(pos); + parentOp.getChildOperators().add(pos, mergeJoinOp); + } + + for (Operator<? extends OperatorDesc> childOp : joinOp.getChildOperators()) { + int pos = childOp.getParentOperators().indexOf(joinOp); + childOp.getParentOperators().remove(pos); + childOp.getParentOperators().add(pos, mergeJoinOp); + } + + List<Operator<? extends OperatorDesc>> childOperators = mergeJoinOp.getChildOperators(); + if (childOperators == null) { + childOperators = new ArrayList<Operator<? extends OperatorDesc>>(); + mergeJoinOp.setChildOperators(childOperators); + } + + List<Operator<? extends OperatorDesc>> parentOperators = mergeJoinOp.getParentOperators(); + if (parentOperators == null) { + parentOperators = new ArrayList<Operator<? extends OperatorDesc>>(); + mergeJoinOp.setParentOperators(parentOperators); + } + + childOperators.clear(); + parentOperators.clear(); + childOperators.addAll(joinOp.getChildOperators()); + parentOperators.addAll(joinOp.getParentOperators()); + mergeJoinOp.getConf().setGenJoinKeys(false); + + if (adjustParentsChildren) { + mergeJoinOp.getConf().setGenJoinKeys(true); + List<Operator<? extends OperatorDesc>> newParentOpList = + new ArrayList<Operator<? extends OperatorDesc>>(); + for (Operator<? extends OperatorDesc> parentOp : mergeJoinOp.getParentOperators()) { + for (Operator<? extends OperatorDesc> grandParentOp : parentOp.getParentOperators()) { + grandParentOp.getChildOperators().remove(parentOp); + grandParentOp.getChildOperators().add(mergeJoinOp); + newParentOpList.add(grandParentOp); + } + } + mergeJoinOp.getParentOperators().clear(); + mergeJoinOp.getParentOperators().addAll(newParentOpList); + List<Operator<? extends OperatorDesc>> parentOps = + new ArrayList<Operator<? extends OperatorDesc>>(mergeJoinOp.getParentOperators()); + for (Operator<? extends OperatorDesc> parentOp : parentOps) { + int parentIndex = mergeJoinOp.getParentOperators().indexOf(parentOp); + if (parentIndex == mapJoinConversionPos) { + continue; + } + + // insert the dummy store operator here + DummyStoreOperator dummyStoreOp = new TezDummyStoreOperator(); + dummyStoreOp.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>()); + dummyStoreOp.setChildOperators(new ArrayList<Operator<? extends OperatorDesc>>()); + dummyStoreOp.getChildOperators().add(mergeJoinOp); + int index = parentOp.getChildOperators().indexOf(mergeJoinOp); + parentOp.getChildOperators().remove(index); + parentOp.getChildOperators().add(index, dummyStoreOp); + dummyStoreOp.getParentOperators().add(parentOp); + mergeJoinOp.getParentOperators().remove(parentIndex); + mergeJoinOp.getParentOperators().add(parentIndex, dummyStoreOp); + } + } + mergeJoinOp.cloneOriginalParentsList(mergeJoinOp.getParentOperators()); + } + private void setAllChildrenTraitsToNull(Operator<? extends OperatorDesc> currentOp) { if (currentOp instanceof ReduceSinkOperator) { return; } - currentOp.setOpTraits(new OpTraits(null, -1)); + currentOp.setOpTraits(new OpTraits(null, -1, null)); for (Operator<? extends OperatorDesc> childOp : currentOp.getChildOperators()) { if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof GroupByOperator)) { break; @@ -151,28 +315,26 @@ public class ConvertJoinMapJoin implemen } } - private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, - int bigTablePosition) throws SemanticException { - - TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf); + private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, + int bigTablePosition, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { if (!checkConvertJoinBucketMapJoin(joinOp, context, bigTablePosition, tezBucketJoinProcCtx)) { LOG.info("Check conversion to bucket map join failed."); return false; } - MapJoinOperator mapJoinOp = - convertJoinMapJoin(joinOp, context, bigTablePosition); + MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, bigTablePosition); MapJoinDesc joinDesc = mapJoinOp.getConf(); joinDesc.setBucketMapJoin(true); // we can set the traits for this join operator OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(), - tezBucketJoinProcCtx.getNumBuckets()); + tezBucketJoinProcCtx.getNumBuckets(), null); mapJoinOp.setOpTraits(opTraits); + mapJoinOp.setStatistics(joinOp.getStatistics()); setNumberOfBucketsOnChildren(mapJoinOp); - // Once the conversion is done, we can set the partitioner to bucket cols on the small table + // Once the conversion is done, we can set the partitioner to bucket cols on the small table Map<String, Integer> bigTableBucketNumMapping = new HashMap<String, Integer>(); bigTableBucketNumMapping.put(joinDesc.getBigTableAlias(), tezBucketJoinProcCtx.getNumBuckets()); joinDesc.setBigTableBucketNumMapping(bigTableBucketNumMapping); @@ -182,6 +344,54 @@ public class ConvertJoinMapJoin implemen return true; } + /* + * This method tries to convert a join to an SMB. This is done based on + * traits. If the sorted by columns are the same as the join columns then, we + * can convert the join to an SMB. Otherwise retain the bucket map join as it + * is still more efficient than a regular join. + */ + private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context, + int bigTablePosition, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { + + ReduceSinkOperator bigTableRS = + (ReduceSinkOperator) joinOp.getParentOperators().get(bigTablePosition); + int numBuckets = bigTableRS.getParentOperators().get(0).getOpTraits() + .getNumBuckets(); + + // the sort and bucket cols have to match on both sides for this + // transformation of the join operation + for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) { + if (!(parentOp instanceof ReduceSinkOperator)) { + // could be mux/demux operators. Currently not supported + LOG.info("Found correlation optimizer operators. Cannot convert to SMB at this time."); + return false; + } + ReduceSinkOperator rsOp = (ReduceSinkOperator) parentOp; + if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getSortCols(), rsOp + .getOpTraits().getSortCols(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx) == false) { + LOG.info("We cannot convert to SMB because the sort column names do not match."); + return false; + } + + if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getBucketColNames(), rsOp + .getOpTraits().getBucketColNames(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx) + == false) { + LOG.info("We cannot convert to SMB because bucket column names do not match."); + return false; + } + } + + boolean isSubQuery = false; + if (numBuckets < 0) { + isSubQuery = true; + numBuckets = bigTableRS.getConf().getNumReducers(); + } + tezBucketJoinProcCtx.setNumBuckets(numBuckets); + tezBucketJoinProcCtx.setIsSubQuery(isSubQuery); + LOG.info("We can convert the join to an SMB join."); + return true; + } + private void setNumberOfBucketsOnChildren(Operator<? extends OperatorDesc> currentOp) { int numBuckets = currentOp.getOpTraits().getNumBuckets(); for (Operator<? extends OperatorDesc>op : currentOp.getChildOperators()) { @@ -193,15 +403,13 @@ public class ConvertJoinMapJoin implemen } /* - * We perform the following checks to see if we can convert to a bucket map join - * 1. If the parent reduce sink of the big table side has the same emit key cols as - * its parent, we can create a bucket map join eliminating the reduce sink. - * 2. If we have the table information, we can check the same way as in Mapreduce to - * determine if we can perform a Bucket Map Join. + * If the parent reduce sink of the big table side has the same emit key cols + * as its parent, we can create a bucket map join eliminating the reduce sink. */ - private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp, - OptimizeTezProcContext context, int bigTablePosition, - TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { + private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp, + OptimizeTezProcContext context, int bigTablePosition, + TezBucketJoinProcCtx tezBucketJoinProcCtx) + throws SemanticException { // bail on mux-operator because mux operator masks the emit keys of the // constituent reduce sinks if (!(joinOp.getParentOperators().get(0) instanceof ReduceSinkOperator)) { @@ -211,14 +419,41 @@ public class ConvertJoinMapJoin implemen } ReduceSinkOperator rs = (ReduceSinkOperator) joinOp.getParentOperators().get(bigTablePosition); + List<List<String>> parentColNames = rs.getOpTraits().getBucketColNames(); + Operator<? extends OperatorDesc> parentOfParent = rs.getParentOperators().get(0); + List<List<String>> grandParentColNames = parentOfParent.getOpTraits().getBucketColNames(); + int numBuckets = parentOfParent.getOpTraits().getNumBuckets(); + // all keys matched. + if (checkColEquality(grandParentColNames, parentColNames, rs.getColumnExprMap(), + tezBucketJoinProcCtx) == false) { + LOG.info("No info available to check for bucket map join. Cannot convert"); + return false; + } + /* * this is the case when the big table is a sub-query and is probably - * already bucketed by the join column in say a group by operation + * already bucketed by the join column in say a group by operation */ - List<List<String>> colNames = rs.getParentOperators().get(0).getOpTraits().getBucketColNames(); - if ((colNames != null) && (colNames.isEmpty() == false)) { - Operator<? extends OperatorDesc>parentOfParent = rs.getParentOperators().get(0); - for (List<String>listBucketCols : parentOfParent.getOpTraits().getBucketColNames()) { + boolean isSubQuery = false; + if (numBuckets < 0) { + isSubQuery = true; + numBuckets = rs.getConf().getNumReducers(); + } + tezBucketJoinProcCtx.setNumBuckets(numBuckets); + tezBucketJoinProcCtx.setIsSubQuery(isSubQuery); + return true; + } + + private boolean checkColEquality(List<List<String>> grandParentColNames, + List<List<String>> parentColNames, Map<String, ExprNodeDesc> colExprMap, + TezBucketJoinProcCtx tezBucketJoinProcCtx) { + + if ((grandParentColNames == null) || (parentColNames == null)) { + return false; + } + + if ((parentColNames != null) && (parentColNames.isEmpty() == false)) { + for (List<String> listBucketCols : grandParentColNames) { // can happen if this operator does not carry forward the previous bucketing columns // for e.g. another join operator which does not carry one of the sides' key columns if (listBucketCols.isEmpty()) { @@ -226,9 +461,9 @@ public class ConvertJoinMapJoin implemen } int colCount = 0; // parent op is guaranteed to have a single list because it is a reduce sink - for (String colName : rs.getOpTraits().getBucketColNames().get(0)) { + for (String colName : parentColNames.get(0)) { // all columns need to be at least a subset of the parentOfParent's bucket cols - ExprNodeDesc exprNodeDesc = rs.getColumnExprMap().get(colName); + ExprNodeDesc exprNodeDesc = colExprMap.get(colName); if (exprNodeDesc instanceof ExprNodeColumnDesc) { if (((ExprNodeColumnDesc)exprNodeDesc).getColumn().equals(listBucketCols.get(colCount))) { colCount++; @@ -236,32 +471,21 @@ public class ConvertJoinMapJoin implemen break; } } - - if (colCount == rs.getOpTraits().getBucketColNames().get(0).size()) { - // all keys matched. - int numBuckets = parentOfParent.getOpTraits().getNumBuckets(); - boolean isSubQuery = false; - if (numBuckets < 0) { - isSubQuery = true; - numBuckets = rs.getConf().getNumReducers(); - } - tezBucketJoinProcCtx.setNumBuckets(numBuckets); - tezBucketJoinProcCtx.setIsSubQuery(isSubQuery); + + if (colCount == parentColNames.get(0).size()) { return true; } } } return false; } - - LOG.info("No info available to check for bucket map join. Cannot convert"); return false; } - public int mapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context, + public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context, int buckets) { - Set<Integer> bigTableCandidateSet = MapJoinProcessor. - getBigTableCandidates(joinOp.getConf().getConds()); + Set<Integer> bigTableCandidateSet = + MapJoinProcessor.getBigTableCandidates(joinOp.getConf().getConds()); long maxSize = context.conf.getLongVar( HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); @@ -287,7 +511,7 @@ public class ConvertJoinMapJoin implemen long inputSize = currInputStat.getDataSize(); if ((bigInputStat == null) || ((bigInputStat != null) && - (inputSize > bigInputStat.getDataSize()))) { + (inputSize > bigInputStat.getDataSize()))) { if (bigTableFound) { // cannot convert to map join; we've already chosen a big table @@ -347,9 +571,9 @@ public class ConvertJoinMapJoin implemen * for tez. */ - public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, + public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, int bigTablePosition) throws SemanticException { - // bail on mux operator because currently the mux operator masks the emit keys + // bail on mux operator because currently the mux operator masks the emit keys // of the constituent reduce sinks. for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) { if (parentOp instanceof MuxOperator) { @@ -359,12 +583,12 @@ public class ConvertJoinMapJoin implemen //can safely convert the join to a map join. ParseContext parseContext = context.parseContext; - MapJoinOperator mapJoinOp = MapJoinProcessor. - convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(), - joinOp, parseContext.getJoinContext().get(joinOp), bigTablePosition, true); + MapJoinOperator mapJoinOp = + MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(), joinOp, + parseContext.getJoinContext().get(joinOp), bigTablePosition, true); - Operator<? extends OperatorDesc> parentBigTableOp - = mapJoinOp.getParentOperators().get(bigTablePosition); + Operator<? extends OperatorDesc> parentBigTableOp = + mapJoinOp.getParentOperators().get(bigTablePosition); if (parentBigTableOp instanceof ReduceSinkOperator) { for (Operator<?> p : parentBigTableOp.getParentOperators()) { // we might have generated a dynamic partition operator chain. Since @@ -380,11 +604,10 @@ public class ConvertJoinMapJoin implemen } } mapJoinOp.getParentOperators().remove(bigTablePosition); - if (!(mapJoinOp.getParentOperators().contains( - parentBigTableOp.getParentOperators().get(0)))) { + if (!(mapJoinOp.getParentOperators().contains(parentBigTableOp.getParentOperators().get(0)))) { mapJoinOp.getParentOperators().add(bigTablePosition, parentBigTableOp.getParentOperators().get(0)); - } + } parentBigTableOp.getParentOperators().get(0).removeChild(parentBigTableOp); for (Operator<? extends OperatorDesc> op : mapJoinOp.getParentOperators()) { if (!(op.getChildOperators().contains(mapJoinOp))) { @@ -397,15 +620,31 @@ public class ConvertJoinMapJoin implemen return mapJoinOp; } - private boolean hasDynamicPartitionBroadcast(Operator<?> op) { - if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) { - return true; - } - for (Operator<?> c : op.getChildOperators()) { - if (hasDynamicPartitionBroadcast(c)) { - return true; + private boolean hasDynamicPartitionBroadcast(Operator<?> parent) { + boolean hasDynamicPartitionPruning = false; + + for (Operator<?> op: parent.getChildOperators()) { + while (op != null) { + if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) { + // found dynamic partition pruning operator + hasDynamicPartitionPruning = true; + break; + } + + if (op instanceof ReduceSinkOperator || op instanceof FileSinkOperator) { + // crossing reduce sink or file sink means the pruning isn't for this parent. + break; + } + + if (op.getChildOperators().size() != 1) { + // dynamic partition pruning pipeline doesn't have multiple children + break; + } + + op = op.getChildOperators().get(0); } } - return false; + + return hasDynamicPartitionPruning; } }
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Tue Oct 14 19:06:45 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.optimizer; +import com.google.common.collect.Interner; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -39,8 +40,6 @@ import org.apache.hadoop.hive.ql.exec.No import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.OperatorUtils; -import org.apache.hadoop.hive.ql.exec.OrcFileMergeOperator; -import org.apache.hadoop.hive.ql.exec.RCFileMergeOperator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; @@ -99,7 +98,6 @@ import org.apache.hadoop.hive.ql.plan.Ta import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputFormat; import java.io.Serializable; @@ -578,8 +576,6 @@ public final class GenMapRedUtils { //This read entity is a direct read entity and not an indirect read (that is when // this is being read because it is a dependency of a view). boolean isDirectRead = (parentViewInfo == null); - PlanUtils.addInput(inputs, - new ReadEntity(parseCtx.getTopToTable().get(topOp), parentViewInfo, isDirectRead)); for (Partition part : parts) { if (part.getTable().isPartitioned()) { @@ -873,6 +869,30 @@ public final class GenMapRedUtils { } } + public static void internTableDesc(Task<?> task, Interner<TableDesc> interner) { + + if (task instanceof ConditionalTask) { + for (Task tsk : ((ConditionalTask) task).getListTasks()) { + internTableDesc(tsk, interner); + } + } else if (task instanceof ExecDriver) { + MapredWork work = (MapredWork) task.getWork(); + work.getMapWork().internTable(interner); + } else if (task != null && (task.getWork() instanceof TezWork)) { + TezWork work = (TezWork)task.getWork(); + for (BaseWork w : work.getAllWorkUnsorted()) { + if (w instanceof MapWork) { + ((MapWork)w).internTable(interner); + } + } + } + if (task.getNumChild() > 0) { + for (Task childTask : task.getChildTasks()) { + internTableDesc(childTask, interner); + } + } + } + /** * create a new plan and return. * @@ -1485,7 +1505,7 @@ public final class GenMapRedUtils { * * @param fsInputDesc * @param finalName - * @param inputFormatClass + * @param inputFormatClass * @return MergeWork if table is stored as RCFile or ORCFile, * null otherwise */ @@ -1689,7 +1709,7 @@ public final class GenMapRedUtils { // There are separate configuration parameters to control whether to // merge for a map-only job // or for a map-reduce job - if (currTask.getWork() instanceof MapredWork) { + if (currTask.getWork() instanceof MapredWork) { ReduceWork reduceWork = ((MapredWork) currTask.getWork()).getReduceWork(); boolean mergeMapOnly = hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && reduceWork == null; @@ -1788,7 +1808,7 @@ public final class GenMapRedUtils { return Collections.emptyList(); } - public static List<Path> getInputPathsForPartialScan(QBParseInfo parseInfo, StringBuffer aggregationKey) + public static List<Path> getInputPathsForPartialScan(QBParseInfo parseInfo, StringBuffer aggregationKey) throws SemanticException { List<Path> inputPaths = new ArrayList<Path>(); switch (parseInfo.getTableSpec().specType) { @@ -1825,6 +1845,7 @@ public final class GenMapRedUtils { public static Set<Operator<?>> findTopOps(Operator<?> startOp, final Class<?> clazz) { final Set<Operator<?>> operators = new LinkedHashSet<Operator<?>>(); OperatorUtils.iterateParents(startOp, new NodeUtils.Function<Operator<?>>() { + @Override public void apply(Operator<?> argument) { if (argument.getNumParent() == 0 && (clazz == null || clazz.isInstance(argument))) { operators.add(argument); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java Tue Oct 14 19:06:45 2014 @@ -332,18 +332,26 @@ public class GroupByOptimizer implements continue; } - ExprNodeDesc selectColList = selectDesc.getColList().get(pos); - if (selectColList instanceof ExprNodeColumnDesc) { + ExprNodeDesc selectCol = selectDesc.getColList().get(pos); + if (selectCol instanceof ExprNodeColumnDesc) { String newValue = - tableColsMapping.get(((ExprNodeColumnDesc) selectColList).getColumn()); + tableColsMapping.get(((ExprNodeColumnDesc) selectCol).getColumn()); tableColsMapping.put(outputColumnName, newValue); } else { tableColsMapping.remove(outputColumnName); - if ((selectColList instanceof ExprNodeConstantDesc) || - (selectColList instanceof ExprNodeNullDesc)) { + if (selectCol instanceof ExprNodeNullDesc) { newConstantCols.add(outputColumnName); } + if (selectCol instanceof ExprNodeConstantDesc) { + // Lets see if this constant was folded because of optimization. + String origCol = ((ExprNodeConstantDesc) selectCol).getFoldedFromCol(); + if (origCol != null) { + tableColsMapping.put(outputColumnName, origCol); + } else { + newConstantCols.add(outputColumnName); + } + } } } @@ -351,7 +359,6 @@ public class GroupByOptimizer implements } } - boolean sortGroupBy = true; // compute groupby columns from groupby keys List<String> groupByCols = new ArrayList<String>(); // If the group by expression is anything other than a list of columns, Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Tue Oct 14 19:06:45 2014 @@ -389,157 +389,8 @@ public class MapJoinProcessor implements JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException { - JoinDesc desc = op.getConf(); - JoinCondDesc[] condns = desc.getConds(); - Byte[] tagOrder = desc.getTagOrder(); - - // outer join cannot be performed on a table which is being cached - if (!noCheckOuterJoin) { - if (checkMapJoin(mapJoinPos, condns) < 0) { - throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg()); - } - } - - // Walk over all the sources (which are guaranteed to be reduce sink - // operators). - // The join outputs a concatenation of all the inputs. - QBJoinTree leftSrc = joinTree.getJoinSrc(); - List<ReduceSinkOperator> oldReduceSinkParentOps = - new ArrayList<ReduceSinkOperator>(op.getNumParent()); - if (leftSrc != null) { - // assert mapJoinPos == 0; - Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0); - assert parentOp.getParentOperators().size() == 1; - oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp); - } - - - byte pos = 0; - for (String src : joinTree.getBaseSrc()) { - if (src != null) { - Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos); - assert parentOp.getParentOperators().size() == 1; - oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp); - } - pos++; - } - - Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap(); - List<ColumnInfo> schema = new ArrayList<ColumnInfo>(op.getSchema().getSignature()); - Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs(); - Map<Byte, List<ExprNodeDesc>> newValueExprs = new HashMap<Byte, List<ExprNodeDesc>>(); - for (Map.Entry<Byte, List<ExprNodeDesc>> entry : valueExprs.entrySet()) { - byte tag = entry.getKey(); - Operator<?> terminal = oldReduceSinkParentOps.get(tag); - - List<ExprNodeDesc> values = entry.getValue(); - List<ExprNodeDesc> newValues = ExprNodeDescUtils.backtrack(values, op, terminal); - newValueExprs.put(tag, newValues); - for (int i = 0; i < schema.size(); i++) { - ColumnInfo column = schema.get(i); - if (column == null) { - continue; - } - ExprNodeDesc expr = colExprMap.get(column.getInternalName()); - int index = ExprNodeDescUtils.indexOf(expr, values); - if (index >= 0) { - colExprMap.put(column.getInternalName(), newValues.get(index)); - schema.set(i, null); - } - } - } - - // rewrite value index for mapjoin - Map<Byte, int[]> valueIndices = new HashMap<Byte, int[]>(); - - // get the join keys from old parent ReduceSink operators - Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>(); - - // construct valueTableDescs and valueFilteredTableDescs - List<TableDesc> valueTableDescs = new ArrayList<TableDesc>(); - List<TableDesc> valueFilteredTableDescs = new ArrayList<TableDesc>(); - int[][] filterMap = desc.getFilterMap(); - for (pos = 0; pos < op.getParentOperators().size(); pos++) { - ReduceSinkOperator inputRS = oldReduceSinkParentOps.get(pos); - List<ExprNodeDesc> keyCols = inputRS.getConf().getKeyCols(); - List<ExprNodeDesc> valueCols = newValueExprs.get(pos); - if (pos != mapJoinPos) { - // remove values in key exprs for value table schema - // value expression for hashsink will be modified in LocalMapJoinProcessor - int[] valueIndex = new int[valueCols.size()]; - List<ExprNodeDesc> valueColsInValueExpr = new ArrayList<ExprNodeDesc>(); - for (int i = 0; i < valueIndex.length; i++) { - ExprNodeDesc expr = valueCols.get(i); - int kindex = ExprNodeDescUtils.indexOf(expr, keyCols); - if (kindex >= 0) { - valueIndex[i] = kindex; - } else { - valueIndex[i] = -valueColsInValueExpr.size() - 1; - valueColsInValueExpr.add(expr); - } - } - if (needValueIndex(valueIndex)) { - valueIndices.put(pos, valueIndex); - } - valueCols = valueColsInValueExpr; - } - // deep copy expr node desc - List<ExprNodeDesc> valueFilteredCols = ExprNodeDescUtils.clone(valueCols); - if (filterMap != null && filterMap[pos] != null && pos != mapJoinPos) { - ExprNodeColumnDesc isFilterDesc = new ExprNodeColumnDesc(TypeInfoFactory - .getPrimitiveTypeInfo(serdeConstants.SMALLINT_TYPE_NAME), "filter", "filter", false); - valueFilteredCols.add(isFilterDesc); - } - - TableDesc valueTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils - .getFieldSchemasFromColumnList(valueCols, "mapjoinvalue")); - TableDesc valueFilteredTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils - .getFieldSchemasFromColumnList(valueFilteredCols, "mapjoinvalue")); - - valueTableDescs.add(valueTableDesc); - valueFilteredTableDescs.add(valueFilteredTableDesc); - - keyExprMap.put(pos, keyCols); - } - - Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters(); - Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>(); - for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) { - byte srcTag = entry.getKey(); - List<ExprNodeDesc> filter = entry.getValue(); - - Operator<?> terminal = oldReduceSinkParentOps.get(srcTag); - newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal)); - } - desc.setFilters(filters = newFilters); - - // create dumpfile prefix needed to create descriptor - String dumpFilePrefix = ""; - if( joinTree.getMapAliases() != null ) { - for(String mapAlias : joinTree.getMapAliases()) { - dumpFilePrefix = dumpFilePrefix + mapAlias; - } - dumpFilePrefix = dumpFilePrefix+"-"+PlanUtils.getCountForMapJoinDumpFilePrefix(); - } else { - dumpFilePrefix = "mapfile"+PlanUtils.getCountForMapJoinDumpFilePrefix(); - } - - List<ExprNodeDesc> keyCols = keyExprMap.get((byte)mapJoinPos); - - List<String> outputColumnNames = op.getConf().getOutputColumnNames(); - TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(hconf, - PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX)); - JoinCondDesc[] joinCondns = op.getConf().getConds(); - MapJoinDesc mapJoinDescriptor = new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs, - valueTableDescs, valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns, - filters, op.getConf().getNoOuterJoin(), dumpFilePrefix); - mapJoinDescriptor.setStatistics(op.getConf().getStatistics()); - mapJoinDescriptor.setTagOrder(tagOrder); - mapJoinDescriptor.setNullSafes(desc.getNullSafes()); - mapJoinDescriptor.setFilterMap(desc.getFilterMap()); - if (!valueIndices.isEmpty()) { - mapJoinDescriptor.setValueIndices(valueIndices); - } + MapJoinDesc mapJoinDescriptor = + getMapJoinDesc(hconf, opParseCtxMap, op, joinTree, mapJoinPos, noCheckOuterJoin); // reduce sink row resolver used to generate map join op RowResolver outputRS = opParseCtxMap.get(op).getRowResolver(); @@ -551,6 +402,7 @@ public class MapJoinProcessor implements opParseCtxMap.put(mapJoinOp, ctx); mapJoinOp.getConf().setReversedExprs(op.getConf().getReversedExprs()); + Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap(); mapJoinOp.setColumnExprMap(colExprMap); List<Operator<? extends OperatorDesc>> childOps = op.getChildOperators(); @@ -1176,4 +1028,168 @@ public class MapJoinProcessor implements } } + + public static MapJoinDesc getMapJoinDesc(HiveConf hconf, + LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap, + JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException { + JoinDesc desc = op.getConf(); + JoinCondDesc[] condns = desc.getConds(); + Byte[] tagOrder = desc.getTagOrder(); + + // outer join cannot be performed on a table which is being cached + if (!noCheckOuterJoin) { + if (checkMapJoin(mapJoinPos, condns) < 0) { + throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg()); + } + } + + // Walk over all the sources (which are guaranteed to be reduce sink + // operators). + // The join outputs a concatenation of all the inputs. + QBJoinTree leftSrc = joinTree.getJoinSrc(); + List<ReduceSinkOperator> oldReduceSinkParentOps = + new ArrayList<ReduceSinkOperator>(op.getNumParent()); + if (leftSrc != null) { + // assert mapJoinPos == 0; + Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0); + assert parentOp.getParentOperators().size() == 1; + oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp); + } + + byte pos = 0; + for (String src : joinTree.getBaseSrc()) { + if (src != null) { + Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos); + assert parentOp.getParentOperators().size() == 1; + oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp); + } + pos++; + } + + Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap(); + List<ColumnInfo> schema = new ArrayList<ColumnInfo>(op.getSchema().getSignature()); + Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs(); + Map<Byte, List<ExprNodeDesc>> newValueExprs = new HashMap<Byte, List<ExprNodeDesc>>(); + for (Map.Entry<Byte, List<ExprNodeDesc>> entry : valueExprs.entrySet()) { + byte tag = entry.getKey(); + Operator<?> terminal = oldReduceSinkParentOps.get(tag); + + List<ExprNodeDesc> values = entry.getValue(); + List<ExprNodeDesc> newValues = ExprNodeDescUtils.backtrack(values, op, terminal); + newValueExprs.put(tag, newValues); + for (int i = 0; i < schema.size(); i++) { + ColumnInfo column = schema.get(i); + if (column == null) { + continue; + } + ExprNodeDesc expr = colExprMap.get(column.getInternalName()); + int index = ExprNodeDescUtils.indexOf(expr, values); + if (index >= 0) { + colExprMap.put(column.getInternalName(), newValues.get(index)); + schema.set(i, null); + } + } + } + + // rewrite value index for mapjoin + Map<Byte, int[]> valueIndices = new HashMap<Byte, int[]>(); + + // get the join keys from old parent ReduceSink operators + Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>(); + + // construct valueTableDescs and valueFilteredTableDescs + List<TableDesc> valueTableDescs = new ArrayList<TableDesc>(); + List<TableDesc> valueFilteredTableDescs = new ArrayList<TableDesc>(); + int[][] filterMap = desc.getFilterMap(); + for (pos = 0; pos < op.getParentOperators().size(); pos++) { + ReduceSinkOperator inputRS = oldReduceSinkParentOps.get(pos); + List<ExprNodeDesc> keyCols = inputRS.getConf().getKeyCols(); + List<ExprNodeDesc> valueCols = newValueExprs.get(pos); + if (pos != mapJoinPos) { + // remove values in key exprs for value table schema + // value expression for hashsink will be modified in + // LocalMapJoinProcessor + int[] valueIndex = new int[valueCols.size()]; + List<ExprNodeDesc> valueColsInValueExpr = new ArrayList<ExprNodeDesc>(); + for (int i = 0; i < valueIndex.length; i++) { + ExprNodeDesc expr = valueCols.get(i); + int kindex = ExprNodeDescUtils.indexOf(expr, keyCols); + if (kindex >= 0) { + valueIndex[i] = kindex; + } else { + valueIndex[i] = -valueColsInValueExpr.size() - 1; + valueColsInValueExpr.add(expr); + } + } + if (needValueIndex(valueIndex)) { + valueIndices.put(pos, valueIndex); + } + valueCols = valueColsInValueExpr; + } + // deep copy expr node desc + List<ExprNodeDesc> valueFilteredCols = ExprNodeDescUtils.clone(valueCols); + if (filterMap != null && filterMap[pos] != null && pos != mapJoinPos) { + ExprNodeColumnDesc isFilterDesc = + new ExprNodeColumnDesc( + TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.SMALLINT_TYPE_NAME), "filter", + "filter", false); + valueFilteredCols.add(isFilterDesc); + } + + TableDesc valueTableDesc = + PlanUtils.getMapJoinValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(valueCols, + "mapjoinvalue")); + TableDesc valueFilteredTableDesc = + PlanUtils.getMapJoinValueTableDesc(PlanUtils.getFieldSchemasFromColumnList( + valueFilteredCols, "mapjoinvalue")); + + valueTableDescs.add(valueTableDesc); + valueFilteredTableDescs.add(valueFilteredTableDesc); + + keyExprMap.put(pos, keyCols); + } + + Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters(); + Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>(); + for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) { + byte srcTag = entry.getKey(); + List<ExprNodeDesc> filter = entry.getValue(); + + Operator<?> terminal = oldReduceSinkParentOps.get(srcTag); + newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal)); + } + desc.setFilters(filters = newFilters); + + // create dumpfile prefix needed to create descriptor + String dumpFilePrefix = ""; + if (joinTree.getMapAliases() != null) { + for (String mapAlias : joinTree.getMapAliases()) { + dumpFilePrefix = dumpFilePrefix + mapAlias; + } + dumpFilePrefix = dumpFilePrefix + "-" + PlanUtils.getCountForMapJoinDumpFilePrefix(); + } else { + dumpFilePrefix = "mapfile" + PlanUtils.getCountForMapJoinDumpFilePrefix(); + } + + List<ExprNodeDesc> keyCols = keyExprMap.get((byte) mapJoinPos); + + List<String> outputColumnNames = op.getConf().getOutputColumnNames(); + TableDesc keyTableDesc = + PlanUtils.getMapJoinKeyTableDesc(hconf, + PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX)); + JoinCondDesc[] joinCondns = op.getConf().getConds(); + MapJoinDesc mapJoinDescriptor = + new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs, valueTableDescs, + valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns, filters, op + .getConf().getNoOuterJoin(), dumpFilePrefix); + mapJoinDescriptor.setStatistics(op.getConf().getStatistics()); + mapJoinDescriptor.setTagOrder(tagOrder); + mapJoinDescriptor.setNullSafes(desc.getNullSafes()); + mapJoinDescriptor.setFilterMap(desc.getFilterMap()); + if (!valueIndices.isEmpty()) { + mapJoinDescriptor.setValueIndices(valueIndices); + } + + return mapJoinDescriptor; + } } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Tue Oct 14 19:06:45 2014 @@ -51,7 +51,12 @@ public class Optimizer { * @param hiveConf */ public void initialize(HiveConf hiveConf) { + + boolean isTezExecEngine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez"); + boolean bucketMapJoinOptimizer = false; + transformations = new ArrayList<Transform>(); + // Add the transformation that computes the lineage information. transformations.add(new Generator()); if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) { @@ -59,19 +64,23 @@ public class Optimizer { transformations.add(new SyntheticJoinPredicate()); transformations.add(new PredicatePushDown()); transformations.add(new PartitionPruner()); + } + if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCONSTANTPROPAGATION)) { + transformations.add(new ConstantPropagate()); + } + + if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) { transformations.add(new PartitionConditionRemover()); if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTLISTBUCKETING)) { /* Add list bucketing pruner. */ transformations.add(new ListBucketingPruner()); } } + if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTGROUPBY) || HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_GROUPBY_SORT)) { transformations.add(new GroupByOptimizer()); } - if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCONSTANTPROPAGATION)) { - transformations.add(new ConstantPropagate()); - } transformations.add(new ColumnPruner()); if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME)) { transformations.add(new SkewJoinOptimizer()); @@ -81,15 +90,16 @@ public class Optimizer { } transformations.add(new SamplePruner()); transformations.add(new MapJoinProcessor()); - boolean bucketMapJoinOptimizer = false; - if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) { + + if ((HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) && !isTezExecEngine) { transformations.add(new BucketMapJoinOptimizer()); bucketMapJoinOptimizer = true; } // If optimize hive.optimize.bucketmapjoin.sortedmerge is set, add both // BucketMapJoinOptimizer and SortedMergeBucketMapJoinOptimizer - if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) { + if ((HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) + && !isTezExecEngine) { if (!bucketMapJoinOptimizer) { // No need to add BucketMapJoinOptimizer twice transformations.add(new BucketMapJoinOptimizer()); @@ -119,7 +129,7 @@ public class Optimizer { if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION) && !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGROUPBYSKEW) && !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME) && - !HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + !isTezExecEngine) { transformations.add(new CorrelationOptimizer()); } if (HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVELIMITPUSHDOWNMEMORYUSAGE) > 0) { @@ -128,8 +138,7 @@ public class Optimizer { if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES)) { transformations.add(new StatsOptimizer()); } - if (pctx.getContext().getExplain() - && !HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + if (pctx.getContext().getExplain() && !isTezExecEngine) { transformations.add(new AnnotateWithStatistics()); transformations.add(new AnnotateWithOpTraits()); } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Tue Oct 14 19:06:45 2014 @@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.plan.Ta import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.TezWork.VertexType; import org.apache.hadoop.hive.ql.stats.StatsUtils; public class ReduceSinkMapJoinProc implements NodeProcessor { @@ -183,7 +184,10 @@ public class ReduceSinkMapJoinProc imple TezWork tezWork = context.currentTask.getWork(); LOG.debug("connecting "+parentWork.getName()+" with "+myWork.getName()); tezWork.connect(parentWork, myWork, edgeProp); - + if (edgeType == EdgeType.CUSTOM_EDGE) { + tezWork.setVertexType(myWork, VertexType.INITIALIZED_EDGES); + } + ReduceSinkOperator r = null; if (parentRS.getConf().getOutputName() != null) { LOG.debug("Cloning reduce sink for multi-child broadcast edge"); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java Tue Oct 14 19:06:45 2014 @@ -44,9 +44,9 @@ import org.apache.hadoop.hive.ql.exec.Ut import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat; import org.apache.hadoop.hive.ql.io.HiveInputFormat; -import org.apache.hadoop.hive.ql.metadata.InputEstimator; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.metadata.InputEstimator; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; @@ -55,13 +55,25 @@ import org.apache.hadoop.hive.ql.parse.P import org.apache.hadoop.hive.ql.parse.QB; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.SplitSample; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.ListSinkDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToBinary; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToChar; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDate; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDecimal; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUnixTimeStamp; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUtcTimestamp; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToVarchar; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; @@ -73,9 +85,11 @@ public class SimpleFetchOptimizer implem private final Log LOG = LogFactory.getLog(SimpleFetchOptimizer.class.getName()); + @Override public ParseContext transform(ParseContext pctx) throws SemanticException { Map<String, Operator<? extends OperatorDesc>> topOps = pctx.getTopOps(); - if (pctx.getQB().isSimpleSelectQuery() && topOps.size() == 1) { + if (pctx.getQB().getIsQuery() && !pctx.getQB().getParseInfo().isAnalyzeCommand() + && topOps.size() == 1) { // no join, no groupby, no distinct, no lateral view, no subq, // no CTAS or insert, not analyze command, and single sourced. String alias = (String) pctx.getTopOps().keySet().toArray()[0]; @@ -144,7 +158,7 @@ public class SimpleFetchOptimizer implem // for non-aggressive mode (minimal) // 1. samping is not allowed // 2. for partitioned table, all filters should be targeted to partition column - // 3. SelectOperator should be select star + // 3. SelectOperator should use only simple cast/column access private FetchData checkTree(boolean aggressive, ParseContext pctx, String alias, TableScanOperator ts) throws HiveException { SplitSample splitSample = pctx.getNameToSplitSample().get(alias); @@ -156,7 +170,7 @@ public class SimpleFetchOptimizer implem return null; } - Table table = qb.getMetaData().getAliasToTable().get(alias); + Table table = pctx.getTopToTable().get(ts); if (table == null) { return null; } @@ -181,34 +195,71 @@ public class SimpleFetchOptimizer implem return null; } - private FetchData checkOperators(FetchData fetch, TableScanOperator ts, boolean aggresive, + private FetchData checkOperators(FetchData fetch, TableScanOperator ts, boolean aggressive, boolean bypassFilter) { if (ts.getChildOperators().size() != 1) { return null; } Operator<?> op = ts.getChildOperators().get(0); for (; ; op = op.getChildOperators().get(0)) { - if (aggresive) { - if (!(op instanceof LimitOperator || op instanceof FilterOperator - || op instanceof SelectOperator)) { + if (op instanceof SelectOperator) { + if (!aggressive) { + if (!checkExpressions((SelectOperator) op)) { + break; + } + } + continue; + } + + if (aggressive) { + if (!(op instanceof LimitOperator || op instanceof FilterOperator)) { break; } - } else if (!(op instanceof LimitOperator || (op instanceof FilterOperator && bypassFilter) - || (op instanceof SelectOperator && ((SelectOperator) op).getConf().isSelectStar()))) { + } else if (!(op instanceof LimitOperator || (op instanceof FilterOperator && bypassFilter))) { break; } + if (op.getChildOperators() == null || op.getChildOperators().size() != 1) { return null; } } + if (op instanceof FileSinkOperator) { fetch.scanOp = ts; fetch.fileSink = op; return fetch; } + return null; } + private boolean checkExpressions(SelectOperator op) { + SelectDesc desc = op.getConf(); + for (ExprNodeDesc expr : desc.getColList()) { + if (!checkExpression(expr)) { + return false; + } + } + return true; + } + + private boolean checkExpression(ExprNodeDesc expr) { + if (expr instanceof ExprNodeConstantDesc || expr instanceof ExprNodeColumnDesc) { + return true; + } + + if (expr instanceof ExprNodeGenericFuncDesc) { + GenericUDF udf = ((ExprNodeGenericFuncDesc) expr).getGenericUDF(); + if (udf instanceof GenericUDFToBinary || udf instanceof GenericUDFToChar + || udf instanceof GenericUDFToDate || udf instanceof GenericUDFToDecimal + || udf instanceof GenericUDFToUnixTimeStamp || udf instanceof GenericUDFToUtcTimestamp + || udf instanceof GenericUDFToVarchar) { + return expr.getChildren().size() == 1 && checkExpression(expr.getChildren().get(0)); + } + } + return false; + } + private class FetchData { private final ReadEntity parent; @@ -240,7 +291,7 @@ public class SimpleFetchOptimizer implem this.splitSample = splitSample; this.onlyPruningFilter = bypassFilter; } - + /* * all filters were executed during partition pruning */ @@ -251,7 +302,7 @@ public class SimpleFetchOptimizer implem private FetchWork convertToWork() throws HiveException { inputs.clear(); if (!table.isPartitioned()) { - inputs.add(new ReadEntity(table, parent)); + inputs.add(new ReadEntity(table, parent, parent == null)); FetchWork work = new FetchWork(table.getPath(), Utilities.getTableDesc(table)); PlanUtils.configureInputJobPropertiesForStorageHandler(work.getTblDesc()); work.setSplitSample(splitSample); @@ -261,12 +312,12 @@ public class SimpleFetchOptimizer implem List<PartitionDesc> partP = new ArrayList<PartitionDesc>(); for (Partition partition : partsList.getNotDeniedPartns()) { - inputs.add(new ReadEntity(partition, parent)); + inputs.add(new ReadEntity(partition, parent, parent == null)); listP.add(partition.getDataLocation()); partP.add(Utilities.getPartitionDesc(partition)); } Table sourceTable = partsList.getSourceTable(); - inputs.add(new ReadEntity(sourceTable, parent)); + inputs.add(new ReadEntity(sourceTable, parent, parent == null)); TableDesc table = Utilities.getTableDesc(sourceTable); FetchWork work = new FetchWork(listP, partP, table); if (!work.getPartDesc().isEmpty()) { Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java Tue Oct 14 19:06:45 2014 @@ -71,7 +71,6 @@ import org.apache.hadoop.hive.ql.plan.Pl import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hadoop.io.IntWritable; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -85,6 +84,7 @@ import com.google.common.collect.Maps; */ public class SortedDynPartitionOptimizer implements Transform { + private static final String BUCKET_NUMBER_COL_NAME = "_bucket_number"; @Override public ParseContext transform(ParseContext pCtx) throws SemanticException { @@ -216,6 +216,13 @@ public class SortedDynPartitionOptimizer ReduceSinkDesc rsConf = getReduceSinkDesc(partitionPositions, sortPositions, sortOrder, newValueCols, bucketColumns, numBuckets, fsParent, fsOp.getConf().getWriteType()); + if (!bucketColumns.isEmpty()) { + String tableAlias = outRR.getColumnInfos().get(0).getTabAlias(); + ColumnInfo ci = new ColumnInfo(BUCKET_NUMBER_COL_NAME, TypeInfoFactory.stringTypeInfo, + tableAlias, true, true); + outRR.put(tableAlias, BUCKET_NUMBER_COL_NAME, ci); + } + // Create ReduceSink operator ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap( OperatorFactory.getAndMakeChild(rsConf, new RowSchema(outRR.getColumnInfos()), fsParent), @@ -380,8 +387,11 @@ public class SortedDynPartitionOptimizer // corresponding with bucket number and hence their OIs for (Integer idx : keyColsPosInVal) { if (idx < 0) { - newKeyCols.add(new ExprNodeConstantDesc(TypeInfoFactory - .getPrimitiveTypeInfoFromPrimitiveWritable(IntWritable.class), -1)); + // add bucket number column to both key and value + ExprNodeConstantDesc encd = new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, + BUCKET_NUMBER_COL_NAME); + newKeyCols.add(encd); + newValueCols.add(encd); } else { newKeyCols.add(newValueCols.get(idx).clone()); } @@ -395,7 +405,8 @@ public class SortedDynPartitionOptimizer // should honor the ordering of records provided by ORDER BY in SELECT statement ReduceSinkOperator parentRSOp = OperatorUtils.findSingleOperatorUpstream(parent, ReduceSinkOperator.class); - if (parentRSOp != null) { + boolean isOrderBy = parseCtx.getQB().getParseInfo().getDestToOrderBy().size() > 0; + if (parentRSOp != null && isOrderBy) { String parentRSOpOrder = parentRSOp.getConf().getOrder(); if (parentRSOpOrder != null && !parentRSOpOrder.isEmpty() && sortPositions.isEmpty()) { newKeyCols.addAll(parentRSOp.getConf().getKeyCols()); @@ -417,6 +428,9 @@ public class SortedDynPartitionOptimizer List<String> outCols = Utilities.getInternalColumnNamesFromSignature(parent.getSchema() .getSignature()); ArrayList<String> outValColNames = Lists.newArrayList(outCols); + if (!bucketColumns.isEmpty()) { + outValColNames.add(BUCKET_NUMBER_COL_NAME); + } List<FieldSchema> valFields = PlanUtils.getFieldSchemasFromColumnList(newValueCols, outValColNames, 0, ""); TableDesc valueTable = PlanUtils.getReduceValueTableDesc(valFields); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java Tue Oct 14 19:06:45 2014 @@ -193,11 +193,12 @@ public class StatsOptimizer implements T } SelectOperator selOp = (SelectOperator)tsOp.getChildren().get(0); for(ExprNodeDesc desc : selOp.getConf().getColList()) { - if (!(desc instanceof ExprNodeColumnDesc)) { + if (!((desc instanceof ExprNodeColumnDesc) || (desc instanceof ExprNodeConstantDesc))) { // Probably an expression, cant handle that return null; } } + Map<String, ExprNodeDesc> exprMap = selOp.getColumnExprMap(); // Since we have done an exact match on TS-SEL-GBY-RS-GBY-SEL-FS // we need not to do any instanceof checks for following. GroupByOperator gbyOp = (GroupByOperator)selOp.getChildren().get(0); @@ -215,6 +216,12 @@ public class StatsOptimizer implements T return null; } + for(ExprNodeDesc desc : selOp.getConf().getColList()) { + if (!(desc instanceof ExprNodeColumnDesc)) { + // Probably an expression, cant handle that + return null; + } + } FileSinkOperator fsOp = (FileSinkOperator)(selOp.getChildren().get(0)); if (fsOp.getChildOperators() != null && fsOp.getChildOperators().size() > 0) { // looks like a subq plan. @@ -236,22 +243,28 @@ public class StatsOptimizer implements T GenericUDAFResolver udaf = FunctionRegistry.getGenericUDAFResolver(aggr.getGenericUDAFName()); if (udaf instanceof GenericUDAFSum) { - if(!(aggr.getParameters().get(0) instanceof ExprNodeConstantDesc)){ + ExprNodeDesc desc = aggr.getParameters().get(0); + String constant; + if (desc instanceof ExprNodeConstantDesc) { + constant = ((ExprNodeConstantDesc) desc).getValue().toString(); + } else if (desc instanceof ExprNodeColumnDesc && exprMap.get(((ExprNodeColumnDesc)desc).getColumn()) instanceof ExprNodeConstantDesc) { + constant = ((ExprNodeConstantDesc)exprMap.get(((ExprNodeColumnDesc)desc).getColumn())).getValue().toString(); + } else { return null; } Long rowCnt = getRowCnt(pctx, tsOp, tbl); if(rowCnt == null) { return null; } - oneRow.add(HiveDecimal.create(((ExprNodeConstantDesc) aggr.getParameters().get(0)) - .getValue().toString()).multiply(HiveDecimal.create(rowCnt))); + oneRow.add(HiveDecimal.create(constant).multiply(HiveDecimal.create(rowCnt))); ois.add(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector( PrimitiveCategory.DECIMAL)); } else if (udaf instanceof GenericUDAFCount) { Long rowCnt = 0L; - if ((aggr.getParameters().isEmpty() || aggr.getParameters().get(0) instanceof - ExprNodeConstantDesc)) { + if (aggr.getParameters().isEmpty() || aggr.getParameters().get(0) instanceof + ExprNodeConstantDesc || ((aggr.getParameters().get(0) instanceof ExprNodeColumnDesc) && + exprMap.get(((ExprNodeColumnDesc)aggr.getParameters().get(0)).getColumn()) instanceof ExprNodeConstantDesc)) { // Its either count (*) or count(1) case rowCnt = getRowCnt(pctx, tsOp, tbl); if(rowCnt == null) { @@ -259,12 +272,7 @@ public class StatsOptimizer implements T } } else { // Its count(col) case - if (!(aggr.getParameters().get(0) instanceof ExprNodeColumnDesc)) { - // this is weird, we got expr or something in there, bail out - Log.debug("Unexpected expression : " + aggr.getParameters().get(0)); - return null; - } - ExprNodeColumnDesc desc = (ExprNodeColumnDesc)aggr.getParameters().get(0); + ExprNodeColumnDesc desc = (ExprNodeColumnDesc)exprMap.get(((ExprNodeColumnDesc)aggr.getParameters().get(0)).getColumn()); String colName = desc.getColumn(); StatType type = getType(desc.getTypeString()); if(!tbl.isPartitioned()) { @@ -330,7 +338,7 @@ public class StatsOptimizer implements T ois.add(PrimitiveObjectInspectorFactory. getPrimitiveJavaObjectInspector(PrimitiveCategory.LONG)); } else if (udaf instanceof GenericUDAFMax) { - ExprNodeColumnDesc colDesc = (ExprNodeColumnDesc)aggr.getParameters().get(0); + ExprNodeColumnDesc colDesc = (ExprNodeColumnDesc)exprMap.get(((ExprNodeColumnDesc)aggr.getParameters().get(0)).getColumn()); String colName = colDesc.getColumn(); StatType type = getType(colDesc.getTypeString()); if(!tbl.isPartitioned()) { @@ -419,7 +427,7 @@ public class StatsOptimizer implements T } } } else if (udaf instanceof GenericUDAFMin) { - ExprNodeColumnDesc colDesc = (ExprNodeColumnDesc)aggr.getParameters().get(0); + ExprNodeColumnDesc colDesc = (ExprNodeColumnDesc)exprMap.get(((ExprNodeColumnDesc)aggr.getParameters().get(0)).getColumn()); String colName = colDesc.getColumn(); StatType type = getType(colDesc.getTypeString()); if (!tbl.isPartitioned()) { Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java Tue Oct 14 19:06:45 2014 @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map.Entry; import java.util.Stack; +import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -104,7 +105,12 @@ public class OpTraitsRulesProcFactory { List<List<String>> listBucketCols = new ArrayList<List<String>>(); listBucketCols.add(bucketCols); - OpTraits opTraits = new OpTraits(listBucketCols, -1); + int numBuckets = -1; + OpTraits parentOpTraits = rs.getParentOperators().get(0).getConf().getOpTraits(); + if (parentOpTraits != null) { + numBuckets = parentOpTraits.getNumBuckets(); + } + OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listBucketCols); rs.setOpTraits(opTraits); return null; } @@ -163,15 +169,21 @@ public class OpTraitsRulesProcFactory { } catch (HiveException e) { prunedPartList = null; } - boolean bucketMapJoinConvertible = checkBucketedTable(table, + boolean isBucketed = checkBucketedTable(table, opTraitsCtx.getParseContext(), prunedPartList); - List<List<String>>bucketCols = new ArrayList<List<String>>(); + List<List<String>> bucketColsList = new ArrayList<List<String>>(); + List<List<String>> sortedColsList = new ArrayList<List<String>>(); int numBuckets = -1; - if (bucketMapJoinConvertible) { - bucketCols.add(table.getBucketCols()); + if (isBucketed) { + bucketColsList.add(table.getBucketCols()); numBuckets = table.getNumBuckets(); + List<String> sortCols = new ArrayList<String>(); + for (Order colSortOrder : table.getSortCols()) { + sortCols.add(colSortOrder.getCol()); + } + sortedColsList.add(sortCols); } - OpTraits opTraits = new OpTraits(bucketCols, numBuckets); + OpTraits opTraits = new OpTraits(bucketColsList, numBuckets, sortedColsList); ts.setOpTraits(opTraits); return null; } @@ -197,7 +209,7 @@ public class OpTraitsRulesProcFactory { List<List<String>> listBucketCols = new ArrayList<List<String>>(); listBucketCols.add(gbyKeys); - OpTraits opTraits = new OpTraits(listBucketCols, -1); + OpTraits opTraits = new OpTraits(listBucketCols, -1, listBucketCols); gbyOp.setOpTraits(opTraits); return null; } @@ -205,22 +217,17 @@ public class OpTraitsRulesProcFactory { public static class SelectRule implements NodeProcessor { - @Override - public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, - Object... nodeOutputs) throws SemanticException { - SelectOperator selOp = (SelectOperator)nd; - List<List<String>> parentBucketColNames = - selOp.getParentOperators().get(0).getOpTraits().getBucketColNames(); - + public List<List<String>> getConvertedColNames(List<List<String>> parentColNames, + SelectOperator selOp) { List<List<String>> listBucketCols = new ArrayList<List<String>>(); if (selOp.getColumnExprMap() != null) { - if (parentBucketColNames != null) { - for (List<String> colNames : parentBucketColNames) { + if (parentColNames != null) { + for (List<String> colNames : parentColNames) { List<String> bucketColNames = new ArrayList<String>(); for (String colName : colNames) { for (Entry<String, ExprNodeDesc> entry : selOp.getColumnExprMap().entrySet()) { if (entry.getValue() instanceof ExprNodeColumnDesc) { - if(((ExprNodeColumnDesc)(entry.getValue())).getColumn().equals(colName)) { + if (((ExprNodeColumnDesc) (entry.getValue())).getColumn().equals(colName)) { bucketColNames.add(entry.getKey()); } } @@ -231,11 +238,34 @@ public class OpTraitsRulesProcFactory { } } + return listBucketCols; + } + + @Override + public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + SelectOperator selOp = (SelectOperator)nd; + List<List<String>> parentBucketColNames = + selOp.getParentOperators().get(0).getOpTraits().getBucketColNames(); + + List<List<String>> listBucketCols = null; + List<List<String>> listSortCols = null; + if (selOp.getColumnExprMap() != null) { + if (parentBucketColNames != null) { + listBucketCols = getConvertedColNames(parentBucketColNames, selOp); + } + List<List<String>> parentSortColNames = selOp.getParentOperators().get(0).getOpTraits() + .getSortCols(); + if (parentSortColNames != null) { + listSortCols = getConvertedColNames(parentSortColNames, selOp); + } + } + int numBuckets = -1; if (selOp.getParentOperators().get(0).getOpTraits() != null) { numBuckets = selOp.getParentOperators().get(0).getOpTraits().getNumBuckets(); } - OpTraits opTraits = new OpTraits(listBucketCols, numBuckets); + OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listSortCols); selOp.setOpTraits(opTraits); return null; } @@ -248,6 +278,7 @@ public class OpTraitsRulesProcFactory { Object... nodeOutputs) throws SemanticException { JoinOperator joinOp = (JoinOperator)nd; List<List<String>> bucketColsList = new ArrayList<List<String>>(); + List<List<String>> sortColsList = new ArrayList<List<String>>(); byte pos = 0; for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) { if (!(parentOp instanceof ReduceSinkOperator)) { @@ -259,26 +290,24 @@ public class OpTraitsRulesProcFactory { ReduceSinkRule rsRule = new ReduceSinkRule(); rsRule.process(rsOp, stack, procCtx, nodeOutputs); } - bucketColsList.add(getOutputColNames(joinOp, rsOp, pos)); + bucketColsList.add(getOutputColNames(joinOp, rsOp.getOpTraits().getBucketColNames(), pos)); + sortColsList.add(getOutputColNames(joinOp, rsOp.getOpTraits().getSortCols(), pos)); pos++; } - joinOp.setOpTraits(new OpTraits(bucketColsList, -1)); + joinOp.setOpTraits(new OpTraits(bucketColsList, -1, bucketColsList)); return null; } - private List<String> getOutputColNames(JoinOperator joinOp, - ReduceSinkOperator rs, byte pos) { - List<List<String>> parentBucketColNames = - rs.getOpTraits().getBucketColNames(); - - if (parentBucketColNames != null) { + private List<String> getOutputColNames(JoinOperator joinOp, List<List<String>> parentColNames, + byte pos) { + if (parentColNames != null) { List<String> bucketColNames = new ArrayList<String>(); // guaranteed that there is only 1 list within this list because // a reduce sink always brings down the bucketing cols to a single list. // may not be true with correlation operators (mux-demux) - List<String> colNames = parentBucketColNames.get(0); + List<String> colNames = parentColNames.get(0); for (String colName : colNames) { for (ExprNodeDesc exprNode : joinOp.getConf().getExprs().get(pos)) { if (exprNode instanceof ExprNodeColumnDesc) { @@ -317,7 +346,7 @@ public class OpTraitsRulesProcFactory { @Override public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { - OpTraits opTraits = new OpTraits(null, -1); + OpTraits opTraits = new OpTraits(null, -1, null); @SuppressWarnings("unchecked") Operator<? extends OperatorDesc> operator = (Operator<? extends OperatorDesc>)nd; operator.setOpTraits(opTraits); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java Tue Oct 14 19:06:45 2014 @@ -32,6 +32,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.ConditionalTask; +import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -56,6 +57,7 @@ import org.apache.hadoop.hive.ql.plan.Ex import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; @@ -152,6 +154,11 @@ public class CrossProductCheck implement private void checkMapJoins(TezWork tzWrk) throws SemanticException { for(BaseWork wrk : tzWrk.getAllWork() ) { + + if ( wrk instanceof MergeJoinWork ) { + wrk = ((MergeJoinWork)wrk).getMainWork(); + } + List<String> warnings = new MapJoinCheck(wrk.getName()).analyze(wrk); if ( !warnings.isEmpty() ) { for(String w : warnings) { @@ -163,12 +170,17 @@ public class CrossProductCheck implement private void checkTezReducer(TezWork tzWrk) throws SemanticException { for(BaseWork wrk : tzWrk.getAllWork() ) { - if ( !(wrk instanceof ReduceWork) ) { + + if ( wrk instanceof MergeJoinWork ) { + wrk = ((MergeJoinWork)wrk).getMainWork(); + } + + if ( !(wrk instanceof ReduceWork ) ) { continue; } ReduceWork rWork = (ReduceWork) wrk; Operator<? extends OperatorDesc> reducer = ((ReduceWork)wrk).getReducer(); - if ( reducer instanceof JoinOperator ) { + if ( reducer instanceof JoinOperator || reducer instanceof CommonMergeJoinOperator ) { Map<Integer, ExtractReduceSinkInfo.Info> rsInfo = new HashMap<Integer, ExtractReduceSinkInfo.Info>(); for(Map.Entry<Integer, String> e : rWork.getTagToInput().entrySet()) { @@ -185,7 +197,7 @@ public class CrossProductCheck implement return; } Operator<? extends OperatorDesc> reducer = rWrk.getReducer(); - if ( reducer instanceof JoinOperator ) { + if ( reducer instanceof JoinOperator|| reducer instanceof CommonMergeJoinOperator ) { BaseWork prntWork = mrWrk.getMapWork(); checkForCrossProduct(taskName, reducer, new ExtractReduceSinkInfo(null).analyze(prntWork));
