Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1627235&r1=1627234&r2=1627235&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Wed Sep 24 07:03:35 2014 @@ -20,6 +20,23 @@ package org.apache.hadoop.hive.ql.exec.t import com.google.common.base.Function; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; + +import javax.security.auth.login.LoginException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -32,6 +49,7 @@ import org.apache.hadoop.hive.conf.HiveC import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; @@ -47,10 +65,12 @@ import org.apache.hadoop.hive.ql.io.merg import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.TezWork.VertexType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; @@ -90,12 +110,16 @@ import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.dag.api.VertexManagerPluginDescriptor; import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; +import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRInputHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; +import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.mapreduce.input.MultiMRInput; import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.mapreduce.partition.MRPartitioner; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.comparator.TezBytesComparator; import org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization; @@ -104,21 +128,6 @@ import org.apache.tez.runtime.library.co import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig; import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput; -import javax.security.auth.login.LoginException; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - /** * DagUtils. DagUtils is a collection of helper methods to convert * map and reduce work to tez vertices and edges. It handles configuration @@ -130,6 +139,11 @@ public class DagUtils { private static final Log LOG = LogFactory.getLog(DagUtils.class.getName()); private static final String TEZ_DIR = "_tez_scratch_dir"; private static DagUtils instance; + // The merge file being currently processed. + public static final String TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX = + "hive.tez.current.merge.file.prefix"; + // "A comma separated list of work names used as prefix. + public static final String TEZ_MERGE_WORK_FILE_PREFIXES = "hive.tez.merge.file.prefixes"; private void addCredentials(MapWork mapWork, DAG dag) { Set<String> paths = mapWork.getPathToAliases().keySet(); @@ -238,8 +252,8 @@ public class DagUtils { * endpoints. */ @SuppressWarnings("rawtypes") - public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, - Vertex w, TezEdgeProperty edgeProp) + public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, Vertex w, + TezEdgeProperty edgeProp, VertexType vertexType) throws IOException { Class mergeInputClass; @@ -254,10 +268,14 @@ public class DagUtils { case CUSTOM_EDGE: { mergeInputClass = ConcatenatedMergedKeyValueInput.class; int numBuckets = edgeProp.getNumBuckets(); + CustomVertexConfiguration vertexConf = + new CustomVertexConfiguration(numBuckets, vertexType, ""); + DataOutputBuffer dob = new DataOutputBuffer(); + vertexConf.write(dob); VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName()); - ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets); - userPayload.flip(); + byte[] userPayloadBytes = dob.getData(); + ByteBuffer userPayload = ByteBuffer.wrap(userPayloadBytes); desc.setUserPayload(UserPayload.create(userPayload)); w.setVertexManagerPlugin(desc); break; @@ -289,17 +307,21 @@ public class DagUtils { * @param w The second vertex (sink) * @return */ - public Edge createEdge(JobConf vConf, Vertex v, Vertex w, - TezEdgeProperty edgeProp) + public Edge createEdge(JobConf vConf, Vertex v, Vertex w, TezEdgeProperty edgeProp, + VertexType vertexType) throws IOException { switch(edgeProp.getEdgeType()) { case CUSTOM_EDGE: { int numBuckets = edgeProp.getNumBuckets(); - ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets); - userPayload.flip(); + CustomVertexConfiguration vertexConf = + new CustomVertexConfiguration(numBuckets, vertexType, ""); + DataOutputBuffer dob = new DataOutputBuffer(); + vertexConf.write(dob); VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create( CustomPartitionVertex.class.getName()); + byte[] userPayloadBytes = dob.getData(); + ByteBuffer userPayload = ByteBuffer.wrap(userPayloadBytes); desc.setUserPayload(UserPayload.create(userPayload)); w.setVertexManagerPlugin(desc); break; @@ -443,12 +465,61 @@ public class DagUtils { return MRHelpers.getJavaOptsForMRMapper(conf); } + private Vertex createVertex(JobConf conf, MergeJoinWork mergeJoinWork, LocalResource appJarLr, + List<LocalResource> additionalLr, FileSystem fs, Path mrScratchDir, Context ctx, + VertexType vertexType) + throws Exception { + Utilities.setMergeWork(conf, mergeJoinWork, mrScratchDir, false); + if (mergeJoinWork.getMainWork() instanceof MapWork) { + List<BaseWork> mapWorkList = mergeJoinWork.getBaseWorkList(); + MapWork mapWork = (MapWork) (mergeJoinWork.getMainWork()); + CommonMergeJoinOperator mergeJoinOp = mergeJoinWork.getMergeJoinOperator(); + Vertex mergeVx = + createVertex(conf, mapWork, appJarLr, additionalLr, fs, mrScratchDir, ctx, vertexType); + + // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat + // here would cause pre-mature grouping which would be incorrect. + Class inputFormatClass = HiveInputFormat.class; + conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class); + // mapreduce.tez.input.initializer.serialize.event.payload should be set + // to false when using this plug-in to avoid getting a serialized event at run-time. + conf.setBoolean("mapreduce.tez.input.initializer.serialize.event.payload", false); + for (int i = 0; i < mapWorkList.size(); i++) { + + mapWork = (MapWork) (mapWorkList.get(i)); + conf.set(TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX, mapWork.getName()); + conf.set(Utilities.INPUT_NAME, mapWork.getName()); + LOG.info("Going through each work and adding MultiMRInput"); + mergeVx.addDataSource(mapWork.getName(), + MultiMRInput.createConfigBuilder(conf, HiveInputFormat.class).build()); + } + + VertexManagerPluginDescriptor desc = + VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName()); + CustomVertexConfiguration vertexConf = + new CustomVertexConfiguration(mergeJoinWork.getMergeJoinOperator().getConf() + .getNumBuckets(), vertexType, mergeJoinWork.getBigTableAlias()); + DataOutputBuffer dob = new DataOutputBuffer(); + vertexConf.write(dob); + byte[] userPayload = dob.getData(); + desc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload))); + mergeVx.setVertexManagerPlugin(desc); + return mergeVx; + } else { + Vertex mergeVx = + createVertex(conf, (ReduceWork) mergeJoinWork.getMainWork(), appJarLr, additionalLr, fs, + mrScratchDir, ctx); + return mergeVx; + } + } + /* * Helper function to create Vertex from MapWork. */ private Vertex createVertex(JobConf conf, MapWork mapWork, LocalResource appJarLr, List<LocalResource> additionalLr, FileSystem fs, - Path mrScratchDir, Context ctx, TezWork tezWork) throws Exception { + Path mrScratchDir, Context ctx, VertexType vertexType) + throws Exception { Path tezDir = getTezDir(mrScratchDir); @@ -470,15 +541,8 @@ public class DagUtils { Class inputFormatClass = conf.getClass("mapred.input.format.class", InputFormat.class); - boolean vertexHasCustomInput = false; - if (tezWork != null) { - for (BaseWork baseWork : tezWork.getParents(mapWork)) { - if (tezWork.getEdgeType(baseWork, mapWork) == EdgeType.CUSTOM_EDGE) { - vertexHasCustomInput = true; - } - } - } - + boolean vertexHasCustomInput = VertexType.isCustomInputType(vertexType); + LOG.info("Vertex has custom input? " + vertexHasCustomInput); if (vertexHasCustomInput) { groupSplitsInInputInitializer = false; // grouping happens in execution phase. The input payload should not enable grouping here, @@ -513,6 +577,8 @@ public class DagUtils { } } + // remember mapping of plan to input + conf.set(Utilities.INPUT_NAME, mapWork.getName()); if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION) && !mapWork.isUseOneNullRowInputFormat()) { @@ -593,6 +659,7 @@ public class DagUtils { Path mrScratchDir, Context ctx) throws Exception { // set up operator plan + conf.set(Utilities.INPUT_NAME, reduceWork.getName()); Utilities.setReduceWork(conf, reduceWork, mrScratchDir, false); // create the directories FileSinkOperators need @@ -937,12 +1004,22 @@ public class DagUtils { return initializeVertexConf(conf, context, (MapWork)work); } else if (work instanceof ReduceWork) { return initializeVertexConf(conf, context, (ReduceWork)work); + } else if (work instanceof MergeJoinWork) { + return initializeVertexConf(conf, context, (MergeJoinWork) work); } else { assert false; return null; } } + private JobConf initializeVertexConf(JobConf conf, Context context, MergeJoinWork work) { + if (work.getMainWork() instanceof MapWork) { + return initializeVertexConf(conf, context, (MapWork) (work.getMainWork())); + } else { + return initializeVertexConf(conf, context, (ReduceWork) (work.getMainWork())); + } + } + /** * Create a vertex from a given work object. * @@ -958,18 +1035,21 @@ public class DagUtils { */ public Vertex createVertex(JobConf conf, BaseWork work, Path scratchDir, LocalResource appJarLr, - List<LocalResource> additionalLr, - FileSystem fileSystem, Context ctx, boolean hasChildren, TezWork tezWork) throws Exception { + List<LocalResource> additionalLr, FileSystem fileSystem, Context ctx, boolean hasChildren, + TezWork tezWork, VertexType vertexType) throws Exception { Vertex v = null; // simply dispatch the call to the right method for the actual (sub-) type of // BaseWork. if (work instanceof MapWork) { - v = createVertex(conf, (MapWork) work, appJarLr, - additionalLr, fileSystem, scratchDir, ctx, tezWork); + v = createVertex(conf, (MapWork) work, appJarLr, additionalLr, fileSystem, scratchDir, ctx, + vertexType); } else if (work instanceof ReduceWork) { v = createVertex(conf, (ReduceWork) work, appJarLr, additionalLr, fileSystem, scratchDir, ctx); + } else if (work instanceof MergeJoinWork) { + v = createVertex(conf, (MergeJoinWork) work, appJarLr, additionalLr, fileSystem, scratchDir, + ctx, vertexType); } else { // something is seriously wrong if this is happening throw new HiveException(ErrorMsg.GENERIC_ERROR.getErrorCodedMsg());
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1627235&r1=1627234&r2=1627235&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Wed Sep 24 07:03:35 2014 @@ -152,8 +152,21 @@ public class HiveSplitGenerator extends public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf, Configuration conf, InputSplit[] splits, float waves, int availableSlots) throws Exception { + return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null); + } + + public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf, + Configuration conf, InputSplit[] splits, float waves, int availableSlots, + String inputName) throws Exception { - MapWork work = Utilities.getMapWork(jobConf); + MapWork work = null; + if (inputName != null) { + work = (MapWork) Utilities.getMergeWork(jobConf, inputName); + // work can still be null if there is no merge work for this input + } + if (work == null) { + work = Utilities.getMapWork(jobConf); + } Multimap<Integer, InputSplit> bucketSplitMultiMap = ArrayListMultimap.<Integer, InputSplit> create(); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1627235&r1=1627234&r2=1627235&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Wed Sep 24 07:03:35 2014 @@ -17,14 +17,20 @@ */ package org.apache.hadoop.hive.ql.exec.tez; -import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; @@ -36,15 +42,17 @@ import org.apache.hadoop.hive.ql.exec.Ut import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; +import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger; import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator; +import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.util.StringUtils; import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.mapreduce.input.MultiMRInput; import org.apache.tez.mapreduce.processor.MRTaskReporter; +import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.ProcessorContext; @@ -58,27 +66,61 @@ public class MapRecordProcessor extends private MapOperator mapOp; + private final List<MapOperator> mergeMapOpList = new ArrayList<MapOperator>(); public static final Log l4j = LogFactory.getLog(MapRecordProcessor.class); - private final ExecMapperContext execContext = new ExecMapperContext(); + private MapRecordSource[] sources; + private final Map<String, MultiMRInput> multiMRInputMap = new HashMap<String, MultiMRInput>(); + private int position = 0; + private boolean foundCachedMergeWork = false; + MRInputLegacy legacyMRInput = null; + private ExecMapperContext execContext = null; private boolean abort = false; protected static final String MAP_PLAN_KEY = "__MAP_PLAN__"; private MapWork mapWork; + List<MapWork> mergeWorkList = null; + private static Map<Integer, DummyStoreOperator> connectOps = + new TreeMap<Integer, DummyStoreOperator>(); - public MapRecordProcessor(JobConf jconf) { + public MapRecordProcessor(JobConf jconf) throws Exception { ObjectCache cache = ObjectCacheFactory.getCache(jconf); + execContext = new ExecMapperContext(jconf); execContext.setJc(jconf); // create map and fetch operators mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY); if (mapWork == null) { mapWork = Utilities.getMapWork(jconf); cache.cache(MAP_PLAN_KEY, mapWork); - l4j.info("Plan: "+mapWork); + l4j.debug("Plan: " + mapWork); for (String s: mapWork.getAliases()) { - l4j.info("Alias: "+s); + l4j.debug("Alias: " + s); } } else { Utilities.setMapWork(jconf, mapWork); } + + String prefixes = jconf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES); + if (prefixes != null) { + mergeWorkList = new ArrayList<MapWork>(); + for (String prefix : prefixes.split(",")) { + MapWork mergeMapWork = (MapWork) cache.retrieve(prefix); + if (mergeMapWork != null) { + l4j.info("Found merge work in cache"); + foundCachedMergeWork = true; + mergeWorkList.add(mergeMapWork); + continue; + } + if (foundCachedMergeWork) { + throw new Exception( + "Should find all work in cache else operator pipeline will be in non-deterministic state"); + } + + if ((prefix != null) && (prefix.isEmpty() == false)) { + mergeMapWork = (MapWork) Utilities.getMergeWork(jconf, prefix); + mergeWorkList.add(mergeMapWork); + cache.cache(prefix, mergeMapWork); + } + } + } } @Override @@ -88,8 +130,8 @@ public class MapRecordProcessor extends super.init(jconf, processorContext, mrReporter, inputs, outputs); //Update JobConf using MRInput, info like filename comes via this - MRInputLegacy mrInput = TezProcessor.getMRInput(inputs); - Configuration updatedConf = mrInput.getConfigUpdates(); + legacyMRInput = getMRInput(inputs); + Configuration updatedConf = legacyMRInput.getConfigUpdates(); if (updatedConf != null) { for (Entry<String, String> entry : updatedConf) { jconf.set(entry.getKey(), entry.getValue()); @@ -99,20 +141,52 @@ public class MapRecordProcessor extends createOutputMap(); // Start all the Outputs. for (Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) { - l4j.info("Starting Output: " + outputEntry.getKey()); + l4j.debug("Starting Output: " + outputEntry.getKey()); outputEntry.getValue().start(); ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize(); } try { + if (mapWork.getVectorMode()) { mapOp = new VectorMapOperator(); } else { mapOp = new MapOperator(); } + connectOps.clear(); + if (mergeWorkList != null) { + MapOperator mergeMapOp = null; + for (MapWork mergeMapWork : mergeWorkList) { + processorContext.waitForAnyInputReady(Collections.singletonList((Input) (inputs + .get(mergeMapWork.getName())))); + if (mergeMapWork.getVectorMode()) { + mergeMapOp = new VectorMapOperator(); + } else { + mergeMapOp = new MapOperator(); + } + + mergeMapOpList.add(mergeMapOp); + // initialize the merge operators first. + if (mergeMapOp != null) { + mergeMapOp.setConf(mergeMapWork); + l4j.info("Input name is " + mergeMapWork.getName()); + jconf.set(Utilities.INPUT_NAME, mergeMapWork.getName()); + mergeMapOp.setChildren(jconf); + if (foundCachedMergeWork == false) { + DummyStoreOperator dummyOp = getJoinParentOp(mergeMapOp); + connectOps.put(mergeMapWork.getTag(), dummyOp); + } + mergeMapOp.setExecContext(new ExecMapperContext(jconf)); + mergeMapOp.initializeLocalWork(jconf); + } + } + } + // initialize map operator mapOp.setConf(mapWork); + l4j.info("Main input name is " + mapWork.getName()); + jconf.set(Utilities.INPUT_NAME, mapWork.getName()); mapOp.setChildren(jconf); l4j.info(mapOp.dump(0)); @@ -121,12 +195,21 @@ public class MapRecordProcessor extends ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext); mapOp.setExecContext(execContext); mapOp.initializeLocalWork(jconf); + + initializeMapRecordSources(); mapOp.initialize(jconf, null); + if ((mergeMapOpList != null) && mergeMapOpList.isEmpty() == false) { + for (MapOperator mergeMapOp : mergeMapOpList) { + jconf.set(Utilities.INPUT_NAME, mergeMapOp.getConf().getName()); + mergeMapOp.initialize(jconf, null); + } + } // Initialization isn't finished until all parents of all operators // are initialized. For broadcast joins that means initializing the // dummy parent operators as well. List<HashTableDummyOperator> dummyOps = mapWork.getDummyOps(); + jconf.set(Utilities.INPUT_NAME, mapWork.getName()); if (dummyOps != null) { for (Operator<? extends OperatorDesc> dummyOp : dummyOps){ dummyOp.setExecContext(execContext); @@ -151,54 +234,46 @@ public class MapRecordProcessor extends perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); } - @Override - void run() throws IOException{ - - MRInputLegacy in = TezProcessor.getMRInput(inputs); - KeyValueReader reader = in.getReader(); + private void initializeMapRecordSources() throws Exception { + int size = mergeMapOpList.size() + 1; // the +1 is for the main map operator itself + sources = new MapRecordSource[size]; + KeyValueReader reader = legacyMRInput.getReader(); + position = mapOp.getConf().getTag(); + sources[position] = new MapRecordSource(); + sources[position].init(jconf, mapOp, reader); + for (MapOperator mapOp : mergeMapOpList) { + int tag = mapOp.getConf().getTag(); + sources[tag] = new MapRecordSource(); + String inputName = mapOp.getConf().getName(); + MultiMRInput multiMRInput = multiMRInputMap.get(inputName); + Collection<KeyValueReader> kvReaders = multiMRInput.getKeyValueReaders(); + l4j.debug("There are " + kvReaders.size() + " key-value readers for input " + inputName); + List<KeyValueReader> kvReaderList = new ArrayList<KeyValueReader>(kvReaders); + reader = new KeyValueInputMerger(kvReaderList); + sources[tag].init(jconf, mapOp, reader); + } + ((TezContext) MapredContext.get()).setRecordSources(sources); + } - //process records until done - while(reader.next()){ - //ignore the key for maps - reader.getCurrentKey(); - Object value = reader.getCurrentValue(); - boolean needMore = processRow(value); - if(!needMore){ - break; + private DummyStoreOperator getJoinParentOp(Operator<? extends OperatorDesc> mergeMapOp) { + for (Operator<? extends OperatorDesc> childOp : mergeMapOp.getChildOperators()) { + if ((childOp.getChildOperators() == null) || (childOp.getChildOperators().isEmpty())) { + return (DummyStoreOperator) childOp; + } else { + return getJoinParentOp(childOp); } } + return null; } + @Override + void run() throws Exception { - /** - * @param value value to process - * @return true if it is not done and can take more inputs - */ - private boolean processRow(Object value) { - // reset the execContext for each new row - execContext.resetRow(); - - try { - if (mapOp.getDone()) { - return false; //done - } else { - // Since there is no concept of a group, we don't invoke - // startGroup/endGroup for a mapper - mapOp.process((Writable)value); - if (isLogInfoEnabled) { - logProgress(); - } - } - } catch (Throwable e) { - abort = true; - if (e instanceof OutOfMemoryError) { - // Don't create a new object if we are already out of memory - throw (OutOfMemoryError) e; - } else { - l4j.fatal(StringUtils.stringifyException(e)); - throw new RuntimeException(e); + while (sources[position].pushRecord()) { + if (isLogInfoEnabled) { + logProgress(); } } - return true; //give me more } @Override @@ -214,6 +289,11 @@ public class MapRecordProcessor extends return; } mapOp.close(abort); + if (mergeMapOpList.isEmpty() == false) { + for (MapOperator mergeMapOp : mergeMapOpList) { + mergeMapOp.close(abort); + } + } // Need to close the dummyOps as well. The operator pipeline // is not considered "closed/done" unless all operators are @@ -242,4 +322,27 @@ public class MapRecordProcessor extends MapredContext.close(); } } + + public static Map<Integer, DummyStoreOperator> getConnectOps() { + return connectOps; + } + + private MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) throws Exception { + // there should be only one MRInput + MRInputLegacy theMRInput = null; + l4j.info("The input names are: " + Arrays.toString(inputs.keySet().toArray())); + for (Entry<String, LogicalInput> inp : inputs.entrySet()) { + if (inp.getValue() instanceof MRInputLegacy) { + if (theMRInput != null) { + throw new IllegalArgumentException("Only one MRInput is expected"); + } + // a better logic would be to find the alias + theMRInput = (MRInputLegacy) inp.getValue(); + } else if (inp.getValue() instanceof MultiMRInput) { + multiMRInputMap.put(inp.getKey(), (MultiMRInput) inp.getValue()); + } + } + theMRInput.init(); + return theMRInput; + } } Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java?rev=1627235&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java Wed Sep 24 07:03:35 2014 @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.MapOperator; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.StringUtils; +import org.apache.tez.mapreduce.input.MRInput; +import org.apache.tez.runtime.library.api.KeyValueReader; + +/** + * Process input from tez LogicalInput and write output - for a map plan Just pump the records + * through the query plan. + */ + +public class MapRecordSource implements RecordSource { + + public static final Log LOG = LogFactory.getLog(MapRecordSource.class); + private ExecMapperContext execContext = null; + private MapOperator mapOp = null; + private KeyValueReader reader = null; + private final boolean grouped = false; + + void init(JobConf jconf, MapOperator mapOp, KeyValueReader reader) throws IOException { + execContext = new ExecMapperContext(jconf); + this.mapOp = mapOp; + this.reader = reader; + } + + @Override + public final boolean isGrouped() { + return grouped; + } + + @Override + public boolean pushRecord() throws HiveException { + execContext.resetRow(); + + try { + if (reader.next()) { + Object value; + try { + value = reader.getCurrentValue(); + } catch (IOException e) { + throw new HiveException(e); + } + return processRow(value); + } + } catch (IOException e) { + throw new HiveException(e); + } + return false; + } + + private boolean processRow(Object value) { + try { + if (mapOp.getDone()) { + return false; // done + } else { + // Since there is no concept of a group, we don't invoke + // startGroup/endGroup for a mapper + mapOp.process((Writable) value); + } + } catch (Throwable e) { + if (e instanceof OutOfMemoryError) { + // Don't create a new object if we are already out of memory + throw (OutOfMemoryError) e; + } else { + LOG.fatal(StringUtils.stringifyException(e)); + throw new RuntimeException(e); + } + } + return true; // give me more + } + +} Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java?rev=1627235&r1=1627234&r2=1627235&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java Wed Sep 24 07:03:35 2014 @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.t import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; import org.apache.hadoop.hive.ql.exec.Operator; @@ -40,7 +41,9 @@ import org.apache.tez.runtime.api.Logica import org.apache.tez.runtime.api.ProcessorContext; import org.apache.tez.runtime.library.api.KeyValueReader; +import java.io.IOException; import java.util.Map; +import java.util.Map.Entry; /** * Record processor for fast merging of files. @@ -51,11 +54,12 @@ public class MergeFileRecordProcessor ex .getLog(MergeFileRecordProcessor.class); protected Operator<? extends OperatorDesc> mergeOp; - private final ExecMapperContext execContext = new ExecMapperContext(); + private ExecMapperContext execContext = null; protected static final String MAP_PLAN_KEY = "__MAP_PLAN__"; private MergeFileWork mfWork; + MRInputLegacy mrInput = null; private boolean abort = false; - private Object[] row = new Object[2]; + private final Object[] row = new Object[2]; @Override void init(JobConf jconf, ProcessorContext processorContext, @@ -63,16 +67,16 @@ public class MergeFileRecordProcessor ex Map<String, LogicalOutput> outputs) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); super.init(jconf, processorContext, mrReporter, inputs, outputs); + execContext = new ExecMapperContext(jconf); //Update JobConf using MRInput, info like filename comes via this - MRInputLegacy mrInput = TezProcessor.getMRInput(inputs); + mrInput = getMRInput(inputs); Configuration updatedConf = mrInput.getConfigUpdates(); if (updatedConf != null) { for (Map.Entry<String, String> entry : updatedConf) { jconf.set(entry.getKey(), entry.getValue()); } } - createOutputMap(); // Start all the Outputs. for (Map.Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) { @@ -127,8 +131,7 @@ public class MergeFileRecordProcessor ex @Override void run() throws Exception { - MRInputLegacy in = TezProcessor.getMRInput(inputs); - KeyValueReader reader = in.getReader(); + KeyValueReader reader = mrInput.getReader(); //process records until done while (reader.next()) { @@ -205,4 +208,23 @@ public class MergeFileRecordProcessor ex return true; //give me more } + private MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) throws Exception { + // there should be only one MRInput + MRInputLegacy theMRInput = null; + for (Entry<String, LogicalInput> inp : inputs.entrySet()) { + if (inp.getValue() instanceof MRInputLegacy) { + if (theMRInput != null) { + throw new IllegalArgumentException("Only one MRInput is expected"); + } + // a better logic would be to find the alias + theMRInput = (MRInputLegacy) inp.getValue(); + } else { + throw new IOException("Expecting only one input of type MRInputLegacy. Found type: " + + inp.getClass().getCanonicalName()); + } + } + theMRInput.init(); + + return theMRInput; + } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java?rev=1627235&r1=1627234&r2=1627235&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java Wed Sep 24 07:03:35 2014 @@ -39,12 +39,6 @@ public class MergeFileTezProcessor exten public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception { rproc = new MergeFileRecordProcessor(); - MRInputLegacy mrInput = getMRInput(inputs); - try { - mrInput.init(); - } catch (IOException e) { - throw new RuntimeException("Failed while initializing MRInput", e); - } initializeAndRunProcessor(inputs, outputs); } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1627235&r1=1627234&r2=1627235&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java Wed Sep 24 07:03:35 2014 @@ -115,8 +115,7 @@ public abstract class RecordProcessor { */ protected void logCloseInfo() { long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); - l4j.info("ExecMapper: processed " + numRows + " rows: used memory = " - + used_memory); + l4j.info("TezProcessor: processed " + numRows + " rows/groups: used memory = " + used_memory); } /** @@ -126,8 +125,7 @@ public abstract class RecordProcessor { numRows++; if (numRows == nextUpdateCntr) { long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); - l4j.info("ExecMapper: processing " + numRows - + " rows: used memory = " + used_memory); + l4j.info("TezProcessor: processing " + numRows + " rows/groups: used memory = " + used_memory); nextUpdateCntr = getNextUpdateRecordCounter(numRows); } } Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordSource.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordSource.java?rev=1627235&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordSource.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordSource.java Wed Sep 24 07:03:35 2014 @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import org.apache.hadoop.hive.ql.metadata.HiveException; + +public interface RecordSource { + public boolean pushRecord() throws HiveException; + public boolean isGrouped(); +} \ No newline at end of file Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1627235&r1=1627234&r2=1627235&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Wed Sep 24 07:03:35 2014 @@ -17,9 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec.tez; -import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -35,31 +33,13 @@ import org.apache.hadoop.hive.ql.exec.Op import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats; -import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; -import org.apache.hadoop.hive.ql.exec.tez.tools.InputMerger; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; -import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; import org.apache.hadoop.hive.ql.log.PerfLogger; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TableDesc; -import org.apache.hadoop.hive.serde2.Deserializer; -import org.apache.hadoop.hive.serde2.SerDe; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.StringUtils; import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.LogicalInput; @@ -76,39 +56,16 @@ public class ReduceRecordProcessor exte private static final String REDUCE_PLAN_KEY = "__REDUCE_PLAN__"; public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class); - private final ExecMapperContext execContext = new ExecMapperContext(); - private boolean abort = false; - private Deserializer inputKeyDeserializer; - - // Input value serde needs to be an array to support different SerDe - // for different tags - private final SerDe[] inputValueDeserializer = new SerDe[Byte.MAX_VALUE]; - TableDesc keyTableDesc; - TableDesc[] valueTableDesc; + private ReduceWork redWork; - ObjectInspector[] rowObjectInspector; private Operator<?> reducer; - private boolean isTagged = false; - - private Object keyObject = null; - private BytesWritable groupKey; - - private ReduceWork redWork; - private boolean vectorized = false; + private ReduceRecordSource[] sources; - List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size()); + private final byte position = 0; - private DataOutputBuffer buffer; - private VectorizedRowBatch[] batches; - // number of columns pertaining to keys in a vectorized row batch - private int keysColumnOffset; - private final int BATCH_SIZE = VectorizedRowBatch.DEFAULT_SIZE; - private StructObjectInspector keyStructInspector; - private StructObjectInspector[] valueStructInspectors; - /* this is only used in the error code path */ - private List<VectorExpressionWriter>[] valueStringWriters; + private boolean abort; @Override void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter, @@ -118,10 +75,6 @@ public class ReduceRecordProcessor exte ObjectCache cache = ObjectCacheFactory.getCache(jconf); - rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; - ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; - ObjectInspector keyObjectInspector; - redWork = (ReduceWork) cache.retrieve(REDUCE_PLAN_KEY); if (redWork == null) { redWork = Utilities.getReduceWork(jconf); @@ -131,95 +84,35 @@ public class ReduceRecordProcessor exte } reducer = redWork.getReducer(); - reducer.setParentOperators(null); // clear out any parents as reducer is the - // root - isTagged = redWork.getNeedsTagging(); - vectorized = redWork.getVectorMode(); + reducer.getParentOperators().clear(); + reducer.setParentOperators(null); // clear out any parents as reducer is the root - try { - keyTableDesc = redWork.getKeyDesc(); - inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc - .getDeserializerClass(), null); - SerDeUtils.initializeSerDe(inputKeyDeserializer, null, keyTableDesc.getProperties(), null); - keyObjectInspector = inputKeyDeserializer.getObjectInspector(); - reducer.setGroupKeyObjectInspector(keyObjectInspector); - valueTableDesc = new TableDesc[redWork.getTagToValueDesc().size()]; - - if(vectorized) { - final int maxTags = redWork.getTagToValueDesc().size(); - keyStructInspector = (StructObjectInspector)keyObjectInspector; - batches = new VectorizedRowBatch[maxTags]; - valueStructInspectors = new StructObjectInspector[maxTags]; - valueStringWriters = new List[maxTags]; - keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size(); - buffer = new DataOutputBuffer(); - } + int numTags = redWork.getTagToValueDesc().size(); - for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) { - // We should initialize the SerDe with the TypeInfo when available. - valueTableDesc[tag] = redWork.getTagToValueDesc().get(tag); - inputValueDeserializer[tag] = (SerDe) ReflectionUtils.newInstance( - valueTableDesc[tag].getDeserializerClass(), null); - SerDeUtils.initializeSerDe(inputValueDeserializer[tag], null, - valueTableDesc[tag].getProperties(), null); - valueObjectInspector[tag] = inputValueDeserializer[tag] - .getObjectInspector(); - - ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>(); - - if(vectorized) { - /* vectorization only works with struct object inspectors */ - valueStructInspectors[tag] = (StructObjectInspector)valueObjectInspector[tag]; - - batches[tag] = VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector, - valueStructInspectors[tag]); - final int totalColumns = keysColumnOffset + - valueStructInspectors[tag].getAllStructFieldRefs().size(); - valueStringWriters[tag] = new ArrayList<VectorExpressionWriter>(totalColumns); - valueStringWriters[tag].addAll(Arrays - .asList(VectorExpressionWriterFactory - .genVectorStructExpressionWritables(keyStructInspector))); - valueStringWriters[tag].addAll(Arrays - .asList(VectorExpressionWriterFactory - .genVectorStructExpressionWritables(valueStructInspectors[tag]))); - - /* - * The row object inspector used by ReduceWork needs to be a **standard** - * struct object inspector, not just any struct object inspector. - */ - ArrayList<String> colNames = new ArrayList<String>(); - List<? extends StructField> fields = keyStructInspector.getAllStructFieldRefs(); - for (StructField field: fields) { - colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName()); - ois.add(field.getFieldObjectInspector()); - } - fields = valueStructInspectors[tag].getAllStructFieldRefs(); - for (StructField field: fields) { - colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName()); - ois.add(field.getFieldObjectInspector()); - } - rowObjectInspector[tag] = ObjectInspectorFactory - .getStandardStructObjectInspector(colNames, ois); - } else { - ois.add(keyObjectInspector); - ois.add(valueObjectInspector[tag]); - rowObjectInspector[tag] = ObjectInspectorFactory - .getStandardStructObjectInspector(Utilities.reduceFieldNameList, ois); - } + ObjectInspector[] ois = new ObjectInspector[numTags]; + sources = new ReduceRecordSource[numTags]; - } - } catch (Exception e) { - throw new RuntimeException(e); + for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) { + TableDesc keyTableDesc = redWork.getKeyDesc(); + TableDesc valueTableDesc = redWork.getTagToValueDesc().get(tag); + KeyValuesReader reader = + (KeyValuesReader) inputs.get(redWork.getTagToInput().get(tag)).getReader(); + + sources[tag] = new ReduceRecordSource(); + sources[tag].init(jconf, reducer, redWork.getVectorMode(), keyTableDesc, valueTableDesc, + reader, tag == position, (byte) tag); + ois[tag] = sources[tag].getObjectInspector(); } MapredContext.init(false, new JobConf(jconf)); ((TezContext) MapredContext.get()).setInputs(inputs); ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext); + ((TezContext) MapredContext.get()).setRecordSources(sources); // initialize reduce operator tree try { l4j.info(reducer.dump(0)); - reducer.initialize(jconf, rowObjectInspector); + reducer.initialize(jconf, ois); // Initialization isn't finished until all parents of all operators // are initialized. For broadcast joins that means initializing the @@ -227,7 +120,6 @@ public class ReduceRecordProcessor exte List<HashTableDummyOperator> dummyOps = redWork.getDummyOps(); if (dummyOps != null) { for (Operator<? extends OperatorDesc> dummyOp : dummyOps){ - dummyOp.setExecContext(execContext); dummyOp.initialize(jconf, null); } } @@ -271,28 +163,12 @@ public class ReduceRecordProcessor exte ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize(); } - KeyValuesReader kvsReader; - try { - if(shuffleInputs.size() == 1){ - //no merging of inputs required - kvsReader = (KeyValuesReader) shuffleInputs.get(0).getReader(); - }else { - //get a sort merged input - kvsReader = new InputMerger(shuffleInputs); - } - } catch (Exception e) { - throw new IOException(e); - } - - while(kvsReader.next()){ - Object key = kvsReader.getCurrentKey(); - Iterable<Object> values = kvsReader.getCurrentValues(); - boolean needMore = processRows(key, values); - if(!needMore){ - break; + // run the operator pipeline + while (sources[position].pushRecord()) { + if (isLogInfoEnabled) { + logProgress(); } } - } /** @@ -302,212 +178,22 @@ public class ReduceRecordProcessor exte */ private List<LogicalInput> getShuffleInputs(Map<String, LogicalInput> inputs) { //the reduce plan inputs have tags, add all inputs that have tags - Map<Integer, String> tag2input = redWork.getTagToInput(); + Map<Integer, String> tagToinput = redWork.getTagToInput(); ArrayList<LogicalInput> shuffleInputs = new ArrayList<LogicalInput>(); - for(String inpStr : tag2input.values()){ + for(String inpStr : tagToinput.values()){ + if (inputs.get(inpStr) == null) { + throw new AssertionError("Cound not find input: " + inpStr); + } shuffleInputs.add(inputs.get(inpStr)); } return shuffleInputs; } - /** - * @param key - * @param values - * @return true if it is not done and can take more inputs - */ - private boolean processRows(Object key, Iterable<Object> values) { - if(reducer.getDone()){ - //done - no more records needed - return false; - } - - // reset the execContext for each new row - execContext.resetRow(); - - try { - BytesWritable keyWritable = (BytesWritable) key; - byte tag = 0; - - if (isTagged) { - // remove the tag from key coming out of reducer - // and store it in separate variable. - int size = keyWritable.getLength() - 1; - tag = keyWritable.getBytes()[size]; - keyWritable.setSize(size); - } - - //Set the key, check if this is a new group or same group - if (!keyWritable.equals(this.groupKey)) { - // If a operator wants to do some work at the beginning of a group - if (groupKey == null) { // the first group - this.groupKey = new BytesWritable(); - } else { - // If a operator wants to do some work at the end of a group - if(isLogTraceEnabled) { - l4j.trace("End Group"); - } - reducer.endGroup(); - } - - try { - this.keyObject = inputKeyDeserializer.deserialize(keyWritable); - } catch (Exception e) { - throw new HiveException( - "Hive Runtime Error: Unable to deserialize reduce input key from " - + Utilities.formatBinaryString(keyWritable.getBytes(), 0, - keyWritable.getLength()) + " with properties " - + keyTableDesc.getProperties(), e); - } - groupKey.set(keyWritable.getBytes(), 0, keyWritable.getLength()); - if (isLogTraceEnabled) { - l4j.trace("Start Group"); - } - reducer.setGroupKeyObject(keyObject); - reducer.startGroup(); - } - /* this.keyObject passed via reference */ - if(vectorized) { - return processVectors(values, tag); - } else { - return processKeyValues(values, tag); - } - } catch (Throwable e) { - abort = true; - if (e instanceof OutOfMemoryError) { - // Don't create a new object if we are already out of memory - throw (OutOfMemoryError) e; - } else { - l4j.fatal(StringUtils.stringifyException(e)); - throw new RuntimeException(e); - } - } - } - - private Object deserializeValue(BytesWritable valueWritable, byte tag) throws HiveException { - try { - return inputValueDeserializer[tag].deserialize(valueWritable); - } catch (SerDeException e) { - throw new HiveException( - "Hive Runtime Error: Unable to deserialize reduce input value (tag=" - + tag - + ") from " - + Utilities.formatBinaryString(valueWritable.getBytes(), 0, - valueWritable.getLength()) + " with properties " - + valueTableDesc[tag].getProperties(), e); - } - } - - /** - * @param values - * @return true if it is not done and can take more inputs - */ - private boolean processKeyValues(Iterable<Object> values, byte tag) throws HiveException { - - for (Object value : values) { - BytesWritable valueWritable = (BytesWritable) value; - - row.clear(); - row.add(this.keyObject); - row.add(deserializeValue(valueWritable, tag)); - - try { - reducer.processOp(row, tag); - } catch (Exception e) { - String rowString = null; - try { - rowString = SerDeUtils.getJSONString(row, rowObjectInspector[tag]); - } catch (Exception e2) { - rowString = "[Error getting row data with exception " - + StringUtils.stringifyException(e2) + " ]"; - } - throw new HiveException("Hive Runtime Error while processing row (tag=" - + tag + ") " + rowString, e); - } - if (isLogInfoEnabled) { - logProgress(); - } - } - return true; //give me more - } - - /** - * @param values - * @return true if it is not done and can take more inputs - */ - private boolean processVectors(Iterable<Object> values, byte tag) throws HiveException { - VectorizedRowBatch batch = batches[tag]; - batch.reset(); - buffer.reset(); - - /* deserialize key into columns */ - VectorizedBatchUtil.addRowToBatchFrom(keyObject, keyStructInspector, - 0, 0, batch, buffer); - for(int i = 0; i < keysColumnOffset; i++) { - VectorizedBatchUtil.setRepeatingColumn(batch, i); - } - - int rowIdx = 0; - try { - for (Object value : values) { - /* deserialize value into columns */ - BytesWritable valueWritable = (BytesWritable) value; - Object valueObj = deserializeValue(valueWritable, tag); - - VectorizedBatchUtil.addRowToBatchFrom(valueObj, valueStructInspectors[tag], - rowIdx, keysColumnOffset, batch, buffer); - rowIdx++; - if (rowIdx >= BATCH_SIZE) { - VectorizedBatchUtil.setBatchSize(batch, rowIdx); - reducer.processOp(batch, tag); - rowIdx = 0; - buffer.reset(); - if (isLogInfoEnabled) { - logProgress(); - } - } - } - if (rowIdx > 0) { - VectorizedBatchUtil.setBatchSize(batch, rowIdx); - reducer.processOp(batch, tag); - buffer.reset(); - } - if (isLogInfoEnabled) { - logProgress(); - } - } catch (Exception e) { - String rowString = null; - try { - /* batch.toString depends on this */ - batch.setValueWriters(valueStringWriters[tag] - .toArray(new VectorExpressionWriter[0])); - rowString = batch.toString(); - } catch (Exception e2) { - rowString = "[Error getting row data with exception " - + StringUtils.stringifyException(e2) + " ]"; - } - throw new HiveException("Hive Runtime Error while processing vector batch (tag=" - + tag + ") " + rowString, e); - } - return true; // give me more - } - @Override void close(){ - // check if there are IOExceptions - if (!abort) { - abort = execContext.getIoCxt().getIOExceptions(); - } - try { - if (groupKey != null) { - // If a operator wants to do some work at the end of a group - if(isLogTraceEnabled) { - l4j.trace("End Group"); - } - reducer.endGroup(); - } - if (isLogInfoEnabled) { - logCloseInfo(); + for (ReduceRecordSource rs: sources) { + abort = abort && rs.close(); } reducer.close(abort); Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java?rev=1627235&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java Wed Sep 24 07:03:35 2014 @@ -0,0 +1,385 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; +import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.tez.runtime.library.api.KeyValuesReader; + +/** + * Process input from tez LogicalInput and write output - for a map plan + * Just pump the records through the query plan. + */ +@SuppressWarnings("deprecation") +public class ReduceRecordSource implements RecordSource { + + public static final Log l4j = LogFactory.getLog(ReduceRecordSource.class); + + private static final String CLASS_NAME = ReduceRecordSource.class.getName(); + + private byte tag; + + private boolean abort = false; + + private static Deserializer inputKeyDeserializer; + + // Input value serde needs to be an array to support different SerDe + // for different tags + private SerDe inputValueDeserializer; + + TableDesc keyTableDesc; + TableDesc valueTableDesc; + + ObjectInspector rowObjectInspector; + private Operator<?> reducer; + + private Object keyObject = null; + private BytesWritable groupKey; + + private boolean vectorized = false; + + List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size()); + + private DataOutputBuffer buffer; + private VectorizedRowBatch batch; + + // number of columns pertaining to keys in a vectorized row batch + private int keysColumnOffset; + private final int BATCH_SIZE = VectorizedRowBatch.DEFAULT_SIZE; + + private StructObjectInspector keyStructInspector; + private StructObjectInspector valueStructInspectors; + + /* this is only used in the error code path */ + private List<VectorExpressionWriter> valueStringWriters; + + private KeyValuesReader reader; + + private boolean handleGroupKey; + + private ObjectInspector valueObjectInspector; + + private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); + + private Iterable<Object> valueWritables; + + private final boolean grouped = true; + + void init(JobConf jconf, Operator<?> reducer, boolean vectorized, TableDesc keyTableDesc, + TableDesc valueTableDesc, KeyValuesReader reader, boolean handleGroupKey, byte tag) + throws Exception { + + ObjectInspector keyObjectInspector; + + this.reducer = reducer; + this.vectorized = vectorized; + this.keyTableDesc = keyTableDesc; + this.reader = reader; + this.handleGroupKey = handleGroupKey; + this.tag = tag; + + try { + inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc + .getDeserializerClass(), null); + SerDeUtils.initializeSerDe(inputKeyDeserializer, null, keyTableDesc.getProperties(), null); + keyObjectInspector = inputKeyDeserializer.getObjectInspector(); + reducer.setGroupKeyObjectInspector(keyObjectInspector); + + if(vectorized) { + keyStructInspector = (StructObjectInspector) keyObjectInspector; + keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size(); + buffer = new DataOutputBuffer(); + } + + // We should initialize the SerDe with the TypeInfo when available. + this.valueTableDesc = valueTableDesc; + inputValueDeserializer = (SerDe) ReflectionUtils.newInstance( + valueTableDesc.getDeserializerClass(), null); + SerDeUtils.initializeSerDe(inputValueDeserializer, null, + valueTableDesc.getProperties(), null); + valueObjectInspector = inputValueDeserializer.getObjectInspector(); + + ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>(); + + if(vectorized) { + /* vectorization only works with struct object inspectors */ + valueStructInspectors = (StructObjectInspector) valueObjectInspector; + + batch = VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector, + valueStructInspectors); + + final int totalColumns = keysColumnOffset + + valueStructInspectors.getAllStructFieldRefs().size(); + valueStringWriters = new ArrayList<VectorExpressionWriter>(totalColumns); + valueStringWriters.addAll(Arrays + .asList(VectorExpressionWriterFactory + .genVectorStructExpressionWritables(keyStructInspector))); + valueStringWriters.addAll(Arrays + .asList(VectorExpressionWriterFactory + .genVectorStructExpressionWritables(valueStructInspectors))); + + /* + * The row object inspector used by ReduceWork needs to be a **standard** + * struct object inspector, not just any struct object inspector. + */ + ArrayList<String> colNames = new ArrayList<String>(); + List<? extends StructField> fields = keyStructInspector.getAllStructFieldRefs(); + for (StructField field: fields) { + colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName()); + ois.add(field.getFieldObjectInspector()); + } + fields = valueStructInspectors.getAllStructFieldRefs(); + for (StructField field: fields) { + colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName()); + ois.add(field.getFieldObjectInspector()); + } + rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois); + } else { + ois.add(keyObjectInspector); + ois.add(valueObjectInspector); + rowObjectInspector = + ObjectInspectorFactory.getStandardStructObjectInspector(Utilities.reduceFieldNameList, + ois); + } + } catch (Throwable e) { + abort = true; + if (e instanceof OutOfMemoryError) { + // Don't create a new object if we are already out of memory + throw (OutOfMemoryError) e; + } else { + throw new RuntimeException("Reduce operator initialization failed", e); + } + } + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); + } + + @Override + public final boolean isGrouped() { + return grouped; + } + + @Override + public boolean pushRecord() throws HiveException { + BytesWritable keyWritable; + + try { + if (!reader.next()) { + return false; + } else { + keyWritable = (BytesWritable) reader.getCurrentKey(); + valueWritables = reader.getCurrentValues(); + } + + //Set the key, check if this is a new group or same group + try { + keyObject = inputKeyDeserializer.deserialize(keyWritable); + } catch (Exception e) { + throw new HiveException("Hive Runtime Error: Unable to deserialize reduce input key from " + + Utilities.formatBinaryString(keyWritable.getBytes(), 0, keyWritable.getLength()) + + " with properties " + keyTableDesc.getProperties(), e); + } + + if (handleGroupKey && !keyWritable.equals(this.groupKey)) { + // If a operator wants to do some work at the beginning of a group + if (groupKey == null) { // the first group + this.groupKey = new BytesWritable(); + } else { + // If a operator wants to do some work at the end of a group + reducer.endGroup(); + } + + groupKey.set(keyWritable.getBytes(), 0, keyWritable.getLength()); + reducer.setGroupKeyObject(keyObject); + reducer.startGroup(); + } + + /* this.keyObject passed via reference */ + if(vectorized) { + processVectors(valueWritables, tag); + } else { + processKeyValues(valueWritables, tag); + } + return true; + } catch (Throwable e) { + abort = true; + if (e instanceof OutOfMemoryError) { + // Don't create a new object if we are already out of memory + throw (OutOfMemoryError) e; + } else { + l4j.fatal(StringUtils.stringifyException(e)); + throw new RuntimeException(e); + } + } + } + + private Object deserializeValue(BytesWritable valueWritable, byte tag) + throws HiveException { + + try { + return inputValueDeserializer.deserialize(valueWritable); + } catch (SerDeException e) { + throw new HiveException( + "Hive Runtime Error: Unable to deserialize reduce input value (tag=" + + tag + + ") from " + + Utilities.formatBinaryString(valueWritable.getBytes(), 0, valueWritable.getLength()) + + " with properties " + valueTableDesc.getProperties(), e); + } + } + + /** + * @param values + * @return true if it is not done and can take more inputs + */ + private void processKeyValues(Iterable<Object> values, byte tag) throws HiveException { + List<Object> passDownKey = null; + for (Object value : values) { + BytesWritable valueWritable = (BytesWritable) value; + + row.clear(); + if (passDownKey == null) { + row.add(this.keyObject); + } else { + row.add(passDownKey.get(0)); + } + if ((passDownKey == null) && (reducer instanceof CommonMergeJoinOperator)) { + passDownKey = + (List<Object>) ObjectInspectorUtils.copyToStandardObject(row, + reducer.getInputObjInspectors()[tag], ObjectInspectorCopyOption.WRITABLE); + row.remove(0); + row.add(0, passDownKey.get(0)); + } + + row.add(deserializeValue(valueWritable, tag)); + + try { + reducer.processOp(row, tag); + } catch (Exception e) { + String rowString = null; + try { + rowString = SerDeUtils.getJSONString(row, rowObjectInspector); + } catch (Exception e2) { + rowString = "[Error getting row data with exception " + + StringUtils.stringifyException(e2) + " ]"; + } + throw new HiveException("Hive Runtime Error while processing row (tag=" + + tag + ") " + rowString, e); + } + } + } + + /** + * @param values + * @return true if it is not done and can take more inputs + */ + private void processVectors(Iterable<Object> values, byte tag) throws HiveException { + batch.reset(); + + /* deserialize key into columns */ + VectorizedBatchUtil.addRowToBatchFrom(keyObject, keyStructInspector, + 0, 0, batch, buffer); + for(int i = 0; i < keysColumnOffset; i++) { + VectorizedBatchUtil.setRepeatingColumn(batch, i); + } + + int rowIdx = 0; + try { + for (Object value : values) { + /* deserialize value into columns */ + BytesWritable valueWritable = (BytesWritable) value; + Object valueObj = deserializeValue(valueWritable, tag); + + VectorizedBatchUtil.addRowToBatchFrom(valueObj, valueStructInspectors, + rowIdx, keysColumnOffset, batch, buffer); + rowIdx++; + if (rowIdx >= BATCH_SIZE) { + VectorizedBatchUtil.setBatchSize(batch, rowIdx); + reducer.processOp(batch, tag); + rowIdx = 0; + } + } + if (rowIdx > 0) { + VectorizedBatchUtil.setBatchSize(batch, rowIdx); + reducer.processOp(batch, tag); + } + } catch (Exception e) { + String rowString = null; + try { + /* batch.toString depends on this */ + batch.setValueWriters(valueStringWriters + .toArray(new VectorExpressionWriter[0])); + rowString = batch.toString(); + } catch (Exception e2) { + rowString = "[Error getting row data with exception " + + StringUtils.stringifyException(e2) + " ]"; + } + throw new HiveException("Hive Runtime Error while processing vector batch (tag=" + + tag + ") " + rowString, e); + } + } + + boolean close() throws Exception { + try { + if (handleGroupKey && groupKey != null) { + // If a operator wants to do some work at the end of a group + reducer.endGroup(); + } + } catch (Exception e) { + if (!abort) { + // signal new failure to map-reduce + l4j.error("Hit error while closing operators - failing tree"); + throw new RuntimeException("Hive Runtime Error while closing operators: " + + e.getMessage(), e); + } + } + return abort; + } + + public ObjectInspector getObjectInspector() { + return rowObjectInspector; + } +} Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java?rev=1627235&r1=1627234&r2=1627235&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java Wed Sep 24 07:03:35 2014 @@ -37,6 +37,8 @@ public class TezContext extends MapredCo private ProcessorContext processorContext; + private RecordSource[] sources; + public TezContext(boolean isMap, JobConf jobConf) { super(isMap, jobConf); } @@ -70,4 +72,12 @@ public class TezContext extends MapredCo public ProcessorContext getTezProcessorContext() { return processorContext; } + + public RecordSource[] getRecordSources() { + return sources; + } + + public void setRecordSources(RecordSource[] sources) { + this.sources = sources; + } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1627235&r1=1627234&r2=1627235&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Wed Sep 24 07:03:35 2014 @@ -17,6 +17,14 @@ */ package org.apache.hadoop.hive.ql.exec.tez; +import java.io.IOException; +import java.text.NumberFormat; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -26,6 +34,7 @@ import org.apache.hadoop.mapred.OutputCo import org.apache.hadoop.util.StringUtils; import org.apache.tez.common.TezUtils; import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.mapreduce.input.MultiMRInput; import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.Event; @@ -34,11 +43,6 @@ import org.apache.tez.runtime.api.Logica import org.apache.tez.runtime.api.ProcessorContext; import org.apache.tez.runtime.library.api.KeyValueWriter; -import java.io.IOException; -import java.text.NumberFormat; -import java.util.List; -import java.util.Map; - /** * Hive processor for Tez that forms the vertices in Tez and processes the data. * Does what ExecMapper and ExecReducer does for hive in MR framework. @@ -90,7 +94,8 @@ public class TezProcessor extends Abstra perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR); Configuration conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); this.jobConf = new JobConf(conf); - setupMRLegacyConfigs(getContext()); + this.processorContext = getContext(); + setupMRLegacyConfigs(processorContext); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR); } @@ -130,12 +135,6 @@ public class TezProcessor extends Abstra if (isMap) { rproc = new MapRecordProcessor(jobConf); - MRInputLegacy mrInput = getMRInput(inputs); - try { - mrInput.init(); - } catch (IOException e) { - throw new RuntimeException("Failed while initializing MRInput", e); - } } else { rproc = new ReduceRecordProcessor(); } @@ -148,18 +147,6 @@ public class TezProcessor extends Abstra throws Exception { Throwable originalThrowable = null; try { - TezCacheAccess cacheAccess = TezCacheAccess.createInstance(jobConf); - // Start the actual Inputs. After MRInput initialization. - for (Map.Entry<String, LogicalInput> inputEntry : inputs.entrySet()) { - if (!cacheAccess.isInputCached(inputEntry.getKey())) { - LOG.info("Input: " + inputEntry.getKey() + " is not cached"); - inputEntry.getValue().start(); - } else { - LOG.info("Input: " + inputEntry.getKey() + - " is already cached. Skipping start"); - } - } - // Outputs will be started later by the individual Processors. MRTaskReporter mrReporter = new MRTaskReporter(getContext()); @@ -214,19 +201,4 @@ public class TezProcessor extends Abstra writer.write(key, value); } } - - static MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) { - //there should be only one MRInput - MRInputLegacy theMRInput = null; - for(LogicalInput inp : inputs.values()){ - if(inp instanceof MRInputLegacy){ - if(theMRInput != null){ - throw new IllegalArgumentException("Only one MRInput is expected"); - } - //a better logic would be to find the alias - theMRInput = (MRInputLegacy)inp; - } - } - return theMRInput; - } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1627235&r1=1627234&r2=1627235&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Wed Sep 24 07:03:35 2014 @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.Ut import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; @@ -313,15 +314,16 @@ public class TezTask extends Task<TezWor for (BaseWork v: children) { // finally we can create the grouped edge GroupInputEdge e = utils.createEdge(group, parentConf, - workToVertex.get(v), work.getEdgeProperty(w, v)); + workToVertex.get(v), work.getEdgeProperty(w, v), work.getVertexType(v)); dag.addEdge(e); } } else { // Regular vertices JobConf wxConf = utils.initializeVertexConf(conf, ctx, w); - Vertex wx = utils.createVertex(wxConf, w, scratchDir, appJarLr, - additionalLr, fs, ctx, !isFinal, work); + Vertex wx = + utils.createVertex(wxConf, w, scratchDir, appJarLr, additionalLr, fs, ctx, !isFinal, + work, work.getVertexType(w)); dag.addVertex(wx); utils.addCredentials(w, dag); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName()); @@ -335,7 +337,7 @@ public class TezTask extends Task<TezWor TezEdgeProperty edgeProp = work.getEdgeProperty(w, v); - e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp); + e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp, work.getVertexType(v)); dag.addEdge(e); } } @@ -386,6 +388,9 @@ public class TezTask extends Task<TezWor try { List<BaseWork> ws = work.getAllWork(); for (BaseWork w: ws) { + if (w instanceof MergeJoinWork) { + w = ((MergeJoinWork) w).getMainWork(); + } for (Operator<?> op: w.getAllOperators()) { op.jobClose(conf, rc == 0); } Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java?rev=1627235&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java Wed Sep 24 07:03:35 2014 @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez.tools; + +import java.io.IOException; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.BinaryComparable; +import org.apache.tez.runtime.library.api.KeyValueReader; + +/** + * A KeyValuesReader implementation that returns a sorted stream of key-values + * by doing a sorted merge of the key-value in LogicalInputs. + * Tags are in the last byte of the key, so no special handling for tags is required. + * Uses a priority queue to pick the KeyValuesReader of the input that is next in + * sort order. + */ +public class KeyValueInputMerger extends KeyValueReader { + + public static final Log l4j = LogFactory.getLog(KeyValueInputMerger.class); + private PriorityQueue<KeyValueReader> pQueue = null; + private KeyValueReader nextKVReader = null; + + public KeyValueInputMerger(List<KeyValueReader> multiMRInputs) throws Exception { + //get KeyValuesReaders from the LogicalInput and add them to priority queue + int initialCapacity = multiMRInputs.size(); + pQueue = new PriorityQueue<KeyValueReader>(initialCapacity, new KVReaderComparator()); + l4j.info("Initialized the priority queue with multi mr inputs: " + multiMRInputs.size()); + for (KeyValueReader input : multiMRInputs) { + addToQueue(input); + } + } + + /** + * Add KeyValueReader to queue if it has more key-value + * + * @param kvReader + * @throws IOException + */ + private void addToQueue(KeyValueReader kvReader) throws IOException { + if (kvReader.next()) { + pQueue.add(kvReader); + } + } + + /** + * @return true if there are more key-values and advances to next key-values + * @throws IOException + */ + @Override + public boolean next() throws IOException { + //add the previous nextKVReader back to queue + if(nextKVReader != null){ + addToQueue(nextKVReader); + } + + //get the new nextKVReader with lowest key + nextKVReader = pQueue.poll(); + return nextKVReader != null; + } + + @Override + public Object getCurrentKey() throws IOException { + return nextKVReader.getCurrentKey(); + } + + @Override + public Object getCurrentValue() throws IOException { + return nextKVReader.getCurrentValue(); + } + + /** + * Comparator that compares KeyValuesReader on their current key + */ + class KVReaderComparator implements Comparator<KeyValueReader> { + + @Override + public int compare(KeyValueReader kvReadr1, KeyValueReader kvReadr2) { + try { + BinaryComparable key1 = (BinaryComparable) kvReadr1.getCurrentValue(); + BinaryComparable key2 = (BinaryComparable) kvReadr2.getCurrentValue(); + return key1.compareTo(key2); + } catch (IOException e) { + l4j.error("Caught exception while reading shuffle input", e); + //die! + throw new RuntimeException(e); + } + } + } +}
