This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 282b6f9 Hive: Vectorized ORC reads for Hive (#2613)
282b6f9 is described below
commit 282b6f9f1cae8d4fd5ff7c73de513ca91f01fddc
Author: Adam Szita <[email protected]>
AuthorDate: Mon Jun 7 10:53:59 2021 +0200
Hive: Vectorized ORC reads for Hive (#2613)
---
build.gradle | 84 ++++++--
.../hive/vector/CompatibilityHiveVectorUtils.java | 217 +++++++++++++++++++++
.../vector/HiveIcebergVectorizedRecordReader.java | 73 +++++++
.../mr/hive/vector/HiveVectorizedReader.java | 138 +++++++++++++
.../mr/hive/vector/VectorizedRowBatchIterator.java | 91 +++++++++
.../hive/ql/exec/vector/VectorizedSupport.java | 45 +++++
.../iceberg/mr/hive/HiveIcebergInputFormat.java | 50 ++++-
.../apache/iceberg/mr/hive/HiveIcebergSerDe.java | 12 --
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 6 +
.../mapred/AbstractMapredIcebergRecordReader.java | 70 +++++++
.../mr/mapred/MapredIcebergInputFormat.java | 62 +++---
.../iceberg/mr/mapreduce/IcebergInputFormat.java | 52 +++--
.../TestHiveIcebergStorageHandlerWithEngine.java | 24 ++-
settings.gradle | 2 +
14 files changed, 847 insertions(+), 79 deletions(-)
diff --git a/build.gradle b/build.gradle
index 855a9f5..bacb2f1 100644
--- a/build.gradle
+++ b/build.gradle
@@ -551,6 +551,60 @@ project(':iceberg-mr') {
}
if (jdkVersion == '8') {
+ // The purpose of this module is to re-shade org.apache.orc.storage to the
original org.apache.hadoop.hive package
+ // name. This is to be used by Hive3 for features including e.g.
vectorization.
+ project(':iceberg-hive3-orc-bundle') {
+
+ apply plugin: 'com.github.johnrengelman.shadow'
+
+ tasks.jar.dependsOn tasks.shadowJar
+
+ dependencies {
+ compile project(':iceberg-data')
+ compile project(':iceberg-orc')
+
+ testCompileOnly project(path: ':iceberg-data', configuration:
'testArtifacts')
+ testCompileOnly project(path: ':iceberg-orc', configuration:
'testArtifacts')
+ }
+
+ shadowJar {
+ configurations = [project.configurations.compile,
project.configurations.compileOnly, project.configurations.testCompileOnly]
+
+ zip64 true
+
+ // include the LICENSE and NOTICE files for the shaded Jar
+ from(projectDir) {
+ include 'LICENSE'
+ include 'NOTICE'
+ }
+
+ // Relocate dependencies to avoid conflicts
+ relocate 'org.apache.orc.storage', 'org.apache.hadoop.hive'
+
+ // We really only need Iceberg and Orc classes, but with relocated
references for storage-api classes (see above)
+ // Unfortunately the include list feature of this shader plugin doesn't
work as expected
+ exclude 'com/**/*'
+ exclude 'edu/**/*'
+ exclude 'io/**'
+ exclude 'javax/**'
+ exclude 'org/apache/avro/**/*'
+ exclude 'org/apache/commons/**/*'
+ exclude 'org/checkerframework/**/*'
+ exclude 'org/codehaus/**/*'
+ exclude 'org/intellij/**/*'
+ exclude 'org/jetbrains/**/*'
+ exclude 'org/slf4j/**/*'
+ exclude 'org/threeten/**/*'
+
+ classifier null
+ }
+
+ jar {
+ enabled = false
+ }
+
+ }
+
project(':iceberg-hive3') {
// run the tests in iceberg-mr with Hive3 dependencies
@@ -569,13 +623,13 @@ if (jdkVersion == '8') {
}
dependencies {
- compile project(':iceberg-api')
- compile project(':iceberg-core')
- compile project(':iceberg-data')
- compile project(':iceberg-hive-metastore')
- compile project(':iceberg-orc')
- compile project(':iceberg-parquet')
- compile project(':iceberg-mr')
+ compileOnly project(':iceberg-api')
+ compileOnly project(':iceberg-core')
+ compileOnly project(':iceberg-hive-metastore')
+ compileOnly project(':iceberg-parquet')
+ compileOnly project(':iceberg-hive3-orc-bundle')
+ compileOnly project(':iceberg-mr')
+
compileOnly("org.apache.hadoop:hadoop-client:3.1.0") {
exclude group: 'org.apache.avro', module: 'avro'
@@ -592,10 +646,13 @@ if (jdkVersion == '8') {
exclude group: 'org.pentaho' // missing dependency
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
- compileOnly("org.apache.hive:hive-metastore:3.1.2")
- compileOnly("org.apache.hive:hive-serde:3.1.2")
+ compileOnly("org.apache.hive:hive-metastore:3.1.2") {
+ exclude group: 'org.apache.orc'
+ }
+ compileOnly("org.apache.hive:hive-serde:3.1.2") {
+ exclude group: 'org.apache.orc'
+ }
- testCompile project(path: ':iceberg-data', configuration:
'testArtifacts')
testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-core', configuration:
'testArtifacts')
testCompile project(path: ':iceberg-hive-metastore', configuration:
'testArtifacts')
@@ -606,6 +663,7 @@ if (jdkVersion == '8') {
testCompile("com.fasterxml.jackson.core:jackson-annotations:2.6.5")
testCompile("org.apache.hive:hive-service:3.1.2") {
exclude group: 'org.apache.hive', module: 'hive-exec'
+ exclude group: 'org.apache.orc'
}
testCompile("org.apache.tez:tez-dag:0.9.1")
testCompile("org.apache.tez:tez-mapreduce:0.9.1")
@@ -628,7 +686,7 @@ project(':iceberg-hive-runtime') {
exclude group: 'com.github.stephenc.findbugs'
exclude group: 'commons-pool'
exclude group: 'javax.annotation'
- exclude group: 'javax.xml.bind'
+ exclude group: 'javax.xml.bind'
exclude group: 'org.apache.commons'
exclude group: 'org.slf4j'
exclude group: 'org.xerial.snappy'
@@ -645,7 +703,7 @@ project(':iceberg-hive-runtime') {
}
compile project(':iceberg-aws')
}
-
+
shadowJar {
configurations = [project.configurations.compile]
@@ -659,7 +717,7 @@ project(':iceberg-hive-runtime') {
// Relocate dependencies to avoid conflicts
relocate 'org.apache.avro', 'org.apache.iceberg.shaded.org.apache.avro'
- relocate 'org.apache.parquet',
'org.apache.iceberg.shaded.org.apache.parquet'
+ relocate 'org.apache.parquet',
'org.apache.iceberg.shaded.org.apache.parquet'
relocate 'com.google', 'org.apache.iceberg.shaded.com.google'
relocate 'com.fasterxml', 'org.apache.iceberg.shaded.com.fasterxml'
relocate 'com.github.benmanes',
'org.apache.iceberg.shaded.com.github.benmanes'
diff --git
a/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/CompatibilityHiveVectorUtils.java
b/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/CompatibilityHiveVectorUtils.java
new file mode 100644
index 0000000..d53fdad
--- /dev/null
+++
b/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/CompatibilityHiveVectorUtils.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive.vector;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
+import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.IntervalDayTimeColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Contains ported code snippets from later Hive sources. We should get rid of
this class as soon as Hive 4 is released
+ * and Iceberg makes a dependency to that version.
+ */
+public class CompatibilityHiveVectorUtils {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CompatibilityHiveVectorUtils.class);
+
+ private CompatibilityHiveVectorUtils() {
+
+ }
+
+ /**
+ * Returns serialized mapwork instance from a job conf - ported from Hive
source code LlapHiveUtils#findMapWork
+ *
+ * @param job JobConf instance
+ * @return
+ */
+ public static MapWork findMapWork(JobConf job) {
+ String inputName = job.get(Utilities.INPUT_NAME, null);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initializing for input {}", inputName);
+ }
+ String prefixes = job.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES);
+ if (prefixes != null && !StringUtils.isBlank(prefixes)) {
+ // Currently SMB is broken, so we cannot check if it's compatible with
IO elevator.
+ // So, we don't use the below code that would get the correct MapWork.
See HIVE-16985.
+ return null;
+ }
+
+ BaseWork work = null;
+ // HIVE-16985: try to find the fake merge work for SMB join, that is
really another MapWork.
+ if (inputName != null) {
+ if (prefixes == null ||
!Lists.newArrayList(prefixes.split(",")).contains(inputName)) {
+ inputName = null;
+ }
+ }
+ if (inputName != null) {
+ work = Utilities.getMergeWork(job, inputName);
+ }
+
+ if (!(work instanceof MapWork)) {
+ work = Utilities.getMapWork(job);
+ }
+ return (MapWork) work;
+ }
+
+
+ /**
+ * Ported from Hive source code VectorizedRowBatchCtx#addPartitionColsToBatch
+ *
+ * @param col ColumnVector to write the partition value into
+ * @param value partition value
+ * @param partitionColumnName partition key
+ * @param rowColumnTypeInfo column type description
+ */
+// @SuppressWarnings({"AvoidNestedBlocks", "FallThrough", "MethodLength",
"CyclomaticComplexity", "Indentation"})
+ public static void addPartitionColsToBatch(ColumnVector col, Object value,
String partitionColumnName,
+ TypeInfo rowColumnTypeInfo) {
+ PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo)
rowColumnTypeInfo;
+
+ if (value == null) {
+ col.noNulls = false;
+ col.isNull[0] = true;
+ col.isRepeating = true;
+ return;
+ }
+
+ switch (primitiveTypeInfo.getPrimitiveCategory()) {
+ case BOOLEAN:
+ LongColumnVector booleanColumnVector = (LongColumnVector) col;
+ booleanColumnVector.fill((Boolean) value ? 1 : 0);
+ booleanColumnVector.isNull[0] = false;
+ break;
+
+ case BYTE:
+ LongColumnVector byteColumnVector = (LongColumnVector) col;
+ byteColumnVector.fill((Byte) value);
+ byteColumnVector.isNull[0] = false;
+ break;
+
+ case SHORT:
+ LongColumnVector shortColumnVector = (LongColumnVector) col;
+ shortColumnVector.fill((Short) value);
+ shortColumnVector.isNull[0] = false;
+ break;
+
+ case INT:
+ LongColumnVector intColumnVector = (LongColumnVector) col;
+ intColumnVector.fill((Integer) value);
+ intColumnVector.isNull[0] = false;
+ break;
+
+ case LONG:
+ LongColumnVector longColumnVector = (LongColumnVector) col;
+ longColumnVector.fill((Long) value);
+ longColumnVector.isNull[0] = false;
+ break;
+
+ case DATE:
+ LongColumnVector dateColumnVector = (LongColumnVector) col;
+ dateColumnVector.fill(DateWritable.dateToDays((Date) value));
+ dateColumnVector.isNull[0] = false;
+ break;
+
+ case TIMESTAMP:
+ TimestampColumnVector timeStampColumnVector = (TimestampColumnVector)
col;
+ timeStampColumnVector.fill((Timestamp) value);
+ timeStampColumnVector.isNull[0] = false;
+ break;
+
+ case INTERVAL_YEAR_MONTH:
+ LongColumnVector intervalYearMonthColumnVector = (LongColumnVector)
col;
+ intervalYearMonthColumnVector.fill(((HiveIntervalYearMonth)
value).getTotalMonths());
+ intervalYearMonthColumnVector.isNull[0] = false;
+ break;
+
+ case INTERVAL_DAY_TIME:
+ IntervalDayTimeColumnVector intervalDayTimeColumnVector =
(IntervalDayTimeColumnVector) col;
+ intervalDayTimeColumnVector.fill((HiveIntervalDayTime) value);
+ intervalDayTimeColumnVector.isNull[0] = false;
+ break;
+
+ case FLOAT:
+ DoubleColumnVector floatColumnVector = (DoubleColumnVector) col;
+ floatColumnVector.fill((Float) value);
+ floatColumnVector.isNull[0] = false;
+ break;
+
+ case DOUBLE:
+ DoubleColumnVector doubleColumnVector = (DoubleColumnVector) col;
+ doubleColumnVector.fill((Double) value);
+ doubleColumnVector.isNull[0] = false;
+ break;
+
+ case DECIMAL:
+ DecimalColumnVector decimalColumnVector = (DecimalColumnVector) col;
+ HiveDecimal hd = (HiveDecimal) value;
+ decimalColumnVector.set(0, hd);
+ decimalColumnVector.isRepeating = true;
+ decimalColumnVector.isNull[0] = false;
+ break;
+
+ case BINARY:
+ BytesColumnVector binaryColumnVector = (BytesColumnVector) col;
+ byte[] bytes = (byte[]) value;
+ binaryColumnVector.fill(bytes);
+ binaryColumnVector.isNull[0] = false;
+ break;
+
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ BytesColumnVector bytesColumnVector = (BytesColumnVector) col;
+ String sVal = value.toString();
+ if (sVal == null) {
+ bytesColumnVector.noNulls = false;
+ bytesColumnVector.isNull[0] = true;
+ bytesColumnVector.isRepeating = true;
+ } else {
+ bytesColumnVector.setVal(0, sVal.getBytes());
+ bytesColumnVector.isRepeating = true;
+ }
+ break;
+
+ default:
+ throw new RuntimeException("Unable to recognize the partition type " +
+ primitiveTypeInfo.getPrimitiveCategory() + " for column " +
partitionColumnName);
+ }
+
+ }
+}
diff --git
a/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/HiveIcebergVectorizedRecordReader.java
b/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/HiveIcebergVectorizedRecordReader.java
new file mode 100644
index 0000000..98507dd
--- /dev/null
+++
b/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/HiveIcebergVectorizedRecordReader.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive.vector;
+
+import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.iceberg.mr.mapred.AbstractMapredIcebergRecordReader;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+
+/**
+ * Basically an MR1 API implementing wrapper for transferring
VectorizedRowBatch's produced by
+ * IcebergInputformat$IcebergRecordReader which relies on the MR2 API format.
+ */
+public final class HiveIcebergVectorizedRecordReader extends
AbstractMapredIcebergRecordReader<VectorizedRowBatch> {
+
+ private final JobConf job;
+
+ public HiveIcebergVectorizedRecordReader(
+ org.apache.iceberg.mr.mapreduce.IcebergInputFormat<VectorizedRowBatch>
mapreduceInputFormat, IcebergSplit split,
+ JobConf job, Reporter reporter) throws IOException {
+ super(mapreduceInputFormat, split, job, reporter);
+ this.job = job;
+ }
+
+ @Override
+ public boolean next(Void key, VectorizedRowBatch value) throws IOException {
+ try {
+ if (innerReader.nextKeyValue()) {
+ VectorizedRowBatch newBatch = (VectorizedRowBatch)
innerReader.getCurrentValue();
+ value.cols = newBatch.cols;
+ value.endOfFile = newBatch.endOfFile;
+ value.selectedInUse = newBatch.selectedInUse;
+ value.size = newBatch.size;
+ return true;
+ } else {
+ return false;
+ }
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(ie);
+ }
+ }
+
+ @Override
+ public VectorizedRowBatch createValue() {
+ return
CompatibilityHiveVectorUtils.findMapWork(job).getVectorizedRowBatchCtx().createVectorizedRowBatch();
+ }
+
+ @Override
+ public long getPos() {
+ return -1;
+ }
+
+}
diff --git
a/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
b/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
new file mode 100644
index 0000000..5c831e5
--- /dev/null
+++
b/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive.vector;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
+import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcInputFormat;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * Utility class to create vectorized readers for Hive.
+ * As per the file format of the task, it will create a matching vectorized
record reader that is already implemented
+ * in Hive. It will also do some tweaks on the produced vectors for Iceberg's
use e.g. partition column handling.
+ */
+public class HiveVectorizedReader {
+
+
+ private HiveVectorizedReader() {
+
+ }
+
+ public static <D> CloseableIterable<D> reader(InputFile inputFile,
FileScanTask task, Map<Integer, ?> idToConstant,
+ TaskAttemptContext context) {
+ JobConf job = (JobConf) context.getConfiguration();
+ Path path = new Path(inputFile.location());
+ FileFormat format = task.file().format();
+ Reporter reporter =
((MapredIcebergInputFormat.CompatibilityTaskAttemptContextImpl)
context).getLegacyReporter();
+
+ // Hive by default requires partition columns to be read too. This is not
required for identity partition
+ // columns, as we will add this as constants later.
+
+ int[] partitionColIndices = null;
+ Object[] partitionValues = null;
+ PartitionSpec partitionSpec = task.spec();
+
+ if (!partitionSpec.isUnpartitioned()) {
+ List<Integer> readColumnIds =
ColumnProjectionUtils.getReadColumnIDs(job);
+
+ List<PartitionField> fields = partitionSpec.fields();
+ List<Integer> partitionColIndicesList = Lists.newLinkedList();
+ List<Object> partitionValuesList = Lists.newLinkedList();
+
+ for (PartitionField field : fields) {
+ if (field.transform().isIdentity()) {
+ // Skip reading identity partition columns from source file...
+ int hiveColIndex = field.sourceId() - 1;
+ readColumnIds.remove((Integer) hiveColIndex);
+
+ // ...and use the corresponding constant value instead
+ partitionColIndicesList.add(hiveColIndex);
+ partitionValuesList.add(idToConstant.get(field.sourceId()));
+ }
+ }
+
+ partitionColIndices =
ArrayUtils.toPrimitive(partitionColIndicesList.toArray(new Integer[0]));
+ partitionValues = partitionValuesList.toArray(new Object[0]);
+
+ ColumnProjectionUtils.setReadColumns(job, readColumnIds);
+ }
+
+ try {
+ switch (format) {
+ case ORC:
+ InputSplit split = new OrcSplit(path, null, task.start(),
task.length(), (String[]) null, null,
+ false, false, Lists.newArrayList(), 0, task.length(),
path.getParent());
+ RecordReader<NullWritable, VectorizedRowBatch> recordReader = null;
+
+ recordReader = new VectorizedOrcInputFormat().getRecordReader(split,
job, reporter);
+ return createVectorizedRowBatchIterable(recordReader, job,
partitionColIndices, partitionValues);
+
+ default:
+ throw new UnsupportedOperationException("Vectorized Hive reading
unimplemented for format: " + format);
+ }
+
+ } catch (IOException ioe) {
+ throw new RuntimeException("Error creating vectorized record reader for
" + inputFile, ioe);
+ }
+ }
+
+ private static <D> CloseableIterable<D> createVectorizedRowBatchIterable(
+ RecordReader<NullWritable, VectorizedRowBatch> hiveRecordReader, JobConf
job, int[] partitionColIndices,
+ Object[] partitionValues) {
+
+ VectorizedRowBatchIterator iterator =
+ new VectorizedRowBatchIterator(hiveRecordReader, job,
partitionColIndices, partitionValues);
+
+ return new CloseableIterable<D>() {
+
+ @Override
+ public CloseableIterator iterator() {
+ return iterator;
+ }
+
+ @Override
+ public void close() throws IOException {
+ iterator.close();
+ }
+ };
+ }
+
+}
diff --git
a/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/VectorizedRowBatchIterator.java
b/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/VectorizedRowBatchIterator.java
new file mode 100644
index 0000000..e29b135
--- /dev/null
+++
b/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/VectorizedRowBatchIterator.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive.vector;
+
+import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.iceberg.io.CloseableIterator;
+
+/**
+ * Iterator wrapper around Hive's VectorizedRowBatch producer (MRv1
implementing) record readers.
+ */
+public final class VectorizedRowBatchIterator implements
CloseableIterator<VectorizedRowBatch> {
+
+ private final RecordReader<NullWritable, VectorizedRowBatch> recordReader;
+ private final NullWritable key;
+ private final VectorizedRowBatch batch;
+ private final VectorizedRowBatchCtx vrbCtx;
+ private final int[] partitionColIndices;
+ private final Object[] partitionValues;
+ private boolean advanced = false;
+
+ VectorizedRowBatchIterator(RecordReader<NullWritable, VectorizedRowBatch>
recordReader, JobConf job,
+ int[] partitionColIndices, Object[]
partitionValues) {
+ this.recordReader = recordReader;
+ this.key = recordReader.createKey();
+ this.batch = recordReader.createValue();
+ this.vrbCtx =
CompatibilityHiveVectorUtils.findMapWork(job).getVectorizedRowBatchCtx();
+ this.partitionColIndices = partitionColIndices;
+ this.partitionValues = partitionValues;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.recordReader.close();
+ }
+
+ private void advance() {
+ if (!advanced) {
+ try {
+
+ if (!recordReader.next(key, batch)) {
+ batch.size = 0;
+ }
+ // Fill partition values
+ if (partitionColIndices != null) {
+ for (int i = 0; i < partitionColIndices.length; ++i) {
+ int colIdx = partitionColIndices[i];
+
CompatibilityHiveVectorUtils.addPartitionColsToBatch(batch.cols[colIdx],
partitionValues[i],
+ vrbCtx.getRowColumnNames()[colIdx],
vrbCtx.getRowColumnTypeInfos()[colIdx]);
+ }
+ }
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ advanced = true;
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ advance();
+ return batch.size > 0;
+ }
+
+ @Override
+ public VectorizedRowBatch next() {
+ advance();
+ advanced = false;
+ return batch;
+ }
+}
diff --git
a/mr/src/main/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSupport.java
b/mr/src/main/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSupport.java
new file mode 100644
index 0000000..0d3310c
--- /dev/null
+++
b/mr/src/main/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSupport.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Copied here from Hive for compatibility
+ */
+@SuppressWarnings("VisibilityModifier")
+public class VectorizedSupport {
+ public enum Support {
+ DECIMAL_64;
+
+ final String lowerCaseName;
+ Support() {
+ this.lowerCaseName = name().toLowerCase();
+ }
+
+ public static final Map<String, Support> nameToSupportMap = new
HashMap<>();
+ static {
+ for (Support support : values()) {
+ nameToSupportMap.put(support.lowerCaseName, support);
+ }
+ }
+ }
+}
diff --git
a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
index 5974e7f..50b1dac 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
@@ -23,7 +23,10 @@ import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedSupport;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -34,20 +37,43 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.hive.MetastoreUtil;
import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapred.AbstractMapredIcebergRecordReader;
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
+import org.apache.iceberg.mr.mapreduce.IcebergInputFormat;
import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.SerializationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
- implements
CombineHiveInputFormat.AvoidSplitCombination {
+ implements CombineHiveInputFormat.AvoidSplitCombination,
VectorizedInputFormatInterface {
private static final Logger LOG =
LoggerFactory.getLogger(HiveIcebergInputFormat.class);
+ private static final String HIVE_VECTORIZED_RECORDREADER_CLASS =
+
"org.apache.iceberg.mr.hive.vector.HiveIcebergVectorizedRecordReader";
+ private static final DynConstructors.Ctor<AbstractMapredIcebergRecordReader>
HIVE_VECTORIZED_RECORDREADER_CTOR;
+
+ static {
+ if (MetastoreUtil.hive3PresentOnClasspath()) {
+ HIVE_VECTORIZED_RECORDREADER_CTOR =
DynConstructors.builder(AbstractMapredIcebergRecordReader.class)
+ .impl(HIVE_VECTORIZED_RECORDREADER_CLASS,
+ IcebergInputFormat.class,
+ IcebergSplit.class,
+ JobConf.class,
+ Reporter.class)
+ .build();
+ } else {
+ HIVE_VECTORIZED_RECORDREADER_CTOR = null;
+ }
+ }
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException
{
@@ -79,11 +105,31 @@ public class HiveIcebergInputFormat extends
MapredIcebergInputFormat<Record>
Reporter
reporter) throws IOException {
String[] selectedColumns = ColumnProjectionUtils.getReadColumnNames(job);
job.setStrings(InputFormatConfig.SELECTED_COLUMNS, selectedColumns);
- return super.getRecordReader(split, job, reporter);
+
+ if (HiveConf.getBoolVar(job,
HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) {
+ Preconditions.checkArgument(MetastoreUtil.hive3PresentOnClasspath(),
"Vectorization only supported for Hive 3+");
+
+ IcebergSplit icebergSplit = ((IcebergSplitContainer)
split).icebergSplit();
+ // bogus cast for favouring code reuse over syntax
+ return (RecordReader) HIVE_VECTORIZED_RECORDREADER_CTOR.newInstance(
+ new org.apache.iceberg.mr.mapreduce.IcebergInputFormat<>(),
+ icebergSplit,
+ job,
+ reporter);
+ } else {
+ return super.getRecordReader(split, job, reporter);
+ }
}
@Override
public boolean shouldSkipCombine(Path path, Configuration conf) {
return true;
}
+
+ // Override annotation commented out, since this interface method has been
introduced only in Hive 3
+ // @Override
+ public VectorizedSupport.Support[] getSupportedFeatures() {
+ return new VectorizedSupport.Support[0];
+ }
+
}
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
index 0dd3c7f..707db6f 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
@@ -70,10 +70,6 @@ public class HiveIcebergSerDe extends AbstractSerDe {
// executor, but serDeProperties are populated by
HiveIcebergStorageHandler.configureInputJobProperties() and
// the resulting properties are serialized and distributed to the executors
- // temporarily disabling vectorization in Tez, since it doesn't work with
projection pruning (fix: TEZ-4248)
- // TODO: remove this once TEZ-4248 has been released and the Tez
dependencies updated here
- assertNotVectorizedTez(configuration);
-
if (serDeProperties.get(InputFormatConfig.TABLE_SCHEMA) != null) {
this.tableSchema = SchemaParser.fromJson((String)
serDeProperties.get(InputFormatConfig.TABLE_SCHEMA));
} else {
@@ -115,14 +111,6 @@ public class HiveIcebergSerDe extends AbstractSerDe {
}
}
- private void assertNotVectorizedTez(Configuration configuration) {
- if ("tez".equals(configuration.get("hive.execution.engine")) &&
- "true".equals(configuration.get("hive.vectorized.execution.enabled")))
{
- throw new UnsupportedOperationException("Vectorized execution on Tez is
currently not supported when using " +
- "Iceberg tables. Please set hive.vectorized.execution.enabled=false
and rerun the query.");
- }
- }
-
@Override
public Class<? extends Writable> getSerializedClass() {
return Container.class;
diff --git
a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index f51259e..83004c7 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
@@ -126,6 +127,11 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
jobConf.set(InputFormatConfig.TABLE_CATALOG_PREFIX + tableName,
catalogName);
}
}
+
+ if (HiveConf.getBoolVar(jobConf,
HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) {
+ jobConf.setEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
InputFormatConfig.InMemoryDataModel.HIVE);
+ conf.setBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, true);
+ }
}
@Override
diff --git
a/mr/src/main/java/org/apache/iceberg/mr/mapred/AbstractMapredIcebergRecordReader.java
b/mr/src/main/java/org/apache/iceberg/mr/mapred/AbstractMapredIcebergRecordReader.java
new file mode 100644
index 0000000..022dc51
--- /dev/null
+++
b/mr/src/main/java/org/apache/iceberg/mr/mapred/AbstractMapredIcebergRecordReader.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.io.IOException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+
+@SuppressWarnings("checkstyle:VisibilityModifier")
+public abstract class AbstractMapredIcebergRecordReader<T> implements
RecordReader<Void, T> {
+
+ protected final org.apache.hadoop.mapreduce.RecordReader<Void, ?>
innerReader;
+
+ public
AbstractMapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat<?>
mapreduceInputFormat,
+ IcebergSplit split, JobConf job,
Reporter reporter) throws IOException {
+ TaskAttemptContext context =
MapredIcebergInputFormat.newTaskAttemptContext(job, reporter);
+
+ try {
+ innerReader = mapreduceInputFormat.createRecordReader(split, context);
+ innerReader.initialize(split, context);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ @Override
+ public Void createKey() {
+ return null;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ try {
+ return innerReader.getProgress();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (innerReader != null) {
+ innerReader.close();
+ }
+ }
+
+}
diff --git
a/mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
b/mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
index 073edd8..60db53b 100644
---
a/mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
+++
b/mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.mr.mapred;
import java.io.IOException;
import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -78,23 +79,14 @@ public class MapredIcebergInputFormat<T> implements
InputFormat<Void, Container<
return new MapredIcebergRecordReader<>(innerInputFormat, icebergSplit,
job, reporter);
}
- private static final class MapredIcebergRecordReader<T> implements
RecordReader<Void, Container<T>> {
- private final org.apache.hadoop.mapreduce.RecordReader<Void, T>
innerReader;
+ private static final class MapredIcebergRecordReader<T> extends
AbstractMapredIcebergRecordReader<Container<T>> {
+
private final long splitLength; // for getPos()
MapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T>
mapreduceInputFormat,
IcebergSplit split, JobConf job, Reporter
reporter) throws IOException {
- TaskAttemptContext context = newTaskAttemptContext(job, reporter);
-
- try {
- innerReader = mapreduceInputFormat.createRecordReader(split, context);
- innerReader.initialize(split, context);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
-
+ super(mapreduceInputFormat, split, job, reporter);
splitLength = split.getLength();
}
@@ -102,7 +94,7 @@ public class MapredIcebergInputFormat<T> implements
InputFormat<Void, Container<
public boolean next(Void key, Container<T> value) throws IOException {
try {
if (innerReader.nextKeyValue()) {
- value.set(innerReader.getCurrentValue());
+ value.set((T) innerReader.getCurrentValue());
return true;
}
} catch (InterruptedException ie) {
@@ -114,11 +106,6 @@ public class MapredIcebergInputFormat<T> implements
InputFormat<Void, Container<
}
@Override
- public Void createKey() {
- return null;
- }
-
- @Override
public Container<T> createValue() {
return new Container<>();
}
@@ -128,36 +115,35 @@ public class MapredIcebergInputFormat<T> implements
InputFormat<Void, Container<
return (long) (splitLength * getProgress());
}
- @Override
- public float getProgress() throws IOException {
- try {
- return innerReader.getProgress();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void close() throws IOException {
- if (innerReader != null) {
- innerReader.close();
- }
- }
}
private static JobContext newJobContext(JobConf job) {
JobID jobID = Optional.ofNullable(JobID.forName(job.get(JobContext.ID)))
- .orElseGet(JobID::new);
+ .orElseGet(JobID::new);
return new JobContextImpl(job, jobID);
}
- private static TaskAttemptContext newTaskAttemptContext(JobConf job,
Reporter reporter) {
+ public static TaskAttemptContext newTaskAttemptContext(JobConf job, Reporter
reporter) {
TaskAttemptID taskAttemptID =
Optional.ofNullable(TaskAttemptID.forName(job.get(JobContext.TASK_ATTEMPT_ID)))
- .orElseGet(TaskAttemptID::new);
+ .orElseGet(TaskAttemptID::new);
+
+ return new CompatibilityTaskAttemptContextImpl(job, taskAttemptID,
reporter);
+ }
+
+ // Saving the Reporter instance here as it is required for Hive vectorized
readers.
+ public static class CompatibilityTaskAttemptContextImpl extends
TaskAttemptContextImpl {
+
+ private final Reporter legacyReporter;
- return new TaskAttemptContextImpl(job, taskAttemptID,
toStatusReporter(reporter));
+ public CompatibilityTaskAttemptContextImpl(Configuration conf,
TaskAttemptID taskId, Reporter reporter) {
+ super(conf, taskId, toStatusReporter(reporter));
+ this.legacyReporter = reporter;
+ }
+
+ public Reporter getLegacyReporter() {
+ return legacyReporter;
+ }
}
private static StatusReporter toStatusReporter(Reporter reporter) {
diff --git
a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 0a43189..c526ede 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -47,6 +47,7 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.data.GenericDeleteFilter;
import org.apache.iceberg.data.IdentityPartitionConverters;
@@ -59,6 +60,7 @@ import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hive.MetastoreUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
@@ -166,6 +168,24 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
}
private static final class IcebergRecordReader<T> extends RecordReader<Void,
T> {
+
+ private static final String HIVE_VECTORIZED_READER_CLASS =
"org.apache.iceberg.mr.hive.vector.HiveVectorizedReader";
+ private static final DynMethods.StaticMethod
HIVE_VECTORIZED_READER_BUILDER;
+
+ static {
+ if (MetastoreUtil.hive3PresentOnClasspath()) {
+ HIVE_VECTORIZED_READER_BUILDER = DynMethods.builder("reader")
+ .impl(HIVE_VECTORIZED_READER_CLASS,
+ InputFile.class,
+ FileScanTask.class,
+ Map.class,
+ TaskAttemptContext.class)
+ .buildStatic();
+ } else {
+ HIVE_VECTORIZED_READER_BUILDER = null;
+ }
+ }
+
private TaskAttemptContext context;
private Schema tableSchema;
private Schema expectedSchema;
@@ -173,7 +193,7 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
private boolean caseSensitive;
private InputFormatConfig.InMemoryDataModel inMemoryDataModel;
private Iterator<FileScanTask> tasks;
- private T currentRow;
+ private T current;
private CloseableIterator<T> currentIterator;
private FileIO io;
private EncryptionManager encryptionManager;
@@ -200,7 +220,7 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
public boolean nextKeyValue() throws IOException {
while (true) {
if (currentIterator.hasNext()) {
- currentRow = currentIterator.next();
+ current = currentIterator.next();
return true;
} else if (tasks.hasNext()) {
currentIterator.close();
@@ -219,7 +239,7 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
@Override
public T getCurrentValue() {
- return currentRow;
+ return current;
}
@Override
@@ -267,9 +287,10 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
private CloseableIterable<T> open(FileScanTask currentTask, Schema
readSchema) {
switch (inMemoryDataModel) {
case PIG:
- case HIVE:
// TODO: Support Pig and Hive object models for IcebergInputFormat
throw new UnsupportedOperationException("Pig and Hive object models
are not supported.");
+ case HIVE:
+ return openTask(currentTask, readSchema);
case GENERIC:
DeleteFilter deletes = new GenericDeleteFilter(io, currentTask,
tableSchema, readSchema);
Schema requiredSchema = deletes.requiredSchema();
@@ -344,24 +365,33 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
Map<Integer, ?> idToConstant = constantsMap(task,
IdentityPartitionConverters::convertConstant);
Schema readSchemaWithoutConstantAndMetadataFields =
TypeUtil.selectNot(readSchema,
Sets.union(idToConstant.keySet(),
MetadataColumns.metadataFieldIds()));
- ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile)
- .project(readSchemaWithoutConstantAndMetadataFields)
- .filter(task.residual())
- .caseSensitive(caseSensitive)
- .split(task.start(), task.length());
+
+ CloseableIterable<T> orcIterator = null;
// ORC does not support reuse containers yet
switch (inMemoryDataModel) {
case PIG:
- case HIVE:
// TODO: implement value readers for Pig and Hive
throw new UnsupportedOperationException("ORC support not yet
supported for Pig and Hive");
+ case HIVE:
+ if (MetastoreUtil.hive3PresentOnClasspath()) {
+ orcIterator = HIVE_VECTORIZED_READER_BUILDER.invoke(inputFile,
task, idToConstant, context);
+ } else {
+ throw new UnsupportedOperationException("Vectorized read is
unsupported for Hive 2 integration.");
+ }
+ break;
case GENERIC:
+ ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile)
+ .project(readSchemaWithoutConstantAndMetadataFields)
+ .filter(task.residual())
+ .caseSensitive(caseSensitive)
+ .split(task.start(), task.length());
orcReadBuilder.createReaderFunc(
fileSchema -> GenericOrcReader.buildReader(
readSchema, fileSchema, idToConstant));
+ orcIterator = orcReadBuilder.build();
}
- return applyResidualFiltering(orcReadBuilder.build(), task.residual(),
readSchema);
+ return applyResidualFiltering(orcIterator, task.residual(), readSchema);
}
private Map<Integer, ?> constantsMap(FileScanTask task, BiFunction<Type,
Object, Object> converter) {
diff --git
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index bb47860..72dc588 100644
---
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
@@ -35,6 +36,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.hive.MetastoreUtil;
import org.apache.iceberg.mr.TestHelper;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -95,7 +97,7 @@ public class TestHiveIcebergStorageHandlerWithEngine {
Types.DecimalType.of(3, 1), Types.UUIDType.get(),
Types.FixedType.ofLength(5),
Types.TimeType.get());
- @Parameters(name = "fileFormat={0}, engine={1}, catalog={2}")
+ @Parameters(name = "fileFormat={0}, engine={1}, catalog={2},
isVectorized={3}")
public static Collection<Object[]> parameters() {
Collection<Object[]> testParams = new ArrayList<>();
String javaVersion = System.getProperty("java.specification.version");
@@ -105,7 +107,11 @@ public class TestHiveIcebergStorageHandlerWithEngine {
for (String engine : EXECUTION_ENGINES) {
// include Tez tests only for Java 8
if (javaVersion.equals("1.8") || "mr".equals(engine)) {
- testParams.add(new Object[] {fileFormat, engine,
TestTables.TestTableType.HIVE_CATALOG});
+ testParams.add(new Object[] {fileFormat, engine,
TestTables.TestTableType.HIVE_CATALOG, false});
+ // test for vectorization=ON in case of ORC format and Tez engine
+ if (fileFormat == FileFormat.ORC && "tez".equals(engine) &&
MetastoreUtil.hive3PresentOnClasspath()) {
+ testParams.add(new Object[] {fileFormat, engine,
TestTables.TestTableType.HIVE_CATALOG, true});
+ }
}
}
}
@@ -114,7 +120,7 @@ public class TestHiveIcebergStorageHandlerWithEngine {
// skip HiveCatalog tests as they are added before
for (TestTables.TestTableType testTableType : TestTables.ALL_TABLE_TYPES) {
if (!TestTables.TestTableType.HIVE_CATALOG.equals(testTableType)) {
- testParams.add(new Object[]{FileFormat.PARQUET, "mr", testTableType});
+ testParams.add(new Object[]{FileFormat.PARQUET, "mr", testTableType,
false});
}
}
@@ -134,6 +140,9 @@ public class TestHiveIcebergStorageHandlerWithEngine {
@Parameter(2)
public TestTables.TestTableType testTableType;
+ @Parameter(3)
+ public boolean isVectorized;
+
@Rule
public TemporaryFolder temp = new TemporaryFolder();
@@ -154,6 +163,7 @@ public class TestHiveIcebergStorageHandlerWithEngine {
public void before() throws IOException {
testTables = HiveIcebergStorageHandlerTestUtils.testTables(shell,
testTableType, temp);
HiveIcebergStorageHandlerTestUtils.init(shell, testTables, temp,
executionEngine);
+ HiveConf.setBoolVar(shell.getHiveConf(),
HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized);
}
@After
@@ -253,6 +263,10 @@ public class TestHiveIcebergStorageHandlerWithEngine {
public void testJoinTablesSupportedTypes() throws IOException {
for (int i = 0; i < SUPPORTED_TYPES.size(); i++) {
Type type = SUPPORTED_TYPES.get(i);
+ if (type == Types.TimestampType.withZone() && isVectorized) {
+ // ORC/TIMESTAMP_INSTANT is not a supported vectorized type for Hive
+ continue;
+ }
// TODO: remove this filter when issue #1881 is resolved
if (type == Types.UUIDType.get() && fileFormat == FileFormat.PARQUET) {
continue;
@@ -276,6 +290,10 @@ public class TestHiveIcebergStorageHandlerWithEngine {
public void testSelectDistinctFromTable() throws IOException {
for (int i = 0; i < SUPPORTED_TYPES.size(); i++) {
Type type = SUPPORTED_TYPES.get(i);
+ if (type == Types.TimestampType.withZone() && isVectorized) {
+ // ORC/TIMESTAMP_INSTANT is not a supported vectorized type for Hive
+ continue;
+ }
// TODO: remove this filter when issue #1881 is resolved
if (type == Types.UUIDType.get() && fileFormat == FileFormat.PARQUET) {
continue;
diff --git a/settings.gradle b/settings.gradle
index 037bdf8..ba00916 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -64,8 +64,10 @@ if (JavaVersion.current() == JavaVersion.VERSION_1_8) {
include 'spark2'
include 'spark-runtime'
include 'hive3'
+ include 'hive3-orc-bundle'
project(':spark2').name = 'iceberg-spark2'
project(':spark-runtime').name = 'iceberg-spark-runtime'
project(':hive3').name = 'iceberg-hive3'
+ project(':hive3-orc-bundle').name = 'iceberg-hive3-orc-bundle'
}