Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java Fri Sep 5 20:16:08 2014 @@ -18,45 +18,29 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.debug.Utils; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKeyObject; -import org.apache.hadoop.hive.ql.exec.persistence.LazyFlatRowContainer; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; -import org.apache.hadoop.hive.serde2.ByteStream.Output; import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryFactory; -import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; -import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryStruct; -import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.library.api.KeyValueReader; @@ -73,6 +57,7 @@ public class HashTableLoader implements private Configuration hconf; private MapJoinDesc desc; private MapJoinKey lastKey = null; + private int rowCount = 0; @Override public void init(ExecMapperContext context, Configuration hconf, MapJoinOperator joinOp) { @@ -125,6 +110,7 @@ public class HashTableLoader implements : new HashMapWrapper(hconf, keyCount); while (kvReader.next()) { + rowCount++; lastKey = tableContainer.putRow(keyCtx, (Writable)kvReader.getCurrentKey(), valCtx, (Writable)kvReader.getCurrentValue()); }
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java Fri Sep 5 20:16:08 2014 @@ -25,15 +25,15 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.ReadaheadPool; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.Event; -import org.apache.tez.runtime.api.LogicalIOProcessor; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; -import org.apache.tez.runtime.api.TezProcessorContext; +import org.apache.tez.runtime.api.ProcessorContext; import java.net.URL; import java.net.JarURLConnection; -import java.util.ArrayList; import java.util.Enumeration; import java.util.List; import java.util.Map; @@ -48,7 +48,7 @@ import javax.crypto.Mac; * * @see Config for configuring the HivePreWarmProcessor */ -public class HivePreWarmProcessor implements LogicalIOProcessor { +public class HivePreWarmProcessor extends AbstractLogicalIOProcessor { private static boolean prewarmed = false; @@ -56,10 +56,13 @@ public class HivePreWarmProcessor implem private Configuration conf; + public HivePreWarmProcessor(ProcessorContext context) { + super(context); + } + @Override - public void initialize(TezProcessorContext processorContext) - throws Exception { - byte[] userPayload = processorContext.getUserPayload(); + public void initialize() throws Exception { + UserPayload userPayload = getContext().getUserPayload(); this.conf = TezUtils.createConfFromUserPayload(userPayload); } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Fri Sep 5 20:16:08 2014 @@ -35,19 +35,23 @@ import org.apache.hadoop.mapred.FileSpli import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.VertexLocationHint; +import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem; -import org.apache.tez.mapreduce.hadoop.MRHelpers; +import org.apache.tez.mapreduce.hadoop.MRInputHelpers; import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto; import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto; import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto; import org.apache.tez.runtime.api.Event; -import org.apache.tez.runtime.api.TezRootInputInitializer; -import org.apache.tez.runtime.api.TezRootInputInitializerContext; -import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent; -import org.apache.tez.runtime.api.events.RootInputDataInformationEvent; +import org.apache.tez.runtime.api.InputInitializer; +import org.apache.tez.runtime.api.InputInitializerContext; +import org.apache.tez.runtime.api.InputSpecUpdate; +import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent; +import org.apache.tez.runtime.api.events.InputDataInformationEvent; +import org.apache.tez.runtime.api.events.InputInitializerEvent; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; @@ -59,20 +63,30 @@ import com.google.common.collect.Multima * making sure that splits from different partitions are only grouped if they * are of the same schema, format and serde */ -public class HiveSplitGenerator implements TezRootInputInitializer { +@SuppressWarnings("deprecation") +public class HiveSplitGenerator extends InputInitializer { private static final Log LOG = LogFactory.getLog(HiveSplitGenerator.class); - private final SplitGrouper grouper = new SplitGrouper(); + private static final SplitGrouper grouper = new SplitGrouper(); + private final DynamicPartitionPruner pruner = new DynamicPartitionPruner(); + private InputInitializerContext context; + + public HiveSplitGenerator(InputInitializerContext initializerContext) { + super(initializerContext); + } @Override - public List<Event> initialize(TezRootInputInitializerContext rootInputContext) throws Exception { + public List<Event> initialize() throws Exception { + InputInitializerContext rootInputContext = getContext(); + + context = rootInputContext; MRInputUserPayloadProto userPayloadProto = - MRHelpers.parseMRInputPayload(rootInputContext.getUserPayload()); + MRInputHelpers.parseMRInputPayload(rootInputContext.getInputUserPayload()); Configuration conf = - MRHelpers.createConfFromByteString(userPayloadProto.getConfigurationBytes()); + TezUtils.createConfFromByteString(userPayloadProto.getConfigurationBytes()); boolean sendSerializedEvents = conf.getBoolean("mapreduce.tez.input.initializer.serialize.event.payload", true); @@ -81,42 +95,66 @@ public class HiveSplitGenerator implemen JobConf jobConf = new JobConf(conf); ShimLoader.getHadoopShims().getMergedCredentials(jobConf); + MapWork work = Utilities.getMapWork(jobConf); + + // perform dynamic partition pruning + pruner.prune(work, jobConf, context); + InputSplitInfoMem inputSplitInfo = null; - String realInputFormatName = userPayloadProto.getInputFormatName(); - if (realInputFormatName != null && !realInputFormatName.isEmpty()) { - inputSplitInfo = generateGroupedSplits(rootInputContext, jobConf, conf, realInputFormatName); + String realInputFormatName = conf.get("mapred.input.format.class"); + boolean groupingEnabled = userPayloadProto.getGroupingEnabled(); + if (groupingEnabled) { + // Need to instantiate the realInputFormat + InputFormat<?, ?> inputFormat = + (InputFormat<?, ?>) ReflectionUtils.newInstance(Class.forName(realInputFormatName), + jobConf); + + int totalResource = rootInputContext.getTotalAvailableResource().getMemory(); + int taskResource = rootInputContext.getVertexTaskResource().getMemory(); + int availableSlots = totalResource / taskResource; + + // Create the un-grouped splits + float waves = + conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES, + TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT); + + InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves)); + LOG.info("Number of input splits: " + splits.length + ". " + availableSlots + + " available slots, " + waves + " waves. Input format is: " + realInputFormatName); + + Multimap<Integer, InputSplit> groupedSplits = + generateGroupedSplits(jobConf, conf, splits, waves, availableSlots); + // And finally return them in a flat array + InputSplit[] flatSplits = groupedSplits.values().toArray(new InputSplit[0]); + LOG.info("Number of grouped splits: " + flatSplits.length); + + List<TaskLocationHint> locationHints = grouper.createTaskLocationHints(flatSplits); + + Utilities.clearWork(jobConf); + + inputSplitInfo = + new InputSplitInfoMem(flatSplits, locationHints, flatSplits.length, null, jobConf); } else { - inputSplitInfo = MRHelpers.generateInputSplitsToMem(jobConf); + // no need for grouping and the target #of tasks. + // This code path should never be triggered at the moment. If grouping is disabled, + // DAGUtils uses MRInputAMSplitGenerator. + // If this is used in the future - make sure to disable grouping in the payload, if it isn't already disabled + throw new RuntimeException( + "HiveInputFormat does not support non-grouped splits, InputFormatName is: " + + realInputFormatName); + // inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(jobConf, false, 0); } return createEventList(sendSerializedEvents, inputSplitInfo); } - private InputSplitInfoMem generateGroupedSplits(TezRootInputInitializerContext context, - JobConf jobConf, Configuration conf, String realInputFormatName) throws Exception { - int totalResource = context.getTotalAvailableResource().getMemory(); - int taskResource = context.getVertexTaskResource().getMemory(); - int availableSlots = totalResource / taskResource; - - float waves = - conf.getFloat(TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES, - TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT); + public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf, + Configuration conf, InputSplit[] splits, float waves, int availableSlots) + throws Exception { MapWork work = Utilities.getMapWork(jobConf); - LOG.info("Grouping splits for " + work.getName() + ". " + availableSlots + " available slots, " - + waves + " waves. Input format is: " + realInputFormatName); - - // Need to instantiate the realInputFormat - InputFormat<?, ?> inputFormat = - (InputFormat<?, ?>) ReflectionUtils - .newInstance(Class.forName(realInputFormatName), jobConf); - - // Create the un-grouped splits - InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves)); - LOG.info("Number of input splits: " + splits.length); - Multimap<Integer, InputSplit> bucketSplitMultiMap = ArrayListMultimap.<Integer, InputSplit> create(); @@ -159,41 +197,42 @@ public class HiveSplitGenerator implemen Multimap<Integer, InputSplit> groupedSplits = grouper.group(jobConf, bucketSplitMultiMap, availableSlots, waves); - // And finally return them in a flat array - InputSplit[] flatSplits = groupedSplits.values().toArray(new InputSplit[0]); - LOG.info("Number of grouped splits: " + flatSplits.length); - - List<TaskLocationHint> locationHints = grouper.createTaskLocationHints(flatSplits); - - Utilities.clearWork(jobConf); - - return new InputSplitInfoMem(flatSplits, locationHints, flatSplits.length, null, jobConf); + return groupedSplits; } private List<Event> createEventList(boolean sendSerializedEvents, InputSplitInfoMem inputSplitInfo) { List<Event> events = Lists.newArrayListWithCapacity(inputSplitInfo.getNumTasks() + 1); - RootInputConfigureVertexTasksEvent configureVertexEvent = - new RootInputConfigureVertexTasksEvent(inputSplitInfo.getNumTasks(), - inputSplitInfo.getTaskLocationHints()); + InputConfigureVertexTasksEvent configureVertexEvent = + InputConfigureVertexTasksEvent.create(inputSplitInfo.getNumTasks(), + VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()), + InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate()); events.add(configureVertexEvent); if (sendSerializedEvents) { MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto(); int count = 0; for (MRSplitProto mrSplit : splitsProto.getSplitsList()) { - RootInputDataInformationEvent diEvent = - new RootInputDataInformationEvent(count++, mrSplit.toByteArray()); + InputDataInformationEvent diEvent = InputDataInformationEvent.createWithSerializedPayload( + count++, mrSplit.toByteString().asReadOnlyByteBuffer()); events.add(diEvent); } } else { int count = 0; for (org.apache.hadoop.mapred.InputSplit split : inputSplitInfo.getOldFormatSplits()) { - RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(count++, split); + InputDataInformationEvent diEvent = InputDataInformationEvent.createWithObjectPayload( + count++, split); events.add(diEvent); } } return events; } + + @Override + public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception { + for (InputInitializerEvent e : events) { + pruner.getQueue().put(e); + } + } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Fri Sep 5 20:16:08 2014 @@ -47,7 +47,7 @@ import org.apache.tez.mapreduce.input.MR import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; -import org.apache.tez.runtime.api.TezProcessorContext; +import org.apache.tez.runtime.api.ProcessorContext; import org.apache.tez.runtime.library.api.KeyValueReader; /** @@ -64,8 +64,25 @@ public class MapRecordProcessor extends protected static final String MAP_PLAN_KEY = "__MAP_PLAN__"; private MapWork mapWork; + public MapRecordProcessor(JobConf jconf) { + ObjectCache cache = ObjectCacheFactory.getCache(jconf); + execContext.setJc(jconf); + // create map and fetch operators + mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY); + if (mapWork == null) { + mapWork = Utilities.getMapWork(jconf); + cache.cache(MAP_PLAN_KEY, mapWork); + l4j.info("Plan: "+mapWork); + for (String s: mapWork.getAliases()) { + l4j.info("Alias: "+s); + } + } else { + Utilities.setMapWork(jconf, mapWork); + } + } + @Override - void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter, + void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter, Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); super.init(jconf, processorContext, mrReporter, inputs, outputs); @@ -87,22 +104,7 @@ public class MapRecordProcessor extends ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize(); } - ObjectCache cache = ObjectCacheFactory.getCache(jconf); try { - - execContext.setJc(jconf); - // create map and fetch operators - mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY); - if (mapWork == null) { - mapWork = Utilities.getMapWork(jconf); - cache.cache(MAP_PLAN_KEY, mapWork); - l4j.info("Plan: "+mapWork); - for (String s: mapWork.getAliases()) { - l4j.info("Alias: "+s); - } - } else { - Utilities.setMapWork(jconf, mapWork); - } if (mapWork.getVectorMode()) { mapOp = new VectorMapOperator(); } else { @@ -115,7 +117,8 @@ public class MapRecordProcessor extends l4j.info(mapOp.dump(0)); MapredContext.init(true, new JobConf(jconf)); - ((TezContext)MapredContext.get()).setInputs(inputs); + ((TezContext) MapredContext.get()).setInputs(inputs); + ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext); mapOp.setExecContext(execContext); mapOp.initializeLocalWork(jconf); mapOp.initialize(jconf, null); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java Fri Sep 5 20:16:08 2014 @@ -17,11 +17,15 @@ */ package org.apache.hadoop.hive.ql.exec.tez; +import org.apache.tez.runtime.api.ProcessorContext; + /** * Subclass that is used to indicate if this is a map or reduce process */ public class MapTezProcessor extends TezProcessor { - public MapTezProcessor(){ - super(true); + + public MapTezProcessor(ProcessorContext context) { + super(context); + this.isMap = true; } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java Fri Sep 5 20:16:08 2014 @@ -20,24 +20,40 @@ package org.apache.hadoop.hive.ql.exec.t import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle; -import org.apache.tez.runtime.common.objectregistry.ObjectRegistry; -import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory; +import org.apache.tez.runtime.api.ObjectRegistry; +import com.google.common.base.Preconditions; /** * ObjectCache. Tez implementation based on the tez object registry. * */ public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache { - + private static final Log LOG = LogFactory.getLog(ObjectCache.class.getName()); - private final ObjectRegistry registry = ObjectRegistryFactory.getObjectRegistry(); + + // ObjectRegistry is available via the Input/Output/ProcessorContext. + // This is setup as part of the Tez Processor construction, so that it is available whenever an + // instance of the ObjectCache is created. The assumption is that Tez will initialize the Processor + // before anything else. + private volatile static ObjectRegistry staticRegistry; + + private final ObjectRegistry registry; + + public ObjectCache() { + Preconditions.checkNotNull(staticRegistry, + "Object registry not setup yet. This should have been setup by the TezProcessor"); + registry = staticRegistry; + } + public static void setupObjectRegistry(ObjectRegistry objectRegistry) { + staticRegistry = objectRegistry; + } + @Override public void cache(String key, Object value) { LOG.info("Adding " + key + " to cache with value " + value); - registry.add(ObjectLifeCycle.VERTEX, key, value); + registry.cacheForVertex(key, value); } @Override Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java Fri Sep 5 20:16:08 2014 @@ -32,7 +32,7 @@ import org.apache.hadoop.mapred.OutputCo import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; -import org.apache.tez.runtime.api.TezProcessorContext; +import org.apache.tez.runtime.api.ProcessorContext; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; @@ -47,7 +47,7 @@ public abstract class RecordProcessor { protected Map<String, LogicalInput> inputs; protected Map<String, LogicalOutput> outputs; protected Map<String, OutputCollector> outMap; - protected TezProcessorContext processorContext; + protected ProcessorContext processorContext; public static final Log l4j = LogFactory.getLog(RecordProcessor.class); @@ -72,7 +72,7 @@ public abstract class RecordProcessor { * @param outputs map of Output names to {@link LogicalOutput}s * @throws Exception */ - void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter, + void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter, Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception { this.jconf = jconf; this.reporter = mrReporter; Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Fri Sep 5 20:16:08 2014 @@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec.t import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -59,14 +58,13 @@ import org.apache.hadoop.hive.serde2.obj import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; -import org.apache.tez.runtime.api.TezProcessorContext; +import org.apache.tez.runtime.api.ProcessorContext; import org.apache.tez.runtime.library.api.KeyValuesReader; /** @@ -113,7 +111,7 @@ public class ReduceRecordProcessor exte private List<VectorExpressionWriter>[] valueStringWriters; @Override - void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter, + void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter, Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); super.init(jconf, processorContext, mrReporter, inputs, outputs); @@ -140,7 +138,7 @@ public class ReduceRecordProcessor exte try { keyTableDesc = redWork.getKeyDesc(); - inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc + inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc .getDeserializerClass(), null); SerDeUtils.initializeSerDe(inputKeyDeserializer, null, keyTableDesc.getProperties(), null); keyObjectInspector = inputKeyDeserializer.getObjectInspector(); @@ -152,7 +150,7 @@ public class ReduceRecordProcessor exte keyStructInspector = (StructObjectInspector)keyObjectInspector; batches = new VectorizedRowBatch[maxTags]; valueStructInspectors = new StructObjectInspector[maxTags]; - valueStringWriters = (List<VectorExpressionWriter>[])new List[maxTags]; + valueStringWriters = new List[maxTags]; keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size(); buffer = new DataOutputBuffer(); } @@ -215,7 +213,8 @@ public class ReduceRecordProcessor exte } MapredContext.init(false, new JobConf(jconf)); - ((TezContext)MapredContext.get()).setInputs(inputs); + ((TezContext) MapredContext.get()).setInputs(inputs); + ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext); // initialize reduce operator tree try { @@ -306,7 +305,7 @@ public class ReduceRecordProcessor exte Map<Integer, String> tag2input = redWork.getTagToInput(); ArrayList<LogicalInput> shuffleInputs = new ArrayList<LogicalInput>(); for(String inpStr : tag2input.values()){ - shuffleInputs.add((LogicalInput)inputs.get(inpStr)); + shuffleInputs.add(inputs.get(inpStr)); } return shuffleInputs; } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java Fri Sep 5 20:16:08 2014 @@ -17,11 +17,15 @@ */ package org.apache.hadoop.hive.ql.exec.tez; +import org.apache.tez.runtime.api.ProcessorContext; + /** * Subclass that is used to indicate if this is a map or reduce process */ public class ReduceTezProcessor extends TezProcessor { - public ReduceTezProcessor(){ - super(false); + + public ReduceTezProcessor(ProcessorContext context) { + super(context); + this.isMap = false; } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java Fri Sep 5 20:16:08 2014 @@ -35,7 +35,7 @@ import org.apache.hadoop.mapred.FileSpli import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.split.TezGroupedSplit; import org.apache.hadoop.mapred.split.TezMapredSplitsGrouper; -import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; +import org.apache.tez.dag.api.TaskLocationHint; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; @@ -141,13 +141,13 @@ public class SplitGrouper { String rack = (split instanceof TezGroupedSplit) ? ((TezGroupedSplit) split).getRack() : null; if (rack == null) { if (split.getLocations() != null) { - locationHints.add(new TaskLocationHint(new HashSet<String>(Arrays.asList(split + locationHints.add(TaskLocationHint.createTaskLocationHint(new HashSet<String>(Arrays.asList(split .getLocations())), null)); } else { - locationHints.add(new TaskLocationHint(null, null)); + locationHints.add(TaskLocationHint.createTaskLocationHint(null, null)); } } else { - locationHints.add(new TaskLocationHint(null, Collections.singleton(rack))); + locationHints.add(TaskLocationHint.createTaskLocationHint(null, Collections.singleton(rack))); } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java Fri Sep 5 20:16:08 2014 @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.exec.Ma import org.apache.hadoop.mapred.JobConf; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; +import org.apache.tez.runtime.api.ProcessorContext; /** * TezContext contains additional context only available with Tez @@ -31,9 +32,11 @@ public class TezContext extends MapredCo // all the inputs for the tez processor private Map<String, LogicalInput> inputs; - + private Map<String, LogicalOutput> outputs; + private ProcessorContext processorContext; + public TezContext(boolean isMap, JobConf jobConf) { super(isMap, jobConf); } @@ -41,7 +44,7 @@ public class TezContext extends MapredCo public void setInputs(Map<String, LogicalInput> inputs) { this.inputs = inputs; } - + public void setOutputs(Map<String, LogicalOutput> outputs) { this.outputs = outputs; } @@ -52,11 +55,19 @@ public class TezContext extends MapredCo } return inputs.get(name); } - + public LogicalOutput getOutput(String name) { if (outputs == null) { return null; } return outputs.get(name); } + + public void setTezProcessorContext(ProcessorContext processorContext) { + this.processorContext = processorContext; + } + + public ProcessorContext getTezProcessorContext() { + return processorContext; + } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java Fri Sep 5 20:16:08 2014 @@ -142,7 +142,7 @@ public class TezJobMonitor { if (!running) { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING); console.printInfo("Status: Running (application id: " - +dagClient.getApplicationId()+")\n"); + +dagClient.getExecutionContext()+")\n"); for (String s: progressMap.keySet()) { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Fri Sep 5 20:16:08 2014 @@ -33,23 +33,23 @@ import org.apache.hadoop.util.StringUtil import org.apache.tez.common.TezUtils; import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.processor.MRTaskReporter; +import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.Event; -import org.apache.tez.runtime.api.LogicalIOProcessor; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; -import org.apache.tez.runtime.api.TezProcessorContext; +import org.apache.tez.runtime.api.ProcessorContext; import org.apache.tez.runtime.library.api.KeyValueWriter; /** * Hive processor for Tez that forms the vertices in Tez and processes the data. * Does what ExecMapper and ExecReducer does for hive in MR framework. */ -public class TezProcessor implements LogicalIOProcessor { +public class TezProcessor extends AbstractLogicalIOProcessor { private static final Log LOG = LogFactory.getLog(TezProcessor.class); - private boolean isMap = false; + protected boolean isMap = false; RecordProcessor rproc = null; @@ -58,8 +58,6 @@ public class TezProcessor implements Log private static final String CLASS_NAME = TezProcessor.class.getName(); private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); - private TezProcessorContext processorContext; - protected static final NumberFormat taskIdFormat = NumberFormat.getInstance(); protected static final NumberFormat jobIdFormat = NumberFormat.getInstance(); static { @@ -69,8 +67,9 @@ public class TezProcessor implements Log jobIdFormat.setMinimumIntegerDigits(4); } - public TezProcessor(boolean isMap) { - this.isMap = isMap; + public TezProcessor(ProcessorContext context) { + super(context); + ObjectCache.setupObjectRegistry(context.getObjectRegistry()); } @Override @@ -86,19 +85,15 @@ public class TezProcessor implements Log } @Override - public void initialize(TezProcessorContext processorContext) - throws IOException { + public void initialize() throws IOException { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR); - this.processorContext = processorContext; - //get the jobconf - byte[] userPayload = processorContext.getUserPayload(); - Configuration conf = TezUtils.createConfFromUserPayload(userPayload); + Configuration conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); this.jobConf = new JobConf(conf); - setupMRLegacyConfigs(processorContext); + setupMRLegacyConfigs(getContext()); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR); } - private void setupMRLegacyConfigs(TezProcessorContext processorContext) { + private void setupMRLegacyConfigs(ProcessorContext processorContext) { // Hive "insert overwrite local directory" uses task id as dir name // Setting the id in jobconf helps to have the similar dir name as MR StringBuilder taskAttemptIdBuilder = new StringBuilder("attempt_"); @@ -133,10 +128,10 @@ public class TezProcessor implements Log // in case of broadcast-join read the broadcast edge inputs // (possibly asynchronously) - LOG.info("Running task: " + processorContext.getUniqueIdentifier()); + LOG.info("Running task: " + getContext().getUniqueIdentifier()); if (isMap) { - rproc = new MapRecordProcessor(); + rproc = new MapRecordProcessor(jobConf); MRInputLegacy mrInput = getMRInput(inputs); try { mrInput.init(); @@ -160,8 +155,8 @@ public class TezProcessor implements Log // Outputs will be started later by the individual Processors. - MRTaskReporter mrReporter = new MRTaskReporter(processorContext); - rproc.init(jobConf, processorContext, mrReporter, inputs, outputs); + MRTaskReporter mrReporter = new MRTaskReporter(getContext()); + rproc.init(jobConf, getContext(), mrReporter, inputs, outputs); rproc.run(); //done - output does not need to be committed as hive does not use outputcommitter @@ -207,6 +202,7 @@ public class TezProcessor implements Log this.writer = (KeyValueWriter) output.getWriter(); } + @Override public void collect(Object key, Object value) throws IOException { writer.write(key, value); } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Fri Sep 5 20:16:08 2014 @@ -47,10 +47,8 @@ import org.apache.hadoop.hive.ql.session import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.tez.client.AMConfiguration; -import org.apache.tez.client.PreWarmContext; -import org.apache.tez.client.TezSession; -import org.apache.tez.client.TezSessionConfiguration; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.PreWarmVertex; import org.apache.tez.dag.api.SessionNotRunning; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; @@ -67,7 +65,7 @@ public class TezSessionState { private HiveConf conf; private Path tezScratchDir; private LocalResource appJarLr; - private TezSession session; + private TezClient session; private String sessionId; private DagUtils utils; private String queueName; @@ -150,11 +148,6 @@ public class TezSessionState { refreshLocalResourcesFromConf(conf); - // generate basic tez config - TezConfiguration tezConfig = new TezConfiguration(conf); - - tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString()); - // unless already installed on all the cluster nodes, we'll have to // localize hive-exec.jar as well. appJarLr = createJarLocalResource(utils.getExecJarPathLocal()); @@ -168,15 +161,23 @@ public class TezSessionState { // Create environment for AM. Map<String, String> amEnv = new HashMap<String, String>(); - MRHelpers.updateEnvironmentForMRAM(conf, amEnv); + MRHelpers.updateEnvBasedOnMRAMEnv(conf, amEnv); - AMConfiguration amConfig = new AMConfiguration(amEnv, commonLocalResources, tezConfig, null); + // and finally we're ready to create and start the session + // generate basic tez config + TezConfiguration tezConfig = new TezConfiguration(conf); + tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString()); - // configuration for the session - TezSessionConfiguration sessionConfig = new TezSessionConfiguration(amConfig, tezConfig); + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) { + int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS); + n = Math.max(tezConfig.getInt( + TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, + TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS_DEFAULT), n); + tezConfig.setInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, n); + } - // and finally we're ready to create and start the session - session = new TezSession("HIVE-" + sessionId, sessionConfig); + session = TezClient.create("HIVE-" + sessionId, tezConfig, true, + commonLocalResources, null); LOG.info("Opening new Tez Session (id: " + sessionId + ", scratch dir: " + tezScratchDir + ")"); @@ -187,20 +188,30 @@ public class TezSessionState { int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS); LOG.info("Prewarming " + n + " containers (id: " + sessionId + ", scratch dir: " + tezScratchDir + ")"); - PreWarmContext context = utils.createPreWarmContext(sessionConfig, n, commonLocalResources); + PreWarmVertex prewarmVertex = utils.createPreWarmVertex(tezConfig, n, + commonLocalResources); try { - session.preWarm(context); - } catch (InterruptedException ie) { - if (LOG.isDebugEnabled()) { - LOG.debug("Hive Prewarm threw an exception ", ie); + session.preWarm(prewarmVertex); + } catch (IOException ie) { + if (ie.getMessage().contains("Interrupted while waiting")) { + if (LOG.isDebugEnabled()) { + LOG.debug("Hive Prewarm threw an exception ", ie); + } + } else { + throw ie; } } } - + try { + session.waitTillReady(); + } catch(InterruptedException ie) { + //ignore + } // In case we need to run some MR jobs, we'll run them under tez MR emulation. The session // id is used for tez to reuse the current session rather than start a new one. conf.set("mapreduce.framework.name", "yarn-tez"); - conf.set("mapreduce.tez.session.tokill-application-id", session.getApplicationId().toString()); + conf.set("mapreduce.tez.session.tokill-application-id", + session.getAppMasterApplicationId().toString()); openSessions.add(this); } @@ -277,7 +288,7 @@ public class TezSessionState { return sessionId; } - public TezSession getSession() { + public TezClient getSession() { return session; } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Fri Sep 5 20:16:08 2014 @@ -29,7 +29,6 @@ import java.util.Set; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Operator; @@ -67,7 +66,7 @@ import org.apache.tez.dag.api.client.Sta * using the Tez APIs directly. * */ -@SuppressWarnings({"serial", "deprecation"}) +@SuppressWarnings({"serial"}) public class TezTask extends Task<TezWork> { private static final String CLASS_NAME = TezTask.class.getName(); @@ -135,7 +134,7 @@ public class TezTask extends Task<TezWor } List<LocalResource> additionalLr = session.getLocalizedResources(); - + // log which resources we're adding (apart from the hive exec) if (LOG.isDebugEnabled()) { if (additionalLr == null || additionalLr.size() == 0) { @@ -166,7 +165,7 @@ public class TezTask extends Task<TezWor counters = client.getDAGStatus(statusGetOpts).getDAGCounters(); TezSessionPoolManager.getInstance().returnSession(session); - if (LOG.isInfoEnabled()) { + if (LOG.isInfoEnabled() && counters != null) { for (CounterGroup group: counters) { LOG.info(group.getDisplayName() +":"); for (TezCounter counter: group) { @@ -212,7 +211,7 @@ public class TezTask extends Task<TezWor FileSystem fs = scratchDir.getFileSystem(conf); // the name of the dag is what is displayed in the AM/Job UI - DAG dag = new DAG(work.getName()); + DAG dag = DAG.create(work.getName()); for (BaseWork w: ws) { @@ -247,16 +246,14 @@ public class TezTask extends Task<TezWor } VertexGroup group = dag.createVertexGroup(w.getName(), vertexArray); + // For a vertex group, all Outputs use the same Key-class, Val-class and partitioner. + // Pick any one source vertex to figure out the Edge configuration. + JobConf parentConf = workToConf.get(unionWorkItems.get(0)); + // now hook up the children for (BaseWork v: children) { - // need to pairwise patch up the configuration of the vertices - for (BaseWork part: unionWorkItems) { - utils.updateConfigurationForEdge(workToConf.get(part), workToVertex.get(part), - workToConf.get(v), workToVertex.get(v)); - } - // finally we can create the grouped edge - GroupInputEdge e = utils.createEdge(group, workToConf.get(v), + GroupInputEdge e = utils.createEdge(group, parentConf, workToVertex.get(v), work.getEdgeProperty(w, v)); dag.addEdge(e); @@ -279,7 +276,7 @@ public class TezTask extends Task<TezWor TezEdgeProperty edgeProp = work.getEdgeProperty(w, v); - e = utils.createEdge(wxConf, wx, workToConf.get(v), workToVertex.get(v), edgeProp); + e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp); dag.addEdge(e); } } @@ -305,7 +302,8 @@ public class TezTask extends Task<TezWor try { // ready to start execution on the cluster - dagClient = sessionState.getSession().submitDAG(dag, resourceMap); + sessionState.getSession().addAppMasterLocalFiles(resourceMap); + dagClient = sessionState.getSession().submitDAG(dag); } catch (SessionNotRunning nr) { console.printInfo("Tez session was closed. Reopening..."); @@ -313,7 +311,7 @@ public class TezTask extends Task<TezWor TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf); console.printInfo("Session re-established."); - dagClient = sessionState.getSession().submitDAG(dag, resourceMap); + dagClient = sessionState.getSession().submitDAG(dag); } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java Fri Sep 5 20:16:08 2014 @@ -37,7 +37,7 @@ import org.apache.tez.runtime.library.ap * Uses a priority queue to pick the KeyValuesReader of the input that is next in * sort order. */ -public class InputMerger implements KeyValuesReader { +public class InputMerger extends KeyValuesReader { public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class); private PriorityQueue<KeyValuesReader> pQueue = null; Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java Fri Sep 5 20:16:08 2014 @@ -18,9 +18,11 @@ package org.apache.hadoop.hive.ql.exec.tez.tools; import java.util.IdentityHashMap; +import java.util.List; import java.util.Map; import org.apache.tez.runtime.api.Input; +import org.apache.tez.runtime.api.MergedInputContext; import org.apache.tez.runtime.api.MergedLogicalInput; import org.apache.tez.runtime.api.Reader; @@ -31,7 +33,11 @@ import org.apache.tez.runtime.api.Reader public class TezMergedLogicalInput extends MergedLogicalInput { private Map<Input, Boolean> readyInputs = new IdentityHashMap<Input, Boolean>(); - + + public TezMergedLogicalInput(MergedInputContext context, List<Input> inputs) { + super(context, inputs); + } + @Override public Reader getReader() throws Exception { return new InputMerger(getInputs()); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java Fri Sep 5 20:16:08 2014 @@ -18,14 +18,13 @@ package org.apache.hadoop.hive.ql.io; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.mapred.lib.HashPartitioner; /** Partition keys by their {@link Object#hashCode()}. */ public class DefaultHivePartitioner<K2, V2> extends HashPartitioner<K2, V2> implements HivePartitioner<K2, V2> { /** Use {@link Object#hashCode()} to partition. */ + @Override public int getBucket(K2 key, V2 value, int numBuckets) { return (key.hashCode() & Integer.MAX_VALUE) % numBuckets; } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java Fri Sep 5 20:16:08 2014 @@ -13,6 +13,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.common.FileUtils; Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java Fri Sep 5 20:16:08 2014 @@ -941,12 +941,12 @@ public final class ConstantPropagateProc return null; } - List<ExprNodeDesc> newChildren = new ArrayList<ExprNodeDesc>(); - for (ExprNodeDesc expr : pred.getChildren()) { - ExprNodeDesc constant = foldExpr(expr, constants, cppCtx, op, 0, false); - newChildren.add(constant); + ExprNodeDesc constant = foldExpr(pred, constants, cppCtx, op, 0, false); + if (constant instanceof ExprNodeGenericFuncDesc) { + conf.setFilterExpr((ExprNodeGenericFuncDesc) constant); + } else { + conf.setFilterExpr(null); } - pred.setChildren(newChildren); return null; } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Fri Sep 5 20:16:08 2014 @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.optimizer; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -27,6 +28,7 @@ import java.util.Stack; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; @@ -39,6 +41,7 @@ import org.apache.hadoop.hive.ql.lib.Nod import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; @@ -363,6 +366,19 @@ public class ConvertJoinMapJoin implemen Operator<? extends OperatorDesc> parentBigTableOp = mapJoinOp.getParentOperators().get(bigTablePosition); if (parentBigTableOp instanceof ReduceSinkOperator) { + for (Operator<?> p : parentBigTableOp.getParentOperators()) { + // we might have generated a dynamic partition operator chain. Since + // we're removing the reduce sink we need do remove that too. + Set<Operator<?>> dynamicPartitionOperators = new HashSet<Operator<?>>(); + for (Operator<?> c : p.getChildOperators()) { + if (hasDynamicPartitionBroadcast(c)) { + dynamicPartitionOperators.add(c); + } + } + for (Operator<?> c : dynamicPartitionOperators) { + p.removeChild(c); + } + } mapJoinOp.getParentOperators().remove(bigTablePosition); if (!(mapJoinOp.getParentOperators().contains( parentBigTableOp.getParentOperators().get(0)))) { @@ -380,4 +396,16 @@ public class ConvertJoinMapJoin implemen return mapJoinOp; } + + private boolean hasDynamicPartitionBroadcast(Operator<?> op) { + if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) { + return true; + } + for (Operator<?> c : op.getChildOperators()) { + if (hasDynamicPartitionBroadcast(c)) { + return true; + } + } + return false; + } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Fri Sep 5 20:16:08 2014 @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.parse.P import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.ppd.PredicatePushDown; import org.apache.hadoop.hive.ql.ppd.PredicateTransitivePropagate; +import org.apache.hadoop.hive.ql.ppd.SyntheticJoinPredicate; /** * Implementation of the optimizer. @@ -55,6 +56,7 @@ public class Optimizer { transformations.add(new Generator()); if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) { transformations.add(new PredicateTransitivePropagate()); + transformations.add(new SyntheticJoinPredicate()); transformations.add(new PredicatePushDown()); transformations.add(new PartitionPruner()); transformations.add(new PartitionConditionRemover()); @@ -125,8 +127,8 @@ public class Optimizer { if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES)) { transformations.add(new StatsOptimizer()); } - if (pctx.getContext().getExplain() || - HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + if (pctx.getContext().getExplain() + && !HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { transformations.add(new AnnotateWithStatistics()); transformations.add(new AnnotateWithOpTraits()); } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerExpressionOperatorFactory.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerExpressionOperatorFactory.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerExpressionOperatorFactory.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerExpressionOperatorFactory.java Fri Sep 5 20:16:08 2014 @@ -186,8 +186,7 @@ public abstract class PrunerExpressionOp return ((ExprNodeNullDesc) nd).clone(); } - assert (false); - return null; + return new ExprNodeConstantDesc(((ExprNodeDesc)nd).getTypeInfo(), null); } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Fri Sep 5 20:16:08 2014 @@ -26,8 +26,6 @@ import java.util.Stack; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -42,7 +40,6 @@ import org.apache.hadoop.hive.ql.parse.G import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.ColStatistics; -import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.HashTableDummyDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; @@ -134,7 +131,8 @@ public class ReduceSinkMapJoinProc imple String prefix = Utilities.ReduceField.KEY.toString(); for (String keyCol : keyCols) { ExprNodeDesc realCol = parentRS.getColumnExprMap().get(prefix + "." + keyCol); - ColStatistics cs = StatsUtils.getColStatisticsFromExpression(null, stats, realCol); + ColStatistics cs = + StatsUtils.getColStatisticsFromExpression(context.conf, stats, realCol); if (cs == null || cs.getCountDistint() <= 0) { maxKeyCount = Long.MAX_VALUE; break; Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java Fri Sep 5 20:16:08 2014 @@ -396,8 +396,7 @@ public final class PcrExprProcFactory { return new NodeInfoWrapper(WalkState.CONSTANT, null, (ExprNodeDesc) nd); } - assert (false); - return null; + return new NodeInfoWrapper(WalkState.UNKNOWN, null, (ExprNodeDesc)nd); } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java Fri Sep 5 20:16:08 2014 @@ -18,13 +18,18 @@ package org.apache.hadoop.hive.ql.optimizer.stats.annotation; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.CommonJoinOperator; import org.apache.hadoop.hive.ql.exec.FilterOperator; @@ -67,12 +72,8 @@ import org.apache.hadoop.hive.ql.udf.gen import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; import org.apache.hadoop.hive.serde.serdeConstants; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Stack; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; public class StatsRulesProcFactory { @@ -657,7 +658,8 @@ public class StatsRulesProcFactory { if (parentStats != null) { // worst case, in the absence of column statistics assume half the rows are emitted - if (gop.getChildOperators().get(0) instanceof ReduceSinkOperator) { + if (gop.getChildOperators().get(0) instanceof ReduceSinkOperator + || gop.getChildOperators().get(0) instanceof AppMasterEventOperator) { // map side stats = parentStats.clone(); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java Fri Sep 5 20:16:08 2014 @@ -22,16 +22,14 @@ import java.util.Stack; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; -import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; /** - * FileSinkProcessor handles addition of merge, move and stats tasks for filesinks + * FileSinkProcessor is a simple rule to remember seen file sinks for later + * processing. * */ public class FileSinkProcessor implements NodeProcessor { @@ -39,12 +37,6 @@ public class FileSinkProcessor implement static final private Log LOG = LogFactory.getLog(FileSinkProcessor.class.getName()); @Override - /* - * (non-Javadoc) - * we should ideally not modify the tree we traverse. - * However, since we need to walk the tree at any time when we modify the - * operator, we might as well do it here. - */ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Fri Sep 5 20:16:08 2014 @@ -26,29 +26,28 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; -import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; /** @@ -134,6 +133,15 @@ public class GenTezProcContext implement // remember which reducesinks we've already connected public final Set<ReduceSinkOperator> connectedReduceSinks; + // remember the event operators we've seen + public final Set<AppMasterEventOperator> eventOperatorSet; + + // remember the event operators we've abandoned. + public final Set<AppMasterEventOperator> abandonedEventOperatorSet; + + // remember the connections between ts and event + public final Map<TableScanOperator, List<AppMasterEventOperator>> tsToEventMap; + @SuppressWarnings("unchecked") public GenTezProcContext(HiveConf conf, ParseContext parseContext, List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>> rootTasks, @@ -165,6 +173,9 @@ public class GenTezProcContext implement this.linkedFileSinks = new LinkedHashMap<Path, List<FileSinkDesc>>(); this.fileSinkSet = new LinkedHashSet<FileSinkOperator>(); this.connectedReduceSinks = new LinkedHashSet<ReduceSinkOperator>(); + this.eventOperatorSet = new LinkedHashSet<AppMasterEventOperator>(); + this.abandonedEventOperatorSet = new LinkedHashSet<AppMasterEventOperator>(); + this.tsToEventMap = new LinkedHashMap<TableScanOperator, List<AppMasterEventOperator>>(); rootTasks.add(currentTask); } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java Fri Sep 5 20:16:08 2014 @@ -20,38 +20,43 @@ package org.apache.hadoop.hive.ql.parse; import java.util.ArrayList; import java.util.Deque; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.List; import java.util.LinkedList; -import java.util.Map; +import java.util.List; import java.util.Set; -import org.apache.hadoop.fs.Path; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; import org.apache.hadoop.hive.ql.exec.FetchTask; -import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; +import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.UnionWork; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; + /** * GenTezUtils is a collection of shared helper methods to produce * TezWork @@ -119,12 +124,12 @@ public class GenTezUtils { int maxReducers = context.conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS); // min we allow tez to pick - int minPartition = Math.max(1, (int) (reduceSink.getConf().getNumReducers() + int minPartition = Math.max(1, (int) (reduceSink.getConf().getNumReducers() * minPartitionFactor)); minPartition = (minPartition > maxReducers) ? maxReducers : minPartition; // max we allow tez to pick - int maxPartition = (int) (reduceSink.getConf().getNumReducers() * maxPartitionFactor); + int maxPartition = (int) (reduceSink.getConf().getNumReducers() * maxPartitionFactor); maxPartition = (maxPartition > maxReducers) ? maxReducers : maxPartition; reduceWork.setMinReduceTasks(minPartition); @@ -210,18 +215,20 @@ public class GenTezUtils { BaseWork work) throws SemanticException { - Set<Operator<?>> roots = work.getAllRootOperators(); + List<Operator<?>> roots = new ArrayList<Operator<?>>(); + roots.addAll(work.getAllRootOperators()); if (work.getDummyOps() != null) { roots.addAll(work.getDummyOps()); } + roots.addAll(context.eventOperatorSet); // need to clone the plan. - Set<Operator<?>> newRoots = Utilities.cloneOperatorTree(conf, roots); + List<Operator<?>> newRoots = Utilities.cloneOperatorTree(conf, roots); // we're cloning the operator plan but we're retaining the original work. That means // that root operators have to be replaced with the cloned ops. The replacement map // tells you what that mapping is. - Map<Operator<?>, Operator<?>> replacementMap = new HashMap<Operator<?>, Operator<?>>(); + BiMap<Operator<?>, Operator<?>> replacementMap = HashBiMap.create(); // there's some special handling for dummyOps required. Mapjoins won't be properly // initialized if their dummy parents aren't initialized. Since we cloned the plan @@ -231,11 +238,35 @@ public class GenTezUtils { Iterator<Operator<?>> it = newRoots.iterator(); for (Operator<?> orig: roots) { Operator<?> newRoot = it.next(); + + replacementMap.put(orig, newRoot); + if (newRoot instanceof HashTableDummyOperator) { - dummyOps.add((HashTableDummyOperator)newRoot); + // dummy ops need to be updated to the cloned ones. + dummyOps.add((HashTableDummyOperator) newRoot); + it.remove(); + } else if (newRoot instanceof AppMasterEventOperator) { + // event operators point to table scan operators. When cloning these we + // need to restore the original scan. + if (newRoot.getConf() instanceof DynamicPruningEventDesc) { + TableScanOperator ts = ((DynamicPruningEventDesc) orig.getConf()).getTableScan(); + if (ts == null) { + throw new AssertionError("No table scan associated with dynamic event pruning. " + orig); + } + ((DynamicPruningEventDesc) newRoot.getConf()).setTableScan(ts); + } it.remove(); } else { - replacementMap.put(orig,newRoot); + if (newRoot instanceof TableScanOperator) { + if (context.tsToEventMap.containsKey(orig)) { + // we need to update event operators with the cloned table scan + for (AppMasterEventOperator event : context.tsToEventMap.get(orig)) { + ((DynamicPruningEventDesc) event.getConf()).setTableScan((TableScanOperator) newRoot); + } + } + } + context.rootToWorkMap.remove(orig); + context.rootToWorkMap.put(newRoot, work); } } @@ -272,6 +303,15 @@ public class GenTezUtils { desc.setLinkedFileSinkDesc(linked); } + if (current instanceof AppMasterEventOperator) { + // remember for additional processing later + context.eventOperatorSet.add((AppMasterEventOperator) current); + + // mark the original as abandoned. Don't need it anymore. + context.abandonedEventOperatorSet.add((AppMasterEventOperator) replacementMap.inverse() + .get(current)); + } + if (current instanceof UnionOperator) { Operator<?> parent = null; int count = 0; @@ -337,4 +377,87 @@ public class GenTezUtils { } } } + + /** + * processAppMasterEvent sets up the event descriptor and the MapWork. + * + * @param procCtx + * @param event + */ + public void processAppMasterEvent(GenTezProcContext procCtx, AppMasterEventOperator event) { + + if (procCtx.abandonedEventOperatorSet.contains(event)) { + // don't need this anymore + return; + } + + DynamicPruningEventDesc eventDesc = (DynamicPruningEventDesc)event.getConf(); + TableScanOperator ts = eventDesc.getTableScan(); + + MapWork work = (MapWork) procCtx.rootToWorkMap.get(ts); + if (work == null) { + throw new AssertionError("No work found for tablescan " + ts); + } + + BaseWork enclosingWork = getEnclosingWork(event, procCtx); + if (enclosingWork == null) { + throw new AssertionError("Cannot find work for operator" + event); + } + String sourceName = enclosingWork.getName(); + + // store the vertex name in the operator pipeline + eventDesc.setVertexName(work.getName()); + eventDesc.setInputName(work.getAliases().get(0)); + + // store table descriptor in map-work + if (!work.getEventSourceTableDescMap().containsKey(sourceName)) { + work.getEventSourceTableDescMap().put(sourceName, new LinkedList<TableDesc>()); + } + List<TableDesc> tables = work.getEventSourceTableDescMap().get(sourceName); + tables.add(event.getConf().getTable()); + + // store column name in map-work + if (!work.getEventSourceColumnNameMap().containsKey(sourceName)) { + work.getEventSourceColumnNameMap().put(sourceName, new LinkedList<String>()); + } + List<String> columns = work.getEventSourceColumnNameMap().get(sourceName); + columns.add(eventDesc.getTargetColumnName()); + + // store partition key expr in map-work + if (!work.getEventSourcePartKeyExprMap().containsKey(sourceName)) { + work.getEventSourcePartKeyExprMap().put(sourceName, new LinkedList<ExprNodeDesc>()); + } + List<ExprNodeDesc> keys = work.getEventSourcePartKeyExprMap().get(sourceName); + keys.add(eventDesc.getPartKey()); + + } + + /** + * getEncosingWork finds the BaseWork any given operator belongs to. + */ + public BaseWork getEnclosingWork(Operator<?> op, GenTezProcContext procCtx) { + List<Operator<?>> ops = new ArrayList<Operator<?>>(); + findRoots(op, ops); + for (Operator<?> r : ops) { + BaseWork work = procCtx.rootToWorkMap.get(r); + if (work != null) { + return work; + } + } + return null; + } + + /* + * findRoots returns all root operators (in ops) that result in operator op + */ + private void findRoots(Operator<?> op, List<Operator<?>> ops) { + List<Operator<?>> parents = op.getParentOperators(); + if (parents == null || parents.isEmpty()) { + ops.add(op); + return; + } + for (Operator<?> p : parents) { + findRoots(p, ops); + } + } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java?rev=1622783&r1=1622782&r2=1622783&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java Fri Sep 5 20:16:08 2014 @@ -23,13 +23,18 @@ import java.util.HashSet; import java.util.Set; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; + /** * OptimizeTezProcContext. OptimizeTezProcContext maintains information * about the current operator plan as we walk the operator tree @@ -47,19 +52,23 @@ public class OptimizeTezProcContext impl public final Set<ReduceSinkOperator> visitedReduceSinks = new HashSet<ReduceSinkOperator>(); + public final Multimap<AppMasterEventOperator, TableScanOperator> eventOpToTableScanMap = + HashMultimap.create(); + // rootOperators are all the table scan operators in sequence // of traversal - public final Deque<Operator<? extends OperatorDesc>> rootOperators; + public Deque<Operator<? extends OperatorDesc>> rootOperators; - @SuppressWarnings("unchecked") - public OptimizeTezProcContext(HiveConf conf, ParseContext parseContext, - Set<ReadEntity> inputs, Set<WriteEntity> outputs, - Deque<Operator<?>> rootOperators) { + public OptimizeTezProcContext(HiveConf conf, ParseContext parseContext, Set<ReadEntity> inputs, + Set<WriteEntity> outputs) { this.conf = conf; this.parseContext = parseContext; this.inputs = inputs; this.outputs = outputs; - this.rootOperators = rootOperators; + } + + public void setRootOperators(Deque<Operator<? extends OperatorDesc>> roots) { + this.rootOperators = roots; } }
