Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
 Tue Oct 14 19:06:45 2014
@@ -18,99 +18,30 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.PTFTopNHash;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.TopNHash;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import 
org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import 
org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
-import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import 
org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-// import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
 public class VectorReduceSinkOperator extends ReduceSinkOperator {
 
-  private static final Log LOG = LogFactory.getLog(
-      VectorReduceSinkOperator.class.getName());
-
   private static final long serialVersionUID = 1L;
 
-  /**
-   * The evaluators for the key columns. Key columns decide the sort order on
-   * the reducer side. Key columns are passed to the reducer in the "key".
-   */
-  private VectorExpression[] keyEval;
-
-  /**
-   * The key value writers. These know how to write the necessary writable type
-   * based on key column metadata, from the primitive vector type.
-   */
-  private transient VectorExpressionWriter[] keyWriters;
-
-  /**
-   * The evaluators for the value columns. Value columns are passed to reducer
-   * in the "value".
-   */
-  private VectorExpression[] valueEval;
-
-  /**
-   * The output value writers. These know how to write the necessary writable 
type
-   * based on value column metadata, from the primitive vector type.
-   */
-  private transient VectorExpressionWriter[] valueWriters;
-
-  /**
-   * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in
-   * Hive language). Partition columns decide the reducer that the current row
-   * goes to. Partition columns are not passed to reducer.
-   */
-  private VectorExpression[] partitionEval;
-
-  /**
-  * Evaluators for bucketing columns. This is used to compute bucket number.
-  */
-  private VectorExpression[] bucketEval;
-  private int buckColIdxInKey;
-
-  /**
-   * The partition value writers. These know how to write the necessary 
writable type
-   * based on partition column metadata, from the primitive vector type.
-   */
-  private transient VectorExpressionWriter[] partitionWriters;
-  private transient VectorExpressionWriter[] bucketWriters = null;
-
-  private static final boolean isDebugEnabled = LOG.isDebugEnabled();
+  // Writer for producing row from input batch.
+  private VectorExpressionWriter[] rowWriters;
+  
+  protected transient Object[] singleRow;
 
   public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc 
conf)
       throws HiveException {
     this();
     ReduceSinkDesc desc = (ReduceSinkDesc) conf;
     this.conf = desc;
-    keyEval = vContext.getVectorExpressions(desc.getKeyCols());
-    valueEval = vContext.getVectorExpressions(desc.getValueCols());
-    partitionEval = vContext.getVectorExpressions(desc.getPartitionCols());
-    bucketEval = null;
-    if (desc.getBucketCols() != null && !desc.getBucketCols().isEmpty()) {
-      bucketEval = vContext.getVectorExpressions(desc.getBucketCols());
-      buckColIdxInKey = desc.getPartitionCols().size();
-    }
   }
 
   public VectorReduceSinkOperator() {
@@ -119,399 +50,49 @@ public class VectorReduceSinkOperator ex
 
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
-    try {
-      numDistributionKeys = conf.getNumDistributionKeys();
-      distinctColIndices = conf.getDistinctColumnIndices();
-      numDistinctExprs = distinctColIndices.size();
-
-      TableDesc keyTableDesc = conf.getKeySerializeInfo();
-      keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
-          .newInstance();
-      keySerializer.initialize(null, keyTableDesc.getProperties());
-      keyIsText = keySerializer.getSerializedClass().equals(Text.class);
-
-      /*
-       * Compute and assign the key writers and the key object inspector
-       */
-      VectorExpressionWriterFactory.processVectorExpressions(
-          conf.getKeyCols(),
-          conf.getOutputKeyColumnNames(),
-          new VectorExpressionWriterFactory.SingleOIDClosure() {
-            @Override
-            public void assign(VectorExpressionWriter[] writers,
-              ObjectInspector objectInspector) {
-              keyWriters = writers;
-              keyObjectInspector = objectInspector;
-            }
-          });
-
-      String colNames = "";
-      for(String colName : conf.getOutputKeyColumnNames()) {
-        colNames = String.format("%s %s", colNames, colName);
-      }
-
-      if (isDebugEnabled) {
-        LOG.debug(String.format("keyObjectInspector [%s]%s => %s",
-          keyObjectInspector.getClass(),
-          keyObjectInspector,
-          colNames));
-      }
-
-      partitionWriters = 
VectorExpressionWriterFactory.getExpressionWriters(conf.getPartitionCols());
-      if (conf.getBucketCols() != null && !conf.getBucketCols().isEmpty()) {
-        bucketWriters = 
VectorExpressionWriterFactory.getExpressionWriters(conf.getBucketCols());
-      }
-
-      TableDesc valueTableDesc = conf.getValueSerializeInfo();
-      valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
-          .newInstance();
-      valueSerializer.initialize(null, valueTableDesc.getProperties());
-
-      /*
-       * Compute and assign the value writers and the value object inspector
-       */
-      VectorExpressionWriterFactory.processVectorExpressions(
-          conf.getValueCols(),
-          conf.getOutputValueColumnNames(),
-          new VectorExpressionWriterFactory.SingleOIDClosure() {
-            @Override
-            public void assign(VectorExpressionWriter[] writers,
-                ObjectInspector objectInspector) {
-                valueWriters = writers;
-                valueObjectInspector = objectInspector;
+    // We need a input object inspector that is for the row we will extract 
out of the
+    // vectorized row batch, not for example, an original inspector for an ORC 
table, etc.
+    VectorExpressionWriterFactory.processVectorInspector(
+            (StructObjectInspector) inputObjInspectors[0],
+            new VectorExpressionWriterFactory.SingleOIDClosure() {
+              @Override
+              public void assign(VectorExpressionWriter[] writers,
+                  ObjectInspector objectInspector) {
+                rowWriters = writers;
+                inputObjInspectors[0] = objectInspector;
               }
-          });
-
-      if (isDebugEnabled) {
-        colNames = "";
-        for(String colName : conf.getOutputValueColumnNames()) {
-          colNames = String.format("%s %s", colNames, colName);
-        }
-      }
-
-      if (isDebugEnabled) {
-        LOG.debug(String.format("valueObjectInspector [%s]%s => %s",
-            valueObjectInspector.getClass(),
-            valueObjectInspector,
-            colNames));
-      }
+            });
+    singleRow = new Object[rowWriters.length];
 
-      int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1;
-      int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 :
-        numDistributionKeys;
-      cachedKeys = new Object[numKeys][keyLen];
-      cachedValues = new Object[valueEval.length];
-
-      int tag = conf.getTag();
-      tagByte[0] = (byte) tag;
-      LOG.info("Using tag = " + tag);
-
-      int limit = conf.getTopN();
-      float memUsage = conf.getTopNMemoryUsage();
-      if (limit >= 0 && memUsage > 0) {
-        reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : reducerHash;
-        reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
-      }
-
-      autoParallel = conf.isAutoParallel();
-
-    } catch(Exception e) {
-      throw new HiveException(e);
-    }
+    // Call ReduceSinkOperator with new input inspector.
+    super.initializeOp(hconf);
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
-    VectorizedRowBatch vrg = (VectorizedRowBatch) row;
-
-    if (isDebugEnabled) {
-      LOG.debug(String.format("sinking %d rows, %d values, %d keys, %d parts",
-          vrg.size,
-          valueEval.length,
-          keyEval.length,
-          partitionEval.length));
-    }
-
-    try {
-      // Evaluate the keys
-      for (int i = 0; i < keyEval.length; i++) {
-        keyEval[i].evaluate(vrg);
-      }
-
-      // Determine which rows we need to emit based on topN optimization
-      int startResult = reducerHash.startVectorizedBatch(vrg.size);
-      if (startResult == TopNHash.EXCLUDE) {
-        return; // TopN wants us to exclude all rows.
-      }
-      // TODO: can we do this later/only for the keys that are needed? E.g. 
update vrg.selected.
-      for (int i = 0; i < partitionEval.length; i++) {
-        partitionEval[i].evaluate(vrg);
-      }
-      if (bucketEval != null) {
-        for (int i = 0; i < bucketEval.length; i++) {
-          bucketEval[i].evaluate(vrg);
-        }
-      }
-      // run the vector evaluations
-      for (int i = 0; i < valueEval.length; i++) {
-         valueEval[i].evaluate(vrg);
-      }
-
-      boolean useTopN = startResult != TopNHash.FORWARD;
-      // Go thru the batch once. If we are not using TopN, we will forward all 
things and be done.
-      // If we are using topN, we will make the first key for each row and 
store/forward it.
-      // Values, hashes and additional distinct rows will be handled in the 
2nd pass in that case.
-      for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
-        int rowIndex = batchIndex;
-        if (vrg.selectedInUse) {
-          rowIndex = vrg.selected[batchIndex];
-        }
-        // First, make distrib key components for this row and determine 
distKeyLength.
-        populatedCachedDistributionKeys(vrg, rowIndex, 0);
-
-        // replace bucketing columns with hashcode % numBuckets
-        int buckNum = -1;
-        if (bucketEval != null) {
-          buckNum = computeBucketNumber(vrg, rowIndex, conf.getNumBuckets());
-          cachedKeys[0][buckColIdxInKey] = new IntWritable(buckNum);
-        }
-        HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null);
-        int distKeyLength = firstKey.getDistKeyLength();
-        // Add first distinct expression, if any.
-        if (numDistinctExprs > 0) {
-          populateCachedDistinctKeys(vrg, rowIndex, 0);
-          firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength);
-        }
-
-        final int hashCode;
-
-        // distKeyLength doesn't include tag, but includes buckNum in 
cachedKeys[0]
-        if (autoParallel && partitionEval.length > 0) {
-          hashCode = computeMurmurHash(firstKey);
-        } else {
-          hashCode = computeHashCode(vrg, rowIndex, buckNum);
-        }
-
-        firstKey.setHashCode(hashCode);
-
-        if (useTopN) {
-          /*
-           * in case of TopN for windowing, we need to distinguish between 
-           * rows with null partition keys and rows with value 0 for partition 
keys.
-           */
-          boolean partkeysNull = conf.isPTFReduceSink() && 
partitionKeysAreNull(vrg, rowIndex);
-          reducerHash.tryStoreVectorizedKey(firstKey, partkeysNull, 
batchIndex);
-        } else {
-          // No TopN, just forward the first key and all others.
-          BytesWritable value = makeValueWritable(vrg, rowIndex);
-          collect(firstKey, value);
-          forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, 
distKeyLength, tag, 0);
-        }
-      }
-
-      if (!useTopN) return; // All done.
-
-      // If we use topN, we have called tryStore on every key now. We can 
process the results.
-      for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
-        int result = reducerHash.getVectorizedBatchResult(batchIndex);
-        if (result == TopNHash.EXCLUDE) continue;
-        int rowIndex = batchIndex;
-        if (vrg.selectedInUse) {
-          rowIndex = vrg.selected[batchIndex];
-        }
-        // Compute value and hashcode - we'd either store or forward them.
-        BytesWritable value = makeValueWritable(vrg, rowIndex);
-        int distKeyLength = -1;
-        int hashCode;
-        if (result == TopNHash.FORWARD) {
-          HiveKey firstKey = reducerHash.getVectorizedKeyToForward(batchIndex);
-          distKeyLength = firstKey.getDistKeyLength();
-          hashCode = firstKey.hashCode();
-          collect(firstKey, value);
-        } else {
-          hashCode = reducerHash.getVectorizedKeyHashCode(batchIndex);
-          reducerHash.storeValue(result, hashCode, value, true);
-          distKeyLength = reducerHash.getVectorizedKeyDistLength(batchIndex);
-        }
-        // Now forward other the rows if there's multi-distinct (but see TODO 
in forward...).
-        // Unfortunately, that means we will have to rebuild the cachedKeys. 
Start at 1.
-        if (numDistinctExprs > 1) {
-          populatedCachedDistributionKeys(vrg, rowIndex, 1);
-          forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, 
distKeyLength, tag, 1);
-        }
-      }
-    } catch (SerDeException e) {
-      throw new HiveException(e);
-    } catch (IOException e) {
-      throw new HiveException(e);
-    }
-  }
-
-  /**
-   * This function creates and forwards all the additional KVs for the 
multi-distinct case,
-   * after the first (0th) KV pertaining to the row has already been stored or 
forwarded.
-   * @param vrg the batch
-   * @param rowIndex the row index in the batch
-   * @param hashCode the partitioning hash code to use; same as for the first 
KV
-   * @param value the value to use; same as for the first KV
-   * @param distKeyLength the distribution key length of the first key; TODO 
probably extraneous
-   * @param tag the tag
-   * @param baseIndex the index in cachedKeys where the pre-evaluated 
distribution keys are stored
-   */
-  private void forwardExtraDistinctRows(VectorizedRowBatch vrg, int 
rowIndex,int hashCode,
-      BytesWritable value, int distKeyLength, int tag, int baseIndex)
-          throws HiveException, SerDeException, IOException {
-    // TODO: We don't have to forward extra distinct rows immediately (same in 
non-vector) if
-    //       the first key has already been stored. There's few bytes 
difference between keys
-    //       for different distincts, and the value/etc. are all the same.
-    //       We could store deltas to re-gen extra rows when flushing TopN.
-    for (int i = 1; i < numDistinctExprs; i++) {
-      if (i != baseIndex) {
-        System.arraycopy(cachedKeys[baseIndex], 0, cachedKeys[i], 0, 
numDistributionKeys);
-      }
-      populateCachedDistinctKeys(vrg, rowIndex, i);
-      HiveKey hiveKey = toHiveKey(cachedKeys[i], tag, distKeyLength);
-      hiveKey.setHashCode(hashCode);
-      collect(hiveKey, value);
-    }
-  }
-
-  /**
-   * Populate distribution keys part of cachedKeys for a particular row from 
the batch.
-   * @param vrg the batch
-   * @param rowIndex the row index in the batch
-   * @param index the cachedKeys index to write to
-   */
-  private void populatedCachedDistributionKeys(
-      VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException {
-    for (int i = 0; i < numDistributionKeys; i++) {
-      int batchColumn = keyEval[i].getOutputColumn();
-      ColumnVector vectorColumn = vrg.cols[batchColumn];
-      cachedKeys[index][i] = keyWriters[i].writeValue(vectorColumn, rowIndex);
-    }
-    if (cachedKeys[index].length > numDistributionKeys) {
-      cachedKeys[index][numDistributionKeys] = null;
-    }
-  }
-
-  /**
-   * Populate distinct keys part of cachedKeys for a particular row from the 
batch.
-   * @param vrg the batch
-   * @param rowIndex the row index in the batch
-   * @param index the cachedKeys index to write to
-   */
-  private void populateCachedDistinctKeys(
-      VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException {
-    StandardUnion union;
-    cachedKeys[index][numDistributionKeys] = union = new StandardUnion(
-        (byte)index, new Object[distinctColIndices.get(index).size()]);
-    Object[] distinctParameters = (Object[]) union.getObject();
-    for (int distinctParamI = 0; distinctParamI < distinctParameters.length; 
distinctParamI++) {
-      int distinctColIndex = distinctColIndices.get(index).get(distinctParamI);
-      int batchColumn = keyEval[distinctColIndex].getOutputColumn();
-      distinctParameters[distinctParamI] =
-          keyWriters[distinctColIndex].writeValue(vrg.cols[batchColumn], 
rowIndex);
-    }
-    union.setTag((byte) index);
-  }
+  public void processOp(Object data, int tag) throws HiveException {
+    VectorizedRowBatch vrg = (VectorizedRowBatch) data;
 
-  private BytesWritable makeValueWritable(VectorizedRowBatch vrg, int rowIndex)
-      throws HiveException, SerDeException {
-    for (int i = 0; i < valueEval.length; i++) {
-      int batchColumn = valueEval[i].getOutputColumn();
-      ColumnVector vectorColumn = vrg.cols[batchColumn];
-      cachedValues[i] = valueWriters[i].writeValue(vectorColumn, rowIndex);
+    for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
+      Object row = getRowObject(vrg, batchIndex);
+      super.processOp(row, tag);
     }
-    // Serialize the value
-    return (BytesWritable)valueSerializer.serialize(cachedValues, 
valueObjectInspector);
   }
 
-  private int computeHashCode(VectorizedRowBatch vrg, int rowIndex, int 
buckNum) throws HiveException {
-    // Evaluate the HashCode
-    int keyHashCode = 0;
-    if (partitionEval.length == 0) {
-      // If no partition cols, just distribute the data uniformly to provide 
better
-      // load balance. If the requirement is to have a single reducer, we 
should set
-      // the number of reducers to 1.
-      // Use a constant seed to make the code deterministic.
-      if (random == null) {
-        random = new Random(12345);
-      }
-      keyHashCode = random.nextInt();
-    } else {
-      for (int p = 0; p < partitionEval.length; p++) {
-        ColumnVector columnVector = 
vrg.cols[partitionEval[p].getOutputColumn()];
-        Object partitionValue = partitionWriters[p].writeValue(columnVector, 
rowIndex);
-        keyHashCode = keyHashCode
-            * 31
-            + ObjectInspectorUtils.hashCode(
-                partitionValue,
-                partitionWriters[p].getObjectInspector());
-      }
-    }
-    return buckNum < 0  ? keyHashCode : keyHashCode * 31 + buckNum;
-  }
-
-  private boolean partitionKeysAreNull(VectorizedRowBatch vrg, int rowIndex)
+  private Object[] getRowObject(VectorizedRowBatch vrg, int rowIndex)
       throws HiveException {
-    if (partitionEval.length != 0) {
-      for (int p = 0; p < partitionEval.length; p++) {
-        ColumnVector columnVector = 
vrg.cols[partitionEval[p].getOutputColumn()];
-        Object partitionValue = partitionWriters[p].writeValue(columnVector,
-            rowIndex);
-        if (partitionValue != null) {
-          return false;
-        }
+    int batchIndex = rowIndex;
+    if (vrg.selectedInUse) {
+      batchIndex = vrg.selected[rowIndex];
+    }
+    for (int i = 0; i < vrg.projectionSize; i++) {
+      ColumnVector vectorColumn = vrg.cols[vrg.projectedColumns[i]];
+      if (vectorColumn != null) {
+        singleRow[i] = rowWriters[i].writeValue(vectorColumn, batchIndex);
+      } else {
+        // Some columns from tables are not used.
+        singleRow[i] = null;
       }
-      return true;
-    }
-    return false;
-  }
-
-  private int computeBucketNumber(VectorizedRowBatch vrg, int rowIndex, int 
numBuckets) throws HiveException {
-    int bucketNum = 0;
-    for (int p = 0; p < bucketEval.length; p++) {
-      ColumnVector columnVector = vrg.cols[bucketEval[p].getOutputColumn()];
-      Object bucketValue = bucketWriters[p].writeValue(columnVector, rowIndex);
-      bucketNum = bucketNum
-          * 31
-          + ObjectInspectorUtils.hashCode(
-              bucketValue,
-              bucketWriters[p].getObjectInspector());
     }
-
-    if (bucketNum < 0) {
-      bucketNum = -1 * bucketNum;
-    }
-
-    return bucketNum % numBuckets;
-  }
-
-  static public String getOperatorName() {
-    return "RS";
-  }
-
-  public VectorExpression[] getPartitionEval() {
-    return partitionEval;
-  }
-
-  public void setPartitionEval(VectorExpression[] partitionEval) {
-    this.partitionEval = partitionEval;
-  }
-
-  public VectorExpression[] getValueEval() {
-    return valueEval;
-  }
-
-  public void setValueEval(VectorExpression[] valueEval) {
-    this.valueEval = valueEval;
-  }
-
-  public VectorExpression[] getKeyEval() {
-    return keyEval;
-  }
-
-  public void setKeyEval(VectorExpression[] keyEval) {
-    this.keyEval = keyEval;
+    return singleRow;
   }
 }

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
 Tue Oct 14 19:06:45 2014
@@ -285,8 +285,7 @@ public class VectorSMBMapJoinOperator ex
     Object[] values = (Object[]) row;
     VectorColumnAssign[] vcas = outputVectorAssigners.get(outputOI);
     if (null == vcas) {
-      Map<String, Map<String, Integer>> allColumnMaps = Utilities.
-          getMapRedWork(hconf).getMapWork().getScratchColumnMap();
+      Map<String, Map<String, Integer>> allColumnMaps = 
Utilities.getScratchColumnMap(hconf);
       Map<String, Integer> columnMap = allColumnMaps.get(fileKey);
       vcas = VectorColumnAssignFactory.buildAssigners(
           outputBatch, outputOI, columnMap, conf.getOutputColumnNames());

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
 Tue Oct 14 19:06:45 2014
@@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.exec.ve
 import 
org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
 import 
org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFAvgDecimal;
 import 
org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCount;
+import 
org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountMerge;
 import 
org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountStar;
 import 
org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFSumDecimal;
 import 
org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFAvgDouble;
@@ -1888,47 +1889,47 @@ public class VectorizationContext {
   // TODO:   And, investigate if different reduce-side versions are needed for 
var* and std*, or if map-side aggregate can be used..  Right now they are 
conservatively
   //         marked map-side (HASH).
   static ArrayList<AggregateDefinition> aggregatesDefinition = new 
ArrayList<AggregateDefinition>() {{
-    add(new AggregateDefinition("min",         
VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    null,                    
      VectorUDAFMinLong.class));
-    add(new AggregateDefinition("min",         
VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  null,                    
      VectorUDAFMinDouble.class));
-    add(new AggregateDefinition("min",         
VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null,                    
      VectorUDAFMinString.class));
-    add(new AggregateDefinition("min",         
VectorExpressionDescriptor.ArgumentType.DECIMAL,       null,                    
      VectorUDAFMinDecimal.class));
-    add(new AggregateDefinition("max",         
VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    null,                    
      VectorUDAFMaxLong.class));
-    add(new AggregateDefinition("max",         
VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  null,                    
      VectorUDAFMaxDouble.class));
-    add(new AggregateDefinition("max",         
VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null,                    
      VectorUDAFMaxString.class));
-    add(new AggregateDefinition("max",         
VectorExpressionDescriptor.ArgumentType.DECIMAL,       null,                    
      VectorUDAFMaxDecimal.class));
-    add(new AggregateDefinition("count",       
VectorExpressionDescriptor.ArgumentType.NONE,          GroupByDesc.Mode.HASH,   
      VectorUDAFCountStar.class));
-    add(new AggregateDefinition("count",       
VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,   
      VectorUDAFCount.class));
-    add(new AggregateDefinition("count",       
VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    
GroupByDesc.Mode.MERGEPARTIAL, VectorUDAFSumLong.class));
-    add(new AggregateDefinition("count",       
VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,   
      VectorUDAFCount.class));
-    add(new AggregateDefinition("count",       
VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, GroupByDesc.Mode.HASH,   
      VectorUDAFCount.class));
-    add(new AggregateDefinition("count",       
VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,   
      VectorUDAFCount.class));
-    add(new AggregateDefinition("sum",         
VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    null,                    
      VectorUDAFSumLong.class));
-    add(new AggregateDefinition("sum",         
VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  null,                    
      VectorUDAFSumDouble.class));
-    add(new AggregateDefinition("sum",         
VectorExpressionDescriptor.ArgumentType.DECIMAL,       null,                    
      VectorUDAFSumDecimal.class));
-    add(new AggregateDefinition("avg",         
VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,   
      VectorUDAFAvgLong.class));
-    add(new AggregateDefinition("avg",         
VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,   
      VectorUDAFAvgDouble.class));
-    add(new AggregateDefinition("avg",         
VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,   
      VectorUDAFAvgDecimal.class));
-    add(new AggregateDefinition("variance",    
VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,   
      VectorUDAFVarPopLong.class));
-    add(new AggregateDefinition("var_pop",     
VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,   
      VectorUDAFVarPopLong.class));
-    add(new AggregateDefinition("variance",    
VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,   
      VectorUDAFVarPopDouble.class));
-    add(new AggregateDefinition("var_pop",     
VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,   
      VectorUDAFVarPopDouble.class));
-    add(new AggregateDefinition("variance",    
VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,   
      VectorUDAFVarPopDecimal.class));
-    add(new AggregateDefinition("var_pop",     
VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,   
      VectorUDAFVarPopDecimal.class));
-    add(new AggregateDefinition("var_samp",    
VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,   
      VectorUDAFVarSampLong.class));
-    add(new AggregateDefinition("var_samp" ,   
VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,   
      VectorUDAFVarSampDouble.class));
-    add(new AggregateDefinition("var_samp" ,   
VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,   
      VectorUDAFVarSampDecimal.class));
-    add(new AggregateDefinition("std",         
VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,   
      VectorUDAFStdPopLong.class));
-    add(new AggregateDefinition("stddev",      
VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,   
      VectorUDAFStdPopLong.class));
-    add(new AggregateDefinition("stddev_pop",  
VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,   
      VectorUDAFStdPopLong.class));
-    add(new AggregateDefinition("std",         
VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,   
      VectorUDAFStdPopDouble.class));
-    add(new AggregateDefinition("stddev",      
VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,   
      VectorUDAFStdPopDouble.class));
-    add(new AggregateDefinition("stddev_pop",  
VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,   
      VectorUDAFStdPopDouble.class));
-    add(new AggregateDefinition("std",         
VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,   
      VectorUDAFStdPopDecimal.class));
-    add(new AggregateDefinition("stddev",      
VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,   
      VectorUDAFStdPopDecimal.class));
-    add(new AggregateDefinition("stddev_pop",  
VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,   
      VectorUDAFStdPopDecimal.class));
-    add(new AggregateDefinition("stddev_samp", 
VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,   
      VectorUDAFStdSampLong.class));
-    add(new AggregateDefinition("stddev_samp", 
VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,   
      VectorUDAFStdSampDouble.class));
-    add(new AggregateDefinition("stddev_samp", 
VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,   
      VectorUDAFStdSampDecimal.class));
+    add(new AggregateDefinition("min",         
VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY,    null,           
               VectorUDAFMinLong.class));
+    add(new AggregateDefinition("min",         
VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           null,           
               VectorUDAFMinDouble.class));
+    add(new AggregateDefinition("min",         
VectorExpressionDescriptor.ArgumentType.STRING_FAMILY,          null,           
               VectorUDAFMinString.class));
+    add(new AggregateDefinition("min",         
VectorExpressionDescriptor.ArgumentType.DECIMAL,                null,           
               VectorUDAFMinDecimal.class));
+    add(new AggregateDefinition("max",         
VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY,    null,           
               VectorUDAFMaxLong.class));
+    add(new AggregateDefinition("max",         
VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           null,           
               VectorUDAFMaxDouble.class));
+    add(new AggregateDefinition("max",         
VectorExpressionDescriptor.ArgumentType.STRING_FAMILY,          null,           
               VectorUDAFMaxString.class));
+    add(new AggregateDefinition("max",         
VectorExpressionDescriptor.ArgumentType.DECIMAL,                null,           
               VectorUDAFMaxDecimal.class));
+    add(new AggregateDefinition("count",       
VectorExpressionDescriptor.ArgumentType.NONE,                   
GroupByDesc.Mode.HASH,         VectorUDAFCountStar.class));
+    add(new AggregateDefinition("count",       
VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY,    
GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
+    add(new AggregateDefinition("count",       
VectorExpressionDescriptor.ArgumentType.INT_FAMILY,             
GroupByDesc.Mode.MERGEPARTIAL, VectorUDAFCountMerge.class));
+    add(new AggregateDefinition("count",       
VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           
GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
+    add(new AggregateDefinition("count",       
VectorExpressionDescriptor.ArgumentType.STRING_FAMILY,          
GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
+    add(new AggregateDefinition("count",       
VectorExpressionDescriptor.ArgumentType.DECIMAL,                
GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
+    add(new AggregateDefinition("sum",         
VectorExpressionDescriptor.ArgumentType.INT_FAMILY,             null,           
               VectorUDAFSumLong.class));
+    add(new AggregateDefinition("sum",         
VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           null,           
               VectorUDAFSumDouble.class));
+    add(new AggregateDefinition("sum",         
VectorExpressionDescriptor.ArgumentType.DECIMAL,                null,           
               VectorUDAFSumDecimal.class));
+    add(new AggregateDefinition("avg",         
VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY,   
GroupByDesc.Mode.HASH,         VectorUDAFAvgLong.class));
+    add(new AggregateDefinition("avg",         
VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           
GroupByDesc.Mode.HASH,         VectorUDAFAvgDouble.class));
+    add(new AggregateDefinition("avg",         
VectorExpressionDescriptor.ArgumentType.DECIMAL,                
GroupByDesc.Mode.HASH,         VectorUDAFAvgDecimal.class));
+    add(new AggregateDefinition("variance",    
VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY,   
GroupByDesc.Mode.HASH,         VectorUDAFVarPopLong.class));
+    add(new AggregateDefinition("var_pop",     
VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY,   
GroupByDesc.Mode.HASH,         VectorUDAFVarPopLong.class));
+    add(new AggregateDefinition("variance",    
VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           
GroupByDesc.Mode.HASH,         VectorUDAFVarPopDouble.class));
+    add(new AggregateDefinition("var_pop",     
VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           
GroupByDesc.Mode.HASH,         VectorUDAFVarPopDouble.class));
+    add(new AggregateDefinition("variance",    
VectorExpressionDescriptor.ArgumentType.DECIMAL,                
GroupByDesc.Mode.HASH,         VectorUDAFVarPopDecimal.class));
+    add(new AggregateDefinition("var_pop",     
VectorExpressionDescriptor.ArgumentType.DECIMAL,                
GroupByDesc.Mode.HASH,         VectorUDAFVarPopDecimal.class));
+    add(new AggregateDefinition("var_samp",    
VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY,   
GroupByDesc.Mode.HASH,         VectorUDAFVarSampLong.class));
+    add(new AggregateDefinition("var_samp" ,   
VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           
GroupByDesc.Mode.HASH,         VectorUDAFVarSampDouble.class));
+    add(new AggregateDefinition("var_samp" ,   
VectorExpressionDescriptor.ArgumentType.DECIMAL,                
GroupByDesc.Mode.HASH,         VectorUDAFVarSampDecimal.class));
+    add(new AggregateDefinition("std",         
VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY,   
GroupByDesc.Mode.HASH,         VectorUDAFStdPopLong.class));
+    add(new AggregateDefinition("stddev",      
VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY,   
GroupByDesc.Mode.HASH,         VectorUDAFStdPopLong.class));
+    add(new AggregateDefinition("stddev_pop",  
VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY,   
GroupByDesc.Mode.HASH,         VectorUDAFStdPopLong.class));
+    add(new AggregateDefinition("std",         
VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           
GroupByDesc.Mode.HASH,         VectorUDAFStdPopDouble.class));
+    add(new AggregateDefinition("stddev",      
VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           
GroupByDesc.Mode.HASH,         VectorUDAFStdPopDouble.class));
+    add(new AggregateDefinition("stddev_pop",  
VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           
GroupByDesc.Mode.HASH,         VectorUDAFStdPopDouble.class));
+    add(new AggregateDefinition("std",         
VectorExpressionDescriptor.ArgumentType.DECIMAL,                
GroupByDesc.Mode.HASH,         VectorUDAFStdPopDecimal.class));
+    add(new AggregateDefinition("stddev",      
VectorExpressionDescriptor.ArgumentType.DECIMAL,                
GroupByDesc.Mode.HASH,         VectorUDAFStdPopDecimal.class));
+    add(new AggregateDefinition("stddev_pop",  
VectorExpressionDescriptor.ArgumentType.DECIMAL,                
GroupByDesc.Mode.HASH,         VectorUDAFStdPopDecimal.class));
+    add(new AggregateDefinition("stddev_samp", 
VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY,   
GroupByDesc.Mode.HASH,         VectorUDAFStdSampLong.class));
+    add(new AggregateDefinition("stddev_samp", 
VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           
GroupByDesc.Mode.HASH,         VectorUDAFStdSampDouble.class));
+    add(new AggregateDefinition("stddev_samp", 
VectorExpressionDescriptor.ArgumentType.DECIMAL,                
GroupByDesc.Mode.HASH,         VectorUDAFStdSampDecimal.class));
   }};
 
   public VectorAggregateExpression getAggregatorExpression(AggregationDesc 
desc, boolean isReduce)

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
 Tue Oct 14 19:06:45 2014
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.serde2.Col
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -131,13 +132,8 @@ public class VectorizedRowBatchCtx {
    */
   public void init(Configuration hiveConf, String fileKey,
       StructObjectInspector rowOI) {
-    MapredWork mapredWork = Utilities.getMapRedWork(hiveConf);
-    Map<String, Map<Integer, String>> scratchColumnVectorTypes;
-    if (mapredWork.getMapWork() != null) {
-      scratchColumnVectorTypes = 
mapredWork.getMapWork().getScratchColumnVectorTypes();
-    } else {
-      scratchColumnVectorTypes = 
mapredWork.getReduceWork().getScratchColumnVectorTypes();
-    }
+    Map<String, Map<Integer, String>> scratchColumnVectorTypes =
+            Utilities.getScratchColumnVectorTypes(hiveConf);
     columnTypeMap = scratchColumnVectorTypes.get(fileKey);
     this.rowOI= rowOI;
     this.rawRowOI = rowOI;
@@ -145,6 +141,20 @@ public class VectorizedRowBatchCtx {
   
 
   /**
+   * Initializes the VectorizedRowBatch context based on an scratch column 
type map and
+   * object inspector.
+   * @param columnTypeMap
+   * @param rowOI
+   *          Object inspector that shapes the column types
+   */
+  public void init(Map<Integer, String> columnTypeMap,
+      StructObjectInspector rowOI) {
+    this.columnTypeMap = columnTypeMap;
+    this.rowOI= rowOI;
+    this.rawRowOI = rowOI;
+  }
+
+  /**
    * Initializes VectorizedRowBatch context based on the
    * split and Hive configuration (Job conf with hive Plan).
    *
@@ -174,7 +184,7 @@ public class VectorizedRowBatchCtx {
 
     String partitionPath = split.getPath().getParent().toString();
     columnTypeMap = Utilities
-        .getMapRedWork(hiveConf).getMapWork().getScratchColumnVectorTypes()
+        .getScratchColumnVectorTypes(hiveConf)
         .get(partitionPath);
 
     Properties partProps =
@@ -476,7 +486,7 @@ public class VectorizedRowBatchCtx {
             lcv.isNull[0] = true;
             lcv.isRepeating = true;
           } else { 
-            lcv.fill(((Date) value).getTime());
+            lcv.fill(DateWritable.dateToDays((Date) value));
             lcv.isNull[0] = false;
           }          
         }
@@ -489,7 +499,7 @@ public class VectorizedRowBatchCtx {
             lcv.isNull[0] = true;
             lcv.isRepeating = true;
           } else { 
-            lcv.fill((long)(((Timestamp) value).getTime()));
+            lcv.fill(TimestampUtils.getTimeNanoSec((Timestamp) value));
             lcv.isNull[0] = false;
           }
         }

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java
 Tue Oct 14 19:06:45 2014
@@ -660,7 +660,7 @@ public final class VectorExpressionWrite
       @Override
       public Object writeValue(byte[] value, int start, int length) throws 
HiveException {
         this.text.set(value, start, length);
-        ((SettableStringObjectInspector) this.objectInspector).set(this.obj, 
this.text.toString());
+        ((SettableStringObjectInspector) this.objectInspector).set(this.obj, 
this.text);
         return this.obj;
       }
 
@@ -671,7 +671,7 @@ public final class VectorExpressionWrite
           field = initValue(null);
         }
         this.text.set(value, start, length);
-        ((SettableStringObjectInspector) this.objectInspector).set(field, 
this.text.toString());
+        ((SettableStringObjectInspector) this.objectInspector).set(field, 
this.text);
         return field;
       }
 
@@ -1060,6 +1060,31 @@ public final class VectorExpressionWrite
     closure.assign(writers, oids);
   }
 
+  /**
+   * Creates the value writers for an struct object inspector.
+   * Creates an appropriate output object inspector.
+   */
+  public static void processVectorInspector(
+      StructObjectInspector structObjInspector,
+      SingleOIDClosure closure)
+      throws HiveException {
+    List<? extends StructField> fields = 
structObjInspector.getAllStructFieldRefs();
+    VectorExpressionWriter[] writers = new 
VectorExpressionWriter[fields.size()];
+    List<ObjectInspector> oids = new 
ArrayList<ObjectInspector>(writers.length);
+    ArrayList<String> columnNames = new ArrayList<String>();
+    int i = 0;
+    for(StructField field : fields) {
+      ObjectInspector fieldObjInsp = field.getFieldObjectInspector();
+      writers[i] = VectorExpressionWriterFactory.
+                genVectorExpressionWritable(fieldObjInsp);
+      columnNames.add(field.getFieldName());
+      oids.add(writers[i].getObjectInspector());
+      i++;
+    }
+    ObjectInspector objectInspector = ObjectInspectorFactory.
+        getStandardStructObjectInspector(columnNames,oids);
+    closure.assign(writers, objectInspector);
+  }
 
   /**
    * Returns {@link VectorExpressionWriter} objects for the fields in the given

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java
 Tue Oct 14 19:06:45 2014
@@ -46,15 +46,7 @@ public class VectorUDAFCount extends Vec
 
       private static final long serialVersionUID = 1L;
 
-      transient private long value;
-      transient private boolean isNull;
-
-      public void initIfNull() {
-        if (isNull) {
-          isNull = false;
-          value = 0;
-        }
-      }
+      transient private long count;
 
       @Override
       public int getVariableSize() {
@@ -63,8 +55,7 @@ public class VectorUDAFCount extends Vec
 
       @Override
       public void reset() {
-        isNull = true;
-        value = 0L;
+        count = 0L;
       }
     }
 
@@ -131,8 +122,7 @@ public class VectorUDAFCount extends Vec
             aggregationBufferSets,
             aggregateIndex,
             i);
-          myagg.initIfNull();
-          myagg.value++;
+          myagg.count++;
         }
     }
 
@@ -148,8 +138,7 @@ public class VectorUDAFCount extends Vec
               aggregationBufferSets,
               aggregateIndex,
               i);
-            myagg.initIfNull();
-            myagg.value++;
+            myagg.count++;
           }
         }
     }
@@ -168,8 +157,7 @@ public class VectorUDAFCount extends Vec
               aggregationBufferSets,
               aggregateIndex,
               j);
-            myagg.initIfNull();
-            myagg.value++;
+            myagg.count++;
           }
         }
     }
@@ -191,17 +179,15 @@ public class VectorUDAFCount extends Vec
 
       Aggregation myagg = (Aggregation)agg;
 
-      myagg.initIfNull();
-
       if (inputVector.isRepeating) {
         if (inputVector.noNulls || !inputVector.isNull[0]) {
-          myagg.value += batchSize;
+          myagg.count += batchSize;
         }
         return;
       }
 
       if (inputVector.noNulls) {
-        myagg.value += batchSize;
+        myagg.count += batchSize;
         return;
       }
       else if (!batch.selectedInUse) {
@@ -221,7 +207,7 @@ public class VectorUDAFCount extends Vec
       for (int j=0; j< batchSize; ++j) {
         int i = selected[j];
         if (!isNull[i]) {
-          myagg.value += 1;
+          myagg.count += 1;
         }
       }
     }
@@ -233,7 +219,7 @@ public class VectorUDAFCount extends Vec
 
       for (int i=0; i< batchSize; ++i) {
         if (!isNull[i]) {
-          myagg.value += 1;
+          myagg.count += 1;
         }
       }
     }
@@ -251,14 +237,9 @@ public class VectorUDAFCount extends Vec
 
     @Override
     public Object evaluateOutput(AggregationBuffer agg) throws HiveException {
-    Aggregation myagg = (Aggregation) agg;
-      if (myagg.isNull) {
-        return null;
-      }
-      else {
-        result.set (myagg.value);
+      Aggregation myagg = (Aggregation) agg;
+      result.set (myagg.count);
       return result;
-      }
     }
 
     @Override

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java
 Tue Oct 14 19:06:45 2014
@@ -44,8 +44,7 @@ public class VectorUDAFCountStar extends
 
       private static final long serialVersionUID = 1L;
 
-      transient private long value;
-      transient private boolean isNull;
+      transient private long count;
 
       @Override
       public int getVariableSize() {
@@ -54,8 +53,7 @@ public class VectorUDAFCountStar extends
 
       @Override
       public void reset() {
-        isNull = true;
-        value = 0L;
+        count = 0L;
       }
     }
 
@@ -95,8 +93,7 @@ public class VectorUDAFCountStar extends
       for (int i=0; i < batchSize; ++i) {
         Aggregation myAgg = getCurrentAggregationBuffer(
             aggregationBufferSets, aggregateIndex, i);
-        myAgg.isNull = false;
-        ++myAgg.value;
+        ++myAgg.count;
       }
     }
 
@@ -111,8 +108,7 @@ public class VectorUDAFCountStar extends
       }
 
       Aggregation myagg = (Aggregation)agg;
-      myagg.isNull = false;
-      myagg.value += batchSize;
+      myagg.count += batchSize;
     }
 
     @Override
@@ -128,14 +124,9 @@ public class VectorUDAFCountStar extends
 
     @Override
     public Object evaluateOutput(AggregationBuffer agg) throws HiveException {
-    Aggregation myagg = (Aggregation) agg;
-      if (myagg.isNull) {
-        return null;
-      }
-      else {
-        result.set (myagg.value);
+      Aggregation myagg = (Aggregation) agg;
+      result.set (myagg.count);
       return result;
-      }
     }
 
     @Override

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java 
(original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java 
Tue Oct 14 19:06:45 2014
@@ -147,7 +147,7 @@ public class HookContext {
  }
 
   public String getOperationName() {
-    return SessionState.get().getHiveOperation().name();
+    return queryPlan.getOperationName();
   }
 
   public String getUserName() {

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
 Tue Oct 14 19:06:45 2014
@@ -155,6 +155,8 @@ public interface AcidInputFormat<KEY ext
   public static interface RawReader<V>
       extends RecordReader<RecordIdentifier, V> {
     public ObjectInspector getObjectInspector();
+
+    public boolean isDelete(V value);
   }
 
   /**

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java 
(original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java 
Tue Oct 14 19:06:45 2014
@@ -40,6 +40,8 @@ import java.util.regex.Pattern;
  * are used by the compactor and cleaner and thus must be format agnostic.
  */
 public class AcidUtils {
+  // This key will be put in the conf file when planning an acid operation
+  public static final String CONF_ACID_KEY = "hive.doing.acid";
   public static final String BASE_PREFIX = "base_";
   public static final String DELTA_PREFIX = "delta_";
   public static final PathFilter deltaFileFilter = new PathFilter() {
@@ -305,6 +307,28 @@ public class AcidUtils {
   }
 
   /**
+   * Is the given directory in ACID format?
+   * @param directory the partition directory to check
+   * @param conf the query configuration
+   * @return true, if it is an ACID directory
+   * @throws IOException
+   */
+  public static boolean isAcid(Path directory,
+                               Configuration conf) throws IOException {
+    FileSystem fs = directory.getFileSystem(conf);
+    for(FileStatus file: fs.listStatus(directory)) {
+      String filename = file.getPath().getName();
+      if (filename.startsWith(BASE_PREFIX) ||
+          filename.startsWith(DELTA_PREFIX)) {
+        if (file.isDir()) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
    * Get the ACID state of the given directory. It finds the minimal set of
    * base and diff directories. Note that because major compactions don't
    * preserve the history, we can't use a base directory that includes a
@@ -324,7 +348,7 @@ public class AcidUtils {
     long bestBaseTxn = 0;
     final List<ParsedDelta> deltas = new ArrayList<ParsedDelta>();
     List<ParsedDelta> working = new ArrayList<ParsedDelta>();
-    final List<FileStatus> original = new ArrayList<FileStatus>();
+    List<FileStatus> originalDirectories = new ArrayList<FileStatus>();
     final List<FileStatus> obsolete = new ArrayList<FileStatus>();
     List<FileStatus> children = SHIMS.listLocatedStatus(fs, directory,
         hiddenFileFilter);
@@ -351,16 +375,26 @@ public class AcidUtils {
           working.add(delta);
         }
       } else {
-        findOriginals(fs, child, original);
+        // This is just the directory.  We need to recurse and find the actual 
files.  But don't
+        // do this until we have determined there is no base.  This saves 
time.  Plus,
+        // it is possible that the cleaner is running and removing these 
original files,
+        // in which case recursing through them could cause us to get an error.
+        originalDirectories.add(child);
       }
     }
 
+    final List<FileStatus> original = new ArrayList<FileStatus>();
     // if we have a base, the original files are obsolete.
     if (bestBase != null) {
-      obsolete.addAll(original);
       // remove the entries so we don't get confused later and think we should
       // use them.
       original.clear();
+    } else {
+      // Okay, we're going to need these originals.  Recurse through them and 
figure out what we
+      // really need.
+      for (FileStatus origDir : originalDirectories) {
+        findOriginals(fs, origDir, original);
+      }
     }
 
     Collections.sort(working);

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
 Tue Oct 14 19:06:45 2014
@@ -122,24 +122,7 @@ public class BucketizedHiveInputFormat<K
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
     init(job);
 
-    Path[] dirs = FileInputFormat.getInputPaths(job);
-    if (dirs.length == 0) {
-      // on tez we're avoiding to duplicate the file info in FileInputFormat.
-      if (HiveConf.getVar(job, 
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
-        try {
-          List<Path> paths = Utilities.getInputPathsTez(job, mrwork);
-          dirs = paths.toArray(new Path[paths.size()]);
-          if (dirs.length == 0) {
-            // if we still don't have any files it's time to fail.
-            throw new IOException("No input paths specified in job");
-          }
-        } catch (Exception e) {
-          throw new IOException("Could not create input paths", e);
-        }
-      } else {
-        throw new IOException("No input paths specified in job");
-      }
-    }
+    Path[] dirs = getInputPaths(job);
 
     JobConf newjob = new JobConf(job);
     ArrayList<InputSplit> result = new ArrayList<InputSplit>();

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
 Tue Oct 14 19:06:45 2014
@@ -33,6 +33,7 @@ import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -264,8 +265,8 @@ public class CombineHiveInputFormat<K ex
   /**
    * Create Hive splits based on CombineFileSplit.
    */
-  @Override
-  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
+  private InputSplit[] getCombineSplits(JobConf job,
+                                        int numSplits) throws IOException {
     PerfLogger perfLogger = PerfLogger.getPerfLogger();
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS);
     init(job);
@@ -274,17 +275,6 @@ public class CombineHiveInputFormat<K ex
       mrwork.getAliasToWork();
     CombineFileInputFormatShim combine = ShimLoader.getHadoopShims()
         .getCombineFileInputFormat();
-    
-    // on tez we're avoiding duplicating path info since the info will go over
-    // rpc
-    if (HiveConf.getVar(job, 
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
-      try {
-        List<Path> dirs = Utilities.getInputPathsTez(job, mrwork);
-        Utilities.setInputPaths(job, dirs);
-      } catch (Exception e) {
-        throw new IOException("Could not create input paths", e);
-      }
-    }
 
     InputSplit[] splits = null;
     if (combine == null) {
@@ -327,13 +317,6 @@ public class CombineHiveInputFormat<K ex
         // ignore
       }
       FileSystem inpFs = path.getFileSystem(job);
-      if (inputFormatClass.isAssignableFrom(OrcInputFormat.class)) {
-        if (inpFs.exists(new Path(path, OrcRecordUpdater.ACID_FORMAT))) {
-          throw new IOException("CombineHiveInputFormat is incompatible " +
-            " with ACID tables. Please set hive.input.format=" +
-              "org.apache.hadoop.hive.ql.io.HiveInputFormat");
-        }
-      }
 
       // Since there is no easy way of knowing whether MAPREDUCE-1597 is 
present in the tree or not,
       // we use a configuration variable for the same
@@ -461,6 +444,84 @@ public class CombineHiveInputFormat<K ex
     return result.toArray(new CombineHiveInputSplit[result.size()]);
   }
 
+  /**
+   * Create Hive splits based on CombineFileSplit.
+   */
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
+    init(job);
+    Map<String, ArrayList<String>> pathToAliases = mrwork.getPathToAliases();
+    Map<String, Operator<? extends OperatorDesc>> aliasToWork =
+        mrwork.getAliasToWork();
+
+    ArrayList<InputSplit> result = new ArrayList<InputSplit>();
+
+    Path[] paths = getInputPaths(job);
+
+    List<Path> nonCombinablePaths = new ArrayList<Path>(paths.length / 2);
+    List<Path> combinablePaths = new ArrayList<Path>(paths.length / 2);
+
+    for (Path path : paths) {
+
+      PartitionDesc part =
+          HiveFileFormatUtils.getPartitionDescFromPathRecursively(
+              pathToPartitionInfo, path,
+              IOPrepareCache.get().allocatePartitionDescMap());
+
+      // Use HiveInputFormat if any of the paths is not splittable
+      Class inputFormatClass = part.getInputFileFormatClass();
+      String inputFormatClassName = inputFormatClass.getName();
+      InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
+      if (inputFormat instanceof AvoidSplitCombination &&
+          ((AvoidSplitCombination) inputFormat).shouldSkipCombine(path, job)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("The split [" + path +
+              "] is being parked for HiveInputFormat.getSplits");
+        }
+        nonCombinablePaths.add(path);
+      } else {
+        combinablePaths.add(path);
+      }
+    }
+
+    // Store the previous value for the path specification
+    String oldPaths = job.get(HiveConf.ConfVars.HADOOPMAPREDINPUTDIR.varname);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("The received input paths are: [" + oldPaths +
+          "] against the property "
+          + HiveConf.ConfVars.HADOOPMAPREDINPUTDIR.varname);
+    }
+
+    // Process the normal splits
+    if (nonCombinablePaths.size() > 0) {
+      FileInputFormat.setInputPaths(job, nonCombinablePaths.toArray
+          (new Path[nonCombinablePaths.size()]));
+      InputSplit[] splits = super.getSplits(job, numSplits);
+      for (InputSplit split : splits) {
+        result.add(split);
+      }
+    }
+
+    // Process the combine splits
+    if (combinablePaths.size() > 0) {
+      FileInputFormat.setInputPaths(job, combinablePaths.toArray
+          (new Path[combinablePaths.size()]));
+      InputSplit[] splits = getCombineSplits(job, numSplits);
+      for (InputSplit split : splits) {
+        result.add(split);
+      }
+    }
+
+    // Restore the old path information back
+    // This is just to prevent incompatibilities with previous versions Hive
+    // if some application depends on the original value being set.
+    if (oldPaths != null) {
+      job.set(HiveConf.ConfVars.HADOOPMAPREDINPUTDIR.varname, oldPaths);
+    }
+    LOG.info("Number of all splits " + result.size());
+    return result.toArray(new InputSplit[result.size()]);
+  }
+
   private void processPaths(JobConf job, CombineFileInputFormatShim combine,
       List<InputSplitShim> iss, Path... path) throws IOException {
     JobConf currJob = new JobConf(job);
@@ -635,4 +696,12 @@ public class CombineHiveInputFormat<K ex
       return s.toString();
     }
   }
+
+    /**
+     * This is a marker interface that is used to identify the formats where
+     * combine split generation is not applicable
+     */
+  public interface AvoidSplitCombination {
+    boolean shouldSkipCombine(Path path, Configuration conf) throws 
IOException;
+  }
 }

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
 Tue Oct 14 19:06:45 2014
@@ -161,10 +161,11 @@ public abstract class HiveContextAwareRe
   }
 
   public IOContext getIOContext() {
-    return IOContext.get();
+    return IOContext.get(jobConf.get(Utilities.INPUT_NAME));
   }
 
-  public void initIOContext(long startPos, boolean isBlockPointer, Path 
inputPath) {
+  private void initIOContext(long startPos, boolean isBlockPointer,
+      Path inputPath) {
     ioCxtRef = this.getIOContext();
     ioCxtRef.currentBlockStart = startPos;
     ioCxtRef.isBlockPointer = isBlockPointer;
@@ -183,7 +184,7 @@ public abstract class HiveContextAwareRe
 
     boolean blockPointer = false;
     long blockStart = -1;
-    FileSplit fileSplit = (FileSplit) split;
+    FileSplit fileSplit = split;
     Path path = fileSplit.getPath();
     FileSystem fs = path.getFileSystem(job);
     if (inputFormatClass.getName().contains("SequenceFile")) {
@@ -202,12 +203,15 @@ public abstract class HiveContextAwareRe
       blockStart = in.getPosition();
       in.close();
     }
+    this.jobConf = job;
     this.initIOContext(blockStart, blockPointer, path.makeQualified(fs));
 
     this.initIOContextSortedProps(split, recordReader, job);
   }
 
   public void initIOContextSortedProps(FileSplit split, RecordReader 
recordReader, JobConf job) {
+    this.jobConf = job;
+
     this.getIOContext().resetSortingValues();
     this.isSorted = jobConf.getBoolean("hive.input.format.sorted", false);
 

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
 Tue Oct 14 19:06:45 2014
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
@@ -279,7 +280,14 @@ public class HiveInputFormat<K extends W
   }
 
   protected void init(JobConf job) {
-    mrwork = Utilities.getMapWork(job);
+    if (HiveConf.getVar(job, 
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+      mrwork = (MapWork) Utilities.getMergeWork(job);
+      if (mrwork == null) {
+        mrwork = Utilities.getMapWork(job);
+      }
+    } else {
+      mrwork = Utilities.getMapWork(job);
+    }
     pathToPartitionInfo = mrwork.getPathToPartitionInfo();
   }
 
@@ -321,11 +329,7 @@ public class HiveInputFormat<K extends W
     }
   }
 
-  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
-    PerfLogger perfLogger = PerfLogger.getPerfLogger();
-    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS);
-    init(job);
-
+  Path[] getInputPaths(JobConf job) throws IOException {
     Path[] dirs = FileInputFormat.getInputPaths(job);
     if (dirs.length == 0) {
       // on tez we're avoiding to duplicate the file info in FileInputFormat.
@@ -340,6 +344,14 @@ public class HiveInputFormat<K extends W
         throw new IOException("No input paths specified in job");
       }
     }
+    return dirs;
+  }
+
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
+    PerfLogger perfLogger = PerfLogger.getPerfLogger();
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS);
+    init(job);
+    Path[] dirs = getInputPaths(job);
     JobConf newjob = new JobConf(job);
     List<InputSplit> result = new ArrayList<InputSplit>();
 
@@ -442,6 +454,9 @@ public class HiveInputFormat<K extends W
 
   public static void pushFilters(JobConf jobConf, TableScanOperator tableScan) 
{
 
+    // ensure filters are not set from previous pushFilters
+    jobConf.unset(TableScanDesc.FILTER_TEXT_CONF_STR);
+    jobConf.unset(TableScanDesc.FILTER_EXPR_CONF_STR);
     TableScanDesc scanDesc = tableScan.getConf();
     if (scanDesc == null) {
       return;

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java 
(original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java 
Tue Oct 14 19:06:45 2014
@@ -18,7 +18,13 @@
 
 package org.apache.hadoop.hive.ql.io;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin;
 import org.apache.hadoop.hive.ql.session.SessionState;
 
 
@@ -32,20 +38,25 @@ import org.apache.hadoop.hive.ql.session
  */
 public class IOContext {
 
-
   private static ThreadLocal<IOContext> threadLocal = new 
ThreadLocal<IOContext>(){
     @Override
     protected synchronized IOContext initialValue() { return new IOContext(); }
  };
 
- private static IOContext ioContext = new IOContext();
+  private static Map<String, IOContext> inputNameIOContextMap = new 
HashMap<String, IOContext>();
+  private static IOContext ioContext = new IOContext();
 
-  public static IOContext get() {
-    if (SessionState.get() == null) {
-      // this happens on the backend. only one io context needed.
-      return ioContext;
+  public static Map<String, IOContext> getMap() {
+    return inputNameIOContextMap;
+  }
+
+  public static IOContext get(String inputName) {
+    if (inputNameIOContextMap.containsKey(inputName) == false) {
+      IOContext ioContext = new IOContext();
+      inputNameIOContextMap.put(inputName, ioContext);
     }
-    return IOContext.threadLocal.get();
+
+    return inputNameIOContextMap.get(inputName);
   }
 
   public static void clear() {

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java
 Tue Oct 14 19:06:45 2014
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.EnumSet;
 
+import javax.annotation.Nullable;
+
 interface CompressionCodec {
 
   public enum Modifier {
@@ -62,6 +64,6 @@ interface CompressionCodec {
    * @param modifiers compression modifiers
    * @return codec for use after optional modification
    */
-  CompressionCodec modify(EnumSet<Modifier> modifiers);
+  CompressionCodec modify(@Nullable EnumSet<Modifier> modifiers);
 
 }

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
 Tue Oct 14 19:06:45 2014
@@ -24,6 +24,8 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
+import java.util.NavigableMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -49,6 +51,7 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
 import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
@@ -103,7 +106,9 @@ import com.google.common.util.concurrent
  */
 public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
   InputFormatChecker, VectorizedInputFormatInterface,
-    AcidInputFormat<NullWritable, OrcStruct>, 
LlapWrappableInputFormatInterface {
+    AcidInputFormat<NullWritable, OrcStruct>, 
+    CombineHiveInputFormat.AvoidSplitCombination,
+    LlapWrappableInputFormatInterface {
 
   private static final Log LOG = LogFactory.getLog(OrcInputFormat.class);
   static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
@@ -128,6 +133,12 @@ public class OrcInputFormat  implements 
    */
   private static final double MIN_INCLUDED_LOCATION = 0.80;
 
+  @Override
+  public boolean shouldSkipCombine(Path path,
+                                   Configuration conf) throws IOException {
+    return (conf.get(AcidUtils.CONF_ACID_KEY) != null) || 
AcidUtils.isAcid(path, conf);
+  }
+
   private static class OrcRecordReader
       implements org.apache.hadoop.mapred.RecordReader<NullWritable, 
OrcStruct>,
       StatsProvidingRecordReader {
@@ -611,7 +622,7 @@ public class OrcInputFormat  implements 
     private final FileSystem fs;
     private final FileStatus file;
     private final long blockSize;
-    private final BlockLocation[] locations;
+    private final TreeMap<Long, BlockLocation> locations;
     private final FileInfo fileInfo;
     private List<StripeInformation> stripes;
     private ReaderImpl.FileMetaInfo fileMetaInfo;
@@ -631,7 +642,7 @@ public class OrcInputFormat  implements 
       this.file = file;
       this.blockSize = file.getBlockSize();
       this.fileInfo = fileInfo;
-      locations = SHIMS.getLocations(fs, file);
+      locations = SHIMS.getLocationsWithOffset(fs, file);
       this.isOriginal = isOriginal;
       this.deltas = deltas;
       this.hasBase = hasBase;
@@ -642,8 +653,8 @@ public class OrcInputFormat  implements 
     }
 
     void schedule() throws IOException {
-      if(locations.length == 1 && file.getLen() < context.maxSize) {
-        String[] hosts = locations[0].getHosts();
+      if(locations.size() == 1 && file.getLen() < context.maxSize) {
+        String[] hosts = locations.firstEntry().getValue().getHosts();
         synchronized (context.splits) {
           context.splits.add(new OrcSplit(file.getPath(), 0, file.getLen(),
                 hosts, fileMetaInfo, isOriginal, hasBase, deltas));
@@ -691,15 +702,22 @@ public class OrcInputFormat  implements 
     void createSplit(long offset, long length,
                      ReaderImpl.FileMetaInfo fileMetaInfo) throws IOException {
       String[] hosts;
-      if ((offset % blockSize) + length <= blockSize) {
+      Map.Entry<Long, BlockLocation> startEntry = locations.floorEntry(offset);
+      BlockLocation start = startEntry.getValue();
+      if (offset + length <= start.getOffset() + start.getLength()) {
         // handle the single block case
-        hosts = locations[(int) (offset / blockSize)].getHosts();
+        hosts = start.getHosts();
       } else {
+        Map.Entry<Long, BlockLocation> endEntry = locations.floorEntry(offset 
+ length);
+        BlockLocation end = endEntry.getValue();
+        //get the submap
+        NavigableMap<Long, BlockLocation> navigableMap = 
locations.subMap(startEntry.getKey(),
+                  true, endEntry.getKey(), true);
         // Calculate the number of bytes in the split that are local to each
         // host.
         Map<String, LongWritable> sizes = new HashMap<String, LongWritable>();
         long maxSize = 0;
-        for(BlockLocation block: locations) {
+        for (BlockLocation block : navigableMap.values()) {
           long overlap = getOverlap(offset, length, block.getOffset(),
               block.getLength());
           if (overlap > 0) {
@@ -712,6 +730,9 @@ public class OrcInputFormat  implements 
               val.set(val.get() + overlap);
               maxSize = Math.max(maxSize, val.get());
             }
+          } else {
+            throw new IOException("File " + file.getPath().toString() +
+                    " should have had overlap on block starting at " + 
block.getOffset());
           }
         }
         // filter the list of locations to those that have at least 80% of the
@@ -719,7 +740,7 @@ public class OrcInputFormat  implements 
         long threshold = (long) (maxSize * MIN_INCLUDED_LOCATION);
         List<String> hostList = new ArrayList<String>();
         // build the locations in a predictable order to simplify testing
-        for(BlockLocation block: locations) {
+        for(BlockLocation block: navigableMap.values()) {
           for(String host: block.getHosts()) {
             if (sizes.containsKey(host)) {
               if (sizes.get(host).get() >= threshold) {
@@ -1115,7 +1136,7 @@ public class OrcInputFormat  implements 
 
       @Override
       public ObjectInspector getObjectInspector() {
-        return ((StructObjectInspector) reader.getObjectInspector())
+        return ((StructObjectInspector) records.getObjectInspector())
             .getAllStructFieldRefs().get(OrcRecordUpdater.ROW)
             .getFieldObjectInspector();
       }

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
 Tue Oct 14 19:06:45 2014
@@ -118,13 +118,11 @@ public class OrcNewInputFormat extends I
   public List<InputSplit> getSplits(JobContext jobContext)
       throws IOException, InterruptedException {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
-    Configuration conf =
-        ShimLoader.getHadoopShims().getConfiguration(jobContext);
     List<OrcSplit> splits =
         OrcInputFormat.generateSplitsInfo(ShimLoader.getHadoopShims()
         .getConfiguration(jobContext));
-    List<InputSplit> result = new ArrayList<InputSplit>();
-    for(OrcSplit split: OrcInputFormat.generateSplitsInfo(conf)) {
+    List<InputSplit> result = new ArrayList<InputSplit>(splits.size());
+    for(OrcSplit split: splits) {
       result.add(new OrcNewSplit(split));
     }
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
 Tue Oct 14 19:06:45 2014
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.common.Val
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
@@ -37,9 +38,10 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 
 import java.io.IOException;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
+import java.util.Deque;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -448,6 +450,10 @@ public class OrcRawRecordMerger implemen
 
     // we always want to read all of the deltas
     eventOptions.range(0, Long.MAX_VALUE);
+    // Turn off the sarg before pushing it to delta.  We never want to push a 
sarg to a delta as
+    // it can produce wrong results (if the latest valid version of the record 
is filtered out by
+    // the sarg) or ArrayOutOfBounds errors (when the sarg is applied to a 
delete record)
+    eventOptions.searchArgument(null, null);
     if (deltaDirectory != null) {
       for(Path delta: deltaDirectory) {
         ReaderKey key = new ReaderKey();
@@ -627,8 +633,16 @@ public class OrcRawRecordMerger implemen
 
     // Parse the configuration parameters
     ArrayList<String> columnNames = new ArrayList<String>();
+    Deque<Integer> virtualColumns = new ArrayDeque<Integer>();
     if (columnNameProperty != null && columnNameProperty.length() > 0) {
-      Collections.addAll(columnNames, columnNameProperty.split(","));
+      String[] colNames = columnNameProperty.split(",");
+      for (int i = 0; i < colNames.length; i++) {
+        if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(colNames[i])) {
+          virtualColumns.addLast(i);
+        } else {
+          columnNames.add(colNames[i]);
+        }
+      }
     }
     if (columnTypeProperty == null) {
       // Default type: all string
@@ -644,6 +658,9 @@ public class OrcRawRecordMerger implemen
 
     ArrayList<TypeInfo> fieldTypes =
         TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+    while (virtualColumns.size() > 0) {
+      fieldTypes.remove(virtualColumns.removeLast());
+    }
     StructTypeInfo rowType = new StructTypeInfo();
     rowType.setAllStructFieldNames(columnNames);
     rowType.setAllStructFieldTypeInfos(fieldTypes);
@@ -651,6 +668,11 @@ public class OrcRawRecordMerger implemen
         (OrcStruct.createObjectInspector(rowType));
   }
 
+  @Override
+  public boolean isDelete(OrcStruct value) {
+    return OrcRecordUpdater.getOperation(value) == 
OrcRecordUpdater.DELETE_OPERATION;
+  }
+
   /**
    * Get the number of columns in the underlying rows.
    * @return 0 if there are no base and no deltas.

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
 Tue Oct 14 19:06:45 2014
@@ -81,6 +81,7 @@ import com.google.common.collect.Compari
 class RecordReaderImpl implements RecordReader {
 
   private static final Log LOG = LogFactory.getLog(RecordReaderImpl.class);
+  private static final boolean isLogTraceEnabled = LOG.isTraceEnabled();
 
   private final FSDataInputStream file;
   private final long firstRow;
@@ -3326,9 +3327,9 @@ class RecordReaderImpl implements Record
     // find the next row
     rowInStripe += 1;
     advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("row from " + reader.path);
-      LOG.debug("orc row = " + result);
+    if (isLogTraceEnabled) {
+      LOG.trace("row from " + reader.path);
+      LOG.trace("orc row = " + result);
     }
     return result;
   }


Reply via email to