This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 282b6f9  Hive: Vectorized ORC reads for Hive (#2613)
282b6f9 is described below

commit 282b6f9f1cae8d4fd5ff7c73de513ca91f01fddc
Author: Adam Szita <[email protected]>
AuthorDate: Mon Jun 7 10:53:59 2021 +0200

    Hive: Vectorized ORC reads for Hive (#2613)
---
 build.gradle                                       |  84 ++++++--
 .../hive/vector/CompatibilityHiveVectorUtils.java  | 217 +++++++++++++++++++++
 .../vector/HiveIcebergVectorizedRecordReader.java  |  73 +++++++
 .../mr/hive/vector/HiveVectorizedReader.java       | 138 +++++++++++++
 .../mr/hive/vector/VectorizedRowBatchIterator.java |  91 +++++++++
 .../hive/ql/exec/vector/VectorizedSupport.java     |  45 +++++
 .../iceberg/mr/hive/HiveIcebergInputFormat.java    |  50 ++++-
 .../apache/iceberg/mr/hive/HiveIcebergSerDe.java   |  12 --
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java |   6 +
 .../mapred/AbstractMapredIcebergRecordReader.java  |  70 +++++++
 .../mr/mapred/MapredIcebergInputFormat.java        |  62 +++---
 .../iceberg/mr/mapreduce/IcebergInputFormat.java   |  52 +++--
 .../TestHiveIcebergStorageHandlerWithEngine.java   |  24 ++-
 settings.gradle                                    |   2 +
 14 files changed, 847 insertions(+), 79 deletions(-)

diff --git a/build.gradle b/build.gradle
index 855a9f5..bacb2f1 100644
--- a/build.gradle
+++ b/build.gradle
@@ -551,6 +551,60 @@ project(':iceberg-mr') {
 }
 
 if (jdkVersion == '8') {
+  // The purpose of this module is to re-shade org.apache.orc.storage to the 
original org.apache.hadoop.hive package
+  // name. This is to be used by Hive3 for features including e.g. 
vectorization.
+  project(':iceberg-hive3-orc-bundle') {
+
+    apply plugin: 'com.github.johnrengelman.shadow'
+
+    tasks.jar.dependsOn tasks.shadowJar
+
+    dependencies {
+      compile project(':iceberg-data')
+      compile project(':iceberg-orc')
+
+      testCompileOnly project(path: ':iceberg-data', configuration: 
'testArtifacts')
+      testCompileOnly project(path: ':iceberg-orc', configuration: 
'testArtifacts')
+    }
+
+    shadowJar {
+      configurations = [project.configurations.compile, 
project.configurations.compileOnly, project.configurations.testCompileOnly]
+
+      zip64 true
+
+      // include the LICENSE and NOTICE files for the shaded Jar
+      from(projectDir) {
+        include 'LICENSE'
+        include 'NOTICE'
+      }
+
+      // Relocate dependencies to avoid conflicts
+      relocate 'org.apache.orc.storage', 'org.apache.hadoop.hive'
+
+      // We really only need Iceberg and Orc classes, but with relocated 
references for storage-api classes (see above)
+      // Unfortunately the include list feature of this shader plugin doesn't 
work as expected
+      exclude 'com/**/*'
+      exclude 'edu/**/*'
+      exclude 'io/**'
+      exclude 'javax/**'
+      exclude 'org/apache/avro/**/*'
+      exclude 'org/apache/commons/**/*'
+      exclude 'org/checkerframework/**/*'
+      exclude 'org/codehaus/**/*'
+      exclude 'org/intellij/**/*'
+      exclude 'org/jetbrains/**/*'
+      exclude 'org/slf4j/**/*'
+      exclude 'org/threeten/**/*'
+
+      classifier null
+    }
+
+    jar {
+      enabled = false
+    }
+
+  }
+
   project(':iceberg-hive3') {
 
     // run the tests in iceberg-mr with Hive3 dependencies
@@ -569,13 +623,13 @@ if (jdkVersion == '8') {
     }
 
     dependencies {
-      compile project(':iceberg-api')
-      compile project(':iceberg-core')
-      compile project(':iceberg-data')
-      compile project(':iceberg-hive-metastore')
-      compile project(':iceberg-orc')
-      compile project(':iceberg-parquet')
-      compile project(':iceberg-mr')
+      compileOnly project(':iceberg-api')
+      compileOnly project(':iceberg-core')
+      compileOnly project(':iceberg-hive-metastore')
+      compileOnly project(':iceberg-parquet')
+      compileOnly project(':iceberg-hive3-orc-bundle')
+      compileOnly project(':iceberg-mr')
+
 
       compileOnly("org.apache.hadoop:hadoop-client:3.1.0") {
         exclude group: 'org.apache.avro', module: 'avro'
@@ -592,10 +646,13 @@ if (jdkVersion == '8') {
         exclude group: 'org.pentaho' // missing dependency
         exclude group: 'org.slf4j', module: 'slf4j-log4j12'
       }
-      compileOnly("org.apache.hive:hive-metastore:3.1.2")
-      compileOnly("org.apache.hive:hive-serde:3.1.2")
+      compileOnly("org.apache.hive:hive-metastore:3.1.2") {
+        exclude group: 'org.apache.orc'
+      }
+      compileOnly("org.apache.hive:hive-serde:3.1.2") {
+        exclude group: 'org.apache.orc'
+      }
 
-      testCompile project(path: ':iceberg-data', configuration: 
'testArtifacts')
       testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
       testCompile project(path: ':iceberg-core', configuration: 
'testArtifacts')
       testCompile project(path: ':iceberg-hive-metastore', configuration: 
'testArtifacts')
@@ -606,6 +663,7 @@ if (jdkVersion == '8') {
       testCompile("com.fasterxml.jackson.core:jackson-annotations:2.6.5")
       testCompile("org.apache.hive:hive-service:3.1.2") {
         exclude group: 'org.apache.hive', module: 'hive-exec'
+        exclude group: 'org.apache.orc'
       }
       testCompile("org.apache.tez:tez-dag:0.9.1")
       testCompile("org.apache.tez:tez-mapreduce:0.9.1")
@@ -628,7 +686,7 @@ project(':iceberg-hive-runtime') {
       exclude group: 'com.github.stephenc.findbugs'
       exclude group: 'commons-pool'
       exclude group: 'javax.annotation'
-      exclude group: 'javax.xml.bind'      
+      exclude group: 'javax.xml.bind'
       exclude group: 'org.apache.commons'
       exclude group: 'org.slf4j'
       exclude group: 'org.xerial.snappy'
@@ -645,7 +703,7 @@ project(':iceberg-hive-runtime') {
     }
     compile project(':iceberg-aws')
   }
-  
+
   shadowJar {
     configurations = [project.configurations.compile]
 
@@ -659,7 +717,7 @@ project(':iceberg-hive-runtime') {
 
     // Relocate dependencies to avoid conflicts
     relocate 'org.apache.avro', 'org.apache.iceberg.shaded.org.apache.avro'
-    relocate 'org.apache.parquet', 
'org.apache.iceberg.shaded.org.apache.parquet'  
+    relocate 'org.apache.parquet', 
'org.apache.iceberg.shaded.org.apache.parquet'
     relocate 'com.google', 'org.apache.iceberg.shaded.com.google'
     relocate 'com.fasterxml', 'org.apache.iceberg.shaded.com.fasterxml'
     relocate 'com.github.benmanes', 
'org.apache.iceberg.shaded.com.github.benmanes'
diff --git 
a/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/CompatibilityHiveVectorUtils.java
 
b/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/CompatibilityHiveVectorUtils.java
new file mode 100644
index 0000000..d53fdad
--- /dev/null
+++ 
b/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/CompatibilityHiveVectorUtils.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive.vector;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
+import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.IntervalDayTimeColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Contains ported code snippets from later Hive sources. We should get rid of 
this class as soon as Hive 4 is released
+ * and Iceberg makes a dependency to that version.
+ */
+public class CompatibilityHiveVectorUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompatibilityHiveVectorUtils.class);
+
+  private CompatibilityHiveVectorUtils() {
+
+  }
+
+  /**
+   * Returns serialized mapwork instance from a job conf - ported from Hive 
source code LlapHiveUtils#findMapWork
+   *
+   * @param job JobConf instance
+   * @return
+   */
+  public static MapWork findMapWork(JobConf job) {
+    String inputName = job.get(Utilities.INPUT_NAME, null);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Initializing for input {}", inputName);
+    }
+    String prefixes = job.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES);
+    if (prefixes != null && !StringUtils.isBlank(prefixes)) {
+      // Currently SMB is broken, so we cannot check if it's  compatible with 
IO elevator.
+      // So, we don't use the below code that would get the correct MapWork. 
See HIVE-16985.
+      return null;
+    }
+
+    BaseWork work = null;
+    // HIVE-16985: try to find the fake merge work for SMB join, that is 
really another MapWork.
+    if (inputName != null) {
+      if (prefixes == null || 
!Lists.newArrayList(prefixes.split(",")).contains(inputName)) {
+        inputName = null;
+      }
+    }
+    if (inputName != null) {
+      work = Utilities.getMergeWork(job, inputName);
+    }
+
+    if (!(work instanceof MapWork)) {
+      work = Utilities.getMapWork(job);
+    }
+    return (MapWork) work;
+  }
+
+
+  /**
+   * Ported from Hive source code VectorizedRowBatchCtx#addPartitionColsToBatch
+   *
+   * @param col ColumnVector to write the partition value into
+   * @param value partition value
+   * @param partitionColumnName partition key
+   * @param rowColumnTypeInfo column type description
+   */
+//  @SuppressWarnings({"AvoidNestedBlocks", "FallThrough", "MethodLength", 
"CyclomaticComplexity", "Indentation"})
+  public static void addPartitionColsToBatch(ColumnVector col, Object value, 
String partitionColumnName,
+      TypeInfo rowColumnTypeInfo) {
+    PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) 
rowColumnTypeInfo;
+
+    if (value == null) {
+      col.noNulls = false;
+      col.isNull[0] = true;
+      col.isRepeating = true;
+      return;
+    }
+
+    switch (primitiveTypeInfo.getPrimitiveCategory()) {
+      case BOOLEAN:
+        LongColumnVector booleanColumnVector = (LongColumnVector) col;
+        booleanColumnVector.fill((Boolean) value ? 1 : 0);
+        booleanColumnVector.isNull[0] = false;
+        break;
+
+      case BYTE:
+        LongColumnVector byteColumnVector = (LongColumnVector) col;
+        byteColumnVector.fill((Byte) value);
+        byteColumnVector.isNull[0] = false;
+        break;
+
+      case SHORT:
+        LongColumnVector shortColumnVector = (LongColumnVector) col;
+        shortColumnVector.fill((Short) value);
+        shortColumnVector.isNull[0] = false;
+        break;
+
+      case INT:
+        LongColumnVector intColumnVector = (LongColumnVector) col;
+        intColumnVector.fill((Integer) value);
+        intColumnVector.isNull[0] = false;
+        break;
+
+      case LONG:
+        LongColumnVector longColumnVector = (LongColumnVector) col;
+        longColumnVector.fill((Long) value);
+        longColumnVector.isNull[0] = false;
+        break;
+
+      case DATE:
+        LongColumnVector dateColumnVector = (LongColumnVector) col;
+        dateColumnVector.fill(DateWritable.dateToDays((Date) value));
+        dateColumnVector.isNull[0] = false;
+        break;
+
+      case TIMESTAMP:
+        TimestampColumnVector timeStampColumnVector = (TimestampColumnVector) 
col;
+        timeStampColumnVector.fill((Timestamp) value);
+        timeStampColumnVector.isNull[0] = false;
+        break;
+
+      case INTERVAL_YEAR_MONTH:
+        LongColumnVector intervalYearMonthColumnVector = (LongColumnVector) 
col;
+        intervalYearMonthColumnVector.fill(((HiveIntervalYearMonth) 
value).getTotalMonths());
+        intervalYearMonthColumnVector.isNull[0] = false;
+        break;
+
+      case INTERVAL_DAY_TIME:
+        IntervalDayTimeColumnVector intervalDayTimeColumnVector = 
(IntervalDayTimeColumnVector) col;
+        intervalDayTimeColumnVector.fill((HiveIntervalDayTime) value);
+        intervalDayTimeColumnVector.isNull[0] = false;
+        break;
+
+      case FLOAT:
+        DoubleColumnVector floatColumnVector = (DoubleColumnVector) col;
+        floatColumnVector.fill((Float) value);
+        floatColumnVector.isNull[0] = false;
+        break;
+
+      case DOUBLE:
+        DoubleColumnVector doubleColumnVector = (DoubleColumnVector) col;
+        doubleColumnVector.fill((Double) value);
+        doubleColumnVector.isNull[0] = false;
+        break;
+
+      case DECIMAL:
+        DecimalColumnVector decimalColumnVector = (DecimalColumnVector) col;
+        HiveDecimal hd = (HiveDecimal) value;
+        decimalColumnVector.set(0, hd);
+        decimalColumnVector.isRepeating = true;
+        decimalColumnVector.isNull[0] = false;
+        break;
+
+      case BINARY:
+        BytesColumnVector binaryColumnVector = (BytesColumnVector) col;
+        byte[] bytes = (byte[]) value;
+        binaryColumnVector.fill(bytes);
+        binaryColumnVector.isNull[0] = false;
+        break;
+
+      case STRING:
+      case CHAR:
+      case VARCHAR:
+        BytesColumnVector bytesColumnVector = (BytesColumnVector) col;
+        String sVal = value.toString();
+        if (sVal == null) {
+          bytesColumnVector.noNulls = false;
+          bytesColumnVector.isNull[0] = true;
+          bytesColumnVector.isRepeating = true;
+        } else {
+          bytesColumnVector.setVal(0, sVal.getBytes());
+          bytesColumnVector.isRepeating = true;
+        }
+        break;
+
+      default:
+        throw new RuntimeException("Unable to recognize the partition type " +
+            primitiveTypeInfo.getPrimitiveCategory() + " for column " + 
partitionColumnName);
+    }
+
+  }
+}
diff --git 
a/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/HiveIcebergVectorizedRecordReader.java
 
b/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/HiveIcebergVectorizedRecordReader.java
new file mode 100644
index 0000000..98507dd
--- /dev/null
+++ 
b/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/HiveIcebergVectorizedRecordReader.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive.vector;
+
+import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.iceberg.mr.mapred.AbstractMapredIcebergRecordReader;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+
+/**
+ * Basically an MR1 API implementing wrapper for transferring 
VectorizedRowBatch's produced by
+ * IcebergInputformat$IcebergRecordReader which relies on the MR2 API format.
+ */
+public final class HiveIcebergVectorizedRecordReader extends 
AbstractMapredIcebergRecordReader<VectorizedRowBatch> {
+
+  private final JobConf job;
+
+  public HiveIcebergVectorizedRecordReader(
+      org.apache.iceberg.mr.mapreduce.IcebergInputFormat<VectorizedRowBatch> 
mapreduceInputFormat, IcebergSplit split,
+      JobConf job, Reporter reporter) throws IOException {
+    super(mapreduceInputFormat, split, job, reporter);
+    this.job = job;
+  }
+
+  @Override
+  public boolean next(Void key, VectorizedRowBatch value) throws IOException {
+    try {
+      if (innerReader.nextKeyValue()) {
+        VectorizedRowBatch newBatch = (VectorizedRowBatch) 
innerReader.getCurrentValue();
+        value.cols = newBatch.cols;
+        value.endOfFile = newBatch.endOfFile;
+        value.selectedInUse = newBatch.selectedInUse;
+        value.size = newBatch.size;
+        return true;
+      } else {
+        return false;
+      }
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(ie);
+    }
+  }
+
+  @Override
+  public VectorizedRowBatch createValue() {
+    return 
CompatibilityHiveVectorUtils.findMapWork(job).getVectorizedRowBatchCtx().createVectorizedRowBatch();
+  }
+
+  @Override
+  public long getPos() {
+    return -1;
+  }
+
+}
diff --git 
a/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
 
b/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
new file mode 100644
index 0000000..5c831e5
--- /dev/null
+++ 
b/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive.vector;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
+import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcInputFormat;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * Utility class to create vectorized readers for Hive.
+ * As per the file format of the task, it will create a matching vectorized 
record reader that is already implemented
+ * in Hive. It will also do some tweaks on the produced vectors for Iceberg's 
use e.g. partition column handling.
+ */
+public class HiveVectorizedReader {
+
+
+  private HiveVectorizedReader() {
+
+  }
+
+  public static <D> CloseableIterable<D> reader(InputFile inputFile, 
FileScanTask task, Map<Integer, ?> idToConstant,
+      TaskAttemptContext context) {
+    JobConf job = (JobConf) context.getConfiguration();
+    Path path = new Path(inputFile.location());
+    FileFormat format = task.file().format();
+    Reporter reporter = 
((MapredIcebergInputFormat.CompatibilityTaskAttemptContextImpl) 
context).getLegacyReporter();
+
+    // Hive by default requires partition columns to be read too. This is not 
required for identity partition
+    // columns, as we will add this as constants later.
+
+    int[] partitionColIndices = null;
+    Object[] partitionValues = null;
+    PartitionSpec partitionSpec = task.spec();
+
+    if (!partitionSpec.isUnpartitioned()) {
+      List<Integer> readColumnIds = 
ColumnProjectionUtils.getReadColumnIDs(job);
+
+      List<PartitionField> fields = partitionSpec.fields();
+      List<Integer> partitionColIndicesList = Lists.newLinkedList();
+      List<Object> partitionValuesList = Lists.newLinkedList();
+
+      for (PartitionField field : fields) {
+        if (field.transform().isIdentity()) {
+          // Skip reading identity partition columns from source file...
+          int hiveColIndex = field.sourceId() - 1;
+          readColumnIds.remove((Integer) hiveColIndex);
+
+          // ...and use the corresponding constant value instead
+          partitionColIndicesList.add(hiveColIndex);
+          partitionValuesList.add(idToConstant.get(field.sourceId()));
+        }
+      }
+
+      partitionColIndices = 
ArrayUtils.toPrimitive(partitionColIndicesList.toArray(new Integer[0]));
+      partitionValues = partitionValuesList.toArray(new Object[0]);
+
+      ColumnProjectionUtils.setReadColumns(job, readColumnIds);
+    }
+
+    try {
+      switch (format) {
+        case ORC:
+          InputSplit split = new OrcSplit(path, null, task.start(), 
task.length(), (String[]) null, null,
+              false, false, Lists.newArrayList(), 0, task.length(), 
path.getParent());
+          RecordReader<NullWritable, VectorizedRowBatch> recordReader = null;
+
+          recordReader = new VectorizedOrcInputFormat().getRecordReader(split, 
job, reporter);
+          return createVectorizedRowBatchIterable(recordReader, job, 
partitionColIndices, partitionValues);
+
+        default:
+          throw new UnsupportedOperationException("Vectorized Hive reading 
unimplemented for format: " + format);
+      }
+
+    } catch (IOException ioe) {
+      throw new RuntimeException("Error creating vectorized record reader for 
" + inputFile, ioe);
+    }
+  }
+
+  private static <D> CloseableIterable<D> createVectorizedRowBatchIterable(
+      RecordReader<NullWritable, VectorizedRowBatch> hiveRecordReader, JobConf 
job, int[] partitionColIndices,
+      Object[] partitionValues) {
+
+    VectorizedRowBatchIterator iterator =
+        new VectorizedRowBatchIterator(hiveRecordReader, job, 
partitionColIndices, partitionValues);
+
+    return new CloseableIterable<D>() {
+
+      @Override
+      public CloseableIterator iterator() {
+        return iterator;
+      }
+
+      @Override
+      public void close() throws IOException {
+        iterator.close();
+      }
+    };
+  }
+
+}
diff --git 
a/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/VectorizedRowBatchIterator.java
 
b/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/VectorizedRowBatchIterator.java
new file mode 100644
index 0000000..e29b135
--- /dev/null
+++ 
b/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/VectorizedRowBatchIterator.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive.vector;
+
+import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.iceberg.io.CloseableIterator;
+
+/**
+ * Iterator wrapper around Hive's VectorizedRowBatch producer (MRv1 
implementing) record readers.
+ */
+public final class VectorizedRowBatchIterator implements 
CloseableIterator<VectorizedRowBatch> {
+
+  private final RecordReader<NullWritable, VectorizedRowBatch> recordReader;
+  private final NullWritable key;
+  private final VectorizedRowBatch batch;
+  private final VectorizedRowBatchCtx vrbCtx;
+  private final int[] partitionColIndices;
+  private final Object[] partitionValues;
+  private boolean advanced = false;
+
+  VectorizedRowBatchIterator(RecordReader<NullWritable, VectorizedRowBatch> 
recordReader, JobConf job,
+                             int[] partitionColIndices, Object[] 
partitionValues) {
+    this.recordReader = recordReader;
+    this.key = recordReader.createKey();
+    this.batch = recordReader.createValue();
+    this.vrbCtx = 
CompatibilityHiveVectorUtils.findMapWork(job).getVectorizedRowBatchCtx();
+    this.partitionColIndices = partitionColIndices;
+    this.partitionValues = partitionValues;
+  }
+
+  @Override
+  public void close() throws IOException {
+    this.recordReader.close();
+  }
+
+  private void advance() {
+    if (!advanced) {
+      try {
+
+        if (!recordReader.next(key, batch)) {
+          batch.size = 0;
+        }
+        // Fill partition values
+        if (partitionColIndices != null) {
+          for (int i = 0; i < partitionColIndices.length; ++i) {
+            int colIdx = partitionColIndices[i];
+            
CompatibilityHiveVectorUtils.addPartitionColsToBatch(batch.cols[colIdx], 
partitionValues[i],
+                    vrbCtx.getRowColumnNames()[colIdx], 
vrbCtx.getRowColumnTypeInfos()[colIdx]);
+          }
+        }
+      } catch (IOException ioe) {
+        throw new RuntimeException(ioe);
+      }
+      advanced = true;
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    advance();
+    return batch.size > 0;
+  }
+
+  @Override
+  public VectorizedRowBatch next() {
+    advance();
+    advanced = false;
+    return batch;
+  }
+}
diff --git 
a/mr/src/main/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSupport.java 
b/mr/src/main/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSupport.java
new file mode 100644
index 0000000..0d3310c
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSupport.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Copied here from Hive for compatibility
+ */
+@SuppressWarnings("VisibilityModifier")
+public class VectorizedSupport {
+  public enum Support {
+    DECIMAL_64;
+
+    final String lowerCaseName;
+    Support() {
+      this.lowerCaseName = name().toLowerCase();
+    }
+
+    public static final Map<String, Support> nameToSupportMap = new 
HashMap<>();
+    static {
+      for (Support support : values()) {
+        nameToSupportMap.put(support.lowerCaseName, support);
+      }
+    }
+  }
+}
diff --git 
a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java 
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
index 5974e7f..50b1dac 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
@@ -23,7 +23,10 @@ import java.io.IOException;
 import java.util.Arrays;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedSupport;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -34,20 +37,43 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.iceberg.common.DynConstructors;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.hive.MetastoreUtil;
 import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapred.AbstractMapredIcebergRecordReader;
 import org.apache.iceberg.mr.mapred.Container;
 import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
+import org.apache.iceberg.mr.mapreduce.IcebergInputFormat;
 import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.util.SerializationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
-                                    implements 
CombineHiveInputFormat.AvoidSplitCombination {
+        implements CombineHiveInputFormat.AvoidSplitCombination, 
VectorizedInputFormatInterface {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HiveIcebergInputFormat.class);
+  private static final String HIVE_VECTORIZED_RECORDREADER_CLASS =
+          
"org.apache.iceberg.mr.hive.vector.HiveIcebergVectorizedRecordReader";
+  private static final DynConstructors.Ctor<AbstractMapredIcebergRecordReader> 
HIVE_VECTORIZED_RECORDREADER_CTOR;
+
+  static {
+    if (MetastoreUtil.hive3PresentOnClasspath()) {
+      HIVE_VECTORIZED_RECORDREADER_CTOR = 
DynConstructors.builder(AbstractMapredIcebergRecordReader.class)
+          .impl(HIVE_VECTORIZED_RECORDREADER_CLASS,
+              IcebergInputFormat.class,
+              IcebergSplit.class,
+              JobConf.class,
+              Reporter.class)
+          .build();
+    } else {
+      HIVE_VECTORIZED_RECORDREADER_CTOR = null;
+    }
+  }
 
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
@@ -79,11 +105,31 @@ public class HiveIcebergInputFormat extends 
MapredIcebergInputFormat<Record>
                                                                Reporter 
reporter) throws IOException {
     String[] selectedColumns = ColumnProjectionUtils.getReadColumnNames(job);
     job.setStrings(InputFormatConfig.SELECTED_COLUMNS, selectedColumns);
-    return super.getRecordReader(split, job, reporter);
+
+    if (HiveConf.getBoolVar(job, 
HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) {
+      Preconditions.checkArgument(MetastoreUtil.hive3PresentOnClasspath(), 
"Vectorization only supported for Hive 3+");
+
+      IcebergSplit icebergSplit = ((IcebergSplitContainer) 
split).icebergSplit();
+      // bogus cast for favouring code reuse over syntax
+      return (RecordReader) HIVE_VECTORIZED_RECORDREADER_CTOR.newInstance(
+              new org.apache.iceberg.mr.mapreduce.IcebergInputFormat<>(),
+              icebergSplit,
+              job,
+              reporter);
+    } else {
+      return super.getRecordReader(split, job, reporter);
+    }
   }
 
   @Override
   public boolean shouldSkipCombine(Path path, Configuration conf) {
     return true;
   }
+
+  // Override annotation commented out, since this interface method has been 
introduced only in Hive 3
+  // @Override
+  public VectorizedSupport.Support[] getSupportedFeatures() {
+    return new VectorizedSupport.Support[0];
+  }
+
 }
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java 
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
index 0dd3c7f..707db6f 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
@@ -70,10 +70,6 @@ public class HiveIcebergSerDe extends AbstractSerDe {
     // executor, but serDeProperties are populated by 
HiveIcebergStorageHandler.configureInputJobProperties() and
     // the resulting properties are serialized and distributed to the executors
 
-    // temporarily disabling vectorization in Tez, since it doesn't work with 
projection pruning (fix: TEZ-4248)
-    // TODO: remove this once TEZ-4248 has been released and the Tez 
dependencies updated here
-    assertNotVectorizedTez(configuration);
-
     if (serDeProperties.get(InputFormatConfig.TABLE_SCHEMA) != null) {
       this.tableSchema = SchemaParser.fromJson((String) 
serDeProperties.get(InputFormatConfig.TABLE_SCHEMA));
     } else {
@@ -115,14 +111,6 @@ public class HiveIcebergSerDe extends AbstractSerDe {
     }
   }
 
-  private void assertNotVectorizedTez(Configuration configuration) {
-    if ("tez".equals(configuration.get("hive.execution.engine")) &&
-        "true".equals(configuration.get("hive.vectorized.execution.enabled"))) 
{
-      throw new UnsupportedOperationException("Vectorized execution on Tez is 
currently not supported when using " +
-          "Iceberg tables. Please set hive.vectorized.execution.enabled=false 
and rerun the query.");
-    }
-  }
-
   @Override
   public Class<? extends Writable> getSerializedClass() {
     return Container.class;
diff --git 
a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java 
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index f51259e..83004c7 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Properties;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
 import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
 import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
@@ -126,6 +127,11 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
         jobConf.set(InputFormatConfig.TABLE_CATALOG_PREFIX + tableName, 
catalogName);
       }
     }
+
+    if (HiveConf.getBoolVar(jobConf, 
HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) {
+      jobConf.setEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL, 
InputFormatConfig.InMemoryDataModel.HIVE);
+      conf.setBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, true);
+    }
   }
 
   @Override
diff --git 
a/mr/src/main/java/org/apache/iceberg/mr/mapred/AbstractMapredIcebergRecordReader.java
 
b/mr/src/main/java/org/apache/iceberg/mr/mapred/AbstractMapredIcebergRecordReader.java
new file mode 100644
index 0000000..022dc51
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/iceberg/mr/mapred/AbstractMapredIcebergRecordReader.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.io.IOException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+
+@SuppressWarnings("checkstyle:VisibilityModifier")
+public abstract class AbstractMapredIcebergRecordReader<T> implements 
RecordReader<Void, T> {
+
+  protected final org.apache.hadoop.mapreduce.RecordReader<Void, ?> 
innerReader;
+
+  public 
AbstractMapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat<?>
 mapreduceInputFormat,
+                                           IcebergSplit split, JobConf job, 
Reporter reporter) throws IOException {
+    TaskAttemptContext context = 
MapredIcebergInputFormat.newTaskAttemptContext(job, reporter);
+
+    try {
+      innerReader = mapreduceInputFormat.createRecordReader(split, context);
+      innerReader.initialize(split, context);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    }
+
+  }
+
+  @Override
+  public Void createKey() {
+    return null;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    try {
+      return innerReader.getProgress();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (innerReader != null) {
+      innerReader.close();
+    }
+  }
+
+}
diff --git 
a/mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java 
b/mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
index 073edd8..60db53b 100644
--- 
a/mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
+++ 
b/mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.mr.mapred;
 
 import java.io.IOException;
 import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -78,23 +79,14 @@ public class MapredIcebergInputFormat<T> implements 
InputFormat<Void, Container<
     return new MapredIcebergRecordReader<>(innerInputFormat, icebergSplit, 
job, reporter);
   }
 
-  private static final class MapredIcebergRecordReader<T> implements 
RecordReader<Void, Container<T>> {
 
-    private final org.apache.hadoop.mapreduce.RecordReader<Void, T> 
innerReader;
+  private static final class MapredIcebergRecordReader<T> extends 
AbstractMapredIcebergRecordReader<Container<T>> {
+
     private final long splitLength; // for getPos()
 
     
MapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> 
mapreduceInputFormat,
                               IcebergSplit split, JobConf job, Reporter 
reporter) throws IOException {
-      TaskAttemptContext context = newTaskAttemptContext(job, reporter);
-
-      try {
-        innerReader = mapreduceInputFormat.createRecordReader(split, context);
-        innerReader.initialize(split, context);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(e);
-      }
-
+      super(mapreduceInputFormat, split, job, reporter);
       splitLength = split.getLength();
     }
 
@@ -102,7 +94,7 @@ public class MapredIcebergInputFormat<T> implements 
InputFormat<Void, Container<
     public boolean next(Void key, Container<T> value) throws IOException {
       try {
         if (innerReader.nextKeyValue()) {
-          value.set(innerReader.getCurrentValue());
+          value.set((T) innerReader.getCurrentValue());
           return true;
         }
       } catch (InterruptedException ie) {
@@ -114,11 +106,6 @@ public class MapredIcebergInputFormat<T> implements 
InputFormat<Void, Container<
     }
 
     @Override
-    public Void createKey() {
-      return null;
-    }
-
-    @Override
     public Container<T> createValue() {
       return new Container<>();
     }
@@ -128,36 +115,35 @@ public class MapredIcebergInputFormat<T> implements 
InputFormat<Void, Container<
       return (long) (splitLength * getProgress());
     }
 
-    @Override
-    public float getProgress() throws IOException {
-      try {
-        return innerReader.getProgress();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (innerReader != null) {
-        innerReader.close();
-      }
-    }
   }
 
   private static JobContext newJobContext(JobConf job) {
     JobID jobID = Optional.ofNullable(JobID.forName(job.get(JobContext.ID)))
-                          .orElseGet(JobID::new);
+            .orElseGet(JobID::new);
 
     return new JobContextImpl(job, jobID);
   }
 
-  private static TaskAttemptContext newTaskAttemptContext(JobConf job, 
Reporter reporter) {
+  public static TaskAttemptContext newTaskAttemptContext(JobConf job, Reporter 
reporter) {
     TaskAttemptID taskAttemptID = 
Optional.ofNullable(TaskAttemptID.forName(job.get(JobContext.TASK_ATTEMPT_ID)))
-                                          .orElseGet(TaskAttemptID::new);
+            .orElseGet(TaskAttemptID::new);
+
+    return new CompatibilityTaskAttemptContextImpl(job, taskAttemptID, 
reporter);
+  }
+
+  // Saving the Reporter instance here as it is required for Hive vectorized 
readers.
+  public static class CompatibilityTaskAttemptContextImpl extends 
TaskAttemptContextImpl {
+
+    private final Reporter legacyReporter;
 
-    return new TaskAttemptContextImpl(job, taskAttemptID, 
toStatusReporter(reporter));
+    public CompatibilityTaskAttemptContextImpl(Configuration conf, 
TaskAttemptID taskId, Reporter reporter) {
+      super(conf, taskId, toStatusReporter(reporter));
+      this.legacyReporter = reporter;
+    }
+
+    public Reporter getLegacyReporter() {
+      return legacyReporter;
+    }
   }
 
   private static StatusReporter toStatusReporter(Reporter reporter) {
diff --git 
a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java 
b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 0a43189..c526ede 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -47,6 +47,7 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.common.DynMethods;
 import org.apache.iceberg.data.DeleteFilter;
 import org.apache.iceberg.data.GenericDeleteFilter;
 import org.apache.iceberg.data.IdentityPartitionConverters;
@@ -59,6 +60,7 @@ import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.expressions.Evaluator;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hive.MetastoreUtil;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileIO;
@@ -166,6 +168,24 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
   }
 
   private static final class IcebergRecordReader<T> extends RecordReader<Void, 
T> {
+
+    private static final String HIVE_VECTORIZED_READER_CLASS = 
"org.apache.iceberg.mr.hive.vector.HiveVectorizedReader";
+    private static final DynMethods.StaticMethod 
HIVE_VECTORIZED_READER_BUILDER;
+
+    static {
+      if (MetastoreUtil.hive3PresentOnClasspath()) {
+        HIVE_VECTORIZED_READER_BUILDER = DynMethods.builder("reader")
+            .impl(HIVE_VECTORIZED_READER_CLASS,
+                InputFile.class,
+                FileScanTask.class,
+                Map.class,
+                TaskAttemptContext.class)
+            .buildStatic();
+      } else {
+        HIVE_VECTORIZED_READER_BUILDER = null;
+      }
+    }
+
     private TaskAttemptContext context;
     private Schema tableSchema;
     private Schema expectedSchema;
@@ -173,7 +193,7 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
     private boolean caseSensitive;
     private InputFormatConfig.InMemoryDataModel inMemoryDataModel;
     private Iterator<FileScanTask> tasks;
-    private T currentRow;
+    private T current;
     private CloseableIterator<T> currentIterator;
     private FileIO io;
     private EncryptionManager encryptionManager;
@@ -200,7 +220,7 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
     public boolean nextKeyValue() throws IOException {
       while (true) {
         if (currentIterator.hasNext()) {
-          currentRow = currentIterator.next();
+          current = currentIterator.next();
           return true;
         } else if (tasks.hasNext()) {
           currentIterator.close();
@@ -219,7 +239,7 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
 
     @Override
     public T getCurrentValue() {
-      return currentRow;
+      return current;
     }
 
     @Override
@@ -267,9 +287,10 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
     private CloseableIterable<T> open(FileScanTask currentTask, Schema 
readSchema) {
       switch (inMemoryDataModel) {
         case PIG:
-        case HIVE:
           // TODO: Support Pig and Hive object models for IcebergInputFormat
           throw new UnsupportedOperationException("Pig and Hive object models 
are not supported.");
+        case HIVE:
+          return openTask(currentTask, readSchema);
         case GENERIC:
           DeleteFilter deletes = new GenericDeleteFilter(io, currentTask, 
tableSchema, readSchema);
           Schema requiredSchema = deletes.requiredSchema();
@@ -344,24 +365,33 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
       Map<Integer, ?> idToConstant = constantsMap(task, 
IdentityPartitionConverters::convertConstant);
       Schema readSchemaWithoutConstantAndMetadataFields = 
TypeUtil.selectNot(readSchema,
           Sets.union(idToConstant.keySet(), 
MetadataColumns.metadataFieldIds()));
-      ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile)
-          .project(readSchemaWithoutConstantAndMetadataFields)
-          .filter(task.residual())
-          .caseSensitive(caseSensitive)
-          .split(task.start(), task.length());
+
+      CloseableIterable<T> orcIterator = null;
       // ORC does not support reuse containers yet
       switch (inMemoryDataModel) {
         case PIG:
-        case HIVE:
           // TODO: implement value readers for Pig and Hive
           throw new UnsupportedOperationException("ORC support not yet 
supported for Pig and Hive");
+        case HIVE:
+          if (MetastoreUtil.hive3PresentOnClasspath()) {
+            orcIterator = HIVE_VECTORIZED_READER_BUILDER.invoke(inputFile, 
task, idToConstant, context);
+          } else {
+            throw new UnsupportedOperationException("Vectorized read is 
unsupported for Hive 2 integration.");
+          }
+          break;
         case GENERIC:
+          ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile)
+              .project(readSchemaWithoutConstantAndMetadataFields)
+              .filter(task.residual())
+              .caseSensitive(caseSensitive)
+              .split(task.start(), task.length());
           orcReadBuilder.createReaderFunc(
               fileSchema -> GenericOrcReader.buildReader(
                   readSchema, fileSchema, idToConstant));
+          orcIterator = orcReadBuilder.build();
       }
 
-      return applyResidualFiltering(orcReadBuilder.build(), task.residual(), 
readSchema);
+      return applyResidualFiltering(orcIterator, task.residual(), readSchema);
     }
 
     private Map<Integer, ?> constantsMap(FileScanTask task, BiFunction<Type, 
Object, Object> converter) {
diff --git 
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
 
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index bb47860..72dc588 100644
--- 
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ 
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
@@ -35,6 +36,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.hive.MetastoreUtil;
 import org.apache.iceberg.mr.TestHelper;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -95,7 +97,7 @@ public class TestHiveIcebergStorageHandlerWithEngine {
                   Types.DecimalType.of(3, 1), Types.UUIDType.get(), 
Types.FixedType.ofLength(5),
                   Types.TimeType.get());
 
-  @Parameters(name = "fileFormat={0}, engine={1}, catalog={2}")
+  @Parameters(name = "fileFormat={0}, engine={1}, catalog={2}, 
isVectorized={3}")
   public static Collection<Object[]> parameters() {
     Collection<Object[]> testParams = new ArrayList<>();
     String javaVersion = System.getProperty("java.specification.version");
@@ -105,7 +107,11 @@ public class TestHiveIcebergStorageHandlerWithEngine {
       for (String engine : EXECUTION_ENGINES) {
         // include Tez tests only for Java 8
         if (javaVersion.equals("1.8") || "mr".equals(engine)) {
-          testParams.add(new Object[] {fileFormat, engine, 
TestTables.TestTableType.HIVE_CATALOG});
+          testParams.add(new Object[] {fileFormat, engine, 
TestTables.TestTableType.HIVE_CATALOG, false});
+          // test for vectorization=ON in case of ORC format and Tez engine
+          if (fileFormat == FileFormat.ORC && "tez".equals(engine) && 
MetastoreUtil.hive3PresentOnClasspath()) {
+            testParams.add(new Object[] {fileFormat, engine, 
TestTables.TestTableType.HIVE_CATALOG, true});
+          }
         }
       }
     }
@@ -114,7 +120,7 @@ public class TestHiveIcebergStorageHandlerWithEngine {
     // skip HiveCatalog tests as they are added before
     for (TestTables.TestTableType testTableType : TestTables.ALL_TABLE_TYPES) {
       if (!TestTables.TestTableType.HIVE_CATALOG.equals(testTableType)) {
-        testParams.add(new Object[]{FileFormat.PARQUET, "mr", testTableType});
+        testParams.add(new Object[]{FileFormat.PARQUET, "mr", testTableType, 
false});
       }
     }
 
@@ -134,6 +140,9 @@ public class TestHiveIcebergStorageHandlerWithEngine {
   @Parameter(2)
   public TestTables.TestTableType testTableType;
 
+  @Parameter(3)
+  public boolean isVectorized;
+
   @Rule
   public TemporaryFolder temp = new TemporaryFolder();
 
@@ -154,6 +163,7 @@ public class TestHiveIcebergStorageHandlerWithEngine {
   public void before() throws IOException {
     testTables = HiveIcebergStorageHandlerTestUtils.testTables(shell, 
testTableType, temp);
     HiveIcebergStorageHandlerTestUtils.init(shell, testTables, temp, 
executionEngine);
+    HiveConf.setBoolVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized);
   }
 
   @After
@@ -253,6 +263,10 @@ public class TestHiveIcebergStorageHandlerWithEngine {
   public void testJoinTablesSupportedTypes() throws IOException {
     for (int i = 0; i < SUPPORTED_TYPES.size(); i++) {
       Type type = SUPPORTED_TYPES.get(i);
+      if (type == Types.TimestampType.withZone() && isVectorized) {
+        // ORC/TIMESTAMP_INSTANT is not a supported vectorized type for Hive
+        continue;
+      }
       // TODO: remove this filter when issue #1881 is resolved
       if (type == Types.UUIDType.get() && fileFormat == FileFormat.PARQUET) {
         continue;
@@ -276,6 +290,10 @@ public class TestHiveIcebergStorageHandlerWithEngine {
   public void testSelectDistinctFromTable() throws IOException {
     for (int i = 0; i < SUPPORTED_TYPES.size(); i++) {
       Type type = SUPPORTED_TYPES.get(i);
+      if (type == Types.TimestampType.withZone() && isVectorized) {
+        // ORC/TIMESTAMP_INSTANT is not a supported vectorized type for Hive
+        continue;
+      }
       // TODO: remove this filter when issue #1881 is resolved
       if (type == Types.UUIDType.get() && fileFormat == FileFormat.PARQUET) {
         continue;
diff --git a/settings.gradle b/settings.gradle
index 037bdf8..ba00916 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -64,8 +64,10 @@ if (JavaVersion.current() == JavaVersion.VERSION_1_8) {
   include 'spark2'
   include 'spark-runtime'
   include 'hive3'
+  include 'hive3-orc-bundle'
 
   project(':spark2').name = 'iceberg-spark2'
   project(':spark-runtime').name = 'iceberg-spark-runtime'
   project(':hive3').name = 'iceberg-hive3'
+  project(':hive3-orc-bundle').name = 'iceberg-hive3-orc-bundle'
 }

Reply via email to