HIVE-13617 : LLAP: support non-vectorized execution in IO (Sergey Shelukhin, 
reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b772fed0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b772fed0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b772fed0

Branch: refs/heads/master
Commit: b772fed09ec39e883ff034b806038fd49812d398
Parents: af378c0
Author: Sergey Shelukhin <[email protected]>
Authored: Wed Jun 22 18:04:02 2016 -0700
Committer: Sergey Shelukhin <[email protected]>
Committed: Wed Jun 22 18:08:20 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |    4 +
 .../test/resources/testconfiguration.properties |    3 +-
 .../hive/llap/io/api/impl/LlapInputFormat.java  |  121 +-
 .../apache/hadoop/hive/ql/exec/MapOperator.java |    7 +-
 .../hive/ql/io/BatchToRowInputFormat.java       |   31 +
 .../hadoop/hive/ql/io/BatchToRowReader.java     |  623 +++++++++
 .../hadoop/hive/ql/io/HiveInputFormat.java      |   17 +-
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |   14 +-
 .../hive/ql/io/orc/OrcOiBatchToRowReader.java   |   71 +
 .../hive/ql/optimizer/GenMapRedUtils.java       |    6 +-
 .../org/apache/hadoop/hive/ql/plan/MapWork.java |   20 +-
 .../queries/clientpositive/orc_llap_nonvector.q |   42 +
 .../llap/dynamic_partition_pruning.q.out        |    6 +-
 .../llap/hybridgrace_hashjoin_1.q.out           |   24 +-
 .../results/clientpositive/llap/llap_udf.q.out  |    4 +-
 .../clientpositive/llap/llapdecider.q.out       |   24 +-
 .../clientpositive/llap/mapjoin_decimal.q.out   |    4 +-
 .../llap/orc_llap_nonvector.q.out               | 1310 ++++++++++++++++++
 .../clientpositive/llap/orc_ppd_basic.q.out     |   28 +-
 .../llap/tez_bmj_schema_evolution.q.out         |    2 +-
 .../llap/tez_dynpart_hashjoin_1.q.out           |   24 +-
 .../llap/tez_dynpart_hashjoin_2.q.out           |    6 +-
 .../llap/tez_union_group_by.q.out               |   10 +-
 .../llap/tez_vector_dynpart_hashjoin_1.q.out    |   12 +-
 .../llap/tez_vector_dynpart_hashjoin_2.q.out    |    2 +-
 .../vectorized_dynamic_partition_pruning.q.out  |    4 +-
 .../clientpositive/orc_llap_nonvector.q.out     | 1302 +++++++++++++++++
 .../clientpositive/tez/llapdecider.q.out        |   24 +-
 28 files changed, 3620 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 3783dc4..08ee23f 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2628,7 +2628,11 @@ public class HiveConf extends Configuration {
         "hive.tez.exec.inplace.progress",
         true,
         "Updates tez job execution progress in-place in the terminal."),
+    // The default is different on the client and server, so it's null here.
     LLAP_IO_ENABLED("hive.llap.io.enabled", null, "Whether the LLAP IO layer 
is enabled."),
+    
LLAP_IO_NONVECTOR_WRAPPER_ENABLED("hive.llap.io.nonvector.wrapper.enabled", 
true,
+        "Whether the LLAP IO layer is enabled for non-vectorized queries that 
read inputs\n" +
+        "that can be vectorized"),
     LLAP_IO_MEMORY_MODE("hive.llap.io.memory.mode", "cache",
         new StringSet("cache", "allocator", "none"),
         "LLAP IO memory usage; 'cache' (the default) uses data and metadata 
cache with a\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties 
b/itests/src/test/resources/testconfiguration.properties
index 2b40cd9..2f04881 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -524,7 +524,8 @@ minillap.shared.query.files=bucket_map_join_tez1.q,\
   tez_union_multiinsert.q
 
 
-minillap.query.files=llap_udf.q
+minillap.query.files=llap_udf.q,\
+  orc_llap_nonvector.q
 
 
 encrypted.query.files=encryption_join_unencrypted_tbl.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
index 298f788..b945de3 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
@@ -19,19 +19,33 @@
 
 package org.apache.hadoop.hive.llap.io.api.impl;
 
+import org.apache.hadoop.hive.ql.io.BatchToRowInputFormat;
+
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+
+import com.google.common.base.Joiner;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
-import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
 import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
 import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
 import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
 import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
@@ -40,11 +54,17 @@ import 
org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.AvoidSplitCombination
 import org.apache.hadoop.hive.ql.io.LlapAwareSplit;
 import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOiBatchToRowReader;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
@@ -57,16 +77,12 @@ import org.apache.tez.common.counters.TezCounters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-
 public class LlapInputFormat implements InputFormat<NullWritable, 
VectorizedRowBatch>,
     VectorizedInputFormatInterface, SelfDescribingInputFormatInterface,
     AvoidSplitCombination {
+  private static final String NONVECTOR_SETTING_MESSAGE = "disable "
+      + ConfVars.LLAP_IO_NONVECTOR_WRAPPER_ENABLED.varname + " to work around 
this error";
+
   @SuppressWarnings("rawtypes")
   private final InputFormat sourceInputFormat;
   private final AvoidSplitCombination sourceASC;
@@ -95,29 +111,35 @@ public class LlapInputFormat implements 
InputFormat<NullWritable, VectorizedRowB
     if (split instanceof LlapAwareSplit) {
       useLlapIo = ((LlapAwareSplit)split).canUseLlapIo();
     }
+    boolean isVectorized = Utilities.getUseVectorizedInputFileFormat(job);
     if (!useLlapIo) {
       LlapIoImpl.LOG.warn("Not using LLAP IO for an unsupported split: " + 
split);
-      @SuppressWarnings("unchecked")
-      RecordReader<NullWritable, VectorizedRowBatch> rr =
-          sourceInputFormat.getRecordReader(split, job, reporter);
-      return rr;
-    }
-    boolean isVectorMode = Utilities.getUseVectorizedInputFileFormat(job);
-    if (!isVectorMode) {
-      LlapIoImpl.LOG.error("No LLAP IO in non-vectorized mode");
-      throw new UnsupportedOperationException("No LLAP IO in non-vectorized 
mode");
+      return sourceInputFormat.getRecordReader(split, job, reporter);
     }
     FileSplit fileSplit = (FileSplit)split;
     reporter.setStatus(fileSplit.toString());
     try {
       List<Integer> includedCols = ColumnProjectionUtils.isReadAllColumns(job)
           ? null : ColumnProjectionUtils.getReadColumnIDs(job);
-      return new LlapRecordReader(job, fileSplit, includedCols, hostName);
+      LlapRecordReader rr = new LlapRecordReader(job, fileSplit, includedCols, 
hostName);
+      if (isVectorized) return rr;
+      if (sourceInputFormat instanceof BatchToRowInputFormat) {
+        return bogusCast(((BatchToRowInputFormat)sourceInputFormat).getWrapper(
+            rr, rr.getVectorizedRowBatchCtx(), includedCols));
+      }
+      return sourceInputFormat.getRecordReader(split, job, reporter);
     } catch (Exception ex) {
       throw new IOException(ex);
     }
   }
 
+  // Returning either a vectorized or non-vectorized reader from the same call 
requires breaking
+  // generics... this is how vectorization currently works.
+  @SuppressWarnings("unchecked")
+  private static <A, B, C, D> RecordReader<A, B> bogusCast(RecordReader<C, D> 
rr) {
+    return (RecordReader<A, B>)rr;
+  }
+
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
     return sourceInputFormat.getSplits(job, numSplits);
@@ -146,9 +168,8 @@ public class LlapInputFormat implements 
InputFormat<NullWritable, VectorizedRowB
     private final QueryFragmentCounters counters;
     private long firstReturnTime;
 
-    public LlapRecordReader(
-        JobConf job, FileSplit split, List<Integer> includedCols, String 
hostName)
-            throws IOException {
+    public LlapRecordReader(JobConf job, FileSplit split, List<Integer> 
includedCols,
+        String hostName) throws IOException, HiveException {
       this.split = split;
       this.columnIds = includedCols;
       this.sarg = ConvertAstToSearchArg.createFromConf(job);
@@ -171,7 +192,8 @@ public class LlapInputFormat implements 
InputFormat<NullWritable, VectorizedRowB
       this.counters.setDesc(QueryFragmentCounters.Desc.MACHINE, hostName);
 
       MapWork mapWork = Utilities.getMapWork(job);
-      rbCtx = mapWork.getVectorizedRowBatchCtx();
+      VectorizedRowBatchCtx ctx = mapWork.getVectorizedRowBatchCtx();
+      rbCtx = ctx != null ? ctx : createFakeVrbCtx(mapWork);
 
       columnsToIncludeTruncated = rbCtx.getColumnsToIncludeTruncated(job);
 
@@ -232,6 +254,9 @@ public class LlapInputFormat implements 
InputFormat<NullWritable, VectorizedRowB
       return true;
     }
 
+    public VectorizedRowBatchCtx getVectorizedRowBatchCtx() {
+      return rbCtx;
+    }
 
     private final class UncaughtErrorHandler implements FutureCallback<Void> {
       @Override
@@ -371,4 +396,54 @@ public class LlapInputFormat implements 
InputFormat<NullWritable, VectorizedRowB
   public boolean shouldSkipCombine(Path path, Configuration conf) throws 
IOException {
     return sourceASC == null ? false : sourceASC.shouldSkipCombine(path, conf);
   }
+
+  private static VectorizedRowBatchCtx createFakeVrbCtx(MapWork mapWork) 
throws HiveException {
+    // This is based on Vectorizer code, minus the validation.
+
+    // Add all non-virtual columns from the TableScan operator.
+    RowSchema rowSchema = findTsOp(mapWork).getSchema();
+    final List<String> colNames = new 
ArrayList<String>(rowSchema.getSignature().size());
+    final List<TypeInfo> colTypes = new 
ArrayList<TypeInfo>(rowSchema.getSignature().size());
+    for (ColumnInfo c : rowSchema.getSignature()) {
+      String columnName = c.getInternalName();
+      if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(columnName)) continue;
+      colNames.add(columnName);
+      colTypes.add(TypeInfoUtils.getTypeInfoFromTypeString(c.getTypeName()));
+    }
+
+    // Determine the partition columns using the first partition descriptor.
+    // Note - like vectorizer, this assumes partition columns go after data 
columns.
+    int partitionColumnCount = 0;
+    Iterator<String> paths = mapWork.getPathToAliases().keySet().iterator();
+    if (paths.hasNext()) {
+      PartitionDesc partDesc = 
mapWork.getPathToPartitionInfo().get(paths.next());
+      if (partDesc != null) {
+        LinkedHashMap<String, String> partSpec = partDesc.getPartSpec();
+        if (partSpec != null && partSpec.isEmpty()) {
+          partitionColumnCount = partSpec.size();
+        }
+      }
+    }
+    return new VectorizedRowBatchCtx(colNames.toArray(new 
String[colNames.size()]),
+        colTypes.toArray(new TypeInfo[colTypes.size()]), partitionColumnCount, 
new String[0]);
+  }
+
+  static TableScanOperator findTsOp(MapWork mapWork) throws HiveException {
+    if (mapWork.getAliasToWork() == null) {
+      throw new HiveException("Unexpected - aliasToWork is missing; " + 
NONVECTOR_SETTING_MESSAGE);
+    }
+    Iterator<Operator<?>> ops = mapWork.getAliasToWork().values().iterator();
+    TableScanOperator tableScanOperator = null;
+    while (ops.hasNext()) {
+      Operator<?> op = ops.next();
+      if (op instanceof TableScanOperator) {
+        if (tableScanOperator != null) {
+          throw new HiveException("Unexpected - more than one TSOP; " + 
NONVECTOR_SETTING_MESSAGE);
+        }
+        tableScanOperator = (TableScanOperator)op;
+      }
+    }
+    return tableScanOperator;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
index afe5ee2..fabfbc4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
@@ -491,7 +491,12 @@ public class MapOperator extends AbstractMapOperator {
         }
       } catch (Exception e) {
         // TODO: policy on deserialization errors
-        String message = toErrorMessage(value, row, 
current.rowObjectInspector);
+        String message = null;
+        try {
+          message = toErrorMessage(value, row, current.rowObjectInspector);
+        } catch (Throwable t) {
+          message = "[" + row + ", " + value + "]: cannot get error message " 
+ t.getMessage();
+        }
         if (row == null) {
           deserialize_error_count.set(deserialize_error_count.get() + 1);
           throw new HiveException("Hive Runtime Error while processing 
writable " + message, e);

http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowInputFormat.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowInputFormat.java
new file mode 100644
index 0000000..f958db5
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowInputFormat.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.hadoop.io.NullWritable;
+
+import java.util.List;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.mapred.RecordReader;
+
+public interface BatchToRowInputFormat {
+  BatchToRowReader<?, ?> getWrapper(RecordReader<NullWritable, 
VectorizedRowBatch> vrr, 
+      VectorizedRowBatchCtx vrbCtx, List<Integer> includedCols);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java
new file mode 100644
index 0000000..f350302
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java
@@ -0,0 +1,623 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io;
+
+
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.hive.llap.DebugUtils;
+
+import java.util.Arrays;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * A record reader wrapper that converts VRB reader into an OI-based reader.
+ * Due to the fact that changing table OIs in the plan after compilation is 
nearly impossible,
+ * this is made an abstract class where type-specific implementations can plug 
in certain details,
+ * so that the data produced after wrapping a vectorized reader would conform 
to the original OIs.
+ */
+public abstract class BatchToRowReader<StructType, UnionType>
+    implements RecordReader<NullWritable, Object> {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(BatchToRowReader.class);
+
+  private final NullWritable key;
+  private final VectorizedRowBatch batch;
+  private final RecordReader<NullWritable, VectorizedRowBatch> vrbReader;
+
+  private final List<TypeInfo> schema;
+  private final boolean[] included;
+  private int rowInBatch = 0;
+
+  public BatchToRowReader(RecordReader<NullWritable, VectorizedRowBatch> 
vrbReader,
+      VectorizedRowBatchCtx vrbCtx, List<Integer> includedCols) {
+    this.vrbReader = vrbReader;
+    this.key = vrbReader.createKey();
+    this.batch = vrbReader.createValue();
+    this.schema = Lists.<TypeInfo>newArrayList(vrbCtx.getRowColumnTypeInfos());
+    // TODO: does this include partition columns?
+    boolean[] included = new boolean[schema.size()];
+    if (includedCols != null) {
+      for (int colIx : includedCols) {
+        included[colIx] = true;
+      }
+    } else {
+      Arrays.fill(included, true);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Including the columns " + DebugUtils.toString(included));
+    }
+    this.included = included;
+  }
+
+  protected abstract StructType createStructObject(Object previous, 
List<TypeInfo> childrenTypes);
+  protected abstract void setStructCol(StructType structObj, int i, Object 
value);
+  protected abstract Object getStructCol(StructType structObj, int i);
+  protected abstract UnionType createUnionObject(List<TypeInfo> childrenTypes, 
Object previous);
+  protected abstract void setUnion(UnionType unionObj, byte tag, Object 
object);
+  protected abstract Object getUnionField(UnionType unionObj);
+
+  @Override
+  public NullWritable createKey() {
+    return key;
+  }
+
+  @Override
+  public Object createValue() {
+    return createStructObject(null, schema);
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return -1;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return 0;
+  }
+
+  @Override
+  public boolean next(NullWritable key, Object previous) throws IOException {
+    if (!ensureBatch()) {
+      return false;
+    }
+    @SuppressWarnings("unchecked")
+    StructType value = (StructType)previous;
+    for (int i = 0; i < schema.size(); ++i) {
+      if (!included[i]) continue; // TODO: shortcut for last col below length?
+      try {
+        setStructCol(value, i,
+            nextValue(batch.cols[i], rowInBatch, schema.get(i), 
getStructCol(value, i)));
+      } catch (Throwable t) {
+        LOG.error("Error at row " + rowInBatch + "/" + batch.size + ", column 
" + i
+            + "/" + schema.size() + " " + batch.cols[i], t);
+        throw (t instanceof IOException) ? (IOException)t : new IOException(t);
+      }
+    }
+    ++rowInBatch;
+    return true;
+  }
+
+  /**
+   * If the current batch is empty, get a new one.
+   * @return true if we have rows available.
+   */
+  private boolean ensureBatch() throws IOException {
+    if (rowInBatch >= batch.size) {
+      rowInBatch = 0;
+      return vrbReader.next(key, batch) && batch.size > 0;
+    }
+    return true;
+  }
+
+
+  @Override
+  public void close() throws IOException {
+    vrbReader.close();
+    batch.cols = null;
+  }
+
+  /* Routines for stubbing into Writables */
+
+  public static BooleanWritable nextBoolean(ColumnVector vector,
+                                     int row,
+                                     Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      BooleanWritable result;
+      if (previous == null || previous.getClass() != BooleanWritable.class) {
+        result = new BooleanWritable();
+      } else {
+        result = (BooleanWritable) previous;
+      }
+      result.set(((LongColumnVector) vector).vector[row] != 0);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  public static ByteWritable nextByte(ColumnVector vector,
+                               int row,
+                               Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      ByteWritable result;
+      if (previous == null || previous.getClass() != ByteWritable.class) {
+        result = new ByteWritable();
+      } else {
+        result = (ByteWritable) previous;
+      }
+      result.set((byte) ((LongColumnVector) vector).vector[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  public static ShortWritable nextShort(ColumnVector vector,
+                                 int row,
+                                 Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      ShortWritable result;
+      if (previous == null || previous.getClass() != ShortWritable.class) {
+        result = new ShortWritable();
+      } else {
+        result = (ShortWritable) previous;
+      }
+      result.set((short) ((LongColumnVector) vector).vector[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  public static IntWritable nextInt(ColumnVector vector,
+                             int row,
+                             Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      IntWritable result;
+      if (previous == null || previous.getClass() != IntWritable.class) {
+        result = new IntWritable();
+      } else {
+        result = (IntWritable) previous;
+      }
+      result.set((int) ((LongColumnVector) vector).vector[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  public static LongWritable nextLong(ColumnVector vector,
+                               int row,
+                               Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      LongWritable result;
+      if (previous == null || previous.getClass() != LongWritable.class) {
+        result = new LongWritable();
+      } else {
+        result = (LongWritable) previous;
+      }
+      result.set(((LongColumnVector) vector).vector[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  public static FloatWritable nextFloat(ColumnVector vector,
+                                 int row,
+                                 Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      FloatWritable result;
+      if (previous == null || previous.getClass() != FloatWritable.class) {
+        result = new FloatWritable();
+      } else {
+        result = (FloatWritable) previous;
+      }
+      result.set((float) ((DoubleColumnVector) vector).vector[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  public static DoubleWritable nextDouble(ColumnVector vector,
+                                   int row,
+                                   Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      DoubleWritable result;
+      if (previous == null || previous.getClass() != DoubleWritable.class) {
+        result = new DoubleWritable();
+      } else {
+        result = (DoubleWritable) previous;
+      }
+      result.set(((DoubleColumnVector) vector).vector[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  public static Text nextString(ColumnVector vector,
+                         int row,
+                         Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      Text result;
+      if (previous == null || previous.getClass() != Text.class) {
+        result = new Text();
+      } else {
+        result = (Text) previous;
+      }
+      BytesColumnVector bytes = (BytesColumnVector) vector;
+      result.set(bytes.vector[row], bytes.start[row], bytes.length[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  public static HiveCharWritable nextChar(ColumnVector vector,
+                                   int row,
+                                   int size,
+                                   Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      HiveCharWritable result;
+      if (previous == null || previous.getClass() != HiveCharWritable.class) {
+        result = new HiveCharWritable();
+      } else {
+        result = (HiveCharWritable) previous;
+      }
+      BytesColumnVector bytes = (BytesColumnVector) vector;
+      result.set(bytes.toString(row), size);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  public static HiveVarcharWritable nextVarchar(
+      ColumnVector vector, int row, int size, Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      HiveVarcharWritable result;
+      if (previous == null || previous.getClass() != 
HiveVarcharWritable.class) {
+        result = new HiveVarcharWritable();
+      } else {
+        result = (HiveVarcharWritable) previous;
+      }
+      BytesColumnVector bytes = (BytesColumnVector) vector;
+      result.set(bytes.toString(row), size);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  public static BytesWritable nextBinary(ColumnVector vector,
+                                  int row,
+                                  Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      BytesWritable result;
+      if (previous == null || previous.getClass() != BytesWritable.class) {
+        result = new BytesWritable();
+      } else {
+        result = (BytesWritable) previous;
+      }
+      BytesColumnVector bytes = (BytesColumnVector) vector;
+      result.set(bytes.vector[row], bytes.start[row], bytes.length[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  public static HiveDecimalWritable nextDecimal(ColumnVector vector,
+                                         int row,
+                                         Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      HiveDecimalWritable result;
+      if (previous == null || previous.getClass() != 
HiveDecimalWritable.class) {
+        result = new HiveDecimalWritable();
+      } else {
+        result = (HiveDecimalWritable) previous;
+      }
+      result.set(((DecimalColumnVector) vector).vector[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  public static DateWritable nextDate(ColumnVector vector,
+                               int row,
+                               Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      DateWritable result;
+      if (previous == null || previous.getClass() != DateWritable.class) {
+        result = new DateWritable();
+      } else {
+        result = (DateWritable) previous;
+      }
+      int date = (int) ((LongColumnVector) vector).vector[row];
+      result.set(date);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  public static TimestampWritable nextTimestamp(ColumnVector vector,
+                                         int row,
+                                         Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      TimestampWritable result;
+      if (previous == null || previous.getClass() != TimestampWritable.class) {
+        result = new TimestampWritable();
+      } else {
+        result = (TimestampWritable) previous;
+      }
+      TimestampColumnVector tcv = (TimestampColumnVector) vector;
+      result.setInternal(tcv.time[row], tcv.nanos[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  public StructType nextStruct(
+      ColumnVector vector, int row, StructTypeInfo schema, Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      List<TypeInfo> childrenTypes = schema.getAllStructFieldTypeInfos();
+      StructType result = createStructObject(previous, childrenTypes);
+      StructColumnVector struct = (StructColumnVector) vector;
+      for (int f = 0; f < childrenTypes.size(); ++f) {
+        setStructCol(result, f, nextValue(struct.fields[f], row,
+            childrenTypes.get(f), getStructCol(result, f)));
+      }
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  private UnionType nextUnion(
+      ColumnVector vector, int row, UnionTypeInfo schema, Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      List<TypeInfo> childrenTypes = schema.getAllUnionObjectTypeInfos();
+      UnionType result = createUnionObject(childrenTypes, previous);
+      UnionColumnVector union = (UnionColumnVector) vector;
+      byte tag = (byte) union.tags[row];
+      setUnion(result, tag, nextValue(union.fields[tag], row, 
childrenTypes.get(tag),
+          getUnionField(result)));
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  private ArrayList<Object> nextList(
+      ColumnVector vector, int row, ListTypeInfo schema, Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      ArrayList<Object> result;
+      if (previous == null || previous.getClass() != ArrayList.class) {
+        result = new ArrayList<>();
+      } else {
+        result = (ArrayList<Object>) previous;
+      }
+      ListColumnVector list = (ListColumnVector) vector;
+      int length = (int) list.lengths[row];
+      int offset = (int) list.offsets[row];
+      result.ensureCapacity(length);
+      int oldLength = result.size();
+      int idx = 0;
+      TypeInfo childType = schema.getListElementTypeInfo();
+      while (idx < length && idx < oldLength) {
+        result.set(idx, nextValue(list.child, offset + idx, childType,
+            result.get(idx)));
+        idx += 1;
+      }
+      if (length < oldLength) {
+        for(int i= oldLength - 1; i >= length; --i) {
+          result.remove(i);
+        }
+      } else if (oldLength < length) {
+        while (idx < length) {
+          result.add(nextValue(list.child, offset + idx, childType, null));
+          idx += 1;
+        }
+      }
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  private HashMap<Object,Object> nextMap(
+      ColumnVector vector, int row, MapTypeInfo schema, Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      MapColumnVector map = (MapColumnVector) vector;
+      int length = (int) map.lengths[row];
+      int offset = (int) map.offsets[row];
+      TypeInfo keyType = schema.getMapKeyTypeInfo();
+      TypeInfo valueType = schema.getMapValueTypeInfo();
+      HashMap<Object,Object> result;
+      if (previous == null || previous.getClass() != HashMap.class) {
+        result = new HashMap<Object,Object>(length);
+      } else {
+        result = (HashMap<Object,Object>) previous;
+        // I couldn't think of a good way to reuse the keys and value objects
+        // without even more allocations, so take the easy and safe approach.
+        result.clear();
+      }
+      for(int e=0; e < length; ++e) {
+        result.put(nextValue(map.keys, e + offset, keyType, null),
+                   nextValue(map.values, e + offset, valueType, null));
+      }
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  private Object nextValue(ColumnVector vector, int row, TypeInfo schema, 
Object previous) {
+    switch (schema.getCategory()) {
+      case STRUCT:
+        return nextStruct(vector, row, (StructTypeInfo)schema, previous);
+      case UNION:
+        return nextUnion(vector, row, (UnionTypeInfo)schema, previous);
+      case LIST:
+        return nextList(vector, row, (ListTypeInfo)schema, previous);
+      case MAP:
+        return nextMap(vector, row, (MapTypeInfo)schema, previous);
+      case PRIMITIVE: {
+        PrimitiveTypeInfo pschema = (PrimitiveTypeInfo)schema;
+        switch (pschema.getPrimitiveCategory()) {
+        case BOOLEAN:
+          return nextBoolean(vector, row, previous);
+        case BYTE:
+          return nextByte(vector, row, previous);
+        case SHORT:
+          return nextShort(vector, row, previous);
+        case INT:
+          return nextInt(vector, row, previous);
+        case LONG:
+          return nextLong(vector, row, previous);
+        case FLOAT:
+          return nextFloat(vector, row, previous);
+        case DOUBLE:
+          return nextDouble(vector, row, previous);
+        case STRING:
+          return nextString(vector, row, previous);
+        case CHAR:
+          return nextChar(vector, row, ((CharTypeInfo)pschema).getLength(), 
previous);
+        case VARCHAR:
+          return nextVarchar(vector, row, 
((VarcharTypeInfo)pschema).getLength(), previous);
+        case BINARY:
+          return nextBinary(vector, row, previous);
+        case DECIMAL:
+          return nextDecimal(vector, row, previous);
+        case DATE:
+          return nextDate(vector, row, previous);
+        case TIMESTAMP:
+          return nextTimestamp(vector, row, previous);
+        default:
+          throw new IllegalArgumentException("Unknown type " + schema);
+        }
+      }
+      default:
+        throw new IllegalArgumentException("Unknown type " + schema);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index cbacc25..227a051 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hive.ql.io;
 
+import java.util.Arrays;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -207,6 +210,11 @@ public class HiveInputFormat<K extends WritableComparable, 
V extends Writable>
     }
     boolean isSupported = inputFormat instanceof 
LlapWrappableInputFormatInterface;
     boolean isVectorized = Utilities.getUseVectorizedInputFileFormat(conf);
+    if (!isVectorized) {
+      // Pretend it's vectorized.
+      isVectorized = HiveConf.getBoolVar(conf, 
ConfVars.LLAP_IO_NONVECTOR_WRAPPER_ENABLED)
+          && (Utilities.getPlanPath(conf) != null);
+    }
     if (!isSupported || !isVectorized) {
       LOG.info("Not using llap for " + inputFormat + ": supported = " + 
isSupported
           + ", vectorized = " + isVectorized);
@@ -224,12 +232,11 @@ public class HiveInputFormat<K extends 
WritableComparable, V extends Writable>
     return castInputFormat(llapIo.getInputFormat(inputFormat));
   }
 
-  public static boolean canWrapAnyForLlap(Configuration conf, MapWork mapWork) 
{
-    return Utilities.getUseVectorizedInputFileFormat(conf, mapWork);
-  }
 
-  public static boolean canWrapForLlap(Class<? extends InputFormat> 
inputFormatClass) {
-    return 
LlapWrappableInputFormatInterface.class.isAssignableFrom(inputFormatClass);
+
+  public static boolean canWrapForLlap(Class<? extends InputFormat> clazz, 
boolean checkVector) {
+    return LlapWrappableInputFormatInterface.class.isAssignableFrom(clazz) &&
+        (!checkVector || BatchToRowInputFormat.class.isAssignableFrom(clazz));
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 070950c..69d58d6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -74,10 +74,14 @@ import 
org.apache.hadoop.hive.metastore.Metastore.SplitInfos;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.AcidUtils.Directory;
+import org.apache.hadoop.hive.ql.io.BatchToRowInputFormat;
+import org.apache.hadoop.hive.ql.io.BatchToRowReader;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HdfsUtils;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
@@ -141,7 +145,7 @@ import com.google.protobuf.CodedInputStream;
 public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
   InputFormatChecker, VectorizedInputFormatInterface, 
LlapWrappableInputFormatInterface,
   SelfDescribingInputFormatInterface, AcidInputFormat<NullWritable, OrcStruct>,
-  CombineHiveInputFormat.AvoidSplitCombination {
+  CombineHiveInputFormat.AvoidSplitCombination, BatchToRowInputFormat {
 
   static enum SplitStrategyKind {
     HYBRID,
@@ -2188,4 +2192,12 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
   protected ExternalFooterCachesByConf createExternalCaches() {
     return null; // The default ones are created in case of null; tests 
override this.
   }
+
+
+  @Override
+  public BatchToRowReader<?, ?> getWrapper(
+      org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch> 
vrr,
+      VectorizedRowBatchCtx vrbCtx, List<Integer> includedCols) {
+    return new OrcOiBatchToRowReader(vrr, vrbCtx, includedCols);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOiBatchToRowReader.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOiBatchToRowReader.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOiBatchToRowReader.java
new file mode 100644
index 0000000..600455e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOiBatchToRowReader.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.ql.io.BatchToRowReader;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.RecordReader;
+
+/** BatchToRowReader that returns the rows readable by ORC IOs. */
+public class OrcOiBatchToRowReader extends BatchToRowReader<OrcStruct, 
OrcUnion> {
+  public OrcOiBatchToRowReader(RecordReader<NullWritable, VectorizedRowBatch> 
vrbReader,
+      VectorizedRowBatchCtx vrbCtx, List<Integer> includedCols) {
+    super(vrbReader, vrbCtx, includedCols);
+  }
+
+  @Override
+  protected OrcStruct createStructObject(Object previous, List<TypeInfo> 
childrenTypes) {
+    int numChildren = childrenTypes.size();
+    if (!(previous instanceof OrcStruct)) {
+      return new OrcStruct(numChildren);
+    }
+    OrcStruct result = (OrcStruct) previous;
+    result.setNumFields(numChildren);
+    return result;
+  }
+
+  @Override
+  protected OrcUnion createUnionObject(List<TypeInfo> childrenTypes, Object 
previous) {
+    return (previous instanceof OrcUnion) ? (OrcUnion)previous : new 
OrcUnion();
+  }
+
+  @Override
+  protected void setStructCol(OrcStruct structObj, int i, Object value) {
+    structObj.setFieldValue(i, value);
+  }
+
+  @Override
+  protected Object getStructCol(OrcStruct structObj, int i) {
+    return structObj.getFieldValue(i);
+  }
+
+  @Override
+  protected Object getUnionField(OrcUnion unionObj) {
+    return unionObj.getObject();
+  }
+
+  @Override
+  protected void setUnion(OrcUnion unionObj, byte tag, Object object) {
+    unionObj.set(tag, object);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 7595065..f555741 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -907,19 +907,19 @@ public final class GenMapRedUtils {
       }
     } else if (task instanceof ExecDriver) {
       MapredWork work = (MapredWork) task.getWork();
-      work.getMapWork().deriveLlap(conf);
+      work.getMapWork().deriveLlap(conf, true);
     } else if (task != null && (task.getWork() instanceof TezWork)) {
       TezWork work = (TezWork)task.getWork();
       for (BaseWork w : work.getAllWorkUnsorted()) {
         if (w instanceof MapWork) {
-          ((MapWork)w).deriveLlap(conf);
+          ((MapWork)w).deriveLlap(conf, false);
         }
       }
     } else if (task instanceof SparkTask) {
       SparkWork work = (SparkWork) task.getWork();
       for (BaseWork w : work.getAllWorkUnsorted()) {
         if (w instanceof MapWork) {
-          ((MapWork) w).deriveLlap(conf);
+          ((MapWork) w).deriveLlap(conf, false);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index f034812..d908d48 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql.plan;
 
+import org.apache.hadoop.hive.ql.exec.Utilities;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
@@ -209,16 +211,26 @@ public class MapWork extends BaseWork {
     }
   }
 
-  public void deriveLlap(Configuration conf) {
+  public void deriveLlap(Configuration conf, boolean isExecDriver) {
     boolean hasLlap = false, hasNonLlap = false, hasAcid = false;
     // Assume the IO is enabled on the daemon by default. We cannot reasonably 
check it here.
-    boolean isLlapOn = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENABLED, 
llapMode),
-        canWrapAny = isLlapOn && HiveInputFormat.canWrapAnyForLlap(conf, this);
+    boolean isLlapOn = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENABLED, 
llapMode);
+    boolean canWrapAny = false, doCheckIfs = false;
+    if (isLlapOn) {
+      // We can wrap inputs if the execution is vectorized, or if we use a 
wrapper.
+      canWrapAny = Utilities.getUseVectorizedInputFileFormat(conf, this);
+      // ExecDriver has no plan path, so we cannot derive VRB stuff for the 
wrapper.
+      if (!canWrapAny && !isExecDriver) {
+        canWrapAny = HiveConf.getBoolVar(conf, 
ConfVars.LLAP_IO_NONVECTOR_WRAPPER_ENABLED);
+        doCheckIfs = true;
+      }
+    }
     boolean hasPathToPartInfo = (pathToPartitionInfo != null && 
!pathToPartitionInfo.isEmpty());
     if (canWrapAny && hasPathToPartInfo) {
       assert isLlapOn;
       for (PartitionDesc part : pathToPartitionInfo.values()) {
-        boolean isUsingLlapIo = 
HiveInputFormat.canWrapForLlap(part.getInputFileFormatClass());
+        boolean isUsingLlapIo = HiveInputFormat.canWrapForLlap(
+            part.getInputFileFormatClass(), doCheckIfs);
         if (isUsingLlapIo) {
           if (part.getTableDesc() != null &&
               
AcidUtils.isTablePropertyTransactional(part.getTableDesc().getProperties())) {

http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/test/queries/clientpositive/orc_llap_nonvector.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_llap_nonvector.q 
b/ql/src/test/queries/clientpositive/orc_llap_nonvector.q
new file mode 100644
index 0000000..1fc60b2
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/orc_llap_nonvector.q
@@ -0,0 +1,42 @@
+set hive.mapred.mode=nonstrict;
+SET hive.vectorized.execution.enabled=true;
+
+SET hive.llap.io.enabled=false;
+
+SET hive.exec.orc.default.buffer.size=32768;
+SET hive.exec.orc.default.row.index.stride=1000;
+SET hive.optimize.index.filter=true;
+set hive.auto.convert.join=false;
+set hive.fetch.task.conversion=none;
+
+DROP TABLE orc_create_staging;
+DROP TABLE orc_create_complex;
+DROP TABLE orc_llap_nonvector;
+
+
+CREATE TABLE orc_create_staging (
+  str STRING,
+  mp  MAP<STRING,STRING>,
+  lst ARRAY<STRING>,
+  strct STRUCT<A:STRING,B:STRING>
+) ROW FORMAT DELIMITED
+    FIELDS TERMINATED BY '|'
+    COLLECTION ITEMS TERMINATED BY ','
+    MAP KEYS TERMINATED BY ':';
+LOAD DATA LOCAL INPATH '../../data/files/orc_create.txt' OVERWRITE INTO TABLE 
orc_create_staging;
+
+create table orc_llap_nonvector stored as orc as select *, rand(1234) rdm from 
alltypesorc order by rdm;
+
+SET hive.llap.io.enabled=true;
+set hive.auto.convert.join=true;
+SET hive.vectorized.execution.enabled=false;
+
+explain 
+select * from orc_llap_nonvector limit 100;
+select * from orc_llap_nonvector limit 100;
+explain 
+select cint, cstring1 from orc_llap_nonvector limit 1025;
+select cint, cstring1 from orc_llap_nonvector limit 1025;
+
+DROP TABLE orc_create_staging;
+DROP TABLE orc_llap_nonvector;

http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/test/results/clientpositive/llap/dynamic_partition_pruning.q.out
----------------------------------------------------------------------
diff --git 
a/ql/src/test/results/clientpositive/llap/dynamic_partition_pruning.q.out 
b/ql/src/test/results/clientpositive/llap/dynamic_partition_pruning.q.out
index 912cd7f..e75bec6 100644
--- a/ql/src/test/results/clientpositive/llap/dynamic_partition_pruning.q.out
+++ b/ql/src/test/results/clientpositive/llap/dynamic_partition_pruning.q.out
@@ -3169,7 +3169,7 @@ STAGE PLANS:
                         Statistics: Num rows: 1 Data size: 0 Basic stats: 
PARTIAL Column stats: NONE
                         value expressions: _col1 (type: string)
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: unknown
         Map 5 
             Map Operator Tree:
                 TableScan
@@ -5557,7 +5557,7 @@ STAGE PLANS:
                         Statistics: Num rows: 1 Data size: 0 Basic stats: 
PARTIAL Column stats: NONE
                         value expressions: _col1 (type: string)
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: unknown
         Map 2 
             Map Operator Tree:
                 TableScan
@@ -5940,7 +5940,7 @@ STAGE PLANS:
                           Statistics: Num rows: 1 Data size: 8 Basic stats: 
COMPLETE Column stats: NONE
                           value expressions: _col0 (type: bigint)
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Map 3 
             Map Operator Tree:
                 TableScan

http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/test/results/clientpositive/llap/hybridgrace_hashjoin_1.q.out
----------------------------------------------------------------------
diff --git 
a/ql/src/test/results/clientpositive/llap/hybridgrace_hashjoin_1.q.out 
b/ql/src/test/results/clientpositive/llap/hybridgrace_hashjoin_1.q.out
index ad3a358..3e96268 100644
--- a/ql/src/test/results/clientpositive/llap/hybridgrace_hashjoin_1.q.out
+++ b/ql/src/test/results/clientpositive/llap/hybridgrace_hashjoin_1.q.out
@@ -83,7 +83,7 @@ STAGE PLANS:
                             Statistics: Num rows: 1 Data size: 8 Basic stats: 
COMPLETE Column stats: NONE
                             value expressions: _col0 (type: bigint)
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Map 3 
             Map Operator Tree:
                 TableScan
@@ -102,7 +102,7 @@ STAGE PLANS:
                         Map-reduce partition columns: _col0 (type: int)
                         Statistics: Num rows: 4096 Data size: 880654 Basic 
stats: COMPLETE Column stats: NONE
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
             Reduce Operator Tree:
@@ -208,7 +208,7 @@ STAGE PLANS:
                             Statistics: Num rows: 1 Data size: 8 Basic stats: 
COMPLETE Column stats: NONE
                             value expressions: _col0 (type: bigint)
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Map 3 
             Map Operator Tree:
                 TableScan
@@ -227,7 +227,7 @@ STAGE PLANS:
                         Map-reduce partition columns: _col0 (type: int)
                         Statistics: Num rows: 4096 Data size: 880654 Basic 
stats: COMPLETE Column stats: NONE
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
             Reduce Operator Tree:
@@ -329,7 +329,7 @@ STAGE PLANS:
                             Statistics: Num rows: 1 Data size: 8 Basic stats: 
COMPLETE Column stats: NONE
                             value expressions: _col0 (type: bigint)
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Map 3 
             Map Operator Tree:
                 TableScan
@@ -348,7 +348,7 @@ STAGE PLANS:
                         Map-reduce partition columns: _col0 (type: int)
                         Statistics: Num rows: 12288 Data size: 2641964 Basic 
stats: COMPLETE Column stats: NONE
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
             Reduce Operator Tree:
@@ -450,7 +450,7 @@ STAGE PLANS:
                             Statistics: Num rows: 1 Data size: 8 Basic stats: 
COMPLETE Column stats: NONE
                             value expressions: _col0 (type: bigint)
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Map 3 
             Map Operator Tree:
                 TableScan
@@ -469,7 +469,7 @@ STAGE PLANS:
                         Map-reduce partition columns: _col0 (type: int)
                         Statistics: Num rows: 12288 Data size: 2641964 Basic 
stats: COMPLETE Column stats: NONE
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
             Reduce Operator Tree:
@@ -566,7 +566,7 @@ STAGE PLANS:
                           Statistics: Num rows: 1 Data size: 8 Basic stats: 
COMPLETE Column stats: NONE
                           value expressions: _col0 (type: bigint)
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Map 3 
             Map Operator Tree:
                 TableScan
@@ -582,7 +582,7 @@ STAGE PLANS:
                       Map-reduce partition columns: _col0 (type: int)
                       Statistics: Num rows: 12288 Data size: 2641964 Basic 
stats: COMPLETE Column stats: NONE
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
             Reduce Operator Tree:
@@ -679,7 +679,7 @@ STAGE PLANS:
                           Statistics: Num rows: 1 Data size: 8 Basic stats: 
COMPLETE Column stats: NONE
                           value expressions: _col0 (type: bigint)
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Map 3 
             Map Operator Tree:
                 TableScan
@@ -695,7 +695,7 @@ STAGE PLANS:
                       Map-reduce partition columns: _col0 (type: int)
                       Statistics: Num rows: 12288 Data size: 2641964 Basic 
stats: COMPLETE Column stats: NONE
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
             Reduce Operator Tree:

http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/test/results/clientpositive/llap/llap_udf.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/llap_udf.q.out 
b/ql/src/test/results/clientpositive/llap/llap_udf.q.out
index e517942..53801d8 100644
--- a/ql/src/test/results/clientpositive/llap/llap_udf.q.out
+++ b/ql/src/test/results/clientpositive/llap/llap_udf.q.out
@@ -116,7 +116,7 @@ STAGE PLANS:
                           output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                           serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
 
   Stage: Stage-0
     Fetch Operator
@@ -206,7 +206,7 @@ STAGE PLANS:
                           output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                           serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
 
   Stage: Stage-0
     Fetch Operator

http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/test/results/clientpositive/llap/llapdecider.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/llapdecider.q.out 
b/ql/src/test/results/clientpositive/llap/llapdecider.q.out
index 27ab92a..3ebaafb 100644
--- a/ql/src/test/results/clientpositive/llap/llapdecider.q.out
+++ b/ql/src/test/results/clientpositive/llap/llapdecider.q.out
@@ -193,7 +193,7 @@ STAGE PLANS:
                         Statistics: Num rows: 500 Data size: 88000 Basic 
stats: COMPLETE Column stats: NONE
                         value expressions: _col1 (type: bigint)
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
             Reduce Operator Tree:
@@ -276,7 +276,7 @@ STAGE PLANS:
                         Statistics: Num rows: 205 Data size: 19475 Basic 
stats: COMPLETE Column stats: COMPLETE
                         value expressions: _col1 (type: bigint)
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
             Reduce Operator Tree:
@@ -348,7 +348,7 @@ STAGE PLANS:
                         Statistics: Num rows: 500 Data size: 89000 Basic 
stats: COMPLETE Column stats: COMPLETE
                         value expressions: _col1 (type: string)
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Map 4 
             Map Operator Tree:
                 TableScan
@@ -438,7 +438,7 @@ STAGE PLANS:
                         Statistics: Num rows: 500 Data size: 89000 Basic 
stats: COMPLETE Column stats: COMPLETE
                         value expressions: _col1 (type: string)
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Map 4 
             Map Operator Tree:
                 TableScan
@@ -458,7 +458,7 @@ STAGE PLANS:
                         Statistics: Num rows: 500 Data size: 89000 Basic 
stats: COMPLETE Column stats: COMPLETE
                         value expressions: _col1 (type: string)
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
             Reduce Operator Tree:
@@ -532,7 +532,7 @@ STAGE PLANS:
                         Statistics: Num rows: 500 Data size: 89000 Basic 
stats: COMPLETE Column stats: COMPLETE
                         value expressions: _col1 (type: string)
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Map 4 
             Map Operator Tree:
                 TableScan
@@ -800,7 +800,7 @@ STAGE PLANS:
                         Statistics: Num rows: 500 Data size: 89000 Basic 
stats: COMPLETE Column stats: COMPLETE
                         value expressions: _col1 (type: string)
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Map 4 
             Map Operator Tree:
                 TableScan
@@ -820,7 +820,7 @@ STAGE PLANS:
                         Statistics: Num rows: 500 Data size: 89000 Basic 
stats: COMPLETE Column stats: COMPLETE
                         value expressions: _col1 (type: string)
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Reducer 2 
             Reduce Operator Tree:
               Merge Join Operator
@@ -980,7 +980,7 @@ STAGE PLANS:
                         Statistics: Num rows: 500 Data size: 89000 Basic 
stats: COMPLETE Column stats: COMPLETE
                         value expressions: _col1 (type: string)
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Map 4 
             Map Operator Tree:
                 TableScan
@@ -1000,7 +1000,7 @@ STAGE PLANS:
                         Statistics: Num rows: 500 Data size: 89000 Basic 
stats: COMPLETE Column stats: COMPLETE
                         value expressions: _col1 (type: string)
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
             Reduce Operator Tree:
@@ -1082,7 +1082,7 @@ STAGE PLANS:
                           Statistics: Num rows: 1 Data size: 8 Basic stats: 
COMPLETE Column stats: COMPLETE
                           value expressions: _col0 (type: bigint)
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
             Reduce Operator Tree:
@@ -1259,7 +1259,7 @@ STAGE PLANS:
                           Statistics: Num rows: 1 Data size: 8 Basic stats: 
COMPLETE Column stats: COMPLETE
                           value expressions: _col0 (type: bigint)
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
             Reduce Operator Tree:

http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/test/results/clientpositive/llap/mapjoin_decimal.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/mapjoin_decimal.q.out 
b/ql/src/test/results/clientpositive/llap/mapjoin_decimal.q.out
index 2d1b818..01f1f80 100644
--- a/ql/src/test/results/clientpositive/llap/mapjoin_decimal.q.out
+++ b/ql/src/test/results/clientpositive/llap/mapjoin_decimal.q.out
@@ -123,7 +123,7 @@ STAGE PLANS:
                           Statistics: Num rows: 1153 Data size: 129236 Basic 
stats: COMPLETE Column stats: NONE
                           value expressions: _col1 (type: decimal(4,0))
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Map 3 
             Map Operator Tree:
                 TableScan
@@ -142,7 +142,7 @@ STAGE PLANS:
                         Map-reduce partition columns: _col0 (type: 
decimal(6,2))
                         Statistics: Num rows: 1049 Data size: 117488 Basic 
stats: COMPLETE Column stats: NONE
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
             Reduce Operator Tree:

Reply via email to