Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValuesInputMerger.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValuesInputMerger.java?rev=1627235&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValuesInputMerger.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValuesInputMerger.java Wed Sep 24 07:03:35 2014 @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez.tools; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.PriorityQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.BinaryComparable; +import org.apache.tez.runtime.api.Input; +import org.apache.tez.runtime.library.api.KeyValuesReader; + +/** + * A KeyValuesReader implementation that returns a sorted stream of key-values + * by doing a sorted merge of the key-value in LogicalInputs. + * Tags are in the last byte of the key, so no special handling for tags is required. + * Uses a priority queue to pick the KeyValuesReader of the input that is next in + * sort order. + */ +public class KeyValuesInputMerger extends KeyValuesReader { + + private class KeyValuesIterable implements Iterable<Object> { + + KeyValuesIterator currentIterator = null; + + KeyValuesIterable(int size) { + currentIterator = new KeyValuesIterator(size); + } + + @Override + public Iterator<Object> iterator() { + return currentIterator; + } + + public void init(List<KeyValuesReader> readerList) { + currentIterator.init(readerList); + } + } + + private class KeyValuesIterator implements Iterator<Object> { + KeyValuesReader[] readerArray = null; + Iterator<Object> currentIterator = null; + int currentIndex = 0; + int loadedSize = 0; + + KeyValuesIterator(int size) { + readerArray = new KeyValuesReader[size]; + } + + public void init(List<KeyValuesReader> readerList) { + for (int i = 0; i < readerList.size(); i++) { + readerArray[i] = null; + } + loadedSize = 0; + for (KeyValuesReader kvsReader : readerList) { + readerArray[loadedSize] = kvsReader; + loadedSize++; + } + currentIterator = null; + currentIndex = 0; + } + + @Override + public boolean hasNext() { + if ((currentIterator == null) || (currentIterator.hasNext() == false)) { + if (currentIndex == loadedSize) { + return false; + } + + try { + if (readerArray[currentIndex] == null) { + return false; + } + currentIterator = readerArray[currentIndex].getCurrentValues().iterator(); + currentIndex++; + return currentIterator.hasNext(); + } catch (IOException e) { + return false; + } + } + + return true; + } + + @Override + public Object next() { + l4j.info("next called on " + currentIterator); + return currentIterator.next(); + } + + @Override + public void remove() { + // nothing to do + } + } + + public static final Log l4j = LogFactory.getLog(KeyValuesInputMerger.class); + private PriorityQueue<KeyValuesReader> pQueue = null; + private final List<KeyValuesReader> nextKVReaders = new ArrayList<KeyValuesReader>(); + KeyValuesIterable kvsIterable = null; + + public KeyValuesInputMerger(List<? extends Input> shuffleInputs) throws Exception { + //get KeyValuesReaders from the LogicalInput and add them to priority queue + int initialCapacity = shuffleInputs.size(); + kvsIterable = new KeyValuesIterable(initialCapacity); + pQueue = new PriorityQueue<KeyValuesReader>(initialCapacity, new KVReaderComparator()); + for(Input input : shuffleInputs){ + addToQueue((KeyValuesReader)input.getReader()); + } + } + + /** + * Add KeyValuesReader to queue if it has more key-values + * @param kvsReadr + * @throws IOException + */ + private void addToQueue(KeyValuesReader kvsReadr) throws IOException{ + if(kvsReadr.next()){ + pQueue.add(kvsReadr); + } + } + + /** + * @return true if there are more key-values and advances to next key-values + * @throws IOException + */ + @Override + public boolean next() throws IOException { + //add the previous nextKVReader back to queue + if (!nextKVReaders.isEmpty()) { + for (KeyValuesReader kvReader : nextKVReaders) { + addToQueue(kvReader); + } + nextKVReaders.clear(); + } + + KeyValuesReader nextKVReader = null; + //get the new nextKVReader with lowest key + nextKVReader = pQueue.poll(); + if (nextKVReader != null) { + nextKVReaders.add(nextKVReader); + } + + while (pQueue.peek() != null) { + KeyValuesReader equalValueKVReader = pQueue.poll(); + if (pQueue.comparator().compare(nextKVReader, equalValueKVReader) == 0) { + nextKVReaders.add(equalValueKVReader); + } else { + pQueue.add(equalValueKVReader); + break; + } + } + return !(nextKVReaders.isEmpty()); + } + + @Override + public Object getCurrentKey() throws IOException { + // return key from any of the readers + return nextKVReaders.get(0).getCurrentKey(); + } + + @Override + public Iterable<Object> getCurrentValues() throws IOException { + kvsIterable.init(nextKVReaders); + return kvsIterable; + } + + /** + * Comparator that compares KeyValuesReader on their current key + */ + class KVReaderComparator implements Comparator<KeyValuesReader> { + + @Override + public int compare(KeyValuesReader kvReadr1, KeyValuesReader kvReadr2) { + try { + BinaryComparable key1 = (BinaryComparable) kvReadr1.getCurrentKey(); + BinaryComparable key2 = (BinaryComparable) kvReadr2.getCurrentKey(); + return key1.compareTo(key2); + } catch (IOException e) { + l4j.error("Caught exception while reading shuffle input", e); + //die! + throw new RuntimeException(e); + } + } + } + + +}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java?rev=1627235&r1=1627234&r2=1627235&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java Wed Sep 24 07:03:35 2014 @@ -40,7 +40,7 @@ public class TezMergedLogicalInput exten @Override public Reader getReader() throws Exception { - return new InputMerger(getInputs()); + return new KeyValuesInputMerger(getInputs()); } @Override Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java?rev=1627235&r1=1627234&r2=1627235&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java Wed Sep 24 07:03:35 2014 @@ -161,10 +161,11 @@ public abstract class HiveContextAwareRe } public IOContext getIOContext() { - return IOContext.get(); + return IOContext.get(jobConf.get(Utilities.INPUT_NAME)); } - public void initIOContext(long startPos, boolean isBlockPointer, Path inputPath) { + private void initIOContext(long startPos, boolean isBlockPointer, + Path inputPath) { ioCxtRef = this.getIOContext(); ioCxtRef.currentBlockStart = startPos; ioCxtRef.isBlockPointer = isBlockPointer; @@ -183,7 +184,7 @@ public abstract class HiveContextAwareRe boolean blockPointer = false; long blockStart = -1; - FileSplit fileSplit = (FileSplit) split; + FileSplit fileSplit = split; Path path = fileSplit.getPath(); FileSystem fs = path.getFileSystem(job); if (inputFormatClass.getName().contains("SequenceFile")) { @@ -202,12 +203,15 @@ public abstract class HiveContextAwareRe blockStart = in.getPosition(); in.close(); } + this.jobConf = job; this.initIOContext(blockStart, blockPointer, path.makeQualified(fs)); this.initIOContextSortedProps(split, recordReader, job); } public void initIOContextSortedProps(FileSplit split, RecordReader recordReader, JobConf job) { + this.jobConf = job; + this.getIOContext().resetSortingValues(); this.isSorted = jobConf.getBoolean("hive.input.format.sorted", false); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1627235&r1=1627234&r2=1627235&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Wed Sep 24 07:03:35 2014 @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.exec.Ut import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; @@ -253,7 +254,14 @@ public class HiveInputFormat<K extends W } protected void init(JobConf job) { - mrwork = Utilities.getMapWork(job); + if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + mrwork = (MapWork) Utilities.getMergeWork(job); + if (mrwork == null) { + mrwork = Utilities.getMapWork(job); + } + } else { + mrwork = Utilities.getMapWork(job); + } pathToPartitionInfo = mrwork.getPathToPartitionInfo(); } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1627235&r1=1627234&r2=1627235&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Wed Sep 24 07:03:35 2014 @@ -18,7 +18,13 @@ package org.apache.hadoop.hive.ql.io; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin; import org.apache.hadoop.hive.ql.session.SessionState; @@ -32,20 +38,25 @@ import org.apache.hadoop.hive.ql.session */ public class IOContext { - private static ThreadLocal<IOContext> threadLocal = new ThreadLocal<IOContext>(){ @Override protected synchronized IOContext initialValue() { return new IOContext(); } }; - private static IOContext ioContext = new IOContext(); + private static Map<String, IOContext> inputNameIOContextMap = new HashMap<String, IOContext>(); + private static IOContext ioContext = new IOContext(); - public static IOContext get() { - if (SessionState.get() == null) { - // this happens on the backend. only one io context needed. - return ioContext; + public static Map<String, IOContext> getMap() { + return inputNameIOContextMap; + } + + public static IOContext get(String inputName) { + if (inputNameIOContextMap.containsKey(inputName) == false) { + IOContext ioContext = new IOContext(); + inputNameIOContextMap.put(inputName, ioContext); } - return IOContext.threadLocal.get(); + + return inputNameIOContextMap.get(inputName); } public static void clear() { Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1627235&r1=1627234&r2=1627235&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Wed Sep 24 07:03:35 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.optimizer; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -29,12 +30,16 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; +import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; +import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.MuxOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; @@ -42,12 +47,16 @@ import org.apache.hadoop.hive.ql.parse.O import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; +import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.JoinCondDesc; +import org.apache.hadoop.hive.ql.plan.JoinDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.OpTraits; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.Statistics; +import org.apache.hadoop.util.ReflectionUtils; /** * ConvertJoinMapJoin is an optimization that replaces a common join @@ -60,39 +69,46 @@ public class ConvertJoinMapJoin implemen static final private Log LOG = LogFactory.getLog(ConvertJoinMapJoin.class.getName()); + @SuppressWarnings("unchecked") @Override - /* - * (non-Javadoc) - * we should ideally not modify the tree we traverse. - * However, since we need to walk the tree at any time when we modify the - * operator, we might as well do it here. - */ - public Object process(Node nd, Stack<Node> stack, - NodeProcessorCtx procCtx, Object... nodeOutputs) - throws SemanticException { + /* + * (non-Javadoc) we should ideally not modify the tree we traverse. However, + * since we need to walk the tree at any time when we modify the operator, we + * might as well do it here. + */ + public Object + process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) + throws SemanticException { OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx; - if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) { + JoinOperator joinOp = (JoinOperator) nd; + + if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN) + && !(context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN))) { + // we are just converting to a common merge join operator. The shuffle + // join in map-reduce case. + int pos = 0; // it doesn't matter which position we use in this case. + convertJoinSMBJoin(joinOp, context, pos, 0, false, false); return null; } - JoinOperator joinOp = (JoinOperator) nd; - // if we have traits, and table info is present in the traits, we know the + // if we have traits, and table info is present in the traits, we know the // exact number of buckets. Else choose the largest number of estimated // reducers from the parent operators. int numBuckets = -1; int estimatedBuckets = -1; + TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf); if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) { for (Operator<? extends OperatorDesc>parentOp : joinOp.getParentOperators()) { if (parentOp.getOpTraits().getNumBuckets() > 0) { - numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ? - parentOp.getOpTraits().getNumBuckets() : numBuckets; + numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ? + parentOp.getOpTraits().getNumBuckets() : numBuckets; } if (parentOp instanceof ReduceSinkOperator) { ReduceSinkOperator rs = (ReduceSinkOperator)parentOp; - estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ? + estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ? rs.getConf().getNumReducers() : estimatedBuckets; } } @@ -107,29 +123,80 @@ public class ConvertJoinMapJoin implemen numBuckets = 1; } LOG.info("Estimated number of buckets " + numBuckets); - int mapJoinConversionPos = mapJoinConversionPos(joinOp, context, numBuckets); + int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets); if (mapJoinConversionPos < 0) { - // we cannot convert to bucket map join, we cannot convert to - // map join either based on the size + // we cannot convert to bucket map join, we cannot convert to + // map join either based on the size. Check if we can convert to SMB join. + if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) { + convertJoinSMBJoin(joinOp, context, 0, 0, false, false); + return null; + } + Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null; + try { + bigTableMatcherClass = + (Class<? extends BigTableSelectorForAutoSMJ>) (Class.forName(HiveConf.getVar( + context.parseContext.getConf(), + HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN_BIGTABLE_SELECTOR))); + } catch (ClassNotFoundException e) { + throw new SemanticException(e.getMessage()); + } + + BigTableSelectorForAutoSMJ bigTableMatcher = + ReflectionUtils.newInstance(bigTableMatcherClass, null); + JoinDesc joinDesc = joinOp.getConf(); + JoinCondDesc[] joinCondns = joinDesc.getConds(); + Set<Integer> joinCandidates = MapJoinProcessor.getBigTableCandidates(joinCondns); + if (joinCandidates.isEmpty()) { + // This is a full outer join. This can never be a map-join + // of any type. So return false. + return false; + } + mapJoinConversionPos = + bigTableMatcher.getBigTablePosition(context.parseContext, joinOp, joinCandidates); + if (mapJoinConversionPos < 0) { + // contains aliases from sub-query + // we are just converting to a common merge join operator. The shuffle + // join in map-reduce case. + int pos = 0; // it doesn't matter which position we use in this case. + convertJoinSMBJoin(joinOp, context, pos, 0, false, false); + return null; + } + + if (checkConvertJoinSMBJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) { + convertJoinSMBJoin(joinOp, context, mapJoinConversionPos, + tezBucketJoinProcCtx.getNumBuckets(), tezBucketJoinProcCtx.isSubQuery(), true); + } else { + // we are just converting to a common merge join operator. The shuffle + // join in map-reduce case. + int pos = 0; // it doesn't matter which position we use in this case. + convertJoinSMBJoin(joinOp, context, pos, 0, false, false); + } return null; } - if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) { - if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos)) { - return null; + if (numBuckets > 1) { + if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) { + if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) { + return null; + } } } LOG.info("Convert to non-bucketed map join"); // check if we can convert to map join no bucket scaling. - mapJoinConversionPos = mapJoinConversionPos(joinOp, context, 1); + mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1); if (mapJoinConversionPos < 0) { + // we are just converting to a common merge join operator. The shuffle + // join in map-reduce case. + int pos = 0; // it doesn't matter which position we use in this case. + convertJoinSMBJoin(joinOp, context, pos, 0, false, false); return null; } MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos); // map join operator by default has no bucket cols - mapJoinOp.setOpTraits(new OpTraits(null, -1)); + mapJoinOp.setOpTraits(new OpTraits(null, -1, null)); + mapJoinOp.setStatistics(joinOp.getStatistics()); // propagate this change till the next RS for (Operator<? extends OperatorDesc> childOp : mapJoinOp.getChildOperators()) { setAllChildrenTraitsToNull(childOp); @@ -138,11 +205,107 @@ public class ConvertJoinMapJoin implemen return null; } + // replaces the join operator with a new CommonJoinOperator, removes the + // parent reduce sinks + private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context, + int mapJoinConversionPos, int numBuckets, boolean isSubQuery, boolean adjustParentsChildren) + throws SemanticException { + ParseContext parseContext = context.parseContext; + MapJoinDesc mapJoinDesc = null; + if (adjustParentsChildren) { + mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, parseContext.getOpParseCtx(), + joinOp, parseContext.getJoinContext().get(joinOp), mapJoinConversionPos, true); + } else { + JoinDesc joinDesc = joinOp.getConf(); + // retain the original join desc in the map join. + mapJoinDesc = + new MapJoinDesc(null, null, joinDesc.getExprs(), null, null, + joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(), + joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null); + } + + @SuppressWarnings("unchecked") + CommonMergeJoinOperator mergeJoinOp = + (CommonMergeJoinOperator) OperatorFactory.get(new CommonMergeJoinDesc(numBuckets, + isSubQuery, mapJoinConversionPos, mapJoinDesc)); + OpTraits opTraits = + new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, joinOp.getOpTraits() + .getSortCols()); + mergeJoinOp.setOpTraits(opTraits); + mergeJoinOp.setStatistics(joinOp.getStatistics()); + + for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) { + int pos = parentOp.getChildOperators().indexOf(joinOp); + parentOp.getChildOperators().remove(pos); + parentOp.getChildOperators().add(pos, mergeJoinOp); + } + + for (Operator<? extends OperatorDesc> childOp : joinOp.getChildOperators()) { + int pos = childOp.getParentOperators().indexOf(joinOp); + childOp.getParentOperators().remove(pos); + childOp.getParentOperators().add(pos, mergeJoinOp); + } + + List<Operator<? extends OperatorDesc>> childOperators = mergeJoinOp.getChildOperators(); + if (childOperators == null) { + childOperators = new ArrayList<Operator<? extends OperatorDesc>>(); + mergeJoinOp.setChildOperators(childOperators); + } + + List<Operator<? extends OperatorDesc>> parentOperators = mergeJoinOp.getParentOperators(); + if (parentOperators == null) { + parentOperators = new ArrayList<Operator<? extends OperatorDesc>>(); + mergeJoinOp.setParentOperators(parentOperators); + } + + childOperators.clear(); + parentOperators.clear(); + childOperators.addAll(joinOp.getChildOperators()); + parentOperators.addAll(joinOp.getParentOperators()); + mergeJoinOp.getConf().setGenJoinKeys(false); + + if (adjustParentsChildren) { + mergeJoinOp.getConf().setGenJoinKeys(true); + List<Operator<? extends OperatorDesc>> newParentOpList = + new ArrayList<Operator<? extends OperatorDesc>>(); + for (Operator<? extends OperatorDesc> parentOp : mergeJoinOp.getParentOperators()) { + for (Operator<? extends OperatorDesc> grandParentOp : parentOp.getParentOperators()) { + grandParentOp.getChildOperators().remove(parentOp); + grandParentOp.getChildOperators().add(mergeJoinOp); + newParentOpList.add(grandParentOp); + } + } + mergeJoinOp.getParentOperators().clear(); + mergeJoinOp.getParentOperators().addAll(newParentOpList); + List<Operator<? extends OperatorDesc>> parentOps = + new ArrayList<Operator<? extends OperatorDesc>>(mergeJoinOp.getParentOperators()); + for (Operator<? extends OperatorDesc> parentOp : parentOps) { + int parentIndex = mergeJoinOp.getParentOperators().indexOf(parentOp); + if (parentIndex == mapJoinConversionPos) { + continue; + } + + // insert the dummy store operator here + DummyStoreOperator dummyStoreOp = new TezDummyStoreOperator(); + dummyStoreOp.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>()); + dummyStoreOp.setChildOperators(new ArrayList<Operator<? extends OperatorDesc>>()); + dummyStoreOp.getChildOperators().add(mergeJoinOp); + int index = parentOp.getChildOperators().indexOf(mergeJoinOp); + parentOp.getChildOperators().remove(index); + parentOp.getChildOperators().add(index, dummyStoreOp); + dummyStoreOp.getParentOperators().add(parentOp); + mergeJoinOp.getParentOperators().remove(parentIndex); + mergeJoinOp.getParentOperators().add(parentIndex, dummyStoreOp); + } + } + mergeJoinOp.cloneOriginalParentsList(mergeJoinOp.getParentOperators()); + } + private void setAllChildrenTraitsToNull(Operator<? extends OperatorDesc> currentOp) { if (currentOp instanceof ReduceSinkOperator) { return; } - currentOp.setOpTraits(new OpTraits(null, -1)); + currentOp.setOpTraits(new OpTraits(null, -1, null)); for (Operator<? extends OperatorDesc> childOp : currentOp.getChildOperators()) { if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof GroupByOperator)) { break; @@ -151,28 +314,26 @@ public class ConvertJoinMapJoin implemen } } - private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, - int bigTablePosition) throws SemanticException { - - TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf); + private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, + int bigTablePosition, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { if (!checkConvertJoinBucketMapJoin(joinOp, context, bigTablePosition, tezBucketJoinProcCtx)) { LOG.info("Check conversion to bucket map join failed."); return false; } - MapJoinOperator mapJoinOp = - convertJoinMapJoin(joinOp, context, bigTablePosition); + MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, bigTablePosition); MapJoinDesc joinDesc = mapJoinOp.getConf(); joinDesc.setBucketMapJoin(true); // we can set the traits for this join operator OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(), - tezBucketJoinProcCtx.getNumBuckets()); + tezBucketJoinProcCtx.getNumBuckets(), null); mapJoinOp.setOpTraits(opTraits); + mapJoinOp.setStatistics(joinOp.getStatistics()); setNumberOfBucketsOnChildren(mapJoinOp); - // Once the conversion is done, we can set the partitioner to bucket cols on the small table + // Once the conversion is done, we can set the partitioner to bucket cols on the small table Map<String, Integer> bigTableBucketNumMapping = new HashMap<String, Integer>(); bigTableBucketNumMapping.put(joinDesc.getBigTableAlias(), tezBucketJoinProcCtx.getNumBuckets()); joinDesc.setBigTableBucketNumMapping(bigTableBucketNumMapping); @@ -182,6 +343,54 @@ public class ConvertJoinMapJoin implemen return true; } + /* + * This method tries to convert a join to an SMB. This is done based on + * traits. If the sorted by columns are the same as the join columns then, we + * can convert the join to an SMB. Otherwise retain the bucket map join as it + * is still more efficient than a regular join. + */ + private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context, + int bigTablePosition, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { + + ReduceSinkOperator bigTableRS = + (ReduceSinkOperator) joinOp.getParentOperators().get(bigTablePosition); + int numBuckets = bigTableRS.getParentOperators().get(0).getOpTraits() + .getNumBuckets(); + + // the sort and bucket cols have to match on both sides for this + // transformation of the join operation + for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) { + if (!(parentOp instanceof ReduceSinkOperator)) { + // could be mux/demux operators. Currently not supported + LOG.info("Found correlation optimizer operators. Cannot convert to SMB at this time."); + return false; + } + ReduceSinkOperator rsOp = (ReduceSinkOperator) parentOp; + if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getSortCols(), rsOp + .getOpTraits().getSortCols(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx) == false) { + LOG.info("We cannot convert to SMB because the sort column names do not match."); + return false; + } + + if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getBucketColNames(), rsOp + .getOpTraits().getBucketColNames(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx) + == false) { + LOG.info("We cannot convert to SMB because bucket column names do not match."); + return false; + } + } + + boolean isSubQuery = false; + if (numBuckets < 0) { + isSubQuery = true; + numBuckets = bigTableRS.getConf().getNumReducers(); + } + tezBucketJoinProcCtx.setNumBuckets(numBuckets); + tezBucketJoinProcCtx.setIsSubQuery(isSubQuery); + LOG.info("We can convert the join to an SMB join."); + return true; + } + private void setNumberOfBucketsOnChildren(Operator<? extends OperatorDesc> currentOp) { int numBuckets = currentOp.getOpTraits().getNumBuckets(); for (Operator<? extends OperatorDesc>op : currentOp.getChildOperators()) { @@ -193,15 +402,13 @@ public class ConvertJoinMapJoin implemen } /* - * We perform the following checks to see if we can convert to a bucket map join - * 1. If the parent reduce sink of the big table side has the same emit key cols as - * its parent, we can create a bucket map join eliminating the reduce sink. - * 2. If we have the table information, we can check the same way as in Mapreduce to - * determine if we can perform a Bucket Map Join. + * If the parent reduce sink of the big table side has the same emit key cols + * as its parent, we can create a bucket map join eliminating the reduce sink. */ - private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp, - OptimizeTezProcContext context, int bigTablePosition, - TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { + private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp, + OptimizeTezProcContext context, int bigTablePosition, + TezBucketJoinProcCtx tezBucketJoinProcCtx) + throws SemanticException { // bail on mux-operator because mux operator masks the emit keys of the // constituent reduce sinks if (!(joinOp.getParentOperators().get(0) instanceof ReduceSinkOperator)) { @@ -211,14 +418,41 @@ public class ConvertJoinMapJoin implemen } ReduceSinkOperator rs = (ReduceSinkOperator) joinOp.getParentOperators().get(bigTablePosition); + List<List<String>> parentColNames = rs.getOpTraits().getBucketColNames(); + Operator<? extends OperatorDesc> parentOfParent = rs.getParentOperators().get(0); + List<List<String>> grandParentColNames = parentOfParent.getOpTraits().getBucketColNames(); + int numBuckets = parentOfParent.getOpTraits().getNumBuckets(); + // all keys matched. + if (checkColEquality(grandParentColNames, parentColNames, rs.getColumnExprMap(), + tezBucketJoinProcCtx) == false) { + LOG.info("No info available to check for bucket map join. Cannot convert"); + return false; + } + /* * this is the case when the big table is a sub-query and is probably - * already bucketed by the join column in say a group by operation + * already bucketed by the join column in say a group by operation */ - List<List<String>> colNames = rs.getParentOperators().get(0).getOpTraits().getBucketColNames(); - if ((colNames != null) && (colNames.isEmpty() == false)) { - Operator<? extends OperatorDesc>parentOfParent = rs.getParentOperators().get(0); - for (List<String>listBucketCols : parentOfParent.getOpTraits().getBucketColNames()) { + boolean isSubQuery = false; + if (numBuckets < 0) { + isSubQuery = true; + numBuckets = rs.getConf().getNumReducers(); + } + tezBucketJoinProcCtx.setNumBuckets(numBuckets); + tezBucketJoinProcCtx.setIsSubQuery(isSubQuery); + return true; + } + + private boolean checkColEquality(List<List<String>> grandParentColNames, + List<List<String>> parentColNames, Map<String, ExprNodeDesc> colExprMap, + TezBucketJoinProcCtx tezBucketJoinProcCtx) { + + if ((grandParentColNames == null) || (parentColNames == null)) { + return false; + } + + if ((parentColNames != null) && (parentColNames.isEmpty() == false)) { + for (List<String> listBucketCols : grandParentColNames) { // can happen if this operator does not carry forward the previous bucketing columns // for e.g. another join operator which does not carry one of the sides' key columns if (listBucketCols.isEmpty()) { @@ -226,9 +460,9 @@ public class ConvertJoinMapJoin implemen } int colCount = 0; // parent op is guaranteed to have a single list because it is a reduce sink - for (String colName : rs.getOpTraits().getBucketColNames().get(0)) { + for (String colName : parentColNames.get(0)) { // all columns need to be at least a subset of the parentOfParent's bucket cols - ExprNodeDesc exprNodeDesc = rs.getColumnExprMap().get(colName); + ExprNodeDesc exprNodeDesc = colExprMap.get(colName); if (exprNodeDesc instanceof ExprNodeColumnDesc) { if (((ExprNodeColumnDesc)exprNodeDesc).getColumn().equals(listBucketCols.get(colCount))) { colCount++; @@ -236,32 +470,21 @@ public class ConvertJoinMapJoin implemen break; } } - - if (colCount == rs.getOpTraits().getBucketColNames().get(0).size()) { - // all keys matched. - int numBuckets = parentOfParent.getOpTraits().getNumBuckets(); - boolean isSubQuery = false; - if (numBuckets < 0) { - isSubQuery = true; - numBuckets = rs.getConf().getNumReducers(); - } - tezBucketJoinProcCtx.setNumBuckets(numBuckets); - tezBucketJoinProcCtx.setIsSubQuery(isSubQuery); + + if (colCount == parentColNames.get(0).size()) { return true; } } } return false; } - - LOG.info("No info available to check for bucket map join. Cannot convert"); return false; } - public int mapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context, + public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context, int buckets) { - Set<Integer> bigTableCandidateSet = MapJoinProcessor. - getBigTableCandidates(joinOp.getConf().getConds()); + Set<Integer> bigTableCandidateSet = + MapJoinProcessor.getBigTableCandidates(joinOp.getConf().getConds()); long maxSize = context.conf.getLongVar( HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); @@ -287,7 +510,7 @@ public class ConvertJoinMapJoin implemen long inputSize = currInputStat.getDataSize(); if ((bigInputStat == null) || ((bigInputStat != null) && - (inputSize > bigInputStat.getDataSize()))) { + (inputSize > bigInputStat.getDataSize()))) { if (bigTableFound) { // cannot convert to map join; we've already chosen a big table @@ -347,9 +570,9 @@ public class ConvertJoinMapJoin implemen * for tez. */ - public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, + public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, int bigTablePosition) throws SemanticException { - // bail on mux operator because currently the mux operator masks the emit keys + // bail on mux operator because currently the mux operator masks the emit keys // of the constituent reduce sinks. for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) { if (parentOp instanceof MuxOperator) { @@ -359,12 +582,12 @@ public class ConvertJoinMapJoin implemen //can safely convert the join to a map join. ParseContext parseContext = context.parseContext; - MapJoinOperator mapJoinOp = MapJoinProcessor. - convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(), - joinOp, parseContext.getJoinContext().get(joinOp), bigTablePosition, true); + MapJoinOperator mapJoinOp = + MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(), joinOp, + parseContext.getJoinContext().get(joinOp), bigTablePosition, true); - Operator<? extends OperatorDesc> parentBigTableOp - = mapJoinOp.getParentOperators().get(bigTablePosition); + Operator<? extends OperatorDesc> parentBigTableOp = + mapJoinOp.getParentOperators().get(bigTablePosition); if (parentBigTableOp instanceof ReduceSinkOperator) { for (Operator<?> p : parentBigTableOp.getParentOperators()) { // we might have generated a dynamic partition operator chain. Since @@ -380,11 +603,10 @@ public class ConvertJoinMapJoin implemen } } mapJoinOp.getParentOperators().remove(bigTablePosition); - if (!(mapJoinOp.getParentOperators().contains( - parentBigTableOp.getParentOperators().get(0)))) { + if (!(mapJoinOp.getParentOperators().contains(parentBigTableOp.getParentOperators().get(0)))) { mapJoinOp.getParentOperators().add(bigTablePosition, parentBigTableOp.getParentOperators().get(0)); - } + } parentBigTableOp.getParentOperators().get(0).removeChild(parentBigTableOp); for (Operator<? extends OperatorDesc> op : mapJoinOp.getParentOperators()) { if (!(op.getChildOperators().contains(mapJoinOp))) { Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1627235&r1=1627234&r2=1627235&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Wed Sep 24 07:03:35 2014 @@ -389,157 +389,8 @@ public class MapJoinProcessor implements JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException { - JoinDesc desc = op.getConf(); - JoinCondDesc[] condns = desc.getConds(); - Byte[] tagOrder = desc.getTagOrder(); - - // outer join cannot be performed on a table which is being cached - if (!noCheckOuterJoin) { - if (checkMapJoin(mapJoinPos, condns) < 0) { - throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg()); - } - } - - // Walk over all the sources (which are guaranteed to be reduce sink - // operators). - // The join outputs a concatenation of all the inputs. - QBJoinTree leftSrc = joinTree.getJoinSrc(); - List<ReduceSinkOperator> oldReduceSinkParentOps = - new ArrayList<ReduceSinkOperator>(op.getNumParent()); - if (leftSrc != null) { - // assert mapJoinPos == 0; - Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0); - assert parentOp.getParentOperators().size() == 1; - oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp); - } - - - byte pos = 0; - for (String src : joinTree.getBaseSrc()) { - if (src != null) { - Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos); - assert parentOp.getParentOperators().size() == 1; - oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp); - } - pos++; - } - - Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap(); - List<ColumnInfo> schema = new ArrayList<ColumnInfo>(op.getSchema().getSignature()); - Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs(); - Map<Byte, List<ExprNodeDesc>> newValueExprs = new HashMap<Byte, List<ExprNodeDesc>>(); - for (Map.Entry<Byte, List<ExprNodeDesc>> entry : valueExprs.entrySet()) { - byte tag = entry.getKey(); - Operator<?> terminal = oldReduceSinkParentOps.get(tag); - - List<ExprNodeDesc> values = entry.getValue(); - List<ExprNodeDesc> newValues = ExprNodeDescUtils.backtrack(values, op, terminal); - newValueExprs.put(tag, newValues); - for (int i = 0; i < schema.size(); i++) { - ColumnInfo column = schema.get(i); - if (column == null) { - continue; - } - ExprNodeDesc expr = colExprMap.get(column.getInternalName()); - int index = ExprNodeDescUtils.indexOf(expr, values); - if (index >= 0) { - colExprMap.put(column.getInternalName(), newValues.get(index)); - schema.set(i, null); - } - } - } - - // rewrite value index for mapjoin - Map<Byte, int[]> valueIndices = new HashMap<Byte, int[]>(); - - // get the join keys from old parent ReduceSink operators - Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>(); - - // construct valueTableDescs and valueFilteredTableDescs - List<TableDesc> valueTableDescs = new ArrayList<TableDesc>(); - List<TableDesc> valueFilteredTableDescs = new ArrayList<TableDesc>(); - int[][] filterMap = desc.getFilterMap(); - for (pos = 0; pos < op.getParentOperators().size(); pos++) { - ReduceSinkOperator inputRS = oldReduceSinkParentOps.get(pos); - List<ExprNodeDesc> keyCols = inputRS.getConf().getKeyCols(); - List<ExprNodeDesc> valueCols = newValueExprs.get(pos); - if (pos != mapJoinPos) { - // remove values in key exprs for value table schema - // value expression for hashsink will be modified in LocalMapJoinProcessor - int[] valueIndex = new int[valueCols.size()]; - List<ExprNodeDesc> valueColsInValueExpr = new ArrayList<ExprNodeDesc>(); - for (int i = 0; i < valueIndex.length; i++) { - ExprNodeDesc expr = valueCols.get(i); - int kindex = ExprNodeDescUtils.indexOf(expr, keyCols); - if (kindex >= 0) { - valueIndex[i] = kindex; - } else { - valueIndex[i] = -valueColsInValueExpr.size() - 1; - valueColsInValueExpr.add(expr); - } - } - if (needValueIndex(valueIndex)) { - valueIndices.put(pos, valueIndex); - } - valueCols = valueColsInValueExpr; - } - // deep copy expr node desc - List<ExprNodeDesc> valueFilteredCols = ExprNodeDescUtils.clone(valueCols); - if (filterMap != null && filterMap[pos] != null && pos != mapJoinPos) { - ExprNodeColumnDesc isFilterDesc = new ExprNodeColumnDesc(TypeInfoFactory - .getPrimitiveTypeInfo(serdeConstants.SMALLINT_TYPE_NAME), "filter", "filter", false); - valueFilteredCols.add(isFilterDesc); - } - - TableDesc valueTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils - .getFieldSchemasFromColumnList(valueCols, "mapjoinvalue")); - TableDesc valueFilteredTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils - .getFieldSchemasFromColumnList(valueFilteredCols, "mapjoinvalue")); - - valueTableDescs.add(valueTableDesc); - valueFilteredTableDescs.add(valueFilteredTableDesc); - - keyExprMap.put(pos, keyCols); - } - - Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters(); - Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>(); - for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) { - byte srcTag = entry.getKey(); - List<ExprNodeDesc> filter = entry.getValue(); - - Operator<?> terminal = oldReduceSinkParentOps.get(srcTag); - newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal)); - } - desc.setFilters(filters = newFilters); - - // create dumpfile prefix needed to create descriptor - String dumpFilePrefix = ""; - if( joinTree.getMapAliases() != null ) { - for(String mapAlias : joinTree.getMapAliases()) { - dumpFilePrefix = dumpFilePrefix + mapAlias; - } - dumpFilePrefix = dumpFilePrefix+"-"+PlanUtils.getCountForMapJoinDumpFilePrefix(); - } else { - dumpFilePrefix = "mapfile"+PlanUtils.getCountForMapJoinDumpFilePrefix(); - } - - List<ExprNodeDesc> keyCols = keyExprMap.get((byte)mapJoinPos); - - List<String> outputColumnNames = op.getConf().getOutputColumnNames(); - TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(hconf, - PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX)); - JoinCondDesc[] joinCondns = op.getConf().getConds(); - MapJoinDesc mapJoinDescriptor = new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs, - valueTableDescs, valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns, - filters, op.getConf().getNoOuterJoin(), dumpFilePrefix); - mapJoinDescriptor.setStatistics(op.getConf().getStatistics()); - mapJoinDescriptor.setTagOrder(tagOrder); - mapJoinDescriptor.setNullSafes(desc.getNullSafes()); - mapJoinDescriptor.setFilterMap(desc.getFilterMap()); - if (!valueIndices.isEmpty()) { - mapJoinDescriptor.setValueIndices(valueIndices); - } + MapJoinDesc mapJoinDescriptor = + getMapJoinDesc(hconf, opParseCtxMap, op, joinTree, mapJoinPos, noCheckOuterJoin); // reduce sink row resolver used to generate map join op RowResolver outputRS = opParseCtxMap.get(op).getRowResolver(); @@ -551,6 +402,7 @@ public class MapJoinProcessor implements opParseCtxMap.put(mapJoinOp, ctx); mapJoinOp.getConf().setReversedExprs(op.getConf().getReversedExprs()); + Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap(); mapJoinOp.setColumnExprMap(colExprMap); List<Operator<? extends OperatorDesc>> childOps = op.getChildOperators(); @@ -1176,4 +1028,168 @@ public class MapJoinProcessor implements } } + + public static MapJoinDesc getMapJoinDesc(HiveConf hconf, + LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap, + JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException { + JoinDesc desc = op.getConf(); + JoinCondDesc[] condns = desc.getConds(); + Byte[] tagOrder = desc.getTagOrder(); + + // outer join cannot be performed on a table which is being cached + if (!noCheckOuterJoin) { + if (checkMapJoin(mapJoinPos, condns) < 0) { + throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg()); + } + } + + // Walk over all the sources (which are guaranteed to be reduce sink + // operators). + // The join outputs a concatenation of all the inputs. + QBJoinTree leftSrc = joinTree.getJoinSrc(); + List<ReduceSinkOperator> oldReduceSinkParentOps = + new ArrayList<ReduceSinkOperator>(op.getNumParent()); + if (leftSrc != null) { + // assert mapJoinPos == 0; + Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0); + assert parentOp.getParentOperators().size() == 1; + oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp); + } + + byte pos = 0; + for (String src : joinTree.getBaseSrc()) { + if (src != null) { + Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos); + assert parentOp.getParentOperators().size() == 1; + oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp); + } + pos++; + } + + Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap(); + List<ColumnInfo> schema = new ArrayList<ColumnInfo>(op.getSchema().getSignature()); + Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs(); + Map<Byte, List<ExprNodeDesc>> newValueExprs = new HashMap<Byte, List<ExprNodeDesc>>(); + for (Map.Entry<Byte, List<ExprNodeDesc>> entry : valueExprs.entrySet()) { + byte tag = entry.getKey(); + Operator<?> terminal = oldReduceSinkParentOps.get(tag); + + List<ExprNodeDesc> values = entry.getValue(); + List<ExprNodeDesc> newValues = ExprNodeDescUtils.backtrack(values, op, terminal); + newValueExprs.put(tag, newValues); + for (int i = 0; i < schema.size(); i++) { + ColumnInfo column = schema.get(i); + if (column == null) { + continue; + } + ExprNodeDesc expr = colExprMap.get(column.getInternalName()); + int index = ExprNodeDescUtils.indexOf(expr, values); + if (index >= 0) { + colExprMap.put(column.getInternalName(), newValues.get(index)); + schema.set(i, null); + } + } + } + + // rewrite value index for mapjoin + Map<Byte, int[]> valueIndices = new HashMap<Byte, int[]>(); + + // get the join keys from old parent ReduceSink operators + Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>(); + + // construct valueTableDescs and valueFilteredTableDescs + List<TableDesc> valueTableDescs = new ArrayList<TableDesc>(); + List<TableDesc> valueFilteredTableDescs = new ArrayList<TableDesc>(); + int[][] filterMap = desc.getFilterMap(); + for (pos = 0; pos < op.getParentOperators().size(); pos++) { + ReduceSinkOperator inputRS = oldReduceSinkParentOps.get(pos); + List<ExprNodeDesc> keyCols = inputRS.getConf().getKeyCols(); + List<ExprNodeDesc> valueCols = newValueExprs.get(pos); + if (pos != mapJoinPos) { + // remove values in key exprs for value table schema + // value expression for hashsink will be modified in + // LocalMapJoinProcessor + int[] valueIndex = new int[valueCols.size()]; + List<ExprNodeDesc> valueColsInValueExpr = new ArrayList<ExprNodeDesc>(); + for (int i = 0; i < valueIndex.length; i++) { + ExprNodeDesc expr = valueCols.get(i); + int kindex = ExprNodeDescUtils.indexOf(expr, keyCols); + if (kindex >= 0) { + valueIndex[i] = kindex; + } else { + valueIndex[i] = -valueColsInValueExpr.size() - 1; + valueColsInValueExpr.add(expr); + } + } + if (needValueIndex(valueIndex)) { + valueIndices.put(pos, valueIndex); + } + valueCols = valueColsInValueExpr; + } + // deep copy expr node desc + List<ExprNodeDesc> valueFilteredCols = ExprNodeDescUtils.clone(valueCols); + if (filterMap != null && filterMap[pos] != null && pos != mapJoinPos) { + ExprNodeColumnDesc isFilterDesc = + new ExprNodeColumnDesc( + TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.SMALLINT_TYPE_NAME), "filter", + "filter", false); + valueFilteredCols.add(isFilterDesc); + } + + TableDesc valueTableDesc = + PlanUtils.getMapJoinValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(valueCols, + "mapjoinvalue")); + TableDesc valueFilteredTableDesc = + PlanUtils.getMapJoinValueTableDesc(PlanUtils.getFieldSchemasFromColumnList( + valueFilteredCols, "mapjoinvalue")); + + valueTableDescs.add(valueTableDesc); + valueFilteredTableDescs.add(valueFilteredTableDesc); + + keyExprMap.put(pos, keyCols); + } + + Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters(); + Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>(); + for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) { + byte srcTag = entry.getKey(); + List<ExprNodeDesc> filter = entry.getValue(); + + Operator<?> terminal = oldReduceSinkParentOps.get(srcTag); + newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal)); + } + desc.setFilters(filters = newFilters); + + // create dumpfile prefix needed to create descriptor + String dumpFilePrefix = ""; + if (joinTree.getMapAliases() != null) { + for (String mapAlias : joinTree.getMapAliases()) { + dumpFilePrefix = dumpFilePrefix + mapAlias; + } + dumpFilePrefix = dumpFilePrefix + "-" + PlanUtils.getCountForMapJoinDumpFilePrefix(); + } else { + dumpFilePrefix = "mapfile" + PlanUtils.getCountForMapJoinDumpFilePrefix(); + } + + List<ExprNodeDesc> keyCols = keyExprMap.get((byte) mapJoinPos); + + List<String> outputColumnNames = op.getConf().getOutputColumnNames(); + TableDesc keyTableDesc = + PlanUtils.getMapJoinKeyTableDesc(hconf, + PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX)); + JoinCondDesc[] joinCondns = op.getConf().getConds(); + MapJoinDesc mapJoinDescriptor = + new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs, valueTableDescs, + valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns, filters, op + .getConf().getNoOuterJoin(), dumpFilePrefix); + mapJoinDescriptor.setStatistics(op.getConf().getStatistics()); + mapJoinDescriptor.setTagOrder(tagOrder); + mapJoinDescriptor.setNullSafes(desc.getNullSafes()); + mapJoinDescriptor.setFilterMap(desc.getFilterMap()); + if (!valueIndices.isEmpty()) { + mapJoinDescriptor.setValueIndices(valueIndices); + } + + return mapJoinDescriptor; + } } Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java?rev=1627235&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java Wed Sep 24 07:03:35 2014 @@ -0,0 +1,100 @@ +package org.apache.hadoop.hive.ql.optimizer; + +import java.util.HashMap; +import java.util.Map; +import java.util.Stack; + +import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; +import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.parse.GenTezProcContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; +import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.TezWork.VertexType; + +public class MergeJoinProc implements NodeProcessor { + + public Operator<? extends OperatorDesc> getLeafOperator(Operator<? extends OperatorDesc> op) { + for (Operator<? extends OperatorDesc> childOp : op.getChildOperators()) { + // FileSink or ReduceSink operators are used to create vertices. See + // TezCompiler. + if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof FileSinkOperator)) { + return childOp; + } else { + return getLeafOperator(childOp); + } + } + + return null; + } + + @Override + public Object + process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) + throws SemanticException { + GenTezProcContext context = (GenTezProcContext) procCtx; + CommonMergeJoinOperator mergeJoinOp = (CommonMergeJoinOperator) nd; + if (stack.size() < 2 || !(stack.get(stack.size() - 2) instanceof DummyStoreOperator)) { + context.currentMergeJoinOperator = mergeJoinOp; + return null; + } + + TezWork tezWork = context.currentTask.getWork(); + @SuppressWarnings("unchecked") + Operator<? extends OperatorDesc> parentOp = + (Operator<? extends OperatorDesc>) ((stack.get(stack.size() - 2))); + // Guaranteed to be just 1 because each DummyStoreOperator can be part of only one work. + BaseWork parentWork = context.childToWorkMap.get(parentOp).get(0); + + + // we need to set the merge work that has been created as part of the dummy store walk. If a + // merge work already exists for this merge join operator, add the dummy store work to the + // merge work. Else create a merge work, add above work to the merge work + MergeJoinWork mergeWork = null; + if (context.opMergeJoinWorkMap.containsKey(getLeafOperator(mergeJoinOp))) { + // we already have the merge work corresponding to this merge join operator + mergeWork = context.opMergeJoinWorkMap.get(getLeafOperator(mergeJoinOp)); + } else { + mergeWork = new MergeJoinWork(); + tezWork.add(mergeWork); + context.opMergeJoinWorkMap.put(getLeafOperator(mergeJoinOp), mergeWork); + } + + mergeWork.setMergeJoinOperator(mergeJoinOp); + mergeWork.addMergedWork(null, parentWork); + tezWork.setVertexType(mergeWork, VertexType.MULTI_INPUT_UNINITIALIZED_EDGES); + + for (BaseWork grandParentWork : tezWork.getParents(parentWork)) { + parentWork.setName(grandParentWork.getName()); + TezEdgeProperty edgeProp = tezWork.getEdgeProperty(grandParentWork, parentWork); + tezWork.disconnect(grandParentWork, parentWork); + tezWork.connect(grandParentWork, mergeWork, edgeProp); + } + + for (BaseWork childWork : tezWork.getChildren(parentWork)) { + TezEdgeProperty edgeProp = tezWork.getEdgeProperty(parentWork, childWork); + tezWork.disconnect(parentWork, childWork); + tezWork.connect(mergeWork, childWork, edgeProp); + } + + tezWork.remove(parentWork); + + DummyStoreOperator dummyOp = (DummyStoreOperator) (stack.get(stack.size() - 2)); + + parentWork.setTag(mergeJoinOp.getTagForOperator(dummyOp)); + + mergeJoinOp.getParentOperators().remove(dummyOp); + dummyOp.getChildOperators().clear(); + + return true; + } +} Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1627235&r1=1627234&r2=1627235&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Wed Sep 24 07:03:35 2014 @@ -51,7 +51,12 @@ public class Optimizer { * @param hiveConf */ public void initialize(HiveConf hiveConf) { + + boolean isTezExecEngine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez"); + boolean bucketMapJoinOptimizer = false; + transformations = new ArrayList<Transform>(); + // Add the transformation that computes the lineage information. transformations.add(new Generator()); if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) { @@ -81,15 +86,16 @@ public class Optimizer { } transformations.add(new SamplePruner()); transformations.add(new MapJoinProcessor()); - boolean bucketMapJoinOptimizer = false; - if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) { + + if ((HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) && !isTezExecEngine) { transformations.add(new BucketMapJoinOptimizer()); bucketMapJoinOptimizer = true; } // If optimize hive.optimize.bucketmapjoin.sortedmerge is set, add both // BucketMapJoinOptimizer and SortedMergeBucketMapJoinOptimizer - if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) { + if ((HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) + && !isTezExecEngine) { if (!bucketMapJoinOptimizer) { // No need to add BucketMapJoinOptimizer twice transformations.add(new BucketMapJoinOptimizer()); @@ -119,7 +125,7 @@ public class Optimizer { if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION) && !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGROUPBYSKEW) && !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME) && - !HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + !isTezExecEngine) { transformations.add(new CorrelationOptimizer()); } if (HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVELIMITPUSHDOWNMEMORYUSAGE) > 0) { @@ -128,8 +134,7 @@ public class Optimizer { if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES)) { transformations.add(new StatsOptimizer()); } - if (pctx.getContext().getExplain() - && !HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + if (pctx.getContext().getExplain() && !isTezExecEngine) { transformations.add(new AnnotateWithStatistics()); transformations.add(new AnnotateWithOpTraits()); } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1627235&r1=1627234&r2=1627235&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Wed Sep 24 07:03:35 2014 @@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.plan.Ta import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.TezWork.VertexType; import org.apache.hadoop.hive.ql.stats.StatsUtils; public class ReduceSinkMapJoinProc implements NodeProcessor { @@ -183,7 +184,10 @@ public class ReduceSinkMapJoinProc imple TezWork tezWork = context.currentTask.getWork(); LOG.debug("connecting "+parentWork.getName()+" with "+myWork.getName()); tezWork.connect(parentWork, myWork, edgeProp); - + if (edgeType == EdgeType.CUSTOM_EDGE) { + tezWork.setVertexType(myWork, VertexType.INITIALIZED_EDGES); + } + ReduceSinkOperator r = null; if (parentRS.getConf().getOutputName() != null) { LOG.debug("Cloning reduce sink for multi-child broadcast edge"); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java?rev=1627235&r1=1627234&r2=1627235&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java Wed Sep 24 07:03:35 2014 @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map.Entry; import java.util.Stack; +import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -104,7 +105,12 @@ public class OpTraitsRulesProcFactory { List<List<String>> listBucketCols = new ArrayList<List<String>>(); listBucketCols.add(bucketCols); - OpTraits opTraits = new OpTraits(listBucketCols, -1); + int numBuckets = -1; + OpTraits parentOpTraits = rs.getParentOperators().get(0).getConf().getOpTraits(); + if (parentOpTraits != null) { + numBuckets = parentOpTraits.getNumBuckets(); + } + OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listBucketCols); rs.setOpTraits(opTraits); return null; } @@ -163,15 +169,21 @@ public class OpTraitsRulesProcFactory { } catch (HiveException e) { prunedPartList = null; } - boolean bucketMapJoinConvertible = checkBucketedTable(table, + boolean isBucketed = checkBucketedTable(table, opTraitsCtx.getParseContext(), prunedPartList); - List<List<String>>bucketCols = new ArrayList<List<String>>(); + List<List<String>> bucketColsList = new ArrayList<List<String>>(); + List<List<String>> sortedColsList = new ArrayList<List<String>>(); int numBuckets = -1; - if (bucketMapJoinConvertible) { - bucketCols.add(table.getBucketCols()); + if (isBucketed) { + bucketColsList.add(table.getBucketCols()); numBuckets = table.getNumBuckets(); + List<String> sortCols = new ArrayList<String>(); + for (Order colSortOrder : table.getSortCols()) { + sortCols.add(colSortOrder.getCol()); + } + sortedColsList.add(sortCols); } - OpTraits opTraits = new OpTraits(bucketCols, numBuckets); + OpTraits opTraits = new OpTraits(bucketColsList, numBuckets, sortedColsList); ts.setOpTraits(opTraits); return null; } @@ -197,7 +209,7 @@ public class OpTraitsRulesProcFactory { List<List<String>> listBucketCols = new ArrayList<List<String>>(); listBucketCols.add(gbyKeys); - OpTraits opTraits = new OpTraits(listBucketCols, -1); + OpTraits opTraits = new OpTraits(listBucketCols, -1, listBucketCols); gbyOp.setOpTraits(opTraits); return null; } @@ -205,22 +217,17 @@ public class OpTraitsRulesProcFactory { public static class SelectRule implements NodeProcessor { - @Override - public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, - Object... nodeOutputs) throws SemanticException { - SelectOperator selOp = (SelectOperator)nd; - List<List<String>> parentBucketColNames = - selOp.getParentOperators().get(0).getOpTraits().getBucketColNames(); - + public List<List<String>> getConvertedColNames(List<List<String>> parentColNames, + SelectOperator selOp) { List<List<String>> listBucketCols = new ArrayList<List<String>>(); if (selOp.getColumnExprMap() != null) { - if (parentBucketColNames != null) { - for (List<String> colNames : parentBucketColNames) { + if (parentColNames != null) { + for (List<String> colNames : parentColNames) { List<String> bucketColNames = new ArrayList<String>(); for (String colName : colNames) { for (Entry<String, ExprNodeDesc> entry : selOp.getColumnExprMap().entrySet()) { if (entry.getValue() instanceof ExprNodeColumnDesc) { - if(((ExprNodeColumnDesc)(entry.getValue())).getColumn().equals(colName)) { + if (((ExprNodeColumnDesc) (entry.getValue())).getColumn().equals(colName)) { bucketColNames.add(entry.getKey()); } } @@ -231,11 +238,34 @@ public class OpTraitsRulesProcFactory { } } + return listBucketCols; + } + + @Override + public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + SelectOperator selOp = (SelectOperator)nd; + List<List<String>> parentBucketColNames = + selOp.getParentOperators().get(0).getOpTraits().getBucketColNames(); + + List<List<String>> listBucketCols = null; + List<List<String>> listSortCols = null; + if (selOp.getColumnExprMap() != null) { + if (parentBucketColNames != null) { + listBucketCols = getConvertedColNames(parentBucketColNames, selOp); + } + List<List<String>> parentSortColNames = selOp.getParentOperators().get(0).getOpTraits() + .getSortCols(); + if (parentSortColNames != null) { + listSortCols = getConvertedColNames(parentSortColNames, selOp); + } + } + int numBuckets = -1; if (selOp.getParentOperators().get(0).getOpTraits() != null) { numBuckets = selOp.getParentOperators().get(0).getOpTraits().getNumBuckets(); } - OpTraits opTraits = new OpTraits(listBucketCols, numBuckets); + OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listSortCols); selOp.setOpTraits(opTraits); return null; } @@ -248,6 +278,7 @@ public class OpTraitsRulesProcFactory { Object... nodeOutputs) throws SemanticException { JoinOperator joinOp = (JoinOperator)nd; List<List<String>> bucketColsList = new ArrayList<List<String>>(); + List<List<String>> sortColsList = new ArrayList<List<String>>(); byte pos = 0; for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) { if (!(parentOp instanceof ReduceSinkOperator)) { @@ -259,26 +290,24 @@ public class OpTraitsRulesProcFactory { ReduceSinkRule rsRule = new ReduceSinkRule(); rsRule.process(rsOp, stack, procCtx, nodeOutputs); } - bucketColsList.add(getOutputColNames(joinOp, rsOp, pos)); + bucketColsList.add(getOutputColNames(joinOp, rsOp.getOpTraits().getBucketColNames(), pos)); + sortColsList.add(getOutputColNames(joinOp, rsOp.getOpTraits().getSortCols(), pos)); pos++; } - joinOp.setOpTraits(new OpTraits(bucketColsList, -1)); + joinOp.setOpTraits(new OpTraits(bucketColsList, -1, bucketColsList)); return null; } - private List<String> getOutputColNames(JoinOperator joinOp, - ReduceSinkOperator rs, byte pos) { - List<List<String>> parentBucketColNames = - rs.getOpTraits().getBucketColNames(); - - if (parentBucketColNames != null) { + private List<String> getOutputColNames(JoinOperator joinOp, List<List<String>> parentColNames, + byte pos) { + if (parentColNames != null) { List<String> bucketColNames = new ArrayList<String>(); // guaranteed that there is only 1 list within this list because // a reduce sink always brings down the bucketing cols to a single list. // may not be true with correlation operators (mux-demux) - List<String> colNames = parentBucketColNames.get(0); + List<String> colNames = parentColNames.get(0); for (String colName : colNames) { for (ExprNodeDesc exprNode : joinOp.getConf().getExprs().get(pos)) { if (exprNode instanceof ExprNodeColumnDesc) { @@ -317,7 +346,7 @@ public class OpTraitsRulesProcFactory { @Override public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { - OpTraits opTraits = new OpTraits(null, -1); + OpTraits opTraits = new OpTraits(null, -1, null); @SuppressWarnings("unchecked") Operator<? extends OperatorDesc> operator = (Operator<? extends OperatorDesc>)nd; operator.setOpTraits(opTraits); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java?rev=1627235&r1=1627234&r2=1627235&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java Wed Sep 24 07:03:35 2014 @@ -32,6 +32,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.ConditionalTask; +import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -56,6 +57,7 @@ import org.apache.hadoop.hive.ql.plan.Ex import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; @@ -152,6 +154,11 @@ public class CrossProductCheck implement private void checkMapJoins(TezWork tzWrk) throws SemanticException { for(BaseWork wrk : tzWrk.getAllWork() ) { + + if ( wrk instanceof MergeJoinWork ) { + wrk = ((MergeJoinWork)wrk).getMainWork(); + } + List<String> warnings = new MapJoinCheck(wrk.getName()).analyze(wrk); if ( !warnings.isEmpty() ) { for(String w : warnings) { @@ -163,12 +170,17 @@ public class CrossProductCheck implement private void checkTezReducer(TezWork tzWrk) throws SemanticException { for(BaseWork wrk : tzWrk.getAllWork() ) { - if ( !(wrk instanceof ReduceWork) ) { + + if ( wrk instanceof MergeJoinWork ) { + wrk = ((MergeJoinWork)wrk).getMainWork(); + } + + if ( !(wrk instanceof ReduceWork ) ) { continue; } ReduceWork rWork = (ReduceWork) wrk; Operator<? extends OperatorDesc> reducer = ((ReduceWork)wrk).getReducer(); - if ( reducer instanceof JoinOperator ) { + if ( reducer instanceof JoinOperator || reducer instanceof CommonMergeJoinOperator ) { Map<Integer, ExtractReduceSinkInfo.Info> rsInfo = new HashMap<Integer, ExtractReduceSinkInfo.Info>(); for(Map.Entry<Integer, String> e : rWork.getTagToInput().entrySet()) { @@ -185,7 +197,7 @@ public class CrossProductCheck implement return; } Operator<? extends OperatorDesc> reducer = rWrk.getReducer(); - if ( reducer instanceof JoinOperator ) { + if ( reducer instanceof JoinOperator|| reducer instanceof CommonMergeJoinOperator ) { BaseWork prntWork = mrWrk.getMapWork(); checkForCrossProduct(taskName, reducer, new ExtractReduceSinkInfo(null).analyze(prntWork)); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1627235&r1=1627234&r2=1627235&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Wed Sep 24 07:03:35 2014 @@ -29,6 +29,7 @@ import java.util.Set; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; +import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; @@ -45,6 +46,7 @@ import org.apache.hadoop.hive.ql.lib.Nod import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; @@ -132,6 +134,8 @@ public class GenTezProcContext implement // remember which reducesinks we've already connected public final Set<ReduceSinkOperator> connectedReduceSinks; + public final Map<Operator<?>, MergeJoinWork> opMergeJoinWorkMap; + public CommonMergeJoinOperator currentMergeJoinOperator; // remember the event operators we've seen public final Set<AppMasterEventOperator> eventOperatorSet; @@ -176,6 +180,8 @@ public class GenTezProcContext implement this.eventOperatorSet = new LinkedHashSet<AppMasterEventOperator>(); this.abandonedEventOperatorSet = new LinkedHashSet<AppMasterEventOperator>(); this.tsToEventMap = new LinkedHashMap<TableScanOperator, List<AppMasterEventOperator>>(); + this.opMergeJoinWorkMap = new LinkedHashMap<Operator<?>, MergeJoinWork>(); + this.currentMergeJoinOperator = null; rootTasks.add(currentTask); } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java?rev=1627235&r1=1627234&r2=1627235&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java Wed Sep 24 07:03:35 2014 @@ -167,7 +167,8 @@ public class GenTezUtils { GenMapRedUtils.setKeyAndValueDesc(reduceWork, reduceSink); // remember which parent belongs to which tag - reduceWork.getTagToInput().put(reduceSink.getConf().getTag(), + int tag = reduceSink.getConf().getTag(); + reduceWork.getTagToInput().put(tag == -1 ? 0 : tag, context.preceedingWork.getName()); // remember the output name of the reduce sink
