Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java Tue Oct 14 19:06:45 2014 @@ -18,99 +18,30 @@ package org.apache.hadoop.hive.ql.exec.vector; -import java.io.IOException; -import java.util.Random; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.exec.PTFTopNHash; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.TopNHash; -import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; -import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; -import org.apache.hadoop.hive.ql.plan.TableDesc; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.Serializer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -// import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; public class VectorReduceSinkOperator extends ReduceSinkOperator { - private static final Log LOG = LogFactory.getLog( - VectorReduceSinkOperator.class.getName()); - private static final long serialVersionUID = 1L; - /** - * The evaluators for the key columns. Key columns decide the sort order on - * the reducer side. Key columns are passed to the reducer in the "key". - */ - private VectorExpression[] keyEval; - - /** - * The key value writers. These know how to write the necessary writable type - * based on key column metadata, from the primitive vector type. - */ - private transient VectorExpressionWriter[] keyWriters; - - /** - * The evaluators for the value columns. Value columns are passed to reducer - * in the "value". - */ - private VectorExpression[] valueEval; - - /** - * The output value writers. These know how to write the necessary writable type - * based on value column metadata, from the primitive vector type. - */ - private transient VectorExpressionWriter[] valueWriters; - - /** - * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in - * Hive language). Partition columns decide the reducer that the current row - * goes to. Partition columns are not passed to reducer. - */ - private VectorExpression[] partitionEval; - - /** - * Evaluators for bucketing columns. This is used to compute bucket number. - */ - private VectorExpression[] bucketEval; - private int buckColIdxInKey; - - /** - * The partition value writers. These know how to write the necessary writable type - * based on partition column metadata, from the primitive vector type. - */ - private transient VectorExpressionWriter[] partitionWriters; - private transient VectorExpressionWriter[] bucketWriters = null; - - private static final boolean isDebugEnabled = LOG.isDebugEnabled(); + // Writer for producing row from input batch. + private VectorExpressionWriter[] rowWriters; + + protected transient Object[] singleRow; public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf) throws HiveException { this(); ReduceSinkDesc desc = (ReduceSinkDesc) conf; this.conf = desc; - keyEval = vContext.getVectorExpressions(desc.getKeyCols()); - valueEval = vContext.getVectorExpressions(desc.getValueCols()); - partitionEval = vContext.getVectorExpressions(desc.getPartitionCols()); - bucketEval = null; - if (desc.getBucketCols() != null && !desc.getBucketCols().isEmpty()) { - bucketEval = vContext.getVectorExpressions(desc.getBucketCols()); - buckColIdxInKey = desc.getPartitionCols().size(); - } } public VectorReduceSinkOperator() { @@ -119,399 +50,49 @@ public class VectorReduceSinkOperator ex @Override protected void initializeOp(Configuration hconf) throws HiveException { - try { - numDistributionKeys = conf.getNumDistributionKeys(); - distinctColIndices = conf.getDistinctColumnIndices(); - numDistinctExprs = distinctColIndices.size(); - - TableDesc keyTableDesc = conf.getKeySerializeInfo(); - keySerializer = (Serializer) keyTableDesc.getDeserializerClass() - .newInstance(); - keySerializer.initialize(null, keyTableDesc.getProperties()); - keyIsText = keySerializer.getSerializedClass().equals(Text.class); - - /* - * Compute and assign the key writers and the key object inspector - */ - VectorExpressionWriterFactory.processVectorExpressions( - conf.getKeyCols(), - conf.getOutputKeyColumnNames(), - new VectorExpressionWriterFactory.SingleOIDClosure() { - @Override - public void assign(VectorExpressionWriter[] writers, - ObjectInspector objectInspector) { - keyWriters = writers; - keyObjectInspector = objectInspector; - } - }); - - String colNames = ""; - for(String colName : conf.getOutputKeyColumnNames()) { - colNames = String.format("%s %s", colNames, colName); - } - - if (isDebugEnabled) { - LOG.debug(String.format("keyObjectInspector [%s]%s => %s", - keyObjectInspector.getClass(), - keyObjectInspector, - colNames)); - } - - partitionWriters = VectorExpressionWriterFactory.getExpressionWriters(conf.getPartitionCols()); - if (conf.getBucketCols() != null && !conf.getBucketCols().isEmpty()) { - bucketWriters = VectorExpressionWriterFactory.getExpressionWriters(conf.getBucketCols()); - } - - TableDesc valueTableDesc = conf.getValueSerializeInfo(); - valueSerializer = (Serializer) valueTableDesc.getDeserializerClass() - .newInstance(); - valueSerializer.initialize(null, valueTableDesc.getProperties()); - - /* - * Compute and assign the value writers and the value object inspector - */ - VectorExpressionWriterFactory.processVectorExpressions( - conf.getValueCols(), - conf.getOutputValueColumnNames(), - new VectorExpressionWriterFactory.SingleOIDClosure() { - @Override - public void assign(VectorExpressionWriter[] writers, - ObjectInspector objectInspector) { - valueWriters = writers; - valueObjectInspector = objectInspector; + // We need a input object inspector that is for the row we will extract out of the + // vectorized row batch, not for example, an original inspector for an ORC table, etc. + VectorExpressionWriterFactory.processVectorInspector( + (StructObjectInspector) inputObjInspectors[0], + new VectorExpressionWriterFactory.SingleOIDClosure() { + @Override + public void assign(VectorExpressionWriter[] writers, + ObjectInspector objectInspector) { + rowWriters = writers; + inputObjInspectors[0] = objectInspector; } - }); - - if (isDebugEnabled) { - colNames = ""; - for(String colName : conf.getOutputValueColumnNames()) { - colNames = String.format("%s %s", colNames, colName); - } - } - - if (isDebugEnabled) { - LOG.debug(String.format("valueObjectInspector [%s]%s => %s", - valueObjectInspector.getClass(), - valueObjectInspector, - colNames)); - } + }); + singleRow = new Object[rowWriters.length]; - int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1; - int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 : - numDistributionKeys; - cachedKeys = new Object[numKeys][keyLen]; - cachedValues = new Object[valueEval.length]; - - int tag = conf.getTag(); - tagByte[0] = (byte) tag; - LOG.info("Using tag = " + tag); - - int limit = conf.getTopN(); - float memUsage = conf.getTopNMemoryUsage(); - if (limit >= 0 && memUsage > 0) { - reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : reducerHash; - reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this); - } - - autoParallel = conf.isAutoParallel(); - - } catch(Exception e) { - throw new HiveException(e); - } + // Call ReduceSinkOperator with new input inspector. + super.initializeOp(hconf); } @Override - public void processOp(Object row, int tag) throws HiveException { - VectorizedRowBatch vrg = (VectorizedRowBatch) row; - - if (isDebugEnabled) { - LOG.debug(String.format("sinking %d rows, %d values, %d keys, %d parts", - vrg.size, - valueEval.length, - keyEval.length, - partitionEval.length)); - } - - try { - // Evaluate the keys - for (int i = 0; i < keyEval.length; i++) { - keyEval[i].evaluate(vrg); - } - - // Determine which rows we need to emit based on topN optimization - int startResult = reducerHash.startVectorizedBatch(vrg.size); - if (startResult == TopNHash.EXCLUDE) { - return; // TopN wants us to exclude all rows. - } - // TODO: can we do this later/only for the keys that are needed? E.g. update vrg.selected. - for (int i = 0; i < partitionEval.length; i++) { - partitionEval[i].evaluate(vrg); - } - if (bucketEval != null) { - for (int i = 0; i < bucketEval.length; i++) { - bucketEval[i].evaluate(vrg); - } - } - // run the vector evaluations - for (int i = 0; i < valueEval.length; i++) { - valueEval[i].evaluate(vrg); - } - - boolean useTopN = startResult != TopNHash.FORWARD; - // Go thru the batch once. If we are not using TopN, we will forward all things and be done. - // If we are using topN, we will make the first key for each row and store/forward it. - // Values, hashes and additional distinct rows will be handled in the 2nd pass in that case. - for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) { - int rowIndex = batchIndex; - if (vrg.selectedInUse) { - rowIndex = vrg.selected[batchIndex]; - } - // First, make distrib key components for this row and determine distKeyLength. - populatedCachedDistributionKeys(vrg, rowIndex, 0); - - // replace bucketing columns with hashcode % numBuckets - int buckNum = -1; - if (bucketEval != null) { - buckNum = computeBucketNumber(vrg, rowIndex, conf.getNumBuckets()); - cachedKeys[0][buckColIdxInKey] = new IntWritable(buckNum); - } - HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null); - int distKeyLength = firstKey.getDistKeyLength(); - // Add first distinct expression, if any. - if (numDistinctExprs > 0) { - populateCachedDistinctKeys(vrg, rowIndex, 0); - firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength); - } - - final int hashCode; - - // distKeyLength doesn't include tag, but includes buckNum in cachedKeys[0] - if (autoParallel && partitionEval.length > 0) { - hashCode = computeMurmurHash(firstKey); - } else { - hashCode = computeHashCode(vrg, rowIndex, buckNum); - } - - firstKey.setHashCode(hashCode); - - if (useTopN) { - /* - * in case of TopN for windowing, we need to distinguish between - * rows with null partition keys and rows with value 0 for partition keys. - */ - boolean partkeysNull = conf.isPTFReduceSink() && partitionKeysAreNull(vrg, rowIndex); - reducerHash.tryStoreVectorizedKey(firstKey, partkeysNull, batchIndex); - } else { - // No TopN, just forward the first key and all others. - BytesWritable value = makeValueWritable(vrg, rowIndex); - collect(firstKey, value); - forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 0); - } - } - - if (!useTopN) return; // All done. - - // If we use topN, we have called tryStore on every key now. We can process the results. - for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) { - int result = reducerHash.getVectorizedBatchResult(batchIndex); - if (result == TopNHash.EXCLUDE) continue; - int rowIndex = batchIndex; - if (vrg.selectedInUse) { - rowIndex = vrg.selected[batchIndex]; - } - // Compute value and hashcode - we'd either store or forward them. - BytesWritable value = makeValueWritable(vrg, rowIndex); - int distKeyLength = -1; - int hashCode; - if (result == TopNHash.FORWARD) { - HiveKey firstKey = reducerHash.getVectorizedKeyToForward(batchIndex); - distKeyLength = firstKey.getDistKeyLength(); - hashCode = firstKey.hashCode(); - collect(firstKey, value); - } else { - hashCode = reducerHash.getVectorizedKeyHashCode(batchIndex); - reducerHash.storeValue(result, hashCode, value, true); - distKeyLength = reducerHash.getVectorizedKeyDistLength(batchIndex); - } - // Now forward other the rows if there's multi-distinct (but see TODO in forward...). - // Unfortunately, that means we will have to rebuild the cachedKeys. Start at 1. - if (numDistinctExprs > 1) { - populatedCachedDistributionKeys(vrg, rowIndex, 1); - forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 1); - } - } - } catch (SerDeException e) { - throw new HiveException(e); - } catch (IOException e) { - throw new HiveException(e); - } - } - - /** - * This function creates and forwards all the additional KVs for the multi-distinct case, - * after the first (0th) KV pertaining to the row has already been stored or forwarded. - * @param vrg the batch - * @param rowIndex the row index in the batch - * @param hashCode the partitioning hash code to use; same as for the first KV - * @param value the value to use; same as for the first KV - * @param distKeyLength the distribution key length of the first key; TODO probably extraneous - * @param tag the tag - * @param baseIndex the index in cachedKeys where the pre-evaluated distribution keys are stored - */ - private void forwardExtraDistinctRows(VectorizedRowBatch vrg, int rowIndex,int hashCode, - BytesWritable value, int distKeyLength, int tag, int baseIndex) - throws HiveException, SerDeException, IOException { - // TODO: We don't have to forward extra distinct rows immediately (same in non-vector) if - // the first key has already been stored. There's few bytes difference between keys - // for different distincts, and the value/etc. are all the same. - // We could store deltas to re-gen extra rows when flushing TopN. - for (int i = 1; i < numDistinctExprs; i++) { - if (i != baseIndex) { - System.arraycopy(cachedKeys[baseIndex], 0, cachedKeys[i], 0, numDistributionKeys); - } - populateCachedDistinctKeys(vrg, rowIndex, i); - HiveKey hiveKey = toHiveKey(cachedKeys[i], tag, distKeyLength); - hiveKey.setHashCode(hashCode); - collect(hiveKey, value); - } - } - - /** - * Populate distribution keys part of cachedKeys for a particular row from the batch. - * @param vrg the batch - * @param rowIndex the row index in the batch - * @param index the cachedKeys index to write to - */ - private void populatedCachedDistributionKeys( - VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException { - for (int i = 0; i < numDistributionKeys; i++) { - int batchColumn = keyEval[i].getOutputColumn(); - ColumnVector vectorColumn = vrg.cols[batchColumn]; - cachedKeys[index][i] = keyWriters[i].writeValue(vectorColumn, rowIndex); - } - if (cachedKeys[index].length > numDistributionKeys) { - cachedKeys[index][numDistributionKeys] = null; - } - } - - /** - * Populate distinct keys part of cachedKeys for a particular row from the batch. - * @param vrg the batch - * @param rowIndex the row index in the batch - * @param index the cachedKeys index to write to - */ - private void populateCachedDistinctKeys( - VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException { - StandardUnion union; - cachedKeys[index][numDistributionKeys] = union = new StandardUnion( - (byte)index, new Object[distinctColIndices.get(index).size()]); - Object[] distinctParameters = (Object[]) union.getObject(); - for (int distinctParamI = 0; distinctParamI < distinctParameters.length; distinctParamI++) { - int distinctColIndex = distinctColIndices.get(index).get(distinctParamI); - int batchColumn = keyEval[distinctColIndex].getOutputColumn(); - distinctParameters[distinctParamI] = - keyWriters[distinctColIndex].writeValue(vrg.cols[batchColumn], rowIndex); - } - union.setTag((byte) index); - } + public void processOp(Object data, int tag) throws HiveException { + VectorizedRowBatch vrg = (VectorizedRowBatch) data; - private BytesWritable makeValueWritable(VectorizedRowBatch vrg, int rowIndex) - throws HiveException, SerDeException { - for (int i = 0; i < valueEval.length; i++) { - int batchColumn = valueEval[i].getOutputColumn(); - ColumnVector vectorColumn = vrg.cols[batchColumn]; - cachedValues[i] = valueWriters[i].writeValue(vectorColumn, rowIndex); + for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) { + Object row = getRowObject(vrg, batchIndex); + super.processOp(row, tag); } - // Serialize the value - return (BytesWritable)valueSerializer.serialize(cachedValues, valueObjectInspector); } - private int computeHashCode(VectorizedRowBatch vrg, int rowIndex, int buckNum) throws HiveException { - // Evaluate the HashCode - int keyHashCode = 0; - if (partitionEval.length == 0) { - // If no partition cols, just distribute the data uniformly to provide better - // load balance. If the requirement is to have a single reducer, we should set - // the number of reducers to 1. - // Use a constant seed to make the code deterministic. - if (random == null) { - random = new Random(12345); - } - keyHashCode = random.nextInt(); - } else { - for (int p = 0; p < partitionEval.length; p++) { - ColumnVector columnVector = vrg.cols[partitionEval[p].getOutputColumn()]; - Object partitionValue = partitionWriters[p].writeValue(columnVector, rowIndex); - keyHashCode = keyHashCode - * 31 - + ObjectInspectorUtils.hashCode( - partitionValue, - partitionWriters[p].getObjectInspector()); - } - } - return buckNum < 0 ? keyHashCode : keyHashCode * 31 + buckNum; - } - - private boolean partitionKeysAreNull(VectorizedRowBatch vrg, int rowIndex) + private Object[] getRowObject(VectorizedRowBatch vrg, int rowIndex) throws HiveException { - if (partitionEval.length != 0) { - for (int p = 0; p < partitionEval.length; p++) { - ColumnVector columnVector = vrg.cols[partitionEval[p].getOutputColumn()]; - Object partitionValue = partitionWriters[p].writeValue(columnVector, - rowIndex); - if (partitionValue != null) { - return false; - } + int batchIndex = rowIndex; + if (vrg.selectedInUse) { + batchIndex = vrg.selected[rowIndex]; + } + for (int i = 0; i < vrg.projectionSize; i++) { + ColumnVector vectorColumn = vrg.cols[vrg.projectedColumns[i]]; + if (vectorColumn != null) { + singleRow[i] = rowWriters[i].writeValue(vectorColumn, batchIndex); + } else { + // Some columns from tables are not used. + singleRow[i] = null; } - return true; - } - return false; - } - - private int computeBucketNumber(VectorizedRowBatch vrg, int rowIndex, int numBuckets) throws HiveException { - int bucketNum = 0; - for (int p = 0; p < bucketEval.length; p++) { - ColumnVector columnVector = vrg.cols[bucketEval[p].getOutputColumn()]; - Object bucketValue = bucketWriters[p].writeValue(columnVector, rowIndex); - bucketNum = bucketNum - * 31 - + ObjectInspectorUtils.hashCode( - bucketValue, - bucketWriters[p].getObjectInspector()); } - - if (bucketNum < 0) { - bucketNum = -1 * bucketNum; - } - - return bucketNum % numBuckets; - } - - static public String getOperatorName() { - return "RS"; - } - - public VectorExpression[] getPartitionEval() { - return partitionEval; - } - - public void setPartitionEval(VectorExpression[] partitionEval) { - this.partitionEval = partitionEval; - } - - public VectorExpression[] getValueEval() { - return valueEval; - } - - public void setValueEval(VectorExpression[] valueEval) { - this.valueEval = valueEval; - } - - public VectorExpression[] getKeyEval() { - return keyEval; - } - - public void setKeyEval(VectorExpression[] keyEval) { - this.keyEval = keyEval; + return singleRow; } }
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java Tue Oct 14 19:06:45 2014 @@ -285,8 +285,7 @@ public class VectorSMBMapJoinOperator ex Object[] values = (Object[]) row; VectorColumnAssign[] vcas = outputVectorAssigners.get(outputOI); if (null == vcas) { - Map<String, Map<String, Integer>> allColumnMaps = Utilities. - getMapRedWork(hconf).getMapWork().getScratchColumnMap(); + Map<String, Map<String, Integer>> allColumnMaps = Utilities.getScratchColumnMap(hconf); Map<String, Integer> columnMap = allColumnMaps.get(fileKey); vcas = VectorColumnAssignFactory.buildAssigners( outputBatch, outputOI, columnMap, conf.getOutputColumnNames()); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Tue Oct 14 19:06:45 2014 @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.exec.ve import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFAvgDecimal; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCount; +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountMerge; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountStar; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFSumDecimal; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFAvgDouble; @@ -1888,47 +1889,47 @@ public class VectorizationContext { // TODO: And, investigate if different reduce-side versions are needed for var* and std*, or if map-side aggregate can be used.. Right now they are conservatively // marked map-side (HASH). static ArrayList<AggregateDefinition> aggregatesDefinition = new ArrayList<AggregateDefinition>() {{ - add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFMinLong.class)); - add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMinDouble.class)); - add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMinString.class)); - add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMinDecimal.class)); - add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFMaxLong.class)); - add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMaxDouble.class)); - add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMaxString.class)); - add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMaxDecimal.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.NONE, GroupByDesc.Mode.HASH, VectorUDAFCountStar.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.MERGEPARTIAL, VectorUDAFSumLong.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); - add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFSumLong.class)); - add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFSumDouble.class)); - add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFSumDecimal.class)); - add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgLong.class)); - add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgDouble.class)); - add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFAvgDecimal.class)); - add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class)); - add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class)); - add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class)); - add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class)); - add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class)); - add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class)); - add(new AggregateDefinition("var_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampLong.class)); - add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampDouble.class)); - add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarSampDecimal.class)); - add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class)); - add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class)); - add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class)); - add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class)); - add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class)); - add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class)); - add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class)); - add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class)); - add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class)); - add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampLong.class)); - add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampDouble.class)); - add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdSampDecimal.class)); + add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY, null, VectorUDAFMinLong.class)); + add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMinDouble.class)); + add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMinString.class)); + add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMinDecimal.class)); + add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY, null, VectorUDAFMaxLong.class)); + add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMaxDouble.class)); + add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMaxString.class)); + add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMaxDecimal.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.NONE, GroupByDesc.Mode.HASH, VectorUDAFCountStar.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.MERGEPARTIAL, VectorUDAFCountMerge.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); + add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFSumLong.class)); + add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFSumDouble.class)); + add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFSumDecimal.class)); + add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgLong.class)); + add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgDouble.class)); + add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFAvgDecimal.class)); + add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class)); + add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class)); + add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class)); + add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class)); + add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class)); + add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class)); + add(new AggregateDefinition("var_samp", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampLong.class)); + add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampDouble.class)); + add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarSampDecimal.class)); + add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class)); + add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class)); + add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class)); + add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class)); + add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class)); + add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class)); + add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class)); + add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class)); + add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class)); + add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampLong.class)); + add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampDouble.class)); + add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdSampDecimal.class)); }}; public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc, boolean isReduce) Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Tue Oct 14 19:06:45 2014 @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.serde2.Col import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; @@ -131,13 +132,8 @@ public class VectorizedRowBatchCtx { */ public void init(Configuration hiveConf, String fileKey, StructObjectInspector rowOI) { - MapredWork mapredWork = Utilities.getMapRedWork(hiveConf); - Map<String, Map<Integer, String>> scratchColumnVectorTypes; - if (mapredWork.getMapWork() != null) { - scratchColumnVectorTypes = mapredWork.getMapWork().getScratchColumnVectorTypes(); - } else { - scratchColumnVectorTypes = mapredWork.getReduceWork().getScratchColumnVectorTypes(); - } + Map<String, Map<Integer, String>> scratchColumnVectorTypes = + Utilities.getScratchColumnVectorTypes(hiveConf); columnTypeMap = scratchColumnVectorTypes.get(fileKey); this.rowOI= rowOI; this.rawRowOI = rowOI; @@ -145,6 +141,20 @@ public class VectorizedRowBatchCtx { /** + * Initializes the VectorizedRowBatch context based on an scratch column type map and + * object inspector. + * @param columnTypeMap + * @param rowOI + * Object inspector that shapes the column types + */ + public void init(Map<Integer, String> columnTypeMap, + StructObjectInspector rowOI) { + this.columnTypeMap = columnTypeMap; + this.rowOI= rowOI; + this.rawRowOI = rowOI; + } + + /** * Initializes VectorizedRowBatch context based on the * split and Hive configuration (Job conf with hive Plan). * @@ -174,7 +184,7 @@ public class VectorizedRowBatchCtx { String partitionPath = split.getPath().getParent().toString(); columnTypeMap = Utilities - .getMapRedWork(hiveConf).getMapWork().getScratchColumnVectorTypes() + .getScratchColumnVectorTypes(hiveConf) .get(partitionPath); Properties partProps = @@ -476,7 +486,7 @@ public class VectorizedRowBatchCtx { lcv.isNull[0] = true; lcv.isRepeating = true; } else { - lcv.fill(((Date) value).getTime()); + lcv.fill(DateWritable.dateToDays((Date) value)); lcv.isNull[0] = false; } } @@ -489,7 +499,7 @@ public class VectorizedRowBatchCtx { lcv.isNull[0] = true; lcv.isRepeating = true; } else { - lcv.fill((long)(((Timestamp) value).getTime())); + lcv.fill(TimestampUtils.getTimeNanoSec((Timestamp) value)); lcv.isNull[0] = false; } } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java Tue Oct 14 19:06:45 2014 @@ -660,7 +660,7 @@ public final class VectorExpressionWrite @Override public Object writeValue(byte[] value, int start, int length) throws HiveException { this.text.set(value, start, length); - ((SettableStringObjectInspector) this.objectInspector).set(this.obj, this.text.toString()); + ((SettableStringObjectInspector) this.objectInspector).set(this.obj, this.text); return this.obj; } @@ -671,7 +671,7 @@ public final class VectorExpressionWrite field = initValue(null); } this.text.set(value, start, length); - ((SettableStringObjectInspector) this.objectInspector).set(field, this.text.toString()); + ((SettableStringObjectInspector) this.objectInspector).set(field, this.text); return field; } @@ -1060,6 +1060,31 @@ public final class VectorExpressionWrite closure.assign(writers, oids); } + /** + * Creates the value writers for an struct object inspector. + * Creates an appropriate output object inspector. + */ + public static void processVectorInspector( + StructObjectInspector structObjInspector, + SingleOIDClosure closure) + throws HiveException { + List<? extends StructField> fields = structObjInspector.getAllStructFieldRefs(); + VectorExpressionWriter[] writers = new VectorExpressionWriter[fields.size()]; + List<ObjectInspector> oids = new ArrayList<ObjectInspector>(writers.length); + ArrayList<String> columnNames = new ArrayList<String>(); + int i = 0; + for(StructField field : fields) { + ObjectInspector fieldObjInsp = field.getFieldObjectInspector(); + writers[i] = VectorExpressionWriterFactory. + genVectorExpressionWritable(fieldObjInsp); + columnNames.add(field.getFieldName()); + oids.add(writers[i].getObjectInspector()); + i++; + } + ObjectInspector objectInspector = ObjectInspectorFactory. + getStandardStructObjectInspector(columnNames,oids); + closure.assign(writers, objectInspector); + } /** * Returns {@link VectorExpressionWriter} objects for the fields in the given Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java Tue Oct 14 19:06:45 2014 @@ -46,15 +46,7 @@ public class VectorUDAFCount extends Vec private static final long serialVersionUID = 1L; - transient private long value; - transient private boolean isNull; - - public void initIfNull() { - if (isNull) { - isNull = false; - value = 0; - } - } + transient private long count; @Override public int getVariableSize() { @@ -63,8 +55,7 @@ public class VectorUDAFCount extends Vec @Override public void reset() { - isNull = true; - value = 0L; + count = 0L; } } @@ -131,8 +122,7 @@ public class VectorUDAFCount extends Vec aggregationBufferSets, aggregateIndex, i); - myagg.initIfNull(); - myagg.value++; + myagg.count++; } } @@ -148,8 +138,7 @@ public class VectorUDAFCount extends Vec aggregationBufferSets, aggregateIndex, i); - myagg.initIfNull(); - myagg.value++; + myagg.count++; } } } @@ -168,8 +157,7 @@ public class VectorUDAFCount extends Vec aggregationBufferSets, aggregateIndex, j); - myagg.initIfNull(); - myagg.value++; + myagg.count++; } } } @@ -191,17 +179,15 @@ public class VectorUDAFCount extends Vec Aggregation myagg = (Aggregation)agg; - myagg.initIfNull(); - if (inputVector.isRepeating) { if (inputVector.noNulls || !inputVector.isNull[0]) { - myagg.value += batchSize; + myagg.count += batchSize; } return; } if (inputVector.noNulls) { - myagg.value += batchSize; + myagg.count += batchSize; return; } else if (!batch.selectedInUse) { @@ -221,7 +207,7 @@ public class VectorUDAFCount extends Vec for (int j=0; j< batchSize; ++j) { int i = selected[j]; if (!isNull[i]) { - myagg.value += 1; + myagg.count += 1; } } } @@ -233,7 +219,7 @@ public class VectorUDAFCount extends Vec for (int i=0; i< batchSize; ++i) { if (!isNull[i]) { - myagg.value += 1; + myagg.count += 1; } } } @@ -251,14 +237,9 @@ public class VectorUDAFCount extends Vec @Override public Object evaluateOutput(AggregationBuffer agg) throws HiveException { - Aggregation myagg = (Aggregation) agg; - if (myagg.isNull) { - return null; - } - else { - result.set (myagg.value); + Aggregation myagg = (Aggregation) agg; + result.set (myagg.count); return result; - } } @Override Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java Tue Oct 14 19:06:45 2014 @@ -44,8 +44,7 @@ public class VectorUDAFCountStar extends private static final long serialVersionUID = 1L; - transient private long value; - transient private boolean isNull; + transient private long count; @Override public int getVariableSize() { @@ -54,8 +53,7 @@ public class VectorUDAFCountStar extends @Override public void reset() { - isNull = true; - value = 0L; + count = 0L; } } @@ -95,8 +93,7 @@ public class VectorUDAFCountStar extends for (int i=0; i < batchSize; ++i) { Aggregation myAgg = getCurrentAggregationBuffer( aggregationBufferSets, aggregateIndex, i); - myAgg.isNull = false; - ++myAgg.value; + ++myAgg.count; } } @@ -111,8 +108,7 @@ public class VectorUDAFCountStar extends } Aggregation myagg = (Aggregation)agg; - myagg.isNull = false; - myagg.value += batchSize; + myagg.count += batchSize; } @Override @@ -128,14 +124,9 @@ public class VectorUDAFCountStar extends @Override public Object evaluateOutput(AggregationBuffer agg) throws HiveException { - Aggregation myagg = (Aggregation) agg; - if (myagg.isNull) { - return null; - } - else { - result.set (myagg.value); + Aggregation myagg = (Aggregation) agg; + result.set (myagg.count); return result; - } } @Override Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java Tue Oct 14 19:06:45 2014 @@ -147,7 +147,7 @@ public class HookContext { } public String getOperationName() { - return SessionState.get().getHiveOperation().name(); + return queryPlan.getOperationName(); } public String getUserName() { Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java Tue Oct 14 19:06:45 2014 @@ -155,6 +155,8 @@ public interface AcidInputFormat<KEY ext public static interface RawReader<V> extends RecordReader<RecordIdentifier, V> { public ObjectInspector getObjectInspector(); + + public boolean isDelete(V value); } /** Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java Tue Oct 14 19:06:45 2014 @@ -40,6 +40,8 @@ import java.util.regex.Pattern; * are used by the compactor and cleaner and thus must be format agnostic. */ public class AcidUtils { + // This key will be put in the conf file when planning an acid operation + public static final String CONF_ACID_KEY = "hive.doing.acid"; public static final String BASE_PREFIX = "base_"; public static final String DELTA_PREFIX = "delta_"; public static final PathFilter deltaFileFilter = new PathFilter() { @@ -305,6 +307,28 @@ public class AcidUtils { } /** + * Is the given directory in ACID format? + * @param directory the partition directory to check + * @param conf the query configuration + * @return true, if it is an ACID directory + * @throws IOException + */ + public static boolean isAcid(Path directory, + Configuration conf) throws IOException { + FileSystem fs = directory.getFileSystem(conf); + for(FileStatus file: fs.listStatus(directory)) { + String filename = file.getPath().getName(); + if (filename.startsWith(BASE_PREFIX) || + filename.startsWith(DELTA_PREFIX)) { + if (file.isDir()) { + return true; + } + } + } + return false; + } + + /** * Get the ACID state of the given directory. It finds the minimal set of * base and diff directories. Note that because major compactions don't * preserve the history, we can't use a base directory that includes a @@ -324,7 +348,7 @@ public class AcidUtils { long bestBaseTxn = 0; final List<ParsedDelta> deltas = new ArrayList<ParsedDelta>(); List<ParsedDelta> working = new ArrayList<ParsedDelta>(); - final List<FileStatus> original = new ArrayList<FileStatus>(); + List<FileStatus> originalDirectories = new ArrayList<FileStatus>(); final List<FileStatus> obsolete = new ArrayList<FileStatus>(); List<FileStatus> children = SHIMS.listLocatedStatus(fs, directory, hiddenFileFilter); @@ -351,16 +375,26 @@ public class AcidUtils { working.add(delta); } } else { - findOriginals(fs, child, original); + // This is just the directory. We need to recurse and find the actual files. But don't + // do this until we have determined there is no base. This saves time. Plus, + // it is possible that the cleaner is running and removing these original files, + // in which case recursing through them could cause us to get an error. + originalDirectories.add(child); } } + final List<FileStatus> original = new ArrayList<FileStatus>(); // if we have a base, the original files are obsolete. if (bestBase != null) { - obsolete.addAll(original); // remove the entries so we don't get confused later and think we should // use them. original.clear(); + } else { + // Okay, we're going to need these originals. Recurse through them and figure out what we + // really need. + for (FileStatus origDir : originalDirectories) { + findOriginals(fs, origDir, original); + } } Collections.sort(working); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java Tue Oct 14 19:06:45 2014 @@ -122,24 +122,7 @@ public class BucketizedHiveInputFormat<K public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { init(job); - Path[] dirs = FileInputFormat.getInputPaths(job); - if (dirs.length == 0) { - // on tez we're avoiding to duplicate the file info in FileInputFormat. - if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { - try { - List<Path> paths = Utilities.getInputPathsTez(job, mrwork); - dirs = paths.toArray(new Path[paths.size()]); - if (dirs.length == 0) { - // if we still don't have any files it's time to fail. - throw new IOException("No input paths specified in job"); - } - } catch (Exception e) { - throw new IOException("Could not create input paths", e); - } - } else { - throw new IOException("No input paths specified in job"); - } - } + Path[] dirs = getInputPaths(job); JobConf newjob = new JobConf(job); ArrayList<InputSplit> result = new ArrayList<InputSplit>(); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java Tue Oct 14 19:06:45 2014 @@ -33,6 +33,7 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -264,8 +265,8 @@ public class CombineHiveInputFormat<K ex /** * Create Hive splits based on CombineFileSplit. */ - @Override - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + private InputSplit[] getCombineSplits(JobConf job, + int numSplits) throws IOException { PerfLogger perfLogger = PerfLogger.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS); init(job); @@ -274,17 +275,6 @@ public class CombineHiveInputFormat<K ex mrwork.getAliasToWork(); CombineFileInputFormatShim combine = ShimLoader.getHadoopShims() .getCombineFileInputFormat(); - - // on tez we're avoiding duplicating path info since the info will go over - // rpc - if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { - try { - List<Path> dirs = Utilities.getInputPathsTez(job, mrwork); - Utilities.setInputPaths(job, dirs); - } catch (Exception e) { - throw new IOException("Could not create input paths", e); - } - } InputSplit[] splits = null; if (combine == null) { @@ -327,13 +317,6 @@ public class CombineHiveInputFormat<K ex // ignore } FileSystem inpFs = path.getFileSystem(job); - if (inputFormatClass.isAssignableFrom(OrcInputFormat.class)) { - if (inpFs.exists(new Path(path, OrcRecordUpdater.ACID_FORMAT))) { - throw new IOException("CombineHiveInputFormat is incompatible " + - " with ACID tables. Please set hive.input.format=" + - "org.apache.hadoop.hive.ql.io.HiveInputFormat"); - } - } // Since there is no easy way of knowing whether MAPREDUCE-1597 is present in the tree or not, // we use a configuration variable for the same @@ -461,6 +444,84 @@ public class CombineHiveInputFormat<K ex return result.toArray(new CombineHiveInputSplit[result.size()]); } + /** + * Create Hive splits based on CombineFileSplit. + */ + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + init(job); + Map<String, ArrayList<String>> pathToAliases = mrwork.getPathToAliases(); + Map<String, Operator<? extends OperatorDesc>> aliasToWork = + mrwork.getAliasToWork(); + + ArrayList<InputSplit> result = new ArrayList<InputSplit>(); + + Path[] paths = getInputPaths(job); + + List<Path> nonCombinablePaths = new ArrayList<Path>(paths.length / 2); + List<Path> combinablePaths = new ArrayList<Path>(paths.length / 2); + + for (Path path : paths) { + + PartitionDesc part = + HiveFileFormatUtils.getPartitionDescFromPathRecursively( + pathToPartitionInfo, path, + IOPrepareCache.get().allocatePartitionDescMap()); + + // Use HiveInputFormat if any of the paths is not splittable + Class inputFormatClass = part.getInputFileFormatClass(); + String inputFormatClassName = inputFormatClass.getName(); + InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); + if (inputFormat instanceof AvoidSplitCombination && + ((AvoidSplitCombination) inputFormat).shouldSkipCombine(path, job)) { + if (LOG.isDebugEnabled()) { + LOG.debug("The split [" + path + + "] is being parked for HiveInputFormat.getSplits"); + } + nonCombinablePaths.add(path); + } else { + combinablePaths.add(path); + } + } + + // Store the previous value for the path specification + String oldPaths = job.get(HiveConf.ConfVars.HADOOPMAPREDINPUTDIR.varname); + if (LOG.isDebugEnabled()) { + LOG.debug("The received input paths are: [" + oldPaths + + "] against the property " + + HiveConf.ConfVars.HADOOPMAPREDINPUTDIR.varname); + } + + // Process the normal splits + if (nonCombinablePaths.size() > 0) { + FileInputFormat.setInputPaths(job, nonCombinablePaths.toArray + (new Path[nonCombinablePaths.size()])); + InputSplit[] splits = super.getSplits(job, numSplits); + for (InputSplit split : splits) { + result.add(split); + } + } + + // Process the combine splits + if (combinablePaths.size() > 0) { + FileInputFormat.setInputPaths(job, combinablePaths.toArray + (new Path[combinablePaths.size()])); + InputSplit[] splits = getCombineSplits(job, numSplits); + for (InputSplit split : splits) { + result.add(split); + } + } + + // Restore the old path information back + // This is just to prevent incompatibilities with previous versions Hive + // if some application depends on the original value being set. + if (oldPaths != null) { + job.set(HiveConf.ConfVars.HADOOPMAPREDINPUTDIR.varname, oldPaths); + } + LOG.info("Number of all splits " + result.size()); + return result.toArray(new InputSplit[result.size()]); + } + private void processPaths(JobConf job, CombineFileInputFormatShim combine, List<InputSplitShim> iss, Path... path) throws IOException { JobConf currJob = new JobConf(job); @@ -635,4 +696,12 @@ public class CombineHiveInputFormat<K ex return s.toString(); } } + + /** + * This is a marker interface that is used to identify the formats where + * combine split generation is not applicable + */ + public interface AvoidSplitCombination { + boolean shouldSkipCombine(Path path, Configuration conf) throws IOException; + } } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java Tue Oct 14 19:06:45 2014 @@ -161,10 +161,11 @@ public abstract class HiveContextAwareRe } public IOContext getIOContext() { - return IOContext.get(); + return IOContext.get(jobConf.get(Utilities.INPUT_NAME)); } - public void initIOContext(long startPos, boolean isBlockPointer, Path inputPath) { + private void initIOContext(long startPos, boolean isBlockPointer, + Path inputPath) { ioCxtRef = this.getIOContext(); ioCxtRef.currentBlockStart = startPos; ioCxtRef.isBlockPointer = isBlockPointer; @@ -183,7 +184,7 @@ public abstract class HiveContextAwareRe boolean blockPointer = false; long blockStart = -1; - FileSplit fileSplit = (FileSplit) split; + FileSplit fileSplit = split; Path path = fileSplit.getPath(); FileSystem fs = path.getFileSystem(job); if (inputFormatClass.getName().contains("SequenceFile")) { @@ -202,12 +203,15 @@ public abstract class HiveContextAwareRe blockStart = in.getPosition(); in.close(); } + this.jobConf = job; this.initIOContext(blockStart, blockPointer, path.makeQualified(fs)); this.initIOContextSortedProps(split, recordReader, job); } public void initIOContextSortedProps(FileSplit split, RecordReader recordReader, JobConf job) { + this.jobConf = job; + this.getIOContext().resetSortingValues(); this.isSorted = jobConf.getBoolean("hive.input.format.sorted", false); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Tue Oct 14 19:06:45 2014 @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.exec.Ut import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; @@ -279,7 +280,14 @@ public class HiveInputFormat<K extends W } protected void init(JobConf job) { - mrwork = Utilities.getMapWork(job); + if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + mrwork = (MapWork) Utilities.getMergeWork(job); + if (mrwork == null) { + mrwork = Utilities.getMapWork(job); + } + } else { + mrwork = Utilities.getMapWork(job); + } pathToPartitionInfo = mrwork.getPathToPartitionInfo(); } @@ -321,11 +329,7 @@ public class HiveInputFormat<K extends W } } - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - PerfLogger perfLogger = PerfLogger.getPerfLogger(); - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS); - init(job); - + Path[] getInputPaths(JobConf job) throws IOException { Path[] dirs = FileInputFormat.getInputPaths(job); if (dirs.length == 0) { // on tez we're avoiding to duplicate the file info in FileInputFormat. @@ -340,6 +344,14 @@ public class HiveInputFormat<K extends W throw new IOException("No input paths specified in job"); } } + return dirs; + } + + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + PerfLogger perfLogger = PerfLogger.getPerfLogger(); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS); + init(job); + Path[] dirs = getInputPaths(job); JobConf newjob = new JobConf(job); List<InputSplit> result = new ArrayList<InputSplit>(); @@ -442,6 +454,9 @@ public class HiveInputFormat<K extends W public static void pushFilters(JobConf jobConf, TableScanOperator tableScan) { + // ensure filters are not set from previous pushFilters + jobConf.unset(TableScanDesc.FILTER_TEXT_CONF_STR); + jobConf.unset(TableScanDesc.FILTER_EXPR_CONF_STR); TableScanDesc scanDesc = tableScan.getConf(); if (scanDesc == null) { return; Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Tue Oct 14 19:06:45 2014 @@ -18,7 +18,13 @@ package org.apache.hadoop.hive.ql.io; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin; import org.apache.hadoop.hive.ql.session.SessionState; @@ -32,20 +38,25 @@ import org.apache.hadoop.hive.ql.session */ public class IOContext { - private static ThreadLocal<IOContext> threadLocal = new ThreadLocal<IOContext>(){ @Override protected synchronized IOContext initialValue() { return new IOContext(); } }; - private static IOContext ioContext = new IOContext(); + private static Map<String, IOContext> inputNameIOContextMap = new HashMap<String, IOContext>(); + private static IOContext ioContext = new IOContext(); - public static IOContext get() { - if (SessionState.get() == null) { - // this happens on the backend. only one io context needed. - return ioContext; + public static Map<String, IOContext> getMap() { + return inputNameIOContextMap; + } + + public static IOContext get(String inputName) { + if (inputNameIOContextMap.containsKey(inputName) == false) { + IOContext ioContext = new IOContext(); + inputNameIOContextMap.put(inputName, ioContext); } - return IOContext.threadLocal.get(); + + return inputNameIOContextMap.get(inputName); } public static void clear() { Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java Tue Oct 14 19:06:45 2014 @@ -21,6 +21,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.EnumSet; +import javax.annotation.Nullable; + interface CompressionCodec { public enum Modifier { @@ -62,6 +64,6 @@ interface CompressionCodec { * @param modifiers compression modifiers * @return codec for use after optional modification */ - CompressionCodec modify(EnumSet<Modifier> modifiers); + CompressionCodec modify(@Nullable EnumSet<Modifier> modifiers); } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Tue Oct 14 19:06:45 2014 @@ -24,6 +24,8 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; +import java.util.NavigableMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -49,6 +51,7 @@ import org.apache.hadoop.hive.ql.exec.ve import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface; import org.apache.hadoop.hive.ql.io.RecordIdentifier; @@ -103,7 +106,9 @@ import com.google.common.util.concurrent */ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, InputFormatChecker, VectorizedInputFormatInterface, - AcidInputFormat<NullWritable, OrcStruct>, LlapWrappableInputFormatInterface { + AcidInputFormat<NullWritable, OrcStruct>, + CombineHiveInputFormat.AvoidSplitCombination, + LlapWrappableInputFormatInterface { private static final Log LOG = LogFactory.getLog(OrcInputFormat.class); static final HadoopShims SHIMS = ShimLoader.getHadoopShims(); @@ -128,6 +133,12 @@ public class OrcInputFormat implements */ private static final double MIN_INCLUDED_LOCATION = 0.80; + @Override + public boolean shouldSkipCombine(Path path, + Configuration conf) throws IOException { + return (conf.get(AcidUtils.CONF_ACID_KEY) != null) || AcidUtils.isAcid(path, conf); + } + private static class OrcRecordReader implements org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct>, StatsProvidingRecordReader { @@ -611,7 +622,7 @@ public class OrcInputFormat implements private final FileSystem fs; private final FileStatus file; private final long blockSize; - private final BlockLocation[] locations; + private final TreeMap<Long, BlockLocation> locations; private final FileInfo fileInfo; private List<StripeInformation> stripes; private ReaderImpl.FileMetaInfo fileMetaInfo; @@ -631,7 +642,7 @@ public class OrcInputFormat implements this.file = file; this.blockSize = file.getBlockSize(); this.fileInfo = fileInfo; - locations = SHIMS.getLocations(fs, file); + locations = SHIMS.getLocationsWithOffset(fs, file); this.isOriginal = isOriginal; this.deltas = deltas; this.hasBase = hasBase; @@ -642,8 +653,8 @@ public class OrcInputFormat implements } void schedule() throws IOException { - if(locations.length == 1 && file.getLen() < context.maxSize) { - String[] hosts = locations[0].getHosts(); + if(locations.size() == 1 && file.getLen() < context.maxSize) { + String[] hosts = locations.firstEntry().getValue().getHosts(); synchronized (context.splits) { context.splits.add(new OrcSplit(file.getPath(), 0, file.getLen(), hosts, fileMetaInfo, isOriginal, hasBase, deltas)); @@ -691,15 +702,22 @@ public class OrcInputFormat implements void createSplit(long offset, long length, ReaderImpl.FileMetaInfo fileMetaInfo) throws IOException { String[] hosts; - if ((offset % blockSize) + length <= blockSize) { + Map.Entry<Long, BlockLocation> startEntry = locations.floorEntry(offset); + BlockLocation start = startEntry.getValue(); + if (offset + length <= start.getOffset() + start.getLength()) { // handle the single block case - hosts = locations[(int) (offset / blockSize)].getHosts(); + hosts = start.getHosts(); } else { + Map.Entry<Long, BlockLocation> endEntry = locations.floorEntry(offset + length); + BlockLocation end = endEntry.getValue(); + //get the submap + NavigableMap<Long, BlockLocation> navigableMap = locations.subMap(startEntry.getKey(), + true, endEntry.getKey(), true); // Calculate the number of bytes in the split that are local to each // host. Map<String, LongWritable> sizes = new HashMap<String, LongWritable>(); long maxSize = 0; - for(BlockLocation block: locations) { + for (BlockLocation block : navigableMap.values()) { long overlap = getOverlap(offset, length, block.getOffset(), block.getLength()); if (overlap > 0) { @@ -712,6 +730,9 @@ public class OrcInputFormat implements val.set(val.get() + overlap); maxSize = Math.max(maxSize, val.get()); } + } else { + throw new IOException("File " + file.getPath().toString() + + " should have had overlap on block starting at " + block.getOffset()); } } // filter the list of locations to those that have at least 80% of the @@ -719,7 +740,7 @@ public class OrcInputFormat implements long threshold = (long) (maxSize * MIN_INCLUDED_LOCATION); List<String> hostList = new ArrayList<String>(); // build the locations in a predictable order to simplify testing - for(BlockLocation block: locations) { + for(BlockLocation block: navigableMap.values()) { for(String host: block.getHosts()) { if (sizes.containsKey(host)) { if (sizes.get(host).get() >= threshold) { @@ -1115,7 +1136,7 @@ public class OrcInputFormat implements @Override public ObjectInspector getObjectInspector() { - return ((StructObjectInspector) reader.getObjectInspector()) + return ((StructObjectInspector) records.getObjectInspector()) .getAllStructFieldRefs().get(OrcRecordUpdater.ROW) .getFieldObjectInspector(); } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java Tue Oct 14 19:06:45 2014 @@ -118,13 +118,11 @@ public class OrcNewInputFormat extends I public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_SPLITS); - Configuration conf = - ShimLoader.getHadoopShims().getConfiguration(jobContext); List<OrcSplit> splits = OrcInputFormat.generateSplitsInfo(ShimLoader.getHadoopShims() .getConfiguration(jobContext)); - List<InputSplit> result = new ArrayList<InputSplit>(); - for(OrcSplit split: OrcInputFormat.generateSplitsInfo(conf)) { + List<InputSplit> result = new ArrayList<InputSplit>(splits.size()); + for(OrcSplit split: splits) { result.add(new OrcNewSplit(split)); } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_SPLITS); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java Tue Oct 14 19:06:45 2014 @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.common.Val import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; @@ -37,9 +38,10 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import java.io.IOException; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; +import java.util.Deque; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -448,6 +450,10 @@ public class OrcRawRecordMerger implemen // we always want to read all of the deltas eventOptions.range(0, Long.MAX_VALUE); + // Turn off the sarg before pushing it to delta. We never want to push a sarg to a delta as + // it can produce wrong results (if the latest valid version of the record is filtered out by + // the sarg) or ArrayOutOfBounds errors (when the sarg is applied to a delete record) + eventOptions.searchArgument(null, null); if (deltaDirectory != null) { for(Path delta: deltaDirectory) { ReaderKey key = new ReaderKey(); @@ -627,8 +633,16 @@ public class OrcRawRecordMerger implemen // Parse the configuration parameters ArrayList<String> columnNames = new ArrayList<String>(); + Deque<Integer> virtualColumns = new ArrayDeque<Integer>(); if (columnNameProperty != null && columnNameProperty.length() > 0) { - Collections.addAll(columnNames, columnNameProperty.split(",")); + String[] colNames = columnNameProperty.split(","); + for (int i = 0; i < colNames.length; i++) { + if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(colNames[i])) { + virtualColumns.addLast(i); + } else { + columnNames.add(colNames[i]); + } + } } if (columnTypeProperty == null) { // Default type: all string @@ -644,6 +658,9 @@ public class OrcRawRecordMerger implemen ArrayList<TypeInfo> fieldTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); + while (virtualColumns.size() > 0) { + fieldTypes.remove(virtualColumns.removeLast()); + } StructTypeInfo rowType = new StructTypeInfo(); rowType.setAllStructFieldNames(columnNames); rowType.setAllStructFieldTypeInfos(fieldTypes); @@ -651,6 +668,11 @@ public class OrcRawRecordMerger implemen (OrcStruct.createObjectInspector(rowType)); } + @Override + public boolean isDelete(OrcStruct value) { + return OrcRecordUpdater.getOperation(value) == OrcRecordUpdater.DELETE_OPERATION; + } + /** * Get the number of columns in the underlying rows. * @return 0 if there are no base and no deltas. Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Tue Oct 14 19:06:45 2014 @@ -81,6 +81,7 @@ import com.google.common.collect.Compari class RecordReaderImpl implements RecordReader { private static final Log LOG = LogFactory.getLog(RecordReaderImpl.class); + private static final boolean isLogTraceEnabled = LOG.isTraceEnabled(); private final FSDataInputStream file; private final long firstRow; @@ -3326,9 +3327,9 @@ class RecordReaderImpl implements Record // find the next row rowInStripe += 1; advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true); - if (LOG.isDebugEnabled()) { - LOG.debug("row from " + reader.path); - LOG.debug("orc row = " + result); + if (isLogTraceEnabled) { + LOG.trace("row from " + reader.path); + LOG.trace("orc row = " + result); } return result; }
