Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java Sun Oct 5 22:26:43 2014 @@ -18,99 +18,30 @@ package org.apache.hadoop.hive.ql.exec.vector; -import java.io.IOException; -import java.util.Random; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.exec.PTFTopNHash; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.TopNHash; -import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; -import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; -import org.apache.hadoop.hive.ql.plan.TableDesc; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.Serializer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -// import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; public class VectorReduceSinkOperator extends ReduceSinkOperator { - private static final Log LOG = LogFactory.getLog( - VectorReduceSinkOperator.class.getName()); - private static final long serialVersionUID = 1L; - /** - * The evaluators for the key columns. Key columns decide the sort order on - * the reducer side. Key columns are passed to the reducer in the "key". - */ - private VectorExpression[] keyEval; - - /** - * The key value writers. These know how to write the necessary writable type - * based on key column metadata, from the primitive vector type. - */ - private transient VectorExpressionWriter[] keyWriters; - - /** - * The evaluators for the value columns. Value columns are passed to reducer - * in the "value". - */ - private VectorExpression[] valueEval; - - /** - * The output value writers. These know how to write the necessary writable type - * based on value column metadata, from the primitive vector type. - */ - private transient VectorExpressionWriter[] valueWriters; - - /** - * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in - * Hive language). Partition columns decide the reducer that the current row - * goes to. Partition columns are not passed to reducer. - */ - private VectorExpression[] partitionEval; - - /** - * Evaluators for bucketing columns. This is used to compute bucket number. - */ - private VectorExpression[] bucketEval; - private int buckColIdxInKey; - - /** - * The partition value writers. These know how to write the necessary writable type - * based on partition column metadata, from the primitive vector type. - */ - private transient VectorExpressionWriter[] partitionWriters; - private transient VectorExpressionWriter[] bucketWriters = null; - - private static final boolean isDebugEnabled = LOG.isDebugEnabled(); + // Writer for producing row from input batch. + private VectorExpressionWriter[] rowWriters; + + protected transient Object[] singleRow; public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf) throws HiveException { this(); ReduceSinkDesc desc = (ReduceSinkDesc) conf; this.conf = desc; - keyEval = vContext.getVectorExpressions(desc.getKeyCols()); - valueEval = vContext.getVectorExpressions(desc.getValueCols()); - partitionEval = vContext.getVectorExpressions(desc.getPartitionCols()); - bucketEval = null; - if (desc.getBucketCols() != null && !desc.getBucketCols().isEmpty()) { - bucketEval = vContext.getVectorExpressions(desc.getBucketCols()); - buckColIdxInKey = desc.getPartitionCols().size(); - } } public VectorReduceSinkOperator() { @@ -119,399 +50,49 @@ public class VectorReduceSinkOperator ex @Override protected void initializeOp(Configuration hconf) throws HiveException { - try { - numDistributionKeys = conf.getNumDistributionKeys(); - distinctColIndices = conf.getDistinctColumnIndices(); - numDistinctExprs = distinctColIndices.size(); - - TableDesc keyTableDesc = conf.getKeySerializeInfo(); - keySerializer = (Serializer) keyTableDesc.getDeserializerClass() - .newInstance(); - keySerializer.initialize(null, keyTableDesc.getProperties()); - keyIsText = keySerializer.getSerializedClass().equals(Text.class); - - /* - * Compute and assign the key writers and the key object inspector - */ - VectorExpressionWriterFactory.processVectorExpressions( - conf.getKeyCols(), - conf.getOutputKeyColumnNames(), - new VectorExpressionWriterFactory.SingleOIDClosure() { - @Override - public void assign(VectorExpressionWriter[] writers, - ObjectInspector objectInspector) { - keyWriters = writers; - keyObjectInspector = objectInspector; - } - }); - - String colNames = ""; - for(String colName : conf.getOutputKeyColumnNames()) { - colNames = String.format("%s %s", colNames, colName); - } - - if (isDebugEnabled) { - LOG.debug(String.format("keyObjectInspector [%s]%s => %s", - keyObjectInspector.getClass(), - keyObjectInspector, - colNames)); - } - - partitionWriters = VectorExpressionWriterFactory.getExpressionWriters(conf.getPartitionCols()); - if (conf.getBucketCols() != null && !conf.getBucketCols().isEmpty()) { - bucketWriters = VectorExpressionWriterFactory.getExpressionWriters(conf.getBucketCols()); - } - - TableDesc valueTableDesc = conf.getValueSerializeInfo(); - valueSerializer = (Serializer) valueTableDesc.getDeserializerClass() - .newInstance(); - valueSerializer.initialize(null, valueTableDesc.getProperties()); - - /* - * Compute and assign the value writers and the value object inspector - */ - VectorExpressionWriterFactory.processVectorExpressions( - conf.getValueCols(), - conf.getOutputValueColumnNames(), - new VectorExpressionWriterFactory.SingleOIDClosure() { - @Override - public void assign(VectorExpressionWriter[] writers, - ObjectInspector objectInspector) { - valueWriters = writers; - valueObjectInspector = objectInspector; + // We need a input object inspector that is for the row we will extract out of the + // vectorized row batch, not for example, an original inspector for an ORC table, etc. + VectorExpressionWriterFactory.processVectorInspector( + (StructObjectInspector) inputObjInspectors[0], + new VectorExpressionWriterFactory.SingleOIDClosure() { + @Override + public void assign(VectorExpressionWriter[] writers, + ObjectInspector objectInspector) { + rowWriters = writers; + inputObjInspectors[0] = objectInspector; } - }); - - if (isDebugEnabled) { - colNames = ""; - for(String colName : conf.getOutputValueColumnNames()) { - colNames = String.format("%s %s", colNames, colName); - } - } - - if (isDebugEnabled) { - LOG.debug(String.format("valueObjectInspector [%s]%s => %s", - valueObjectInspector.getClass(), - valueObjectInspector, - colNames)); - } + }); + singleRow = new Object[rowWriters.length]; - int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1; - int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 : - numDistributionKeys; - cachedKeys = new Object[numKeys][keyLen]; - cachedValues = new Object[valueEval.length]; - - int tag = conf.getTag(); - tagByte[0] = (byte) tag; - LOG.info("Using tag = " + tag); - - int limit = conf.getTopN(); - float memUsage = conf.getTopNMemoryUsage(); - if (limit >= 0 && memUsage > 0) { - reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : reducerHash; - reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this); - } - - autoParallel = conf.isAutoParallel(); - - } catch(Exception e) { - throw new HiveException(e); - } + // Call ReduceSinkOperator with new input inspector. + super.initializeOp(hconf); } @Override - public void processOp(Object row, int tag) throws HiveException { - VectorizedRowBatch vrg = (VectorizedRowBatch) row; - - if (isDebugEnabled) { - LOG.debug(String.format("sinking %d rows, %d values, %d keys, %d parts", - vrg.size, - valueEval.length, - keyEval.length, - partitionEval.length)); - } - - try { - // Evaluate the keys - for (int i = 0; i < keyEval.length; i++) { - keyEval[i].evaluate(vrg); - } - - // Determine which rows we need to emit based on topN optimization - int startResult = reducerHash.startVectorizedBatch(vrg.size); - if (startResult == TopNHash.EXCLUDE) { - return; // TopN wants us to exclude all rows. - } - // TODO: can we do this later/only for the keys that are needed? E.g. update vrg.selected. - for (int i = 0; i < partitionEval.length; i++) { - partitionEval[i].evaluate(vrg); - } - if (bucketEval != null) { - for (int i = 0; i < bucketEval.length; i++) { - bucketEval[i].evaluate(vrg); - } - } - // run the vector evaluations - for (int i = 0; i < valueEval.length; i++) { - valueEval[i].evaluate(vrg); - } - - boolean useTopN = startResult != TopNHash.FORWARD; - // Go thru the batch once. If we are not using TopN, we will forward all things and be done. - // If we are using topN, we will make the first key for each row and store/forward it. - // Values, hashes and additional distinct rows will be handled in the 2nd pass in that case. - for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) { - int rowIndex = batchIndex; - if (vrg.selectedInUse) { - rowIndex = vrg.selected[batchIndex]; - } - // First, make distrib key components for this row and determine distKeyLength. - populatedCachedDistributionKeys(vrg, rowIndex, 0); - - // replace bucketing columns with hashcode % numBuckets - int buckNum = -1; - if (bucketEval != null) { - buckNum = computeBucketNumber(vrg, rowIndex, conf.getNumBuckets()); - cachedKeys[0][buckColIdxInKey] = new IntWritable(buckNum); - } - HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null); - int distKeyLength = firstKey.getDistKeyLength(); - // Add first distinct expression, if any. - if (numDistinctExprs > 0) { - populateCachedDistinctKeys(vrg, rowIndex, 0); - firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength); - } - - final int hashCode; - - // distKeyLength doesn't include tag, but includes buckNum in cachedKeys[0] - if (autoParallel && partitionEval.length > 0) { - hashCode = computeMurmurHash(firstKey); - } else { - hashCode = computeHashCode(vrg, rowIndex, buckNum); - } - - firstKey.setHashCode(hashCode); - - if (useTopN) { - /* - * in case of TopN for windowing, we need to distinguish between - * rows with null partition keys and rows with value 0 for partition keys. - */ - boolean partkeysNull = conf.isPTFReduceSink() && partitionKeysAreNull(vrg, rowIndex); - reducerHash.tryStoreVectorizedKey(firstKey, partkeysNull, batchIndex); - } else { - // No TopN, just forward the first key and all others. - BytesWritable value = makeValueWritable(vrg, rowIndex); - collect(firstKey, value); - forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 0); - } - } - - if (!useTopN) return; // All done. - - // If we use topN, we have called tryStore on every key now. We can process the results. - for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) { - int result = reducerHash.getVectorizedBatchResult(batchIndex); - if (result == TopNHash.EXCLUDE) continue; - int rowIndex = batchIndex; - if (vrg.selectedInUse) { - rowIndex = vrg.selected[batchIndex]; - } - // Compute value and hashcode - we'd either store or forward them. - BytesWritable value = makeValueWritable(vrg, rowIndex); - int distKeyLength = -1; - int hashCode; - if (result == TopNHash.FORWARD) { - HiveKey firstKey = reducerHash.getVectorizedKeyToForward(batchIndex); - distKeyLength = firstKey.getDistKeyLength(); - hashCode = firstKey.hashCode(); - collect(firstKey, value); - } else { - hashCode = reducerHash.getVectorizedKeyHashCode(batchIndex); - reducerHash.storeValue(result, hashCode, value, true); - distKeyLength = reducerHash.getVectorizedKeyDistLength(batchIndex); - } - // Now forward other the rows if there's multi-distinct (but see TODO in forward...). - // Unfortunately, that means we will have to rebuild the cachedKeys. Start at 1. - if (numDistinctExprs > 1) { - populatedCachedDistributionKeys(vrg, rowIndex, 1); - forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 1); - } - } - } catch (SerDeException e) { - throw new HiveException(e); - } catch (IOException e) { - throw new HiveException(e); - } - } - - /** - * This function creates and forwards all the additional KVs for the multi-distinct case, - * after the first (0th) KV pertaining to the row has already been stored or forwarded. - * @param vrg the batch - * @param rowIndex the row index in the batch - * @param hashCode the partitioning hash code to use; same as for the first KV - * @param value the value to use; same as for the first KV - * @param distKeyLength the distribution key length of the first key; TODO probably extraneous - * @param tag the tag - * @param baseIndex the index in cachedKeys where the pre-evaluated distribution keys are stored - */ - private void forwardExtraDistinctRows(VectorizedRowBatch vrg, int rowIndex,int hashCode, - BytesWritable value, int distKeyLength, int tag, int baseIndex) - throws HiveException, SerDeException, IOException { - // TODO: We don't have to forward extra distinct rows immediately (same in non-vector) if - // the first key has already been stored. There's few bytes difference between keys - // for different distincts, and the value/etc. are all the same. - // We could store deltas to re-gen extra rows when flushing TopN. - for (int i = 1; i < numDistinctExprs; i++) { - if (i != baseIndex) { - System.arraycopy(cachedKeys[baseIndex], 0, cachedKeys[i], 0, numDistributionKeys); - } - populateCachedDistinctKeys(vrg, rowIndex, i); - HiveKey hiveKey = toHiveKey(cachedKeys[i], tag, distKeyLength); - hiveKey.setHashCode(hashCode); - collect(hiveKey, value); - } - } - - /** - * Populate distribution keys part of cachedKeys for a particular row from the batch. - * @param vrg the batch - * @param rowIndex the row index in the batch - * @param index the cachedKeys index to write to - */ - private void populatedCachedDistributionKeys( - VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException { - for (int i = 0; i < numDistributionKeys; i++) { - int batchColumn = keyEval[i].getOutputColumn(); - ColumnVector vectorColumn = vrg.cols[batchColumn]; - cachedKeys[index][i] = keyWriters[i].writeValue(vectorColumn, rowIndex); - } - if (cachedKeys[index].length > numDistributionKeys) { - cachedKeys[index][numDistributionKeys] = null; - } - } - - /** - * Populate distinct keys part of cachedKeys for a particular row from the batch. - * @param vrg the batch - * @param rowIndex the row index in the batch - * @param index the cachedKeys index to write to - */ - private void populateCachedDistinctKeys( - VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException { - StandardUnion union; - cachedKeys[index][numDistributionKeys] = union = new StandardUnion( - (byte)index, new Object[distinctColIndices.get(index).size()]); - Object[] distinctParameters = (Object[]) union.getObject(); - for (int distinctParamI = 0; distinctParamI < distinctParameters.length; distinctParamI++) { - int distinctColIndex = distinctColIndices.get(index).get(distinctParamI); - int batchColumn = keyEval[distinctColIndex].getOutputColumn(); - distinctParameters[distinctParamI] = - keyWriters[distinctColIndex].writeValue(vrg.cols[batchColumn], rowIndex); - } - union.setTag((byte) index); - } + public void processOp(Object data, int tag) throws HiveException { + VectorizedRowBatch vrg = (VectorizedRowBatch) data; - private BytesWritable makeValueWritable(VectorizedRowBatch vrg, int rowIndex) - throws HiveException, SerDeException { - for (int i = 0; i < valueEval.length; i++) { - int batchColumn = valueEval[i].getOutputColumn(); - ColumnVector vectorColumn = vrg.cols[batchColumn]; - cachedValues[i] = valueWriters[i].writeValue(vectorColumn, rowIndex); + for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) { + Object row = getRowObject(vrg, batchIndex); + super.processOp(row, tag); } - // Serialize the value - return (BytesWritable)valueSerializer.serialize(cachedValues, valueObjectInspector); } - private int computeHashCode(VectorizedRowBatch vrg, int rowIndex, int buckNum) throws HiveException { - // Evaluate the HashCode - int keyHashCode = 0; - if (partitionEval.length == 0) { - // If no partition cols, just distribute the data uniformly to provide better - // load balance. If the requirement is to have a single reducer, we should set - // the number of reducers to 1. - // Use a constant seed to make the code deterministic. - if (random == null) { - random = new Random(12345); - } - keyHashCode = random.nextInt(); - } else { - for (int p = 0; p < partitionEval.length; p++) { - ColumnVector columnVector = vrg.cols[partitionEval[p].getOutputColumn()]; - Object partitionValue = partitionWriters[p].writeValue(columnVector, rowIndex); - keyHashCode = keyHashCode - * 31 - + ObjectInspectorUtils.hashCode( - partitionValue, - partitionWriters[p].getObjectInspector()); - } - } - return buckNum < 0 ? keyHashCode : keyHashCode * 31 + buckNum; - } - - private boolean partitionKeysAreNull(VectorizedRowBatch vrg, int rowIndex) + private Object[] getRowObject(VectorizedRowBatch vrg, int rowIndex) throws HiveException { - if (partitionEval.length != 0) { - for (int p = 0; p < partitionEval.length; p++) { - ColumnVector columnVector = vrg.cols[partitionEval[p].getOutputColumn()]; - Object partitionValue = partitionWriters[p].writeValue(columnVector, - rowIndex); - if (partitionValue != null) { - return false; - } + int batchIndex = rowIndex; + if (vrg.selectedInUse) { + batchIndex = vrg.selected[rowIndex]; + } + for (int i = 0; i < vrg.projectionSize; i++) { + ColumnVector vectorColumn = vrg.cols[vrg.projectedColumns[i]]; + if (vectorColumn != null) { + singleRow[i] = rowWriters[i].writeValue(vectorColumn, batchIndex); + } else { + // Some columns from tables are not used. + singleRow[i] = null; } - return true; - } - return false; - } - - private int computeBucketNumber(VectorizedRowBatch vrg, int rowIndex, int numBuckets) throws HiveException { - int bucketNum = 0; - for (int p = 0; p < bucketEval.length; p++) { - ColumnVector columnVector = vrg.cols[bucketEval[p].getOutputColumn()]; - Object bucketValue = bucketWriters[p].writeValue(columnVector, rowIndex); - bucketNum = bucketNum - * 31 - + ObjectInspectorUtils.hashCode( - bucketValue, - bucketWriters[p].getObjectInspector()); } - - if (bucketNum < 0) { - bucketNum = -1 * bucketNum; - } - - return bucketNum % numBuckets; - } - - static public String getOperatorName() { - return "RS"; - } - - public VectorExpression[] getPartitionEval() { - return partitionEval; - } - - public void setPartitionEval(VectorExpression[] partitionEval) { - this.partitionEval = partitionEval; - } - - public VectorExpression[] getValueEval() { - return valueEval; - } - - public void setValueEval(VectorExpression[] valueEval) { - this.valueEval = valueEval; - } - - public VectorExpression[] getKeyEval() { - return keyEval; - } - - public void setKeyEval(VectorExpression[] keyEval) { - this.keyEval = keyEval; + return singleRow; } }
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Sun Oct 5 22:26:43 2014 @@ -1889,47 +1889,47 @@ public class VectorizationContext { // TODO: And, investigate if different reduce-side versions are needed for var* and std*, or if map-side aggregate can be used.. Right now they are conservatively // marked map-side (HASH). static ArrayList<AggregateDefinition> aggregatesDefinition = new ArrayList<AggregateDefinition>() {{ - add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFMinLong.class)); - add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMinDouble.class)); - add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMinString.class)); - add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMinDecimal.class)); - add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFMaxLong.class)); - add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMaxDouble.class)); - add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMaxString.class)); - add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMaxDecimal.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.NONE, GroupByDesc.Mode.HASH, VectorUDAFCountStar.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.MERGEPARTIAL, VectorUDAFCountMerge.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); - add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFSumLong.class)); - add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFSumDouble.class)); - add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFSumDecimal.class)); - add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgLong.class)); - add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgDouble.class)); - add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFAvgDecimal.class)); - add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class)); - add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class)); - add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class)); - add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class)); - add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class)); - add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class)); - add(new AggregateDefinition("var_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampLong.class)); - add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampDouble.class)); - add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarSampDecimal.class)); - add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class)); - add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class)); - add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class)); - add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class)); - add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class)); - add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class)); - add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class)); - add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class)); - add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class)); - add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampLong.class)); - add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampDouble.class)); - add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdSampDecimal.class)); + add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY, null, VectorUDAFMinLong.class)); + add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMinDouble.class)); + add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMinString.class)); + add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMinDecimal.class)); + add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY, null, VectorUDAFMaxLong.class)); + add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMaxDouble.class)); + add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMaxString.class)); + add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMaxDecimal.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.NONE, GroupByDesc.Mode.HASH, VectorUDAFCountStar.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.MERGEPARTIAL, VectorUDAFCountMerge.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); + add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFSumLong.class)); + add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFSumDouble.class)); + add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFSumDecimal.class)); + add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgLong.class)); + add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgDouble.class)); + add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFAvgDecimal.class)); + add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class)); + add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class)); + add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class)); + add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class)); + add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class)); + add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class)); + add(new AggregateDefinition("var_samp", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampLong.class)); + add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampDouble.class)); + add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarSampDecimal.class)); + add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class)); + add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class)); + add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class)); + add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class)); + add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class)); + add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class)); + add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class)); + add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class)); + add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class)); + add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampLong.class)); + add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampDouble.class)); + add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdSampDecimal.class)); }}; public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc, boolean isReduce) Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Sun Oct 5 22:26:43 2014 @@ -140,6 +140,20 @@ public class VectorizedRowBatchCtx { /** + * Initializes the VectorizedRowBatch context based on an scratch column type map and + * object inspector. + * @param columnTypeMap + * @param rowOI + * Object inspector that shapes the column types + */ + public void init(Map<Integer, String> columnTypeMap, + StructObjectInspector rowOI) { + this.columnTypeMap = columnTypeMap; + this.rowOI= rowOI; + this.rawRowOI = rowOI; + } + + /** * Initializes VectorizedRowBatch context based on the * split and Hive configuration (Job conf with hive Plan). * Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java Sun Oct 5 22:26:43 2014 @@ -1060,6 +1060,31 @@ public final class VectorExpressionWrite closure.assign(writers, oids); } + /** + * Creates the value writers for an struct object inspector. + * Creates an appropriate output object inspector. + */ + public static void processVectorInspector( + StructObjectInspector structObjInspector, + SingleOIDClosure closure) + throws HiveException { + List<? extends StructField> fields = structObjInspector.getAllStructFieldRefs(); + VectorExpressionWriter[] writers = new VectorExpressionWriter[fields.size()]; + List<ObjectInspector> oids = new ArrayList<ObjectInspector>(writers.length); + ArrayList<String> columnNames = new ArrayList<String>(); + int i = 0; + for(StructField field : fields) { + ObjectInspector fieldObjInsp = field.getFieldObjectInspector(); + writers[i] = VectorExpressionWriterFactory. + genVectorExpressionWritable(fieldObjInsp); + columnNames.add(field.getFieldName()); + oids.add(writers[i].getObjectInspector()); + i++; + } + ObjectInspector objectInspector = ObjectInspectorFactory. + getStandardStructObjectInspector(columnNames,oids); + closure.assign(writers, objectInspector); + } /** * Returns {@link VectorExpressionWriter} objects for the fields in the given Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java Sun Oct 5 22:26:43 2014 @@ -147,7 +147,7 @@ public class HookContext { } public String getOperationName() { - return SessionState.get().getHiveOperation().name(); + return queryPlan.getOperationName(); } public String getUserName() { Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java Sun Oct 5 22:26:43 2014 @@ -40,6 +40,8 @@ import java.util.regex.Pattern; * are used by the compactor and cleaner and thus must be format agnostic. */ public class AcidUtils { + // This key will be put in the conf file when planning an acid operation + public static final String CONF_ACID_KEY = "hive.doing.acid"; public static final String BASE_PREFIX = "base_"; public static final String DELTA_PREFIX = "delta_"; public static final PathFilter deltaFileFilter = new PathFilter() { Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java Sun Oct 5 22:26:43 2014 @@ -161,10 +161,11 @@ public abstract class HiveContextAwareRe } public IOContext getIOContext() { - return IOContext.get(); + return IOContext.get(jobConf.get(Utilities.INPUT_NAME)); } - public void initIOContext(long startPos, boolean isBlockPointer, Path inputPath) { + private void initIOContext(long startPos, boolean isBlockPointer, + Path inputPath) { ioCxtRef = this.getIOContext(); ioCxtRef.currentBlockStart = startPos; ioCxtRef.isBlockPointer = isBlockPointer; @@ -183,7 +184,7 @@ public abstract class HiveContextAwareRe boolean blockPointer = false; long blockStart = -1; - FileSplit fileSplit = (FileSplit) split; + FileSplit fileSplit = split; Path path = fileSplit.getPath(); FileSystem fs = path.getFileSystem(job); if (inputFormatClass.getName().contains("SequenceFile")) { @@ -202,12 +203,15 @@ public abstract class HiveContextAwareRe blockStart = in.getPosition(); in.close(); } + this.jobConf = job; this.initIOContext(blockStart, blockPointer, path.makeQualified(fs)); this.initIOContextSortedProps(split, recordReader, job); } public void initIOContextSortedProps(FileSplit split, RecordReader recordReader, JobConf job) { + this.jobConf = job; + this.getIOContext().resetSortingValues(); this.isSorted = jobConf.getBoolean("hive.input.format.sorted", false); Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Sun Oct 5 22:26:43 2014 @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.exec.Ut import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; @@ -253,7 +254,14 @@ public class HiveInputFormat<K extends W } protected void init(JobConf job) { - mrwork = Utilities.getMapWork(job); + if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + mrwork = (MapWork) Utilities.getMergeWork(job); + if (mrwork == null) { + mrwork = Utilities.getMapWork(job); + } + } else { + mrwork = Utilities.getMapWork(job); + } pathToPartitionInfo = mrwork.getPathToPartitionInfo(); } @@ -420,6 +428,9 @@ public class HiveInputFormat<K extends W public static void pushFilters(JobConf jobConf, TableScanOperator tableScan) { + // ensure filters are not set from previous pushFilters + jobConf.unset(TableScanDesc.FILTER_TEXT_CONF_STR); + jobConf.unset(TableScanDesc.FILTER_EXPR_CONF_STR); TableScanDesc scanDesc = tableScan.getConf(); if (scanDesc == null) { return; Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Sun Oct 5 22:26:43 2014 @@ -18,7 +18,14 @@ package org.apache.hadoop.hive.ql.io; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin; +import org.apache.hadoop.hive.ql.session.SessionState; /** @@ -31,14 +38,25 @@ import org.apache.hadoop.fs.Path; */ public class IOContext { - private static ThreadLocal<IOContext> threadLocal = new ThreadLocal<IOContext>(){ @Override protected synchronized IOContext initialValue() { return new IOContext(); } }; - public static IOContext get() { - return IOContext.threadLocal.get(); + private static Map<String, IOContext> inputNameIOContextMap = new HashMap<String, IOContext>(); + private static IOContext ioContext = new IOContext(); + + public static Map<String, IOContext> getMap() { + return inputNameIOContextMap; + } + + public static IOContext get(String inputName) { + if (inputNameIOContextMap.containsKey(inputName) == false) { + IOContext ioContext = new IOContext(); + inputNameIOContextMap.put(inputName, ioContext); + } + + return inputNameIOContextMap.get(inputName); } public static void clear() { Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Sun Oct 5 22:26:43 2014 @@ -132,7 +132,7 @@ public class OrcInputFormat implements @Override public boolean shouldSkipCombine(Path path, Configuration conf) throws IOException { - return AcidUtils.isAcid(path, conf); + return (conf.get(AcidUtils.CONF_ACID_KEY) != null) || AcidUtils.isAcid(path, conf); } private static class OrcRecordReader Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java Sun Oct 5 22:26:43 2014 @@ -118,13 +118,11 @@ public class OrcNewInputFormat extends I public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_SPLITS); - Configuration conf = - ShimLoader.getHadoopShims().getConfiguration(jobContext); List<OrcSplit> splits = OrcInputFormat.generateSplitsInfo(ShimLoader.getHadoopShims() .getConfiguration(jobContext)); - List<InputSplit> result = new ArrayList<InputSplit>(); - for(OrcSplit split: OrcInputFormat.generateSplitsInfo(conf)) { + List<InputSplit> result = new ArrayList<InputSplit>(splits.size()); + for(OrcSplit split: splits) { result.add(new OrcNewSplit(split)); } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_SPLITS); Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java Sun Oct 5 22:26:43 2014 @@ -418,138 +418,120 @@ class RunLengthIntegerWriterV2 implement private void determineEncoding() { - int idx = 0; + // we need to compute zigzag values for DIRECT encoding if we decide to + // break early for delta overflows or for shorter runs + computeZigZagLiterals(); - // for identifying monotonic sequences - boolean isIncreasing = false; - int increasingCount = 1; - boolean isDecreasing = false; - int decreasingCount = 1; + zzBits100p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 1.0); - // for identifying type of delta encoding - min = literals[0]; - long max = literals[0]; - isFixedDelta = true; - long currDelta = 0; + // not a big win for shorter runs to determine encoding + if (numLiterals <= MIN_REPEAT) { + encoding = EncodingType.DIRECT; + return; + } - min = literals[0]; - long deltaMax = 0; + // DELTA encoding check - // populate all variables to identify the encoding type - if (numLiterals >= 1) { - currDelta = literals[1] - literals[0]; - for(int i = 0; i < numLiterals; i++) { - if (i > 0 && literals[i] >= max) { - max = literals[i]; - increasingCount++; - } + // for identifying monotonic sequences + boolean isIncreasing = true; + boolean isDecreasing = true; + this.isFixedDelta = true; - if (i > 0 && literals[i] <= min) { - min = literals[i]; - decreasingCount++; - } + this.min = literals[0]; + long max = literals[0]; + final long initialDelta = literals[1] - literals[0]; + long currDelta = initialDelta; + long deltaMax = initialDelta; + this.adjDeltas[0] = initialDelta; + + for (int i = 1; i < numLiterals; i++) { + final long l1 = literals[i]; + final long l0 = literals[i - 1]; + currDelta = l1 - l0; + min = Math.min(min, l1); + max = Math.max(max, l1); + + isIncreasing &= (l0 <= l1); + isDecreasing &= (l0 >= l1); + + isFixedDelta &= (currDelta == initialDelta); + if (i > 1) { + adjDeltas[i - 1] = Math.abs(currDelta); + deltaMax = Math.max(deltaMax, adjDeltas[i - 1]); + } + } - // if delta doesn't changes then mark it as fixed delta - if (i > 0 && isFixedDelta) { - if (literals[i] - literals[i - 1] != currDelta) { - isFixedDelta = false; - } + // its faster to exit under delta overflow condition without checking for + // PATCHED_BASE condition as encoding using DIRECT is faster and has less + // overhead than PATCHED_BASE + if (!utils.isSafeSubtract(max, min)) { + encoding = EncodingType.DIRECT; + return; + } - fixedDelta = currDelta; - } + // invariant - subtracting any number from any other in the literals after + // this point won't overflow - // populate zigzag encoded literals - long zzEncVal = 0; - if (signed) { - zzEncVal = utils.zigzagEncode(literals[i]); - } else { - zzEncVal = literals[i]; - } - zigzagLiterals[idx] = zzEncVal; - idx++; + // if initialDelta is 0 then we cannot delta encode as we cannot identify + // the sign of deltas (increasing or decreasing) + if (initialDelta != 0) { + + // if min is equal to max then the delta is 0, this condition happens for + // fixed values run >10 which cannot be encoded with SHORT_REPEAT + if (min == max) { + assert isFixedDelta : min + "==" + max + + ", isFixedDelta cannot be false"; + assert currDelta == 0 : min + "==" + max + ", currDelta should be zero"; + fixedDelta = 0; + encoding = EncodingType.DELTA; + return; + } - // max delta value is required for computing the fixed bits - // required for delta blob in delta encoding - if (i > 0) { - if (i == 1) { - // first value preserve the sign - adjDeltas[i - 1] = literals[i] - literals[i - 1]; - } else { - adjDeltas[i - 1] = Math.abs(literals[i] - literals[i - 1]); - if (adjDeltas[i - 1] > deltaMax) { - deltaMax = adjDeltas[i - 1]; - } - } - } + if (isFixedDelta) { + assert currDelta == initialDelta + : "currDelta should be equal to initialDelta for fixed delta encoding"; + encoding = EncodingType.DELTA; + fixedDelta = currDelta; + return; } // stores the number of bits required for packing delta blob in // delta encoding bitsDeltaMax = utils.findClosestNumBits(deltaMax); - // if decreasing count equals total number of literals then the - // sequence is monotonically decreasing - if (increasingCount == 1 && decreasingCount == numLiterals) { - isDecreasing = true; - } - - // if increasing count equals total number of literals then the - // sequence is monotonically increasing - if (decreasingCount == 1 && increasingCount == numLiterals) { - isIncreasing = true; + // monotonic condition + if (isIncreasing || isDecreasing) { + encoding = EncodingType.DELTA; + return; } } - // if the sequence is both increasing and decreasing then it is not - // monotonic - if (isDecreasing && isIncreasing) { - isDecreasing = false; - isIncreasing = false; - } - - // fixed delta condition - if (isIncreasing == false && isDecreasing == false && isFixedDelta == true) { - encoding = EncodingType.DELTA; - return; - } - - // monotonic condition - if (isIncreasing || isDecreasing) { - encoding = EncodingType.DELTA; - return; - } + // PATCHED_BASE encoding check // percentile values are computed for the zigzag encoded values. if the // number of bit requirement between 90th and 100th percentile varies // beyond a threshold then we need to patch the values. if the variation - // is not significant then we can use direct or delta encoding - - double p = 0.9; - zzBits90p = utils.percentileBits(zigzagLiterals, 0, numLiterals, p); - - p = 1.0; - zzBits100p = utils.percentileBits(zigzagLiterals, 0, numLiterals, p); + // is not significant then we can use direct encoding + zzBits90p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 0.9); int diffBitsLH = zzBits100p - zzBits90p; // if the difference between 90th percentile and 100th percentile fixed // bits is > 1 then we need patch the values - if (isIncreasing == false && isDecreasing == false && diffBitsLH > 1 - && isFixedDelta == false) { + if (diffBitsLH > 1) { + // patching is done only on base reduced values. // remove base from literals - for(int i = 0; i < numLiterals; i++) { + for (int i = 0; i < numLiterals; i++) { baseRedLiterals[i] = literals[i] - min; } // 95th percentile width is used to determine max allowed value // after which patching will be done - p = 0.95; - brBits95p = utils.percentileBits(baseRedLiterals, 0, numLiterals, p); + brBits95p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 0.95); // 100th percentile is used to compute the max patch width - p = 1.0; - brBits100p = utils.percentileBits(baseRedLiterals, 0, numLiterals, p); + brBits100p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 1.0); // after base reducing the values, if the difference in bits between // 95th percentile and 100th percentile value is zero then there @@ -565,19 +547,24 @@ class RunLengthIntegerWriterV2 implement encoding = EncodingType.DIRECT; return; } - } - - // if difference in bits between 95th percentile and 100th percentile is - // 0, then patch length will become 0. Hence we will fallback to direct - if (isIncreasing == false && isDecreasing == false && diffBitsLH <= 1 - && isFixedDelta == false) { + } else { + // if difference in bits between 95th percentile and 100th percentile is + // 0, then patch length will become 0. Hence we will fallback to direct encoding = EncodingType.DIRECT; return; } + } - // this should not happen - if (encoding == null) { - throw new RuntimeException("Integer encoding cannot be determined."); + private void computeZigZagLiterals() { + // populate zigzag encoded literals + long zzEncVal = 0; + for (int i = 0; i < numLiterals; i++) { + if (signed) { + zzEncVal = utils.zigzagEncode(literals[i]); + } else { + zzEncVal = literals[i]; + } + zigzagLiterals[i] = zzEncVal; } } @@ -700,7 +687,7 @@ class RunLengthIntegerWriterV2 implement patchWidth = 0; gapVsPatchList = null; min = 0; - isFixedDelta = false; + isFixedDelta = true; } @Override Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java Sun Oct 5 22:26:43 2014 @@ -1283,4 +1283,9 @@ final class SerializationUtils { + ((readBuffer[rbOffset + 7] & 255) << 0)); } + // Do not want to use Guava LongMath.checkedSubtract() here as it will throw + // ArithmeticException in case of overflow + public boolean isSafeSubtract(long left, long right) { + return (left ^ right) >= 0 | (left ^ (left - right)) >= 0; + } } Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java Sun Oct 5 22:26:43 2014 @@ -13,9 +13,6 @@ */ package org.apache.hadoop.hive.ql.io.parquet.convert; -import java.util.List; - -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.Writable; @@ -33,7 +30,7 @@ public class ArrayWritableGroupConverter private Writable[] mapPairContainer; public ArrayWritableGroupConverter(final GroupType groupType, final HiveGroupConverter parent, - final int index, List<TypeInfo> hiveSchemaTypeInfos) { + final int index) { this.parent = parent; this.index = index; int count = groupType.getFieldCount(); @@ -43,8 +40,7 @@ public class ArrayWritableGroupConverter isMap = count == 2; converters = new Converter[count]; for (int i = 0; i < count; i++) { - converters[i] = getConverterFromDescription(groupType.getType(i), i, this, - hiveSchemaTypeInfos); + converters[i] = getConverterFromDescription(groupType.getType(i), i, this); } } Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java Sun Oct 5 22:26:43 2014 @@ -16,7 +16,6 @@ package org.apache.hadoop.hive.ql.io.par import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.Writable; @@ -37,21 +36,19 @@ public class DataWritableGroupConverter private final Object[] currentArr; private Writable[] rootMap; - public DataWritableGroupConverter(final GroupType requestedSchema, final GroupType tableSchema, - final List<TypeInfo> hiveSchemaTypeInfos) { - this(requestedSchema, null, 0, tableSchema, hiveSchemaTypeInfos); + public DataWritableGroupConverter(final GroupType requestedSchema, final GroupType tableSchema) { + this(requestedSchema, null, 0, tableSchema); final int fieldCount = tableSchema.getFieldCount(); this.rootMap = new Writable[fieldCount]; } public DataWritableGroupConverter(final GroupType groupType, final HiveGroupConverter parent, - final int index, final List<TypeInfo> hiveSchemaTypeInfos) { - this(groupType, parent, index, groupType, hiveSchemaTypeInfos); + final int index) { + this(groupType, parent, index, groupType); } public DataWritableGroupConverter(final GroupType selectedGroupType, - final HiveGroupConverter parent, final int index, final GroupType containingGroupType, - final List<TypeInfo> hiveSchemaTypeInfos) { + final HiveGroupConverter parent, final int index, final GroupType containingGroupType) { this.parent = parent; this.index = index; final int totalFieldCount = containingGroupType.getFieldCount(); @@ -65,8 +62,7 @@ public class DataWritableGroupConverter Type subtype = selectedFields.get(i); if (containingGroupType.getFields().contains(subtype)) { converters[i] = getConverterFromDescription(subtype, - containingGroupType.getFieldIndex(subtype.getName()), this, - hiveSchemaTypeInfos); + containingGroupType.getFieldIndex(subtype.getName()), this); } else { throw new IllegalStateException("Group type [" + containingGroupType + "] does not contain requested field: " + subtype); Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java Sun Oct 5 22:26:43 2014 @@ -31,10 +31,8 @@ public class DataWritableRecordConverter private final DataWritableGroupConverter root; - public DataWritableRecordConverter(final GroupType requestedSchema, final GroupType tableSchema, - final List<TypeInfo> hiveColumnTypeInfos) { - this.root = new DataWritableGroupConverter(requestedSchema, tableSchema, - hiveColumnTypeInfos); + public DataWritableRecordConverter(final GroupType requestedSchema, final GroupType tableSchema) { + this.root = new DataWritableGroupConverter(requestedSchema, tableSchema); } @Override Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java Sun Oct 5 22:26:43 2014 @@ -16,19 +16,12 @@ package org.apache.hadoop.hive.ql.io.par import java.math.BigDecimal; import java.sql.Timestamp; import java.util.ArrayList; -import java.util.List; -import org.apache.hadoop.hive.common.type.HiveChar; -import org.apache.hadoop.hive.common.type.HiveVarchar; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.io.DoubleWritable; -import org.apache.hadoop.hive.serde2.io.HiveCharWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; -import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; import org.apache.hadoop.hive.serde2.io.TimestampWritable; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.FloatWritable; @@ -152,32 +145,6 @@ public enum ETypeConverter { } }; } - }, - ECHAR_CONVERTER(HiveCharWritable.class) { - @Override - Converter getConverter(final PrimitiveType type, final int index, final HiveGroupConverter parent) { - return new BinaryConverter<HiveCharWritable>(type, parent, index) { - @Override - protected HiveCharWritable convert(Binary binary) { - HiveChar hiveChar = new HiveChar(); - hiveChar.setValue(binary.toStringUsingUTF8()); - return new HiveCharWritable(hiveChar); - } - }; - } - }, - EVARCHAR_CONVERTER(HiveVarcharWritable.class) { - @Override - Converter getConverter(final PrimitiveType type, final int index, final HiveGroupConverter parent) { - return new BinaryConverter<HiveVarcharWritable>(type, parent, index) { - @Override - protected HiveVarcharWritable convert(Binary binary) { - HiveVarchar hiveVarchar = new HiveVarchar(); - hiveVarchar.setValue(binary.toStringUsingUTF8()); - return new HiveVarcharWritable(hiveVarchar); - } - }; - } }; final Class<?> _type; @@ -193,7 +160,7 @@ public enum ETypeConverter { abstract Converter getConverter(final PrimitiveType type, final int index, final HiveGroupConverter parent); public static Converter getNewConverter(final PrimitiveType type, final int index, - final HiveGroupConverter parent, List<TypeInfo> hiveSchemaTypeInfos) { + final HiveGroupConverter parent) { if (type.isPrimitive() && (type.asPrimitiveType().getPrimitiveTypeName().equals(PrimitiveType.PrimitiveTypeName.INT96))) { //TODO- cleanup once parquet support Timestamp type annotation. return ETypeConverter.ETIMESTAMP_CONVERTER.getConverter(type, index, parent); @@ -201,15 +168,7 @@ public enum ETypeConverter { if (OriginalType.DECIMAL == type.getOriginalType()) { return EDECIMAL_CONVERTER.getConverter(type, index, parent); } else if (OriginalType.UTF8 == type.getOriginalType()) { - if (hiveSchemaTypeInfos.get(index).getTypeName() - .startsWith(serdeConstants.CHAR_TYPE_NAME)) { - return ECHAR_CONVERTER.getConverter(type, index, parent); - } else if (hiveSchemaTypeInfos.get(index).getTypeName() - .startsWith(serdeConstants.VARCHAR_TYPE_NAME)) { - return EVARCHAR_CONVERTER.getConverter(type, index, parent); - } else if (type.isPrimitive()) { - return ESTRING_CONVERTER.getConverter(type, index, parent); - } + return ESTRING_CONVERTER.getConverter(type, index, parent); } Class<?> javaType = type.getPrimitiveTypeName().javaType; Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java Sun Oct 5 22:26:43 2014 @@ -13,9 +13,6 @@ */ package org.apache.hadoop.hive.ql.io.parquet.convert; -import java.util.List; - -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.Writable; import parquet.io.api.Converter; @@ -26,20 +23,17 @@ import parquet.schema.Type.Repetition; public abstract class HiveGroupConverter extends GroupConverter { protected static Converter getConverterFromDescription(final Type type, final int index, - final HiveGroupConverter parent, List<TypeInfo> hiveSchemaTypeInfos) { + final HiveGroupConverter parent) { if (type == null) { return null; } if (type.isPrimitive()) { - return ETypeConverter.getNewConverter(type.asPrimitiveType(), index, parent, - hiveSchemaTypeInfos); + return ETypeConverter.getNewConverter(type.asPrimitiveType(), index, parent); } else { if (type.asGroupType().getRepetition() == Repetition.REPEATED) { - return new ArrayWritableGroupConverter(type.asGroupType(), parent, index, - hiveSchemaTypeInfos); + return new ArrayWritableGroupConverter(type.asGroupType(), parent, index); } else { - return new DataWritableGroupConverter(type.asGroupType(), parent, index, - hiveSchemaTypeInfos); + return new DataWritableGroupConverter(type.asGroupType(), parent, index); } } } Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java Sun Oct 5 22:26:43 2014 @@ -14,7 +14,6 @@ package org.apache.hadoop.hive.ql.io.parquet.read; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -24,8 +23,6 @@ import org.apache.hadoop.hive.ql.io.IOCo import org.apache.hadoop.hive.ql.io.parquet.convert.DataWritableRecordConverter; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.util.StringUtils; @@ -56,7 +53,7 @@ public class DataWritableReadSupport ext * From a string which columns names (including hive column), return a list * of string columns * - * @param comma separated list of columns + * @param columns comma separated list of columns * @return list with virtual columns removed */ private static List<String> getColumns(final String columns) { @@ -64,27 +61,6 @@ public class DataWritableReadSupport ext removeVirtualColumns(StringUtils.getStringCollection(columns)); } - private static List<TypeInfo> getColumnTypes(Configuration configuration) { - - List<String> columnNames; - String columnNamesProperty = configuration.get(IOConstants.COLUMNS); - if (columnNamesProperty.length() == 0) { - columnNames = new ArrayList<String>(); - } else { - columnNames = Arrays.asList(columnNamesProperty.split(",")); - } - List<TypeInfo> columnTypes; - String columnTypesProperty = configuration.get(IOConstants.COLUMNS_TYPES); - if (columnTypesProperty.length() == 0) { - columnTypes = new ArrayList<TypeInfo>(); - } else { - columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypesProperty); - } - - columnTypes = VirtualColumn.removeVirtualColumnTypes(columnNames, columnTypes); - return columnTypes; - } - /** * * It creates the readContext for Parquet side with the requested schema during the init phase. @@ -173,8 +149,7 @@ public class DataWritableReadSupport ext } final MessageType tableSchema = resolveSchemaAccess(MessageTypeParser. parseMessageType(metadata.get(HIVE_SCHEMA_KEY)), fileSchema, configuration); - return new DataWritableRecordConverter(readContext.getRequestedSchema(), tableSchema, - getColumnTypes(configuration)); + return new DataWritableRecordConverter(readContext.getRequestedSchema(), tableSchema); } /** @@ -194,4 +169,4 @@ public class DataWritableReadSupport ext } return requestedSchema; } -} +} \ No newline at end of file Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java Sun Oct 5 22:26:43 2014 @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.lockmg import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.thrift.TException; @@ -42,10 +43,10 @@ public class DbLockManager implements Hi private static final long MAX_SLEEP = 15000; private HiveLockManagerCtx context; private Set<DbHiveLock> locks; - private HiveMetaStoreClient client; + private IMetaStoreClient client; private long nextSleep = 50; - DbLockManager(HiveMetaStoreClient client) { + DbLockManager(IMetaStoreClient client) { locks = new HashSet<DbHiveLock>(); this.client = client; } @@ -210,8 +211,8 @@ public class DbLockManager implements Hi /** * Clear the memory of the locks in this object. This won't clear the locks from the database. * It is for use with - * {@link #DbLockManager(org.apache.hadoop.hive.metastore.HiveMetaStoreClient).commitTxn} and - * {@link #DbLockManager(org.apache.hadoop.hive.metastore.HiveMetaStoreClient).rollbackTxn}. + * {@link #DbLockManager(org.apache.hadoop.hive.metastore.IMetaStoreClient).commitTxn} and + * {@link #DbLockManager(org.apache.hadoop.hive.metastore.IMetaStoreClient).rollbackTxn}. */ void clearLocalLockRecords() { locks.clear(); Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java Sun Oct 5 22:26:43 2014 @@ -31,6 +31,8 @@ import org.apache.hadoop.hive.ql.QueryPl import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.thrift.TException; @@ -46,7 +48,7 @@ public class DbTxnManager extends HiveTx static final private Log LOG = LogFactory.getLog(CLASS_NAME); private DbLockManager lockMgr = null; - private HiveMetaStoreClient client = null; + private IMetaStoreClient client = null; private long txnId = 0; DbTxnManager() { @@ -284,7 +286,7 @@ public class DbTxnManager extends HiveTx public ValidTxnList getValidTxns() throws LockException { init(); try { - return client.getValidTxns(); + return client.getValidTxns(txnId); } catch (TException e) { throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e); @@ -311,7 +313,6 @@ public class DbTxnManager extends HiveTx try { if (txnId > 0) rollbackTxn(); if (lockMgr != null) lockMgr.close(); - if (client != null) client.close(); } catch (Exception e) { LOG.error("Caught exception " + e.getClass().getName() + " with message <" + e.getMessage() + ">, swallowing as there is nothing we can do with it."); @@ -326,10 +327,12 @@ public class DbTxnManager extends HiveTx "methods."); } try { - client = new HiveMetaStoreClient(conf); + Hive db = Hive.get(conf); + client = db.getMSC(); } catch (MetaException e) { - throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), - e); + throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), e); + } catch (HiveException e) { + throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), e); } } }
