Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java Tue Oct 14 19:06:45 2014 @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hive.ql.exec.tez; +import java.io.IOException; +import java.util.Map; +import java.util.Map.Entry; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -40,8 +44,6 @@ import org.apache.tez.runtime.api.Logica import org.apache.tez.runtime.api.ProcessorContext; import org.apache.tez.runtime.library.api.KeyValueReader; -import java.util.Map; - /** * Record processor for fast merging of files. */ @@ -51,11 +53,12 @@ public class MergeFileRecordProcessor ex .getLog(MergeFileRecordProcessor.class); protected Operator<? extends OperatorDesc> mergeOp; - private final ExecMapperContext execContext = new ExecMapperContext(); + private ExecMapperContext execContext = null; protected static final String MAP_PLAN_KEY = "__MAP_PLAN__"; private MergeFileWork mfWork; + MRInputLegacy mrInput = null; private boolean abort = false; - private Object[] row = new Object[2]; + private final Object[] row = new Object[2]; @Override void init(JobConf jconf, ProcessorContext processorContext, @@ -63,16 +66,16 @@ public class MergeFileRecordProcessor ex Map<String, LogicalOutput> outputs) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); super.init(jconf, processorContext, mrReporter, inputs, outputs); + execContext = new ExecMapperContext(jconf); //Update JobConf using MRInput, info like filename comes via this - MRInputLegacy mrInput = TezProcessor.getMRInput(inputs); + mrInput = getMRInput(inputs); Configuration updatedConf = mrInput.getConfigUpdates(); if (updatedConf != null) { for (Map.Entry<String, String> entry : updatedConf) { jconf.set(entry.getKey(), entry.getValue()); } } - createOutputMap(); // Start all the Outputs. for (Map.Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) { @@ -89,17 +92,17 @@ public class MergeFileRecordProcessor ex MapWork mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY); if (mapWork == null) { mapWork = Utilities.getMapWork(jconf); - if (mapWork instanceof MergeFileWork) { - mfWork = (MergeFileWork) mapWork; - } else { - throw new RuntimeException("MapWork should be an instance of" + - " MergeFileWork."); - } cache.cache(MAP_PLAN_KEY, mapWork); } else { Utilities.setMapWork(jconf, mapWork); } + if (mapWork instanceof MergeFileWork) { + mfWork = (MergeFileWork) mapWork; + } else { + throw new RuntimeException("MapWork should be an instance of MergeFileWork."); + } + String alias = mfWork.getAliasToWork().keySet().iterator().next(); mergeOp = mfWork.getAliasToWork().get(alias); LOG.info(mergeOp.dump(0)); @@ -127,8 +130,7 @@ public class MergeFileRecordProcessor ex @Override void run() throws Exception { - MRInputLegacy in = TezProcessor.getMRInput(inputs); - KeyValueReader reader = in.getReader(); + KeyValueReader reader = mrInput.getReader(); //process records until done while (reader.next()) { @@ -205,4 +207,23 @@ public class MergeFileRecordProcessor ex return true; //give me more } + private MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) throws Exception { + // there should be only one MRInput + MRInputLegacy theMRInput = null; + for (Entry<String, LogicalInput> inp : inputs.entrySet()) { + if (inp.getValue() instanceof MRInputLegacy) { + if (theMRInput != null) { + throw new IllegalArgumentException("Only one MRInput is expected"); + } + // a better logic would be to find the alias + theMRInput = (MRInputLegacy) inp.getValue(); + } else { + throw new IOException("Expecting only one input of type MRInputLegacy. Found type: " + + inp.getClass().getCanonicalName()); + } + } + theMRInput.init(); + + return theMRInput; + } }
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java Tue Oct 14 19:06:45 2014 @@ -39,12 +39,6 @@ public class MergeFileTezProcessor exten public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception { rproc = new MergeFileRecordProcessor(); - MRInputLegacy mrInput = getMRInput(inputs); - try { - mrInput.init(); - } catch (IOException e) { - throw new RuntimeException("Failed while initializing MRInput", e); - } initializeAndRunProcessor(inputs, outputs); } } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java Tue Oct 14 19:06:45 2014 @@ -115,8 +115,7 @@ public abstract class RecordProcessor { */ protected void logCloseInfo() { long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); - l4j.info("ExecMapper: processed " + numRows + " rows: used memory = " - + used_memory); + l4j.info("TezProcessor: processed " + numRows + " rows/groups: used memory = " + used_memory); } /** @@ -126,8 +125,7 @@ public abstract class RecordProcessor { numRows++; if (numRows == nextUpdateCntr) { long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); - l4j.info("ExecMapper: processing " + numRows - + " rows: used memory = " + used_memory); + l4j.info("TezProcessor: processing " + numRows + " rows/groups: used memory = " + used_memory); nextUpdateCntr = getNextUpdateRecordCounter(numRows); } } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Tue Oct 14 19:06:45 2014 @@ -17,9 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec.tez; -import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -35,31 +33,13 @@ import org.apache.hadoop.hive.ql.exec.Op import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats; -import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; -import org.apache.hadoop.hive.ql.exec.tez.tools.InputMerger; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; -import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; import org.apache.hadoop.hive.ql.log.PerfLogger; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TableDesc; -import org.apache.hadoop.hive.serde2.Deserializer; -import org.apache.hadoop.hive.serde2.SerDe; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.StringUtils; import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.LogicalInput; @@ -76,39 +56,16 @@ public class ReduceRecordProcessor exte private static final String REDUCE_PLAN_KEY = "__REDUCE_PLAN__"; public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class); - private final ExecMapperContext execContext = new ExecMapperContext(); - private boolean abort = false; - private Deserializer inputKeyDeserializer; - - // Input value serde needs to be an array to support different SerDe - // for different tags - private final SerDe[] inputValueDeserializer = new SerDe[Byte.MAX_VALUE]; - TableDesc keyTableDesc; - TableDesc[] valueTableDesc; + private ReduceWork redWork; - ObjectInspector[] rowObjectInspector; private Operator<?> reducer; - private boolean isTagged = false; - - private Object keyObject = null; - private BytesWritable groupKey; - - private ReduceWork redWork; - private boolean vectorized = false; + private ReduceRecordSource[] sources; - List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size()); + private final byte position = 0; - private DataOutputBuffer buffer; - private VectorizedRowBatch[] batches; - // number of columns pertaining to keys in a vectorized row batch - private int keysColumnOffset; - private final int BATCH_SIZE = VectorizedRowBatch.DEFAULT_SIZE; - private StructObjectInspector keyStructInspector; - private StructObjectInspector[] valueStructInspectors; - /* this is only used in the error code path */ - private List<VectorExpressionWriter>[] valueStringWriters; + private boolean abort; @Override void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter, @@ -118,10 +75,6 @@ public class ReduceRecordProcessor exte ObjectCache cache = ObjectCacheFactory.getCache(jconf); - rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; - ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; - ObjectInspector keyObjectInspector; - redWork = (ReduceWork) cache.retrieve(REDUCE_PLAN_KEY); if (redWork == null) { redWork = Utilities.getReduceWork(jconf); @@ -131,95 +84,36 @@ public class ReduceRecordProcessor exte } reducer = redWork.getReducer(); - reducer.setParentOperators(null); // clear out any parents as reducer is the - // root - isTagged = redWork.getNeedsTagging(); - vectorized = redWork.getVectorMode(); + reducer.getParentOperators().clear(); + reducer.setParentOperators(null); // clear out any parents as reducer is the root - try { - keyTableDesc = redWork.getKeyDesc(); - inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc - .getDeserializerClass(), null); - SerDeUtils.initializeSerDe(inputKeyDeserializer, null, keyTableDesc.getProperties(), null); - keyObjectInspector = inputKeyDeserializer.getObjectInspector(); - reducer.setGroupKeyObjectInspector(keyObjectInspector); - valueTableDesc = new TableDesc[redWork.getTagToValueDesc().size()]; - - if(vectorized) { - final int maxTags = redWork.getTagToValueDesc().size(); - keyStructInspector = (StructObjectInspector)keyObjectInspector; - batches = new VectorizedRowBatch[maxTags]; - valueStructInspectors = new StructObjectInspector[maxTags]; - valueStringWriters = new List[maxTags]; - keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size(); - buffer = new DataOutputBuffer(); - } + int numTags = redWork.getTagToValueDesc().size(); - for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) { - // We should initialize the SerDe with the TypeInfo when available. - valueTableDesc[tag] = redWork.getTagToValueDesc().get(tag); - inputValueDeserializer[tag] = (SerDe) ReflectionUtils.newInstance( - valueTableDesc[tag].getDeserializerClass(), null); - SerDeUtils.initializeSerDe(inputValueDeserializer[tag], null, - valueTableDesc[tag].getProperties(), null); - valueObjectInspector[tag] = inputValueDeserializer[tag] - .getObjectInspector(); - - ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>(); - - if(vectorized) { - /* vectorization only works with struct object inspectors */ - valueStructInspectors[tag] = (StructObjectInspector)valueObjectInspector[tag]; - - batches[tag] = VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector, - valueStructInspectors[tag]); - final int totalColumns = keysColumnOffset + - valueStructInspectors[tag].getAllStructFieldRefs().size(); - valueStringWriters[tag] = new ArrayList<VectorExpressionWriter>(totalColumns); - valueStringWriters[tag].addAll(Arrays - .asList(VectorExpressionWriterFactory - .genVectorStructExpressionWritables(keyStructInspector))); - valueStringWriters[tag].addAll(Arrays - .asList(VectorExpressionWriterFactory - .genVectorStructExpressionWritables(valueStructInspectors[tag]))); - - /* - * The row object inspector used by ReduceWork needs to be a **standard** - * struct object inspector, not just any struct object inspector. - */ - ArrayList<String> colNames = new ArrayList<String>(); - List<? extends StructField> fields = keyStructInspector.getAllStructFieldRefs(); - for (StructField field: fields) { - colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName()); - ois.add(field.getFieldObjectInspector()); - } - fields = valueStructInspectors[tag].getAllStructFieldRefs(); - for (StructField field: fields) { - colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName()); - ois.add(field.getFieldObjectInspector()); - } - rowObjectInspector[tag] = ObjectInspectorFactory - .getStandardStructObjectInspector(colNames, ois); - } else { - ois.add(keyObjectInspector); - ois.add(valueObjectInspector[tag]); - rowObjectInspector[tag] = ObjectInspectorFactory - .getStandardStructObjectInspector(Utilities.reduceFieldNameList, ois); - } + ObjectInspector[] ois = new ObjectInspector[numTags]; + sources = new ReduceRecordSource[numTags]; - } - } catch (Exception e) { - throw new RuntimeException(e); + for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) { + TableDesc keyTableDesc = redWork.getKeyDesc(); + TableDesc valueTableDesc = redWork.getTagToValueDesc().get(tag); + KeyValuesReader reader = + (KeyValuesReader) inputs.get(redWork.getTagToInput().get(tag)).getReader(); + + sources[tag] = new ReduceRecordSource(); + sources[tag].init(jconf, reducer, redWork.getVectorMode(), keyTableDesc, valueTableDesc, + reader, tag == position, (byte) tag, + redWork.getScratchColumnVectorTypes()); + ois[tag] = sources[tag].getObjectInspector(); } MapredContext.init(false, new JobConf(jconf)); ((TezContext) MapredContext.get()).setInputs(inputs); ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext); + ((TezContext) MapredContext.get()).setRecordSources(sources); // initialize reduce operator tree try { l4j.info(reducer.dump(0)); - reducer.initialize(jconf, rowObjectInspector); + reducer.initialize(jconf, ois); // Initialization isn't finished until all parents of all operators // are initialized. For broadcast joins that means initializing the @@ -227,7 +121,6 @@ public class ReduceRecordProcessor exte List<HashTableDummyOperator> dummyOps = redWork.getDummyOps(); if (dummyOps != null) { for (Operator<? extends OperatorDesc> dummyOp : dummyOps){ - dummyOp.setExecContext(execContext); dummyOp.initialize(jconf, null); } } @@ -271,28 +164,12 @@ public class ReduceRecordProcessor exte ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize(); } - KeyValuesReader kvsReader; - try { - if(shuffleInputs.size() == 1){ - //no merging of inputs required - kvsReader = (KeyValuesReader) shuffleInputs.get(0).getReader(); - }else { - //get a sort merged input - kvsReader = new InputMerger(shuffleInputs); - } - } catch (Exception e) { - throw new IOException(e); - } - - while(kvsReader.next()){ - Object key = kvsReader.getCurrentKey(); - Iterable<Object> values = kvsReader.getCurrentValues(); - boolean needMore = processRows(key, values); - if(!needMore){ - break; + // run the operator pipeline + while (sources[position].pushRecord()) { + if (isLogInfoEnabled) { + logProgress(); } } - } /** @@ -302,209 +179,22 @@ public class ReduceRecordProcessor exte */ private List<LogicalInput> getShuffleInputs(Map<String, LogicalInput> inputs) { //the reduce plan inputs have tags, add all inputs that have tags - Map<Integer, String> tag2input = redWork.getTagToInput(); + Map<Integer, String> tagToinput = redWork.getTagToInput(); ArrayList<LogicalInput> shuffleInputs = new ArrayList<LogicalInput>(); - for(String inpStr : tag2input.values()){ + for(String inpStr : tagToinput.values()){ + if (inputs.get(inpStr) == null) { + throw new AssertionError("Cound not find input: " + inpStr); + } shuffleInputs.add(inputs.get(inpStr)); } return shuffleInputs; } - /** - * @param key - * @param values - * @return true if it is not done and can take more inputs - */ - private boolean processRows(Object key, Iterable<Object> values) { - if(reducer.getDone()){ - //done - no more records needed - return false; - } - - // reset the execContext for each new row - execContext.resetRow(); - - try { - BytesWritable keyWritable = (BytesWritable) key; - byte tag = 0; - - if (isTagged) { - // remove the tag from key coming out of reducer - // and store it in separate variable. - int size = keyWritable.getLength() - 1; - tag = keyWritable.getBytes()[size]; - keyWritable.setSize(size); - } - - //Set the key, check if this is a new group or same group - if (!keyWritable.equals(this.groupKey)) { - // If a operator wants to do some work at the beginning of a group - if (groupKey == null) { // the first group - this.groupKey = new BytesWritable(); - } else { - // If a operator wants to do some work at the end of a group - if(isLogTraceEnabled) { - l4j.trace("End Group"); - } - reducer.endGroup(); - } - - try { - this.keyObject = inputKeyDeserializer.deserialize(keyWritable); - } catch (Exception e) { - throw new HiveException( - "Hive Runtime Error: Unable to deserialize reduce input key from " - + Utilities.formatBinaryString(keyWritable.getBytes(), 0, - keyWritable.getLength()) + " with properties " - + keyTableDesc.getProperties(), e); - } - groupKey.set(keyWritable.getBytes(), 0, keyWritable.getLength()); - if (isLogTraceEnabled) { - l4j.trace("Start Group"); - } - reducer.setGroupKeyObject(keyObject); - reducer.startGroup(); - } - /* this.keyObject passed via reference */ - if(vectorized) { - return processVectors(values, tag); - } else { - return processKeyValues(values, tag); - } - } catch (Throwable e) { - abort = true; - if (e instanceof OutOfMemoryError) { - // Don't create a new object if we are already out of memory - throw (OutOfMemoryError) e; - } else { - l4j.fatal(StringUtils.stringifyException(e)); - throw new RuntimeException(e); - } - } - } - - private Object deserializeValue(BytesWritable valueWritable, byte tag) throws HiveException { - try { - return inputValueDeserializer[tag].deserialize(valueWritable); - } catch (SerDeException e) { - throw new HiveException( - "Hive Runtime Error: Unable to deserialize reduce input value (tag=" - + tag - + ") from " - + Utilities.formatBinaryString(valueWritable.getBytes(), 0, - valueWritable.getLength()) + " with properties " - + valueTableDesc[tag].getProperties(), e); - } - } - - /** - * @param values - * @return true if it is not done and can take more inputs - */ - private boolean processKeyValues(Iterable<Object> values, byte tag) throws HiveException { - - for (Object value : values) { - BytesWritable valueWritable = (BytesWritable) value; - - row.clear(); - row.add(this.keyObject); - row.add(deserializeValue(valueWritable, tag)); - - try { - reducer.processOp(row, tag); - } catch (Exception e) { - String rowString = null; - try { - rowString = SerDeUtils.getJSONString(row, rowObjectInspector[tag]); - } catch (Exception e2) { - rowString = "[Error getting row data with exception " - + StringUtils.stringifyException(e2) + " ]"; - } - throw new HiveException("Hive Runtime Error while processing row (tag=" - + tag + ") " + rowString, e); - } - if (isLogInfoEnabled) { - logProgress(); - } - } - return true; //give me more - } - - /** - * @param values - * @return true if it is not done and can take more inputs - */ - private boolean processVectors(Iterable<Object> values, byte tag) throws HiveException { - VectorizedRowBatch batch = batches[tag]; - batch.reset(); - - /* deserialize key into columns */ - VectorizedBatchUtil.addRowToBatchFrom(keyObject, keyStructInspector, - 0, 0, batch, buffer); - for(int i = 0; i < keysColumnOffset; i++) { - VectorizedBatchUtil.setRepeatingColumn(batch, i); - } - - int rowIdx = 0; - try { - for (Object value : values) { - /* deserialize value into columns */ - BytesWritable valueWritable = (BytesWritable) value; - Object valueObj = deserializeValue(valueWritable, tag); - - VectorizedBatchUtil.addRowToBatchFrom(valueObj, valueStructInspectors[tag], - rowIdx, keysColumnOffset, batch, buffer); - rowIdx++; - if (rowIdx >= BATCH_SIZE) { - VectorizedBatchUtil.setBatchSize(batch, rowIdx); - reducer.processOp(batch, tag); - rowIdx = 0; - if (isLogInfoEnabled) { - logProgress(); - } - } - } - if (rowIdx > 0) { - VectorizedBatchUtil.setBatchSize(batch, rowIdx); - reducer.processOp(batch, tag); - } - if (isLogInfoEnabled) { - logProgress(); - } - } catch (Exception e) { - String rowString = null; - try { - /* batch.toString depends on this */ - batch.setValueWriters(valueStringWriters[tag] - .toArray(new VectorExpressionWriter[0])); - rowString = batch.toString(); - } catch (Exception e2) { - rowString = "[Error getting row data with exception " - + StringUtils.stringifyException(e2) + " ]"; - } - throw new HiveException("Hive Runtime Error while processing vector batch (tag=" - + tag + ") " + rowString, e); - } - return true; // give me more - } - @Override void close(){ - // check if there are IOExceptions - if (!abort) { - abort = execContext.getIoCxt().getIOExceptions(); - } - try { - if (groupKey != null) { - // If a operator wants to do some work at the end of a group - if(isLogTraceEnabled) { - l4j.trace("End Group"); - } - reducer.endGroup(); - } - if (isLogInfoEnabled) { - logCloseInfo(); + for (ReduceRecordSource rs: sources) { + abort = abort && rs.close(); } reducer.close(abort); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java Tue Oct 14 19:06:45 2014 @@ -37,6 +37,8 @@ public class TezContext extends MapredCo private ProcessorContext processorContext; + private RecordSource[] sources; + public TezContext(boolean isMap, JobConf jobConf) { super(isMap, jobConf); } @@ -70,4 +72,12 @@ public class TezContext extends MapredCo public ProcessorContext getTezProcessorContext() { return processorContext; } + + public RecordSource[] getRecordSources() { + return sources; + } + + public void setRecordSources(RecordSource[] sources) { + this.sources = sources; + } } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java Tue Oct 14 19:06:45 2014 @@ -68,17 +68,12 @@ public class TezJobMonitor { @Override public void run() { for (DAGClient c: shutdownList) { - try { - System.err.println("Trying to shutdown DAG"); - c.tryKillDAG(); - } catch (Exception e) { - // ignore - } + TezJobMonitor.killRunningJobs(); } try { for (TezSessionState s: TezSessionState.getOpenSessions()) { System.err.println("Shutting down tez session."); - TezSessionPoolManager.getInstance().close(s); + TezSessionPoolManager.getInstance().close(s, false); } } catch (Exception e) { // ignore @@ -113,6 +108,7 @@ public class TezJobMonitor { String lastReport = null; Set<StatusGetOpts> opts = new HashSet<StatusGetOpts>(); Heartbeater heartbeater = new Heartbeater(txnMgr, conf); + long startTime = 0; shutdownList.add(dagClient); @@ -141,11 +137,11 @@ public class TezJobMonitor { case RUNNING: if (!running) { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING); - console.printInfo("Status: Running (application id: " - +dagClient.getExecutionContext()+")\n"); + console.printInfo("Status: Running (" + dagClient.getExecutionContext() + ")\n"); for (String s: progressMap.keySet()) { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); } + startTime = System.currentTimeMillis(); running = true; } @@ -153,7 +149,8 @@ public class TezJobMonitor { break; case SUCCEEDED: lastReport = printStatus(progressMap, lastReport, console); - console.printInfo("Status: Finished successfully"); + double duration = (System.currentTimeMillis() - startTime)/1000.0; + console.printInfo("Status: Finished successfully in " + String.format("%.2f seconds", duration)); running = false; done = true; break; @@ -210,6 +207,21 @@ public class TezJobMonitor { return rc; } + /** + * killRunningJobs tries to terminate execution of all + * currently running tez queries. No guarantees, best effort only. + */ + public static void killRunningJobs() { + for (DAGClient c: shutdownList) { + try { + System.err.println("Trying to shutdown DAG"); + c.tryKillDAG(); + } catch (Exception e) { + // ignore + } + } + } + private String printStatus(Map<String, Progress> progressMap, String lastReport, LogHelper console) { StringBuffer reportBuffer = new StringBuffer(); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Tue Oct 14 19:06:45 2014 @@ -17,6 +17,14 @@ */ package org.apache.hadoop.hive.ql.exec.tez; +import java.io.IOException; +import java.text.NumberFormat; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -26,6 +34,7 @@ import org.apache.hadoop.mapred.OutputCo import org.apache.hadoop.util.StringUtils; import org.apache.tez.common.TezUtils; import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.mapreduce.input.MultiMRInput; import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.Event; @@ -34,11 +43,6 @@ import org.apache.tez.runtime.api.Logica import org.apache.tez.runtime.api.ProcessorContext; import org.apache.tez.runtime.library.api.KeyValueWriter; -import java.io.IOException; -import java.text.NumberFormat; -import java.util.List; -import java.util.Map; - /** * Hive processor for Tez that forms the vertices in Tez and processes the data. * Does what ExecMapper and ExecReducer does for hive in MR framework. @@ -90,7 +94,8 @@ public class TezProcessor extends Abstra perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR); Configuration conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); this.jobConf = new JobConf(conf); - setupMRLegacyConfigs(getContext()); + this.processorContext = getContext(); + setupMRLegacyConfigs(processorContext); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR); } @@ -130,14 +135,6 @@ public class TezProcessor extends Abstra if (isMap) { rproc = new MapRecordProcessor(jobConf); - MRInputLegacy mrInput = getMRInput(inputs); - try { - // TODO: This might create oldInputFormat in MRInput. - // We are assuming we don't need to wrap it for Llap. - mrInput.init(); - } catch (IOException e) { - throw new RuntimeException("Failed while initializing MRInput", e); - } } else { rproc = new ReduceRecordProcessor(); } @@ -150,6 +147,7 @@ public class TezProcessor extends Abstra throws Exception { Throwable originalThrowable = null; try { + // Outputs will be started later by the individual Processors. TezCacheAccess cacheAccess = TezCacheAccess.createInstance(jobConf); // Start the actual Inputs. After MRInput initialization. for (Map.Entry<String, LogicalInput> inputEntry : inputs.entrySet()) { @@ -157,13 +155,10 @@ public class TezProcessor extends Abstra LOG.info("Input: " + inputEntry.getKey() + " is not cached"); inputEntry.getValue().start(); } else { - LOG.info("Input: " + inputEntry.getKey() + - " is already cached. Skipping start"); + LOG.info("Input: " + inputEntry.getKey() + " is already cached. Skipping start"); } } - // Outputs will be started later by the individual Processors. - MRTaskReporter mrReporter = new MRTaskReporter(getContext()); rproc.init(jobConf, getContext(), mrReporter, inputs, outputs); rproc.run(); @@ -216,19 +211,4 @@ public class TezProcessor extends Abstra writer.write(key, value); } } - - static MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) { - //there should be only one MRInput - MRInputLegacy theMRInput = null; - for(LogicalInput inp : inputs.values()){ - if(inp instanceof MRInputLegacy){ - if(theMRInput != null){ - throw new IllegalArgumentException("Only one MRInput is expected"); - } - //a better logic would be to find the alias - theMRInput = (MRInputLegacy)inp; - } - } - return theMRInput; - } } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java Tue Oct 14 19:06:45 2014 @@ -168,10 +168,10 @@ public class TezSessionPoolManager { // session in the SessionState } - public void close(TezSessionState tezSessionState) throws Exception { + public void close(TezSessionState tezSessionState, boolean keepTmpDir) throws Exception { LOG.info("Closing tez session default? " + tezSessionState.isDefault()); if (!tezSessionState.isDefault()) { - tezSessionState.close(false); + tezSessionState.close(keepTmpDir); } } @@ -262,19 +262,24 @@ public class TezSessionPoolManager { } if (session != null) { - close(session); + close(session, false); } return getSession(conf, doOpen, forceCreate); } - public void closeAndOpen(TezSessionState sessionState, HiveConf conf) + public void closeAndOpen(TezSessionState sessionState, HiveConf conf, boolean keepTmpDir) throws Exception { + closeAndOpen(sessionState, conf, null, keepTmpDir); + } + + public void closeAndOpen(TezSessionState sessionState, HiveConf conf, + String[] additionalFiles, boolean keepTmpDir) throws Exception { HiveConf sessionConf = sessionState.getConf(); if (sessionConf != null && sessionConf.get("tez.queue.name") != null) { conf.set("tez.queue.name", sessionConf.get("tez.queue.name")); } - close(sessionState); - sessionState.open(conf); + close(sessionState, keepTmpDir); + sessionState.open(conf, additionalFiles); } } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Tue Oct 14 19:06:45 2014 @@ -207,11 +207,6 @@ public class TezSessionState { } catch(InterruptedException ie) { //ignore } - // In case we need to run some MR jobs, we'll run them under tez MR emulation. The session - // id is used for tez to reuse the current session rather than start a new one. - conf.set("mapreduce.framework.name", "yarn-tez"); - conf.set("mapreduce.tez.session.tokill-application-id", - session.getAppMasterApplicationId().toString()); openSessions.add(this); } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Tue Oct 14 19:06:45 2014 @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.Ut import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; @@ -55,6 +56,7 @@ import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.GroupInputEdge; import org.apache.tez.dag.api.SessionNotRunning; +import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.dag.api.client.DAGClient; @@ -124,14 +126,11 @@ public class TezTask extends Task<TezWor // create the tez tmp dir scratchDir = utils.createTezDir(scratchDir, conf); - if (!session.isOpen()) { - // can happen if the user sets the tez flag after the session was - // established - LOG.info("Tez session hasn't been created yet. Opening session"); - session.open(conf, inputOutputJars); - } else { - session.refreshLocalResourcesFromConf(conf); - } + Map<String,LocalResource> inputOutputLocalResources = + getExtraLocalResources(jobConf, scratchDir, inputOutputJars); + + // Ensure the session is open and has the necessary local resources + updateSession(session, jobConf, scratchDir, inputOutputJars, inputOutputLocalResources); List<LocalResource> additionalLr = session.getLocalizedResources(); @@ -153,8 +152,12 @@ public class TezTask extends Task<TezWor // next we translate the TezWork to a Tez DAG DAG dag = build(jobConf, work, scratchDir, appJarLr, additionalLr, ctx); + // Add the extra resources to the dag + addExtraResourcesToDag(session, dag, inputOutputJars, inputOutputLocalResources); + // submit will send the job to the cluster and start executing - client = submit(jobConf, dag, scratchDir, appJarLr, session, additionalLr); + client = submit(jobConf, dag, scratchDir, appJarLr, session, + additionalLr, inputOutputJars, inputOutputLocalResources); // finally monitor will print progress until the job is done TezJobMonitor monitor = new TezJobMonitor(); @@ -195,6 +198,63 @@ public class TezTask extends Task<TezWor return rc; } + /** + * Converted the list of jars into local resources + */ + Map<String,LocalResource> getExtraLocalResources(JobConf jobConf, Path scratchDir, + String[] inputOutputJars) throws Exception { + final Map<String,LocalResource> resources = new HashMap<String,LocalResource>(); + final List<LocalResource> localResources = utils.localizeTempFiles( + scratchDir.toString(), jobConf, inputOutputJars); + if (null != localResources) { + for (LocalResource lr : localResources) { + resources.put(utils.getBaseName(lr), lr); + } + } + return resources; + } + + /** + * Ensures that the Tez Session is open and the AM has all necessary jars configured. + */ + void updateSession(TezSessionState session, + JobConf jobConf, Path scratchDir, String[] inputOutputJars, + Map<String,LocalResource> extraResources) throws Exception { + final boolean missingLocalResources = !session + .hasResources(inputOutputJars); + + if (!session.isOpen()) { + // can happen if the user sets the tez flag after the session was + // established + LOG.info("Tez session hasn't been created yet. Opening session"); + session.open(conf, inputOutputJars); + } else { + LOG.info("Session is already open"); + + // Ensure the open session has the necessary resources (StorageHandler) + if (missingLocalResources) { + LOG.info("Tez session missing resources," + + " adding additional necessary resources"); + session.getSession().addAppMasterLocalFiles(extraResources); + } + + session.refreshLocalResourcesFromConf(conf); + } + } + + /** + * Adds any necessary resources that must be localized in each vertex to the DAG. + */ + void addExtraResourcesToDag(TezSessionState session, DAG dag, + String[] inputOutputJars, + Map<String,LocalResource> inputOutputLocalResources) throws Exception { + if (!session.hasResources(inputOutputJars)) { + if (null != inputOutputLocalResources) { + dag.addTaskLocalFiles(inputOutputLocalResources); + } + } + } + DAG build(JobConf conf, TezWork work, Path scratchDir, LocalResource appJarLr, List<LocalResource> additionalLr, Context ctx) throws Exception { @@ -254,15 +314,16 @@ public class TezTask extends Task<TezWor for (BaseWork v: children) { // finally we can create the grouped edge GroupInputEdge e = utils.createEdge(group, parentConf, - workToVertex.get(v), work.getEdgeProperty(w, v)); + workToVertex.get(v), work.getEdgeProperty(w, v), work.getVertexType(v)); dag.addEdge(e); } } else { // Regular vertices JobConf wxConf = utils.initializeVertexConf(conf, ctx, w); - Vertex wx = utils.createVertex(wxConf, w, scratchDir, appJarLr, - additionalLr, fs, ctx, !isFinal, work); + Vertex wx = + utils.createVertex(wxConf, w, scratchDir, appJarLr, additionalLr, fs, ctx, !isFinal, + work, work.getVertexType(w)); dag.addVertex(wx); utils.addCredentials(w, dag); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName()); @@ -276,7 +337,7 @@ public class TezTask extends Task<TezWor TezEdgeProperty edgeProp = work.getEdgeProperty(w, v); - e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp); + e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp, work.getVertexType(v)); dag.addEdge(e); } } @@ -287,7 +348,8 @@ public class TezTask extends Task<TezWor DAGClient submit(JobConf conf, DAG dag, Path scratchDir, LocalResource appJarLr, TezSessionState sessionState, - List<LocalResource> additionalLr) + List<LocalResource> additionalLr, String[] inputOutputJars, + Map<String,LocalResource> inputOutputLocalResources) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG); @@ -308,7 +370,7 @@ public class TezTask extends Task<TezWor console.printInfo("Tez session was closed. Reopening..."); // close the old one, but keep the tmp files around - TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf); + TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf, inputOutputJars, true); console.printInfo("Session re-established."); dagClient = sessionState.getSession().submitDAG(dag); @@ -326,6 +388,9 @@ public class TezTask extends Task<TezWor try { List<BaseWork> ws = work.getAllWork(); for (BaseWork w: ws) { + if (w instanceof MergeJoinWork) { + w = ((MergeJoinWork) w).getMainWork(); + } for (Operator<?> op: w.getAllOperators()) { op.jobClose(conf, rc == 0); } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java Tue Oct 14 19:06:45 2014 @@ -40,7 +40,7 @@ public class TezMergedLogicalInput exten @Override public Reader getReader() throws Exception { - return new InputMerger(getInputs()); + return new KeyValuesInputMerger(getInputs()); } @Override Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java Tue Oct 14 19:06:45 2014 @@ -24,7 +24,9 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.hive.common.type.Decimal128; +import org.apache.hadoop.hive.common.type.HiveChar; import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveVarchar; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DateWritable; @@ -404,8 +406,7 @@ public class VectorColumnAssignFactory { public void assignObjectValue(Object val, int destIndex) throws HiveException { if (val == null) { assignNull(destIndex); - } - else { + } else { Text bw = (Text) val; byte[] bytes = bw.getBytes(); assignBytes(bytes, 0, bw.getLength(), destIndex); @@ -413,6 +414,35 @@ public class VectorColumnAssignFactory { } }.init(outputBatch, (BytesColumnVector) destCol); break; + case VARCHAR: + outVCA = new VectorBytesColumnAssign() { + @Override + public void assignObjectValue(Object val, int destIndex) throws HiveException { + if (val == null) { + assignNull(destIndex); + } else { + HiveVarchar hiveVarchar = (HiveVarchar) val; + byte[] bytes = hiveVarchar.getValue().getBytes(); + assignBytes(bytes, 0, bytes.length, destIndex); + } + } + }.init(outputBatch, (BytesColumnVector) destCol); + break; + case CHAR: + outVCA = new VectorBytesColumnAssign() { + @Override + public void assignObjectValue(Object val, int destIndex) throws HiveException { + if (val == null) { + assignNull(destIndex); + } else { + // We store CHAR type stripped of pads. + HiveChar hiveChar = (HiveChar) val; + byte[] bytes = hiveChar.getStrippedValue().getBytes(); + assignBytes(bytes, 0, bytes.length, destIndex); + } + } + }.init(outputBatch, (BytesColumnVector) destCol); + break; default: throw new HiveException("Incompatible Bytes vector column and primitive category " + category); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java Tue Oct 14 19:06:45 2014 @@ -67,6 +67,7 @@ public class VectorExpressionDescriptor DATE (0x040), TIMESTAMP (0x080), DATETIME_FAMILY (DATE.value | TIMESTAMP.value), + INT_TIMESTAMP_FAMILY (INT_FAMILY.value | TIMESTAMP.value), INT_DATETIME_FAMILY (INT_FAMILY.value | DATETIME_FAMILY.value), STRING_DATETIME_FAMILY (STRING_FAMILY.value | DATETIME_FAMILY.value), ALL_FAMILY (0xFFF); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java Tue Oct 14 19:06:45 2014 @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.vector; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -45,7 +46,8 @@ public class VectorExtractOperator exten private int keyColCount; private int valueColCount; - private transient int [] projectedColumns = null; + private transient VectorizedRowBatch outputBatch; + private transient int remainingColCount; public VectorExtractOperator(VectorizationContext vContext, OperatorDesc conf) throws HiveException { @@ -57,26 +59,25 @@ public class VectorExtractOperator exten super(); } - private StructObjectInspector makeStandardStructObjectInspector(StructObjectInspector structObjectInspector) { - List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs(); + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + StructObjectInspector structInputObjInspector = (StructObjectInspector) inputObjInspectors[0]; + List<? extends StructField> fields = structInputObjInspector.getAllStructFieldRefs(); ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>(); ArrayList<String> colNames = new ArrayList<String>(); - for (StructField field: fields) { - colNames.add(field.getFieldName()); + for (int i = keyColCount; i < fields.size(); i++) { + StructField field = fields.get(i); + String fieldName = field.getFieldName(); + + // Remove "VALUE." prefix. + int dotIndex = fieldName.indexOf("."); + colNames.add(fieldName.substring(dotIndex + 1)); ois.add(field.getFieldObjectInspector()); } - return ObjectInspectorFactory + outputObjInspector = ObjectInspectorFactory .getStandardStructObjectInspector(colNames, ois); - } - - @Override - protected void initializeOp(Configuration hconf) throws HiveException { - outputObjInspector = inputObjInspectors[0]; - LOG.info("VectorExtractOperator class of outputObjInspector is " + outputObjInspector.getClass().getName()); - projectedColumns = new int [valueColCount]; - for (int i = 0; i < valueColCount; i++) { - projectedColumns[i] = keyColCount + i; - } + remainingColCount = fields.size() - keyColCount; + outputBatch = new VectorizedRowBatch(remainingColCount); initializeChildren(hconf); } @@ -86,20 +87,16 @@ public class VectorExtractOperator exten } @Override - // Evaluate vectorized batches of rows and forward them. + // Remove the key columns and forward the values (and scratch columns). public void processOp(Object row, int tag) throws HiveException { - VectorizedRowBatch vrg = (VectorizedRowBatch) row; + VectorizedRowBatch inputBatch = (VectorizedRowBatch) row; + + // Copy references to the input columns array starting after the keys... + for (int i = 0; i < remainingColCount; i++) { + outputBatch.cols[i] = inputBatch.cols[keyColCount + i]; + } + outputBatch.size = inputBatch.size; - // Project away the key columns... - int[] originalProjections = vrg.projectedColumns; - int originalProjectionSize = vrg.projectionSize; - vrg.projectionSize = valueColCount; - vrg.projectedColumns = this.projectedColumns; - - forward(vrg, outputObjInspector); - - // Revert the projected columns back, because vrg will be re-used. - vrg.projectionSize = originalProjectionSize; - vrg.projectedColumns = originalProjections; + forward(outputBatch, outputObjInspector); } } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java Tue Oct 14 19:06:45 2014 @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.ql.exec.vector; -import java.io.IOException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; @@ -27,16 +25,7 @@ import org.apache.hadoop.hive.ql.exec.ve import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.hive.common.StatsSetupConst; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeStats; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.io.ObjectWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; /** * File Sink operator implementation. @@ -69,113 +58,10 @@ public class VectorFileSinkOperator exte @Override public void processOp(Object data, int tag) throws HiveException { - VectorizedRowBatch vrg = (VectorizedRowBatch)data; - - Writable [] records = null; - boolean vectorizedSerde = false; - try { - if (serializer instanceof VectorizedSerde) { - recordValue = ((VectorizedSerde) serializer).serializeVector(vrg, - inputObjInspectors[0]); - records = (Writable[]) ((ObjectWritable) recordValue).get(); - vectorizedSerde = true; - } - } catch (SerDeException e1) { - throw new HiveException(e1); - } - for (int i = 0; i < vrg.size; i++) { - Writable row = null; - if (vectorizedSerde) { - row = records[i]; - } else { - if (vrg.valueWriters == null) { - vrg.setValueWriters(this.valueWriters); - } - try { - row = serializer.serialize(getRowObject(vrg, i), inputObjInspectors[0]); - } catch (SerDeException ex) { - throw new HiveException(ex); - } - } - /* Create list bucketing sub-directory only if stored-as-directories is on. */ - String lbDirName = null; - lbDirName = (lbCtx == null) ? null : generateListBucketingDirName(row); - - FSPaths fpaths; - - if (!bDynParts && !filesCreated) { - if (lbDirName != null) { - FSPaths fsp2 = lookupListBucketingPaths(lbDirName); - } else { - createBucketFiles(fsp); - } - } - - try { - updateProgress(); - - // if DP is enabled, get the final output writers and prepare the real output row - assert inputObjInspectors[0].getCategory() == ObjectInspector.Category.STRUCT : "input object inspector is not struct"; - - if (bDynParts) { - // copy the DP column values from the input row to dpVals - dpVals.clear(); - dpWritables.clear(); - ObjectInspectorUtils.partialCopyToStandardObject(dpWritables, row, dpStartCol, numDynParts, - (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE); - // get a set of RecordWriter based on the DP column values - // pass the null value along to the escaping process to determine what the dir should be - for (Object o : dpWritables) { - if (o == null || o.toString().length() == 0) { - dpVals.add(dpCtx.getDefaultPartitionName()); - } else { - dpVals.add(o.toString()); - } - } - fpaths = getDynOutPaths(dpVals, lbDirName); - - } else { - if (lbDirName != null) { - fpaths = lookupListBucketingPaths(lbDirName); - } else { - fpaths = fsp; - } - } - - rowOutWriters = fpaths.getOutWriters(); - // check if all record writers implement statistics. if atleast one RW - // doesn't implement stats interface we will fallback to conventional way - // of gathering stats - isCollectRWStats = areAllTrue(statsFromRecordWriter); - if (conf.isGatherStats() && !isCollectRWStats) { - if (statsCollectRawDataSize) { - SerDeStats stats = serializer.getSerDeStats(); - if (stats != null) { - fpaths.getStat().addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize()); - } - } - fpaths.getStat().addToStat(StatsSetupConst.ROW_COUNT, 1); - } - - - if (row_count != null) { - row_count.set(row_count.get() + 1); - } - - if (!multiFileSpray) { - rowOutWriters[0].write(row); - } else { - int keyHashCode = 0; - key.setHashCode(keyHashCode); - int bucketNum = prtner.getBucket(key, null, totalFiles); - int idx = bucketMap.get(bucketNum); - rowOutWriters[idx].write(row); - } - } catch (IOException e) { - throw new HiveException(e); - } + Object[] row = getRowObject(vrg, i); + super.processOp(row, tag); } } @@ -187,7 +73,7 @@ public class VectorFileSinkOperator exte } for (int i = 0; i < vrg.projectionSize; i++) { ColumnVector vectorColumn = vrg.cols[vrg.projectedColumns[i]]; - singleRow[i] = vrg.valueWriters[i].writeValue(vectorColumn, batchIndex); + singleRow[i] = valueWriters[i].writeValue(vectorColumn, batchIndex); } return singleRow; } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java Tue Oct 14 19:06:45 2014 @@ -653,6 +653,21 @@ public class VectorGroupByOperator exten /** * Sorted reduce group batch processing mode. Each input VectorizedRowBatch will have the * same key. On endGroup (or close), the intermediate values are flushed. + * + * We build the output rows one-at-a-time in the output vectorized row batch (outputBatch) + * in 2 steps: + * + * 1) Just after startGroup, we copy the group key to the next position in the output batch, + * but don't increment the size in the batch (yet). This is done with the copyGroupKey + * method of VectorGroupKeyHelper. The next position is outputBatch.size + * + * We know the same key is used for the whole batch (i.e. repeating) since that is how + * vectorized reduce-shuffle feeds the batches to us. + * + * 2) Later at endGroup after reduce-shuffle has fed us all the input batches for the group, + * we fill in the aggregation columns in outputBatch at outputBatch.size. Our method + * writeGroupRow does this and finally increments outputBatch.size. + * */ private class ProcessingModeGroupBatches extends ProcessingModeBase { Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java Tue Oct 14 19:06:45 2014 @@ -42,19 +42,38 @@ public class VectorGroupKeyHelper extend finishAdding(); } + /* + * This helper method copies the group keys from one vectorized row batch to another, + * but does not increment the outputBatch.size (i.e. the next output position). + * + * It was designed for VectorGroupByOperator's sorted reduce group batch processing mode + * to copy the group keys at startGroup. + */ public void copyGroupKey(VectorizedRowBatch inputBatch, VectorizedRowBatch outputBatch, DataOutputBuffer buffer) throws HiveException { - // Grab the key at index 0. We don't care about selected or repeating since all keys in the input batch are the same. for(int i = 0; i< longIndices.length; ++i) { int keyIndex = longIndices[i]; LongColumnVector inputColumnVector = (LongColumnVector) inputBatch.cols[keyIndex]; LongColumnVector outputColumnVector = (LongColumnVector) outputBatch.cols[keyIndex]; + + // This vectorized code pattern says: + // If the input batch has no nulls at all (noNulls is true) OR + // the input row is NOT NULL, copy the value. + // + // Otherwise, we have a NULL input value. The standard way to mark a NULL in the + // output batch is: turn off noNulls indicating there is at least one NULL in the batch + // and mark that row as NULL. + // + // When a vectorized row batch is reset, noNulls is set to true and the isNull array + // is zeroed. + // + // We grab the key at index 0. We don't care about selected or repeating since all keys + // in the input batch are suppose to be the same. + // if (inputColumnVector.noNulls || !inputColumnVector.isNull[0]) { outputColumnVector.vector[outputBatch.size] = inputColumnVector.vector[0]; - } else if (inputColumnVector.noNulls ){ - outputColumnVector.noNulls = false; - outputColumnVector.isNull[outputBatch.size] = true; } else { + outputColumnVector.noNulls = false; outputColumnVector.isNull[outputBatch.size] = true; } } @@ -64,10 +83,8 @@ public class VectorGroupKeyHelper extend DoubleColumnVector outputColumnVector = (DoubleColumnVector) outputBatch.cols[keyIndex]; if (inputColumnVector.noNulls || !inputColumnVector.isNull[0]) { outputColumnVector.vector[outputBatch.size] = inputColumnVector.vector[0]; - } else if (inputColumnVector.noNulls ){ - outputColumnVector.noNulls = false; - outputColumnVector.isNull[outputBatch.size] = true; } else { + outputColumnVector.noNulls = false; outputColumnVector.isNull[outputBatch.size] = true; } } @@ -85,10 +102,8 @@ public class VectorGroupKeyHelper extend throw new IllegalStateException("bad write", ioe); } outputColumnVector.setRef(outputBatch.size, buffer.getData(), start, length); - } else if (inputColumnVector.noNulls ){ - outputColumnVector.noNulls = false; - outputColumnVector.isNull[outputBatch.size] = true; } else { + outputColumnVector.noNulls = false; outputColumnVector.isNull[outputBatch.size] = true; } } @@ -98,10 +113,8 @@ public class VectorGroupKeyHelper extend DecimalColumnVector outputColumnVector = (DecimalColumnVector) outputBatch.cols[keyIndex]; if (inputColumnVector.noNulls || !inputColumnVector.isNull[0]) { outputColumnVector.vector[outputBatch.size] = inputColumnVector.vector[0]; - } else if (inputColumnVector.noNulls ){ - outputColumnVector.noNulls = false; - outputColumnVector.isNull[outputBatch.size] = true; } else { + outputColumnVector.noNulls = false; outputColumnVector.isNull[outputBatch.size] = true; } } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java Tue Oct 14 19:06:45 2014 @@ -36,6 +36,12 @@ import org.apache.hadoop.hive.serde2.obj */ public class VectorHashKeyWrapper extends KeyWrapper { + private static final int[] EMPTY_INT_ARRAY = new int[0]; + private static final long[] EMPTY_LONG_ARRAY = new long[0]; + private static final double[] EMPTY_DOUBLE_ARRAY = new double[0]; + private static final byte[][] EMPTY_BYTES_ARRAY = new byte[0][]; + private static final Decimal128[] EMPTY_DECIMAL_ARRAY = new Decimal128[0]; + private long[] longValues; private double[] doubleValues; @@ -50,15 +56,21 @@ public class VectorHashKeyWrapper extend public VectorHashKeyWrapper(int longValuesCount, int doubleValuesCount, int byteValuesCount, int decimalValuesCount) { - longValues = new long[longValuesCount]; - doubleValues = new double[doubleValuesCount]; - decimalValues = new Decimal128[decimalValuesCount]; + longValues = longValuesCount > 0 ? new long[longValuesCount] : EMPTY_LONG_ARRAY; + doubleValues = doubleValuesCount > 0 ? new double[doubleValuesCount] : EMPTY_DOUBLE_ARRAY; + decimalValues = decimalValuesCount > 0 ? new Decimal128[decimalValuesCount] : EMPTY_DECIMAL_ARRAY; for(int i = 0; i < decimalValuesCount; ++i) { decimalValues[i] = new Decimal128(); } - byteValues = new byte[byteValuesCount][]; - byteStarts = new int[byteValuesCount]; - byteLengths = new int[byteValuesCount]; + if (byteValuesCount > 0) { + byteValues = new byte[byteValuesCount][]; + byteStarts = new int[byteValuesCount]; + byteLengths = new int[byteValuesCount]; + } else { + byteValues = EMPTY_BYTES_ARRAY; + byteStarts = EMPTY_INT_ARRAY; + byteLengths = EMPTY_INT_ARRAY; + } isNull = new boolean[longValuesCount + doubleValuesCount + byteValuesCount + decimalValuesCount]; hashcode = 0; } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java Tue Oct 14 19:06:45 2014 @@ -207,8 +207,7 @@ public class VectorMapJoinOperator exten Object[] values = (Object[]) row; VectorColumnAssign[] vcas = outputVectorAssigners.get(outputOI); if (null == vcas) { - Map<String, Map<String, Integer>> allColumnMaps = Utilities. - getMapRedWork(hconf).getMapWork().getScratchColumnMap(); + Map<String, Map<String, Integer>> allColumnMaps = Utilities.getScratchColumnMap(hconf); Map<String, Integer> columnMap = allColumnMaps.get(fileKey); vcas = VectorColumnAssignFactory.buildAssigners( outputBatch, outputOI, columnMap, conf.getOutputColumnNames());
