Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Tue Oct 14 19:06:45 2014 @@ -146,6 +146,7 @@ public abstract class Operator<T extends /** * Implements the getChildren function for the Node Interface. */ + @Override public ArrayList<Node> getChildren() { if (getChildOperators() == null) { @@ -497,8 +498,6 @@ public abstract class Operator<T extends LOG.debug("Starting group for children:"); for (Operator<? extends OperatorDesc> op : childOperators) { - op.setGroupKeyObjectInspector(groupKeyOI); - op.setGroupKeyObject(groupKeyObject); op.startGroup(); } @@ -851,6 +850,7 @@ public abstract class Operator<T extends * * @return the name of the operator */ + @Override public String getName() { return getOperatorName(); } @@ -968,7 +968,6 @@ public abstract class Operator<T extends } protected transient Object groupKeyObject; - protected transient ObjectInspector groupKeyOI; public String getOperatorId() { return operatorId; @@ -1061,7 +1060,7 @@ public abstract class Operator<T extends if (parents != null) { for (Operator<? extends OperatorDesc> parent : parents) { - parentClones.add((Operator<? extends OperatorDesc>)(parent.clone())); + parentClones.add((parent.clone())); } } @@ -1082,8 +1081,8 @@ public abstract class Operator<T extends public Operator<? extends OperatorDesc> cloneOp() throws CloneNotSupportedException { T descClone = (T) conf.clone(); Operator<? extends OperatorDesc> ret = - (Operator<? extends OperatorDesc>) OperatorFactory.getAndMakeChild( - descClone, getSchema()); + OperatorFactory.getAndMakeChild( + descClone, getSchema()); return ret; } @@ -1254,15 +1253,15 @@ public abstract class Operator<T extends } return null; } - + public OpTraits getOpTraits() { if (conf != null) { return conf.getOpTraits(); } - + return null; } - + public void setOpTraits(OpTraits metaInfo) { if (LOG.isDebugEnabled()) { LOG.debug("Setting traits ("+metaInfo+") on "+this); @@ -1285,21 +1284,23 @@ public abstract class Operator<T extends } } - public void setGroupKeyObjectInspector(ObjectInspector keyObjectInspector) { - this.groupKeyOI = keyObjectInspector; - } - - public ObjectInspector getGroupKeyObjectInspector() { - return groupKeyOI; - } - public static Operator createDummy() { return new DummyOperator(); } private static class DummyOperator extends Operator { public DummyOperator() { super("dummy"); } + @Override public void processOp(Object row, int tag) { } + @Override public OperatorType getType() { return null; } } + + public Map<Integer, DummyStoreOperator> getTagToOperatorTree() { + if ((parentOperators == null) || (parentOperators.size() == 0)) { + return null; + } + Map<Integer, DummyStoreOperator> dummyOps = parentOperators.get(0).getTagToOperatorTree(); + return dummyOps; + } }
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Tue Oct 14 19:06:45 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec; +import org.apache.hadoop.hive.ql.exec.vector.VectorAppMasterEventOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorExtractOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator; @@ -31,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.ve import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc; import org.apache.hadoop.hive.ql.plan.CollectDesc; +import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; import org.apache.hadoop.hive.ql.plan.DemuxDesc; import org.apache.hadoop.hive.ql.plan.DummyStoreDesc; import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; @@ -114,10 +116,16 @@ public final class OperatorFactory { RCFileMergeOperator.class)); opvec.add(new OpTuple<OrcFileMergeDesc>(OrcFileMergeDesc.class, OrcFileMergeOperator.class)); + opvec.add(new OpTuple<CommonMergeJoinDesc>(CommonMergeJoinDesc.class, + CommonMergeJoinOperator.class)); } static { vectorOpvec = new ArrayList<OpTuple>(); + vectorOpvec.add(new OpTuple<AppMasterEventDesc>(AppMasterEventDesc.class, + VectorAppMasterEventOperator.class)); + vectorOpvec.add(new OpTuple<DynamicPruningEventDesc>(DynamicPruningEventDesc.class, + VectorAppMasterEventOperator.class)); vectorOpvec.add(new OpTuple<SelectDesc>(SelectDesc.class, VectorSelectOperator.class)); vectorOpvec.add(new OpTuple<GroupByDesc>(GroupByDesc.class, VectorGroupByOperator.class)); vectorOpvec.add(new OpTuple<MapJoinDesc>(MapJoinDesc.class, VectorMapJoinOperator.class)); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java Tue Oct 14 19:06:45 2014 @@ -46,6 +46,9 @@ public class OperatorUtils { public static <T> Set<T> findOperators(Collection<Operator<?>> starts, Class<T> clazz) { Set<T> found = new HashSet<T>(); for (Operator<?> start : starts) { + if (start == null) { + continue; + } findOperators(start, clazz, found); } return found; Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java Tue Oct 14 19:06:45 2014 @@ -64,6 +64,7 @@ public class OrcFileMergeOperator extend private void processKeyValuePairs(Object key, Object value) throws HiveException { + String filePath = ""; try { OrcFileValueWrapper v; OrcFileKeyWrapper k; @@ -72,6 +73,7 @@ public class OrcFileMergeOperator extend } else { k = (OrcFileKeyWrapper) key; } + filePath = k.getInputPath().toUri().getPath(); fixTmpPath(k.getInputPath().getParent()); @@ -131,6 +133,16 @@ public class OrcFileMergeOperator extend this.exception = true; closeOp(true); throw new HiveException(e); + } finally { + if (fdis != null) { + try { + fdis.close(); + } catch (IOException e) { + throw new HiveException(String.format("Unable to close file %s", filePath), e); + } finally { + fdis = null; + } + } } } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Tue Oct 14 19:06:45 2014 @@ -337,17 +337,20 @@ public class PTFOperator extends Operato handleOutputRows(tabFn.finishPartition()); } else { if ( tabFn.canIterateOutput() ) { - outputPartRowsItr = tabFn.iterator(inputPart.iterator()); + outputPartRowsItr = inputPart == null ? null : + tabFn.iterator(inputPart.iterator()); } else { - outputPart = tabFn.execute(inputPart); - outputPartRowsItr = outputPart.iterator(); + outputPart = inputPart == null ? null : tabFn.execute(inputPart); + outputPartRowsItr = outputPart == null ? null : outputPart.iterator(); } if ( next != null ) { if (!next.isStreaming() && !isOutputIterator() ) { next.inputPart = outputPart; } else { - while(outputPartRowsItr.hasNext() ) { - next.processRow(outputPartRowsItr.next()); + if ( outputPartRowsItr != null ) { + while(outputPartRowsItr.hasNext() ) { + next.processRow(outputPartRowsItr.next()); + } } } } @@ -357,8 +360,10 @@ public class PTFOperator extends Operato next.finishPartition(); } else { if (!isStreaming() ) { - while(outputPartRowsItr.hasNext() ) { - forward(outputPartRowsItr.next(), outputObjInspector); + if ( outputPartRowsItr != null ) { + while(outputPartRowsItr.hasNext() ) { + forward(outputPartRowsItr.next(), outputObjInspector); + } } } } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Tue Oct 14 19:06:45 2014 @@ -50,7 +50,6 @@ import org.apache.hadoop.hive.serde2.obj import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.io.BinaryComparable; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.OutputCollector; @@ -67,6 +66,9 @@ public class ReduceSinkOperator extends } private static final Log LOG = LogFactory.getLog(ReduceSinkOperator.class.getName()); + private static final boolean isInfoEnabled = LOG.isInfoEnabled(); + private static final boolean isDebugEnabled = LOG.isDebugEnabled(); + private static final boolean isTraceEnabled = LOG.isTraceEnabled(); private static final long serialVersionUID = 1L; private static final MurmurHash hash = (MurmurHash) MurmurHash.getInstance(); @@ -117,6 +119,8 @@ public class ReduceSinkOperator extends protected transient Object[] cachedValues; protected transient List<List<Integer>> distinctColIndices; protected transient Random random; + protected transient int bucketNumber; + /** * This two dimensional array holds key data and a corresponding Union object * which contains the tag identifying the aggregate expression for distinct columns. @@ -144,8 +148,14 @@ public class ReduceSinkOperator extends protected void initializeOp(Configuration hconf) throws HiveException { try { List<ExprNodeDesc> keys = conf.getKeyCols(); - LOG.debug("keys size is " + keys.size()); - for (ExprNodeDesc k : keys) LOG.debug("Key exprNodeDesc " + k.getExprString()); + + if (isDebugEnabled) { + LOG.debug("keys size is " + keys.size()); + for (ExprNodeDesc k : keys) { + LOG.debug("Key exprNodeDesc " + k.getExprString()); + } + } + keyEval = new ExprNodeEvaluator[keys.size()]; int i = 0; for (ExprNodeDesc e : keys) { @@ -184,7 +194,9 @@ public class ReduceSinkOperator extends tag = conf.getTag(); tagByte[0] = (byte) tag; skipTag = conf.getSkipTag(); - LOG.info("Using tag = " + tag); + if (isInfoEnabled) { + LOG.info("Using tag = " + tag); + } TableDesc keyTableDesc = conf.getKeySerializeInfo(); keySerializer = (Serializer) keyTableDesc.getDeserializerClass() @@ -284,7 +296,10 @@ public class ReduceSinkOperator extends bucketInspector = (IntObjectInspector)bucketField.getFieldObjectInspector(); } - LOG.info("keys are " + conf.getOutputKeyColumnNames() + " num distributions: " + conf.getNumDistributionKeys()); + if (isInfoEnabled) { + LOG.info("keys are " + conf.getOutputKeyColumnNames() + " num distributions: " + + conf.getNumDistributionKeys()); + } keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval, distinctColIndices, conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector); @@ -304,15 +319,14 @@ public class ReduceSinkOperator extends populateCachedDistributionKeys(row, 0); // replace bucketing columns with hashcode % numBuckets - int buckNum = -1; if (bucketEval != null) { - buckNum = computeBucketNumber(row, conf.getNumBuckets()); - cachedKeys[0][buckColIdxInKey] = new IntWritable(buckNum); + bucketNumber = computeBucketNumber(row, conf.getNumBuckets()); + cachedKeys[0][buckColIdxInKey] = new Text(String.valueOf(bucketNumber)); } else if (conf.getWriteType() == AcidUtils.Operation.UPDATE || conf.getWriteType() == AcidUtils.Operation.DELETE) { // In the non-partitioned case we still want to compute the bucket number for updates and // deletes. - buckNum = computeBucketNumber(row, conf.getNumBuckets()); + bucketNumber = computeBucketNumber(row, conf.getNumBuckets()); } HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null); @@ -328,7 +342,7 @@ public class ReduceSinkOperator extends if (autoParallel && partitionEval.length > 0) { hashCode = computeMurmurHash(firstKey); } else { - hashCode = computeHashCode(row, buckNum); + hashCode = computeHashCode(row); } firstKey.setHashCode(hashCode); @@ -377,7 +391,9 @@ public class ReduceSinkOperator extends // column directly. Object recIdValue = acidRowInspector.getStructFieldData(row, recIdField); buckNum = bucketInspector.get(recIdInspector.getStructFieldData(recIdValue, bucketField)); - LOG.debug("Acid choosing bucket number " + buckNum); + if (isTraceEnabled) { + LOG.trace("Acid choosing bucket number " + buckNum); + } } else { for (int i = 0; i < bucketEval.length; i++) { Object o = bucketEval[i].evaluate(row); @@ -422,7 +438,7 @@ public class ReduceSinkOperator extends return hash.hash(firstKey.getBytes(), firstKey.getDistKeyLength(), 0); } - private int computeHashCode(Object row, int buckNum) throws HiveException { + private int computeHashCode(Object row) throws HiveException { // Evaluate the HashCode int keyHashCode = 0; if (partitionEval.length == 0) { @@ -446,8 +462,10 @@ public class ReduceSinkOperator extends + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]); } } - LOG.debug("Going to return hash code " + (keyHashCode * 31 + buckNum)); - return buckNum < 0 ? keyHashCode : keyHashCode * 31 + buckNum; + if (isTraceEnabled) { + LOG.trace("Going to return hash code " + (keyHashCode * 31 + bucketNumber)); + } + return bucketNumber < 0 ? keyHashCode : keyHashCode * 31 + bucketNumber; } private boolean partitionKeysAreNull(Object row) throws HiveException { @@ -493,10 +511,19 @@ public class ReduceSinkOperator extends } private BytesWritable makeValueWritable(Object row) throws Exception { + int length = valueEval.length; + + // in case of bucketed table, insert the bucket number as the last column in value + if (bucketEval != null) { + length -= 1; + cachedValues[length] = new Text(String.valueOf(bucketNumber)); + } + // Evaluate the value - for (int i = 0; i < valueEval.length; i++) { + for (int i = 0; i < length; i++) { cachedValues[i] = valueEval[i].evaluate(row); } + // Serialize the value return (BytesWritable) valueSerializer.serialize(cachedValues, valueObjectInspector); } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Tue Oct 14 19:06:45 2014 @@ -697,6 +697,7 @@ public class SMBMapJoinOperator extends // But if hive supports assigning bucket number for each partition, this can be vary public void setupContext(List<Path> paths) throws HiveException { int segmentLen = paths.size(); + FetchOperator.setFetchOperatorContext(jobConf, fetchWork.getPartDir()); FetchOperator[] segments = segmentsForSize(segmentLen); for (int i = 0 ; i < segmentLen; i++) { Path path = paths.get(i); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Oct 14 19:06:45 2014 @@ -27,6 +27,7 @@ import org.antlr.runtime.CommonToken; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.WordUtils; +import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -87,6 +88,7 @@ import org.apache.hadoop.hive.ql.plan.Fi import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; @@ -199,6 +201,8 @@ public final class Utilities { public static String HADOOP_LOCAL_FS = "file:///"; public static String MAP_PLAN_NAME = "map.xml"; public static String REDUCE_PLAN_NAME = "reduce.xml"; + public static String MERGE_PLAN_NAME = "merge.xml"; + public static final String INPUT_NAME = "iocontext.input.name"; public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class"; public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class"; @@ -289,6 +293,39 @@ public final class Utilities { return (ReduceWork) getBaseWork(conf, REDUCE_PLAN_NAME); } + public static Path setMergeWork(JobConf conf, MergeJoinWork mergeJoinWork, Path mrScratchDir, + boolean useCache) { + for (BaseWork baseWork : mergeJoinWork.getBaseWorkList()) { + setBaseWork(conf, baseWork, mrScratchDir, baseWork.getName() + MERGE_PLAN_NAME, useCache); + String prefixes = conf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES); + if (prefixes == null) { + prefixes = baseWork.getName(); + } else { + prefixes = prefixes + "," + baseWork.getName(); + } + conf.set(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES, prefixes); + } + + // nothing to return + return null; + } + + public static BaseWork getMergeWork(JobConf jconf) { + if ((jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX) == null) + || (jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX).isEmpty())) { + return null; + } + return getMergeWork(jconf, jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX)); + } + + public static BaseWork getMergeWork(JobConf jconf, String prefix) { + if (prefix == null || prefix.isEmpty()) { + return null; + } + + return getBaseWork(jconf, prefix + MERGE_PLAN_NAME); + } + public static void cacheBaseWork(Configuration conf, String name, BaseWork work, Path hiveScratchDir) { try { @@ -367,6 +404,8 @@ public final class Utilities { throw new RuntimeException("unable to determine work from configuration ." + MAPRED_REDUCER_CLASS +" was "+ conf.get(MAPRED_REDUCER_CLASS)) ; } + } else if (name.contains(MERGE_PLAN_NAME)) { + gWork = deserializePlan(in, MapWork.class, conf); } gWorkMap.put(path, gWork); } else { @@ -390,6 +429,22 @@ public final class Utilities { } } + public static Map<String, Map<Integer, String>> getScratchColumnVectorTypes(Configuration hiveConf) { + BaseWork baseWork = getMapWork(hiveConf); + if (baseWork == null) { + baseWork = getReduceWork(hiveConf); + } + return baseWork.getScratchColumnVectorTypes(); + } + + public static Map<String, Map<String, Integer>> getScratchColumnMap(Configuration hiveConf) { + BaseWork baseWork = getMapWork(hiveConf); + if (baseWork == null) { + baseWork = getReduceWork(hiveConf); + } + return baseWork.getScratchColumnMap(); + } + public static void setWorkflowAdjacencies(Configuration conf, QueryPlan plan) { try { Graph stageGraph = plan.getQueryPlan().getStageGraph(); @@ -583,8 +638,14 @@ public final class Utilities { } public static void setMapRedWork(Configuration conf, MapredWork w, Path hiveScratchDir) { + String useName = conf.get(INPUT_NAME); + if (useName == null) { + useName = "mapreduce"; + } + conf.set(INPUT_NAME, useName); setMapWork(conf, w.getMapWork(), hiveScratchDir, true); if (w.getReduceWork() != null) { + conf.set(INPUT_NAME, useName); setReduceWork(conf, w.getReduceWork(), hiveScratchDir, true); } } @@ -1821,7 +1882,7 @@ public final class Utilities { for (int i = 0; i < parts.length; ++i) { assert parts[i].isDir() : "dynamic partition " + parts[i].getPath() - + " is not a direcgtory"; + + " is not a directory"; FileStatus[] items = fs.listStatus(parts[i].getPath()); // remove empty directory since DP insert should not generate empty partitions. @@ -2259,13 +2320,15 @@ public final class Utilities { * configuration which receives configured properties */ public static void copyTableJobPropertiesToConf(TableDesc tbl, JobConf job) { - String bucketString = tbl.getProperties() - .getProperty(hive_metastoreConstants.BUCKET_COUNT); - // copy the bucket count - if (bucketString != null) { - job.set(hive_metastoreConstants.BUCKET_COUNT, bucketString); + Properties tblProperties = tbl.getProperties(); + for(String name: tblProperties.stringPropertyNames()) { + if (job.get(name) == null) { + String val = (String) tblProperties.get(name); + if (val != null) { + job.set(name, StringEscapeUtils.escapeJava(val)); + } + } } - Map<String, String> jobProperties = tbl.getJobProperties(); if (jobProperties == null) { return; Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Tue Oct 14 19:06:45 2014 @@ -56,6 +56,8 @@ import org.apache.hadoop.hive.ql.exec.Pa import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; +import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl; @@ -416,6 +418,13 @@ public class ExecDriver extends Task<Map Utilities.createTmpDirs(job, mWork); Utilities.createTmpDirs(job, rWork); + SessionState ss = SessionState.get(); + if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") + && ss != null) { + TezSessionState session = ss.getTezSession(); + TezSessionPoolManager.getInstance().close(session, true); + } + // Finally SUBMIT the JOB! rj = jc.submitJob(job); // replace it back Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Tue Oct 14 19:06:45 2014 @@ -78,10 +78,11 @@ public class ExecMapper extends MapReduc private MapredLocalWork localWork = null; private boolean isLogInfoEnabled = false; - private final ExecMapperContext execContext = new ExecMapperContext(); + private ExecMapperContext execContext = null; @Override public void configure(JobConf job) { + execContext = new ExecMapperContext(job); // Allocate the bean at the beginning - memoryMXBean = ManagementFactory.getMemoryMXBean(); l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); @@ -292,6 +293,7 @@ public class ExecMapper extends MapReduc this.rp = rp; } + @Override public void func(Operator op) { Map<Enum<?>, Long> opStats = op.getStats(); for (Map.Entry<Enum<?>, Long> e : opStats.entrySet()) { Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java Tue Oct 14 19:06:45 2014 @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.FetchOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; import org.apache.hadoop.mapred.JobConf; @@ -60,8 +61,9 @@ public class ExecMapperContext { this.currentBigBucketFile = currentBigBucketFile; } - public ExecMapperContext() { - ioCxt = IOContext.get(); + public ExecMapperContext(JobConf jc) { + this.jc = jc; + ioCxt = IOContext.get(jc.get(Utilities.INPUT_NAME)); } public void clear() { Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Tue Oct 14 19:06:45 2014 @@ -66,6 +66,8 @@ import org.apache.hadoop.util.StringUtil public class ExecReducer extends MapReduceBase implements Reducer { private static final Log LOG = LogFactory.getLog("ExecReducer"); + private static final boolean isInfoEnabled = LOG.isInfoEnabled(); + private static final boolean isTraceEnabled = LOG.isTraceEnabled(); private static final String PLAN_KEY = "__REDUCE_PLAN__"; // used to log memory usage periodically @@ -75,7 +77,6 @@ public class ExecReducer extends MapRedu private final Deserializer[] inputValueDeserializer = new Deserializer[Byte.MAX_VALUE]; private final Object[] valueObject = new Object[Byte.MAX_VALUE]; private final List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size()); - private final boolean isLogInfoEnabled = LOG.isInfoEnabled(); // TODO: move to DynamicSerDe when it's ready private Deserializer inputKeyDeserializer; @@ -101,16 +102,18 @@ public class ExecReducer extends MapRedu ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; ObjectInspector keyObjectInspector; - LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); + if (isInfoEnabled) { + LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); - try { - LOG.info("conf classpath = " - + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs())); - LOG.info("thread classpath = " - + Arrays.asList(((URLClassLoader) Thread.currentThread() - .getContextClassLoader()).getURLs())); - } catch (Exception e) { - LOG.info("cannot get classpath: " + e.getMessage()); + try { + LOG.info("conf classpath = " + + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs())); + LOG.info("thread classpath = " + + Arrays.asList(((URLClassLoader) Thread.currentThread() + .getContextClassLoader()).getURLs())); + } catch (Exception e) { + LOG.info("cannot get classpath: " + e.getMessage()); + } } jc = job; @@ -147,7 +150,6 @@ public class ExecReducer extends MapRedu ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>(); ois.add(keyObjectInspector); ois.add(valueObjectInspector[tag]); - reducer.setGroupKeyObjectInspector(keyObjectInspector); rowObjectInspector[tag] = ObjectInspectorFactory .getStandardStructObjectInspector(Utilities.reduceFieldNameList, ois); } @@ -202,7 +204,9 @@ public class ExecReducer extends MapRedu groupKey = new BytesWritable(); } else { // If a operator wants to do some work at the end of a group - LOG.trace("End Group"); + if (isTraceEnabled) { + LOG.trace("End Group"); + } reducer.endGroup(); } @@ -217,9 +221,11 @@ public class ExecReducer extends MapRedu } groupKey.set(keyWritable.get(), 0, keyWritable.getSize()); - LOG.trace("Start Group"); - reducer.setGroupKeyObject(keyObject); + if (isTraceEnabled) { + LOG.trace("Start Group"); + } reducer.startGroup(); + reducer.setGroupKeyObject(keyObject); } // System.err.print(keyObject.toString()); while (values.hasNext()) { @@ -239,12 +245,14 @@ public class ExecReducer extends MapRedu row.clear(); row.add(keyObject); row.add(valueObject[tag]); - if (isLogInfoEnabled) { + if (isInfoEnabled) { cntr++; if (cntr == nextCntr) { long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); - LOG.info("ExecReducer: processing " + cntr - + " rows: used memory = " + used_memory); + if (isInfoEnabled) { + LOG.info("ExecReducer: processing " + cntr + + " rows: used memory = " + used_memory); + } nextCntr = getNextCntr(cntr); } } @@ -290,17 +298,19 @@ public class ExecReducer extends MapRedu public void close() { // No row was processed - if (oc == null) { + if (oc == null && isTraceEnabled) { LOG.trace("Close called without any rows processed"); } try { if (groupKey != null) { // If a operator wants to do some work at the end of a group - LOG.trace("End Group"); + if (isTraceEnabled) { + LOG.trace("End Group"); + } reducer.endGroup(); } - if (isLogInfoEnabled) { + if (isInfoEnabled) { LOG.info("ExecReducer: processed " + cntr + " rows: used memory = " + memoryMXBean.getHeapMemoryUsage().getUsed()); } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Tue Oct 14 19:06:45 2014 @@ -91,7 +91,7 @@ public class MapredLocalTask extends Tas // not sure we need this exec context; but all the operators in the work // will pass this context throught - private ExecMapperContext execContext = new ExecMapperContext(); + private ExecMapperContext execContext = null; private Process executor; @@ -113,6 +113,7 @@ public class MapredLocalTask extends Tas public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) { super.initialize(conf, queryPlan, driverContext); job = new JobConf(conf, ExecDriver.class); + execContext = new ExecMapperContext(job); //we don't use the HadoopJobExecHooks for local tasks this.jobExecHelper = new HadoopJobExecHelper(job, console, this, null); } @@ -237,6 +238,18 @@ public class MapredLocalTask extends Tas variables.put(HADOOP_OPTS_KEY, hadoopOpts); } + //For Windows OS, we need to pass HIVE_HADOOP_CLASSPATH Java parameter while starting + //Hiveserver2 using "-hiveconf hive.hadoop.classpath=%HIVE_LIB%". This is to combine path(s). + if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_HADOOP_CLASSPATH)!= null) + { + if (variables.containsKey("HADOOP_CLASSPATH")) + { + variables.put("HADOOP_CLASSPATH", variables.get("HADOOP_CLASSPATH") + ";" + HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_HADOOP_CLASSPATH)); + } else { + variables.put("HADOOP_CLASSPATH", HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_HADOOP_CLASSPATH)); + } + } + if(variables.containsKey(MapRedTask.HIVE_DEBUG_RECURSIVE)) { MapRedTask.configureDebugVariablesForChildJVM(variables); } @@ -301,6 +314,11 @@ public class MapredLocalTask extends Tas if (work == null) { return -1; } + + if (execContext == null) { + execContext = new ExecMapperContext(job); + } + memoryMXBean = ManagementFactory.getMemoryMXBean(); long startTime = System.currentTimeMillis(); console.printInfo(Utilities.now() Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java Tue Oct 14 19:06:45 2014 @@ -31,6 +31,7 @@ import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.plan.TezWork.VertexType; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.serializer.SerializationFactory; @@ -79,9 +80,14 @@ public class CustomPartitionVertex exten private List<InputDataInformationEvent> dataInformationEvents; private int numBuckets = -1; private Configuration conf = null; - private boolean rootVertexInitialized = false; private final SplitGrouper grouper = new SplitGrouper(); private int taskCount = 0; + private VertexType vertexType; + private String mainWorkName; + private final Multimap<Integer, Integer> bucketToTaskMap = HashMultimap.<Integer, Integer> create(); + + private final Map<String, Multimap<Integer, InputSplit>> inputToGroupedSplitMap = + new HashMap<String, Multimap<Integer, InputSplit>>(); public CustomPartitionVertex(VertexManagerPluginContext context) { super(context); @@ -90,8 +96,18 @@ public class CustomPartitionVertex exten @Override public void initialize() { this.context = getContext(); - ByteBuffer byteBuf = context.getUserPayload().getPayload(); - this.numBuckets = byteBuf.getInt(); + ByteBuffer payload = context.getUserPayload().getPayload(); + CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(payload); + try { + vertexConf.readFields(dibb); + } catch (IOException e) { + throw new RuntimeException(e); + } + this.numBuckets = vertexConf.getNumBuckets(); + this.mainWorkName = vertexConf.getInputName(); + this.vertexType = vertexConf.getVertexType(); } @Override @@ -113,17 +129,12 @@ public class CustomPartitionVertex exten public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) { } - // One call per root Input - and for now only one is handled. + // One call per root Input @Override public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List<Event> events) { + LOG.info("On root vertex initialized " + inputName); - // Ideally, since there's only 1 Input expected at the moment - - // ensure this method is called only once. Tez will call it once per Root - // Input. - Preconditions.checkState(rootVertexInitialized == false); - LOG.info("Root vertex not initialized"); - rootVertexInitialized = true; try { // This is using the payload from the RootVertexInitializer corresponding // to InputName. Ideally it should be using it's own configuration class - @@ -164,9 +175,6 @@ public class CustomPartitionVertex exten // No tasks should have been started yet. Checked by initial state // check. Preconditions.checkState(dataInformationEventSeen == false); - Preconditions - .checkState(context.getVertexNumTasks(context.getVertexName()) == -1, - "Parallelism for the vertex should be set to -1 if the InputInitializer is setting parallelism"); InputConfigureVertexTasksEvent cEvent = (InputConfigureVertexTasksEvent) event; // The vertex cannot be configured until all DataEvents are seen - to @@ -220,21 +228,55 @@ public class CustomPartitionVertex exten (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0])); Multimap<Integer, InputSplit> groupedSplit = HiveSplitGenerator.generateGroupedSplits(jobConf, conf, inputSplitArray, waves, - availableSlots); + availableSlots, inputName); bucketToGroupedSplitMap.putAll(key, groupedSplit.values()); } - LOG.info("We have grouped the splits into " + bucketToGroupedSplitMap.size() + " tasks"); - processAllEvents(inputName, bucketToGroupedSplitMap); + LOG.info("We have grouped the splits into " + bucketToGroupedSplitMap); + if ((mainWorkName.isEmpty() == false) && (mainWorkName.compareTo(inputName) != 0)) { + /* + * this is the small table side. In case of SMB join, we may need to send each split to the + * corresponding bucket-based task on the other side. In case a split needs to go to + * multiple downstream tasks, we need to clone the event and send it to the right + * destination. + */ + processAllSideEvents(inputName, bucketToGroupedSplitMap); + } else { + processAllEvents(inputName, bucketToGroupedSplitMap); + } } catch (Exception e) { throw new RuntimeException(e); } } + private void processAllSideEvents(String inputName, + Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException { + // the bucket to task map should have been setup by the big table. + if (bucketToTaskMap.isEmpty()) { + inputToGroupedSplitMap.put(inputName, bucketToGroupedSplitMap); + return; + } + List<InputDataInformationEvent> taskEvents = new ArrayList<InputDataInformationEvent>(); + for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) { + Collection<Integer> destTasks = bucketToTaskMap.get(entry.getKey()); + for (Integer task : destTasks) { + for (InputSplit split : entry.getValue()) { + MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(split); + InputDataInformationEvent diEvent = + InputDataInformationEvent.createWithSerializedPayload(task, serializedSplit + .toByteString().asReadOnlyByteBuffer()); + diEvent.setTargetIndex(task); + taskEvents.add(diEvent); + } + } + } + + context.addRootInputEvents(inputName, taskEvents); + } + private void processAllEvents(String inputName, Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException { - Multimap<Integer, Integer> bucketToTaskMap = HashMultimap.<Integer, Integer> create(); List<InputSplit> finalSplits = Lists.newLinkedList(); for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) { int bucketNum = entry.getKey(); @@ -248,11 +290,13 @@ public class CustomPartitionVertex exten // Construct the EdgeManager descriptor to be used by all edges which need // the routing table. - EdgeManagerPluginDescriptor hiveEdgeManagerDesc = - EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName()); - UserPayload payload = getBytePayload(bucketToTaskMap); - hiveEdgeManagerDesc.setUserPayload(payload); - + EdgeManagerPluginDescriptor hiveEdgeManagerDesc = null; + if ((vertexType == VertexType.MULTI_INPUT_INITIALIZED_EDGES) + || (vertexType == VertexType.INITIALIZED_EDGES)) { + hiveEdgeManagerDesc = EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName()); + UserPayload payload = getBytePayload(bucketToTaskMap); + hiveEdgeManagerDesc.setUserPayload(payload); + } Map<String, EdgeManagerPluginDescriptor> emMap = Maps.newHashMap(); // Replace the edge manager for all vertices which have routing type custom. @@ -285,13 +329,21 @@ public class CustomPartitionVertex exten rootInputSpecUpdate.put( inputName, InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate()); - context.setVertexParallelism( - taskCount, - VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits - .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate); + if ((mainWorkName.compareTo(inputName) == 0) || (mainWorkName.isEmpty())) { + context.setVertexParallelism( + taskCount, + VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits + .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate); + } // Set the actual events for the tasks. context.addRootInputEvents(inputName, taskEvents); + if (inputToGroupedSplitMap.isEmpty() == false) { + for (Entry<String, Multimap<Integer, InputSplit>> entry : inputToGroupedSplitMap.entrySet()) { + processAllSideEvents(entry.getKey(), entry.getValue()); + } + inputToGroupedSplitMap.clear(); + } } UserPayload getBytePayload(Multimap<Integer, Integer> routingTable) throws IOException { @@ -315,7 +367,8 @@ public class CustomPartitionVertex exten if (!(inputSplit instanceof FileSplit)) { throw new UnsupportedOperationException( - "Cannot handle splits other than FileSplit for the moment"); + "Cannot handle splits other than FileSplit for the moment. Current input split type: " + + inputSplit.getClass().getSimpleName()); } return (FileSplit) inputSplit; } @@ -327,7 +380,6 @@ public class CustomPartitionVertex exten Map<String, List<FileSplit>> pathFileSplitsMap) { int bucketNum = 0; - int fsCount = 0; Multimap<Integer, InputSplit> bucketToInitialSplitMap = ArrayListMultimap.<Integer, InputSplit> create(); @@ -335,14 +387,20 @@ public class CustomPartitionVertex exten for (Map.Entry<String, List<FileSplit>> entry : pathFileSplitsMap.entrySet()) { int bucketId = bucketNum % numBuckets; for (FileSplit fsplit : entry.getValue()) { - fsCount++; bucketToInitialSplitMap.put(bucketId, fsplit); } bucketNum++; } - LOG.info("Total number of splits counted: " + fsCount + " and total files encountered: " - + pathFileSplitsMap.size()); + if (bucketNum < numBuckets) { + int loopedBucketId = 0; + for (; bucketNum < numBuckets; bucketNum++) { + for (InputSplit fsplit : bucketToInitialSplitMap.get(loopedBucketId)) { + bucketToInitialSplitMap.put(bucketNum, fsplit); + } + loopedBucketId++; + } + } return bucketToInitialSplitMap; } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Tue Oct 14 19:06:45 2014 @@ -20,6 +20,23 @@ package org.apache.hadoop.hive.ql.exec.t import com.google.common.base.Function; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; + +import javax.security.auth.login.LoginException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -32,6 +49,7 @@ import org.apache.hadoop.hive.conf.HiveC import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; @@ -47,10 +65,12 @@ import org.apache.hadoop.hive.ql.io.merg import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.TezWork.VertexType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; @@ -90,12 +110,16 @@ import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.dag.api.VertexManagerPluginDescriptor; import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; +import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRInputHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; +import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.mapreduce.input.MultiMRInput; import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.mapreduce.partition.MRPartitioner; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.comparator.TezBytesComparator; import org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization; @@ -104,21 +128,6 @@ import org.apache.tez.runtime.library.co import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig; import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput; -import javax.security.auth.login.LoginException; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - /** * DagUtils. DagUtils is a collection of helper methods to convert * map and reduce work to tez vertices and edges. It handles configuration @@ -130,6 +139,11 @@ public class DagUtils { private static final Log LOG = LogFactory.getLog(DagUtils.class.getName()); private static final String TEZ_DIR = "_tez_scratch_dir"; private static DagUtils instance; + // The merge file being currently processed. + public static final String TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX = + "hive.tez.current.merge.file.prefix"; + // "A comma separated list of work names used as prefix. + public static final String TEZ_MERGE_WORK_FILE_PREFIXES = "hive.tez.merge.file.prefixes"; private void addCredentials(MapWork mapWork, DAG dag) { Set<String> paths = mapWork.getPathToAliases().keySet(); @@ -238,8 +252,8 @@ public class DagUtils { * endpoints. */ @SuppressWarnings("rawtypes") - public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, - Vertex w, TezEdgeProperty edgeProp) + public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, Vertex w, + TezEdgeProperty edgeProp, VertexType vertexType) throws IOException { Class mergeInputClass; @@ -254,10 +268,14 @@ public class DagUtils { case CUSTOM_EDGE: { mergeInputClass = ConcatenatedMergedKeyValueInput.class; int numBuckets = edgeProp.getNumBuckets(); + CustomVertexConfiguration vertexConf = + new CustomVertexConfiguration(numBuckets, vertexType, ""); + DataOutputBuffer dob = new DataOutputBuffer(); + vertexConf.write(dob); VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName()); - ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets); - userPayload.flip(); + byte[] userPayloadBytes = dob.getData(); + ByteBuffer userPayload = ByteBuffer.wrap(userPayloadBytes); desc.setUserPayload(UserPayload.create(userPayload)); w.setVertexManagerPlugin(desc); break; @@ -289,17 +307,21 @@ public class DagUtils { * @param w The second vertex (sink) * @return */ - public Edge createEdge(JobConf vConf, Vertex v, Vertex w, - TezEdgeProperty edgeProp) + public Edge createEdge(JobConf vConf, Vertex v, Vertex w, TezEdgeProperty edgeProp, + VertexType vertexType) throws IOException { switch(edgeProp.getEdgeType()) { case CUSTOM_EDGE: { int numBuckets = edgeProp.getNumBuckets(); - ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets); - userPayload.flip(); + CustomVertexConfiguration vertexConf = + new CustomVertexConfiguration(numBuckets, vertexType, ""); + DataOutputBuffer dob = new DataOutputBuffer(); + vertexConf.write(dob); VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create( CustomPartitionVertex.class.getName()); + byte[] userPayloadBytes = dob.getData(); + ByteBuffer userPayload = ByteBuffer.wrap(userPayloadBytes); desc.setUserPayload(UserPayload.create(userPayload)); w.setVertexManagerPlugin(desc); break; @@ -405,7 +427,7 @@ public class DagUtils { * from yarn. Falls back to Map-reduce's map size if tez * container size isn't set. */ - private Resource getContainerResource(Configuration conf) { + public static Resource getContainerResource(Configuration conf) { int memory = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0 ? HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) : conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB); @@ -443,12 +465,61 @@ public class DagUtils { return MRHelpers.getJavaOptsForMRMapper(conf); } + private Vertex createVertex(JobConf conf, MergeJoinWork mergeJoinWork, LocalResource appJarLr, + List<LocalResource> additionalLr, FileSystem fs, Path mrScratchDir, Context ctx, + VertexType vertexType) + throws Exception { + Utilities.setMergeWork(conf, mergeJoinWork, mrScratchDir, false); + if (mergeJoinWork.getMainWork() instanceof MapWork) { + List<BaseWork> mapWorkList = mergeJoinWork.getBaseWorkList(); + MapWork mapWork = (MapWork) (mergeJoinWork.getMainWork()); + CommonMergeJoinOperator mergeJoinOp = mergeJoinWork.getMergeJoinOperator(); + Vertex mergeVx = + createVertex(conf, mapWork, appJarLr, additionalLr, fs, mrScratchDir, ctx, vertexType); + + // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat + // here would cause pre-mature grouping which would be incorrect. + Class inputFormatClass = HiveInputFormat.class; + conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class); + // mapreduce.tez.input.initializer.serialize.event.payload should be set + // to false when using this plug-in to avoid getting a serialized event at run-time. + conf.setBoolean("mapreduce.tez.input.initializer.serialize.event.payload", false); + for (int i = 0; i < mapWorkList.size(); i++) { + + mapWork = (MapWork) (mapWorkList.get(i)); + conf.set(TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX, mapWork.getName()); + conf.set(Utilities.INPUT_NAME, mapWork.getName()); + LOG.info("Going through each work and adding MultiMRInput"); + mergeVx.addDataSource(mapWork.getName(), + MultiMRInput.createConfigBuilder(conf, HiveInputFormat.class).build()); + } + + VertexManagerPluginDescriptor desc = + VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName()); + CustomVertexConfiguration vertexConf = + new CustomVertexConfiguration(mergeJoinWork.getMergeJoinOperator().getConf() + .getNumBuckets(), vertexType, mergeJoinWork.getBigTableAlias()); + DataOutputBuffer dob = new DataOutputBuffer(); + vertexConf.write(dob); + byte[] userPayload = dob.getData(); + desc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload))); + mergeVx.setVertexManagerPlugin(desc); + return mergeVx; + } else { + Vertex mergeVx = + createVertex(conf, (ReduceWork) mergeJoinWork.getMainWork(), appJarLr, additionalLr, fs, + mrScratchDir, ctx); + return mergeVx; + } + } + /* * Helper function to create Vertex from MapWork. */ private Vertex createVertex(JobConf conf, MapWork mapWork, LocalResource appJarLr, List<LocalResource> additionalLr, FileSystem fs, - Path mrScratchDir, Context ctx, TezWork tezWork) throws Exception { + Path mrScratchDir, Context ctx, VertexType vertexType) + throws Exception { Path tezDir = getTezDir(mrScratchDir); @@ -470,15 +541,8 @@ public class DagUtils { Class inputFormatClass = conf.getClass("mapred.input.format.class", InputFormat.class); - boolean vertexHasCustomInput = false; - if (tezWork != null) { - for (BaseWork baseWork : tezWork.getParents(mapWork)) { - if (tezWork.getEdgeType(baseWork, mapWork) == EdgeType.CUSTOM_EDGE) { - vertexHasCustomInput = true; - } - } - } - + boolean vertexHasCustomInput = VertexType.isCustomInputType(vertexType); + LOG.info("Vertex has custom input? " + vertexHasCustomInput); if (vertexHasCustomInput) { groupSplitsInInputInitializer = false; // grouping happens in execution phase. The input payload should not enable grouping here, @@ -513,6 +577,8 @@ public class DagUtils { } } + // remember mapping of plan to input + conf.set(Utilities.INPUT_NAME, mapWork.getName()); if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION) && !mapWork.isUseOneNullRowInputFormat()) { @@ -593,6 +659,7 @@ public class DagUtils { Path mrScratchDir, Context ctx) throws Exception { // set up operator plan + conf.set(Utilities.INPUT_NAME, reduceWork.getName()); Utilities.setReduceWork(conf, reduceWork, mrScratchDir, false); // create the directories FileSinkOperators need @@ -850,7 +917,7 @@ public class DagUtils { throws IOException { FileSystem destFS = dest.getFileSystem(conf); - if (src != null) { + if (src != null && checkPreExisting(src, dest, conf) == false) { // copy the src to the destination and create local resource. // do not overwrite. LOG.info("Localizing resource because it does not exist: " + src + " to dest: " + dest); @@ -904,7 +971,7 @@ public class DagUtils { public JobConf createConfiguration(HiveConf hiveConf) throws IOException { hiveConf.setBoolean("mapred.mapper.new-api", false); - JobConf conf = new JobConf(hiveConf); + JobConf conf = new JobConf(new TezConfiguration(hiveConf)); conf.set("mapred.output.committer.class", NullOutputCommitter.class.getName()); @@ -937,12 +1004,22 @@ public class DagUtils { return initializeVertexConf(conf, context, (MapWork)work); } else if (work instanceof ReduceWork) { return initializeVertexConf(conf, context, (ReduceWork)work); + } else if (work instanceof MergeJoinWork) { + return initializeVertexConf(conf, context, (MergeJoinWork) work); } else { assert false; return null; } } + private JobConf initializeVertexConf(JobConf conf, Context context, MergeJoinWork work) { + if (work.getMainWork() instanceof MapWork) { + return initializeVertexConf(conf, context, (MapWork) (work.getMainWork())); + } else { + return initializeVertexConf(conf, context, (ReduceWork) (work.getMainWork())); + } + } + /** * Create a vertex from a given work object. * @@ -958,18 +1035,21 @@ public class DagUtils { */ public Vertex createVertex(JobConf conf, BaseWork work, Path scratchDir, LocalResource appJarLr, - List<LocalResource> additionalLr, - FileSystem fileSystem, Context ctx, boolean hasChildren, TezWork tezWork) throws Exception { + List<LocalResource> additionalLr, FileSystem fileSystem, Context ctx, boolean hasChildren, + TezWork tezWork, VertexType vertexType) throws Exception { Vertex v = null; // simply dispatch the call to the right method for the actual (sub-) type of // BaseWork. if (work instanceof MapWork) { - v = createVertex(conf, (MapWork) work, appJarLr, - additionalLr, fileSystem, scratchDir, ctx, tezWork); + v = createVertex(conf, (MapWork) work, appJarLr, additionalLr, fileSystem, scratchDir, ctx, + vertexType); } else if (work instanceof ReduceWork) { v = createVertex(conf, (ReduceWork) work, appJarLr, additionalLr, fileSystem, scratchDir, ctx); + } else if (work instanceof MergeJoinWork) { + v = createVertex(conf, (MergeJoinWork) work, appJarLr, additionalLr, fileSystem, scratchDir, + ctx, vertexType); } else { // something is seriously wrong if this is happening throw new HiveException(ErrorMsg.GENERIC_ERROR.getErrorCodedMsg()); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java Tue Oct 14 19:06:45 2014 @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -59,6 +60,7 @@ import org.apache.hadoop.hive.serde2.typ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.runtime.api.InputInitializerContext; import org.apache.tez.runtime.api.events.InputInitializerEvent; @@ -77,12 +79,13 @@ public class DynamicPartitionPruner { private final BytesWritable writable = new BytesWritable(); - private final BlockingQueue<InputInitializerEvent> queue = - new LinkedBlockingQueue<InputInitializerEvent>(); + private final BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>(); + + private final Set<String> sourcesWaitingForEvents = new HashSet<String>(); private int sourceInfoCount = 0; - private InputInitializerContext context; + private final Object endOfEvents = new Object(); public DynamicPartitionPruner() { } @@ -91,8 +94,21 @@ public class DynamicPartitionPruner { throws SerDeException, IOException, InterruptedException, HiveException { - this.context = context; - this.initialize(work, jobConf); + synchronized(sourcesWaitingForEvents) { + initialize(work, jobConf); + + if (sourcesWaitingForEvents.isEmpty()) { + return; + } + + Set<VertexState> states = Collections.singleton(VertexState.SUCCEEDED); + for (String source : sourcesWaitingForEvents) { + // we need to get state transition updates for the vertices that will send + // events to us. once we have received all events and a vertex has succeeded, + // we can move to do the pruning. + context.registerForVertexStateUpdates(source, states); + } + } LOG.info("Waiting for events (" + sourceInfoCount + " items) ..."); // synchronous event processing loop. Won't return until all events have @@ -102,7 +118,7 @@ public class DynamicPartitionPruner { LOG.info("Ok to proceed."); } - public BlockingQueue<InputInitializerEvent> getQueue() { + public BlockingQueue<Object> getQueue() { return queue; } @@ -111,11 +127,14 @@ public class DynamicPartitionPruner { sourceInfoCount = 0; } - private void initialize(MapWork work, JobConf jobConf) throws SerDeException { + public void initialize(MapWork work, JobConf jobConf) throws SerDeException { this.clear(); Map<String, SourceInfo> columnMap = new HashMap<String, SourceInfo>(); + Set<String> sources = work.getEventSourceTableDescMap().keySet(); + + sourcesWaitingForEvents.addAll(sources); - for (String s : work.getEventSourceTableDescMap().keySet()) { + for (String s : sources) { List<TableDesc> tables = work.getEventSourceTableDescMap().get(s); List<String> columnNames = work.getEventSourceColumnNameMap().get(s); List<ExprNodeDesc> partKeyExprs = work.getEventSourcePartKeyExprMap().get(s); @@ -277,46 +296,30 @@ public class DynamicPartitionPruner { private void processEvents() throws SerDeException, IOException, InterruptedException { int eventCount = 0; - int neededEvents = getExpectedNumberOfEvents(); - while (neededEvents > eventCount) { - InputInitializerEvent event = queue.take(); + while (true) { + Object element = queue.take(); + + if (element == endOfEvents) { + // we're done processing events + break; + } + + InputInitializerEvent event = (InputInitializerEvent) element; + LOG.info("Input event: " + event.getTargetInputName() + ", " + event.getTargetVertexName() + ", " + (event.getUserPayload().limit() - event.getUserPayload().position())); - processPayload(event.getUserPayload()); + processPayload(event.getUserPayload(), event.getSourceVertexName()); eventCount += 1; - neededEvents = getExpectedNumberOfEvents(); - LOG.info("Needed events: " + neededEvents + ", received events: " + eventCount); } - } - - private int getExpectedNumberOfEvents() throws InterruptedException { - int neededEvents = 0; - - boolean notInitialized; - do { - neededEvents = 0; - notInitialized = false; - for (String s : sourceInfoMap.keySet()) { - int multiplier = sourceInfoMap.get(s).size(); - int taskNum = context.getVertexNumTasks(s); - LOG.info("Vertex " + s + " has " + taskNum + " events."); - if (taskNum < 0) { - notInitialized = true; - Thread.sleep(10); - continue; - } - neededEvents += (taskNum * multiplier); - } - } while (notInitialized); - - return neededEvents; + LOG.info("Received events: " + eventCount); } @SuppressWarnings("deprecation") - private String processPayload(ByteBuffer payload) throws SerDeException, IOException { + private String processPayload(ByteBuffer payload, String sourceName) throws SerDeException, + IOException { + DataInputStream in = new DataInputStream(new ByteBufferBackedInputStream(payload)); - String sourceName = in.readUTF(); String columnName = in.readUTF(); boolean skip = in.readBoolean(); @@ -390,4 +393,26 @@ public class DynamicPartitionPruner { } } + public void addEvent(InputInitializerEvent event) { + synchronized(sourcesWaitingForEvents) { + if (sourcesWaitingForEvents.contains(event.getSourceVertexName())) { + queue.offer(event); + } + } + } + + public void processVertex(String name) { + LOG.info("Vertex succeeded: " + name); + + synchronized(sourcesWaitingForEvents) { + sourcesWaitingForEvents.remove(name); + + if (sourcesWaitingForEvents.isEmpty()) { + // we've got what we need; mark the queue + queue.offer(endOfEvents); + } else { + LOG.info("Waiting for " + sourcesWaitingForEvents.size() + " events."); + } + } + } } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Tue Oct 14 19:06:45 2014 @@ -38,8 +38,9 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper; import org.apache.hadoop.util.ReflectionUtils; import org.apache.tez.common.TezUtils; -import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.TaskLocationHint; +import org.apache.tez.dag.api.VertexLocationHint; +import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem; import org.apache.tez.mapreduce.hadoop.MRInputHelpers; import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto; @@ -152,8 +153,21 @@ public class HiveSplitGenerator extends public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf, Configuration conf, InputSplit[] splits, float waves, int availableSlots) throws Exception { + return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null); + } - MapWork work = Utilities.getMapWork(jobConf); + public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf, + Configuration conf, InputSplit[] splits, float waves, int availableSlots, + String inputName) throws Exception { + + MapWork work = null; + if (inputName != null) { + work = (MapWork) Utilities.getMergeWork(jobConf, inputName); + // work can still be null if there is no merge work for this input + } + if (work == null) { + work = Utilities.getMapWork(jobConf); + } Multimap<Integer, InputSplit> bucketSplitMultiMap = ArrayListMultimap.<Integer, InputSplit> create(); @@ -230,9 +244,14 @@ public class HiveSplitGenerator extends } @Override + public void onVertexStateUpdated(VertexStateUpdate stateUpdate) { + pruner.processVertex(stateUpdate.getVertexName()); + } + + @Override public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception { for (InputInitializerEvent e : events) { - pruner.getQueue().put(e); + pruner.addEvent(e); } } } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Tue Oct 14 19:06:45 2014 @@ -17,14 +17,20 @@ */ package org.apache.hadoop.hive.ql.exec.tez; -import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; @@ -36,15 +42,17 @@ import org.apache.hadoop.hive.ql.exec.Ut import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; +import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger; import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator; +import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.util.StringUtils; import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.mapreduce.input.MultiMRInput; import org.apache.tez.mapreduce.processor.MRTaskReporter; +import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.ProcessorContext; @@ -58,27 +66,61 @@ public class MapRecordProcessor extends private MapOperator mapOp; + private final List<MapOperator> mergeMapOpList = new ArrayList<MapOperator>(); public static final Log l4j = LogFactory.getLog(MapRecordProcessor.class); - private final ExecMapperContext execContext = new ExecMapperContext(); + private MapRecordSource[] sources; + private final Map<String, MultiMRInput> multiMRInputMap = new HashMap<String, MultiMRInput>(); + private int position = 0; + private boolean foundCachedMergeWork = false; + MRInputLegacy legacyMRInput = null; + private ExecMapperContext execContext = null; private boolean abort = false; protected static final String MAP_PLAN_KEY = "__MAP_PLAN__"; private MapWork mapWork; + List<MapWork> mergeWorkList = null; + private static Map<Integer, DummyStoreOperator> connectOps = + new TreeMap<Integer, DummyStoreOperator>(); - public MapRecordProcessor(JobConf jconf) { + public MapRecordProcessor(JobConf jconf) throws Exception { ObjectCache cache = ObjectCacheFactory.getCache(jconf); + execContext = new ExecMapperContext(jconf); execContext.setJc(jconf); // create map and fetch operators mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY); if (mapWork == null) { mapWork = Utilities.getMapWork(jconf); cache.cache(MAP_PLAN_KEY, mapWork); - l4j.info("Plan: "+mapWork); + l4j.debug("Plan: " + mapWork); for (String s: mapWork.getAliases()) { - l4j.info("Alias: "+s); + l4j.debug("Alias: " + s); } } else { Utilities.setMapWork(jconf, mapWork); } + + String prefixes = jconf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES); + if (prefixes != null) { + mergeWorkList = new ArrayList<MapWork>(); + for (String prefix : prefixes.split(",")) { + MapWork mergeMapWork = (MapWork) cache.retrieve(prefix); + if (mergeMapWork != null) { + l4j.info("Found merge work in cache"); + foundCachedMergeWork = true; + mergeWorkList.add(mergeMapWork); + continue; + } + if (foundCachedMergeWork) { + throw new Exception( + "Should find all work in cache else operator pipeline will be in non-deterministic state"); + } + + if ((prefix != null) && (prefix.isEmpty() == false)) { + mergeMapWork = (MapWork) Utilities.getMergeWork(jconf, prefix); + mergeWorkList.add(mergeMapWork); + cache.cache(prefix, mergeMapWork); + } + } + } } @Override @@ -88,8 +130,8 @@ public class MapRecordProcessor extends super.init(jconf, processorContext, mrReporter, inputs, outputs); //Update JobConf using MRInput, info like filename comes via this - MRInputLegacy mrInput = TezProcessor.getMRInput(inputs); - Configuration updatedConf = mrInput.getConfigUpdates(); + legacyMRInput = getMRInput(inputs); + Configuration updatedConf = legacyMRInput.getConfigUpdates(); if (updatedConf != null) { for (Entry<String, String> entry : updatedConf) { jconf.set(entry.getKey(), entry.getValue()); @@ -99,20 +141,52 @@ public class MapRecordProcessor extends createOutputMap(); // Start all the Outputs. for (Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) { - l4j.info("Starting Output: " + outputEntry.getKey()); + l4j.debug("Starting Output: " + outputEntry.getKey()); outputEntry.getValue().start(); ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize(); } try { + if (mapWork.getVectorMode()) { mapOp = new VectorMapOperator(); } else { mapOp = new MapOperator(); } + connectOps.clear(); + if (mergeWorkList != null) { + MapOperator mergeMapOp = null; + for (MapWork mergeMapWork : mergeWorkList) { + processorContext.waitForAnyInputReady(Collections.singletonList((Input) (inputs + .get(mergeMapWork.getName())))); + if (mergeMapWork.getVectorMode()) { + mergeMapOp = new VectorMapOperator(); + } else { + mergeMapOp = new MapOperator(); + } + + mergeMapOpList.add(mergeMapOp); + // initialize the merge operators first. + if (mergeMapOp != null) { + mergeMapOp.setConf(mergeMapWork); + l4j.info("Input name is " + mergeMapWork.getName()); + jconf.set(Utilities.INPUT_NAME, mergeMapWork.getName()); + mergeMapOp.setChildren(jconf); + if (foundCachedMergeWork == false) { + DummyStoreOperator dummyOp = getJoinParentOp(mergeMapOp); + connectOps.put(mergeMapWork.getTag(), dummyOp); + } + mergeMapOp.setExecContext(new ExecMapperContext(jconf)); + mergeMapOp.initializeLocalWork(jconf); + } + } + } + // initialize map operator mapOp.setConf(mapWork); + l4j.info("Main input name is " + mapWork.getName()); + jconf.set(Utilities.INPUT_NAME, mapWork.getName()); mapOp.setChildren(jconf); l4j.info(mapOp.dump(0)); @@ -121,12 +195,21 @@ public class MapRecordProcessor extends ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext); mapOp.setExecContext(execContext); mapOp.initializeLocalWork(jconf); + + initializeMapRecordSources(); mapOp.initialize(jconf, null); + if ((mergeMapOpList != null) && mergeMapOpList.isEmpty() == false) { + for (MapOperator mergeMapOp : mergeMapOpList) { + jconf.set(Utilities.INPUT_NAME, mergeMapOp.getConf().getName()); + mergeMapOp.initialize(jconf, null); + } + } // Initialization isn't finished until all parents of all operators // are initialized. For broadcast joins that means initializing the // dummy parent operators as well. List<HashTableDummyOperator> dummyOps = mapWork.getDummyOps(); + jconf.set(Utilities.INPUT_NAME, mapWork.getName()); if (dummyOps != null) { for (Operator<? extends OperatorDesc> dummyOp : dummyOps){ dummyOp.setExecContext(execContext); @@ -151,54 +234,46 @@ public class MapRecordProcessor extends perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); } - @Override - void run() throws IOException{ - - MRInputLegacy in = TezProcessor.getMRInput(inputs); - KeyValueReader reader = in.getReader(); + private void initializeMapRecordSources() throws Exception { + int size = mergeMapOpList.size() + 1; // the +1 is for the main map operator itself + sources = new MapRecordSource[size]; + KeyValueReader reader = legacyMRInput.getReader(); + position = mapOp.getConf().getTag(); + sources[position] = new MapRecordSource(); + sources[position].init(jconf, mapOp, reader); + for (MapOperator mapOp : mergeMapOpList) { + int tag = mapOp.getConf().getTag(); + sources[tag] = new MapRecordSource(); + String inputName = mapOp.getConf().getName(); + MultiMRInput multiMRInput = multiMRInputMap.get(inputName); + Collection<KeyValueReader> kvReaders = multiMRInput.getKeyValueReaders(); + l4j.debug("There are " + kvReaders.size() + " key-value readers for input " + inputName); + List<KeyValueReader> kvReaderList = new ArrayList<KeyValueReader>(kvReaders); + reader = new KeyValueInputMerger(kvReaderList); + sources[tag].init(jconf, mapOp, reader); + } + ((TezContext) MapredContext.get()).setRecordSources(sources); + } - //process records until done - while(reader.next()){ - //ignore the key for maps - reader.getCurrentKey(); - Object value = reader.getCurrentValue(); - boolean needMore = processRow(value); - if(!needMore){ - break; + private DummyStoreOperator getJoinParentOp(Operator<? extends OperatorDesc> mergeMapOp) { + for (Operator<? extends OperatorDesc> childOp : mergeMapOp.getChildOperators()) { + if ((childOp.getChildOperators() == null) || (childOp.getChildOperators().isEmpty())) { + return (DummyStoreOperator) childOp; + } else { + return getJoinParentOp(childOp); } } + return null; } + @Override + void run() throws Exception { - /** - * @param value value to process - * @return true if it is not done and can take more inputs - */ - private boolean processRow(Object value) { - // reset the execContext for each new row - execContext.resetRow(); - - try { - if (mapOp.getDone()) { - return false; //done - } else { - // Since there is no concept of a group, we don't invoke - // startGroup/endGroup for a mapper - mapOp.process((Writable)value); - if (isLogInfoEnabled) { - logProgress(); - } - } - } catch (Throwable e) { - abort = true; - if (e instanceof OutOfMemoryError) { - // Don't create a new object if we are already out of memory - throw (OutOfMemoryError) e; - } else { - l4j.fatal(StringUtils.stringifyException(e)); - throw new RuntimeException(e); + while (sources[position].pushRecord()) { + if (isLogInfoEnabled) { + logProgress(); } } - return true; //give me more } @Override @@ -214,6 +289,11 @@ public class MapRecordProcessor extends return; } mapOp.close(abort); + if (mergeMapOpList.isEmpty() == false) { + for (MapOperator mergeMapOp : mergeMapOpList) { + mergeMapOp.close(abort); + } + } // Need to close the dummyOps as well. The operator pipeline // is not considered "closed/done" unless all operators are @@ -242,4 +322,27 @@ public class MapRecordProcessor extends MapredContext.close(); } } + + public static Map<Integer, DummyStoreOperator> getConnectOps() { + return connectOps; + } + + private MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) throws Exception { + // there should be only one MRInput + MRInputLegacy theMRInput = null; + l4j.info("The input names are: " + Arrays.toString(inputs.keySet().toArray())); + for (Entry<String, LogicalInput> inp : inputs.entrySet()) { + if (inp.getValue() instanceof MRInputLegacy) { + if (theMRInput != null) { + throw new IllegalArgumentException("Only one MRInput is expected"); + } + // a better logic would be to find the alias + theMRInput = (MRInputLegacy) inp.getValue(); + } else if (inp.getValue() instanceof MultiMRInput) { + multiMRInputMap.put(inp.getKey(), (MultiMRInput) inp.getValue()); + } + } + theMRInput.init(); + return theMRInput; + } }
