This is an automated email from the ASF dual-hosted git repository.
szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 10135b90e72 HIVE-26696: Implement row iterator for VectorizedRowBatch
(#3737) (Adam Szita, reviewed by Laszlo Pinter)
10135b90e72 is described below
commit 10135b90e72ab53a2cc761fe7ed24db8fc7924c7
Author: Adam Szita <[email protected]>
AuthorDate: Wed Nov 9 14:51:14 2022 +0100
HIVE-26696: Implement row iterator for VectorizedRowBatch (#3737) (Adam
Szita, reviewed by Laszlo Pinter)
---
.../iceberg/mr/hive/vector/HiveBatchContext.java | 85 +++++--
.../org/apache/iceberg/mr/hive/vector/HiveRow.java | 53 +++++
.../iceberg/mr/hive/vector/HiveStructLike.java | 56 +++++
.../iceberg/mr/hive/vector/HiveValueConverter.java | 140 ++++++++++++
.../org/apache/iceberg/mr/hive/TestTables.java | 2 +-
.../hive/vector/TestHiveIcebergVectorization.java | 204 +++++++++++++++++
.../hive/ql/exec/vector/VectorExtractRow.java | 248 +++++++++++++++------
7 files changed, 712 insertions(+), 76 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchContext.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchContext.java
index 08d2f732555..015eb9de843 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchContext.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchContext.java
@@ -20,14 +20,16 @@
package org.apache.iceberg.mr.hive.vector;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.NoSuchElementException;
+import java.util.stream.IntStream;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.io.CloseableIterator;
-/**
- * Wraps a Hive VRB and holds corresponding metadata information about it,
such as VRB context (e.g. type infos) and
- * file row offset.
- */
public class HiveBatchContext {
private final VectorizedRowBatch batch;
@@ -47,25 +49,82 @@ public class HiveBatchContext {
return batch;
}
- public RowIterator rowIterator() throws IOException {
- throw new UnsupportedOperationException("Not implemented yet");
+ public CloseableIterator<HiveRow> rowIterator() throws IOException {
+ return new RowIterator();
}
- // TODO: implement row iterator
- class RowIterator implements CloseableIterator {
+ class RowIterator implements CloseableIterator<HiveRow> {
- @Override
- public void close() throws IOException {
+ private final VectorExtractRow vectorExtractRow;
+ private final int originalSize;
+ private final int[] originalIndices;
+ private int currentPosition = 0;
+
+ RowIterator() throws IOException {
+ try {
+ this.vectorExtractRow = new VectorExtractRow();
+ this.vectorExtractRow.init(vrbCtx.getRowColumnTypeInfos());
+ this.originalSize = batch.size;
+ if (batch.isSelectedInUse()) {
+ // copy, as further operations working on this batch might change
what rows are selected
+ originalIndices = Arrays.copyOf(batch.selected, batch.size);
+ } else {
+ originalIndices = IntStream.range(0, batch.size).toArray();
+ }
+ } catch (HiveException e) {
+ throw new IOException(e);
+ }
}
@Override
public boolean hasNext() {
- return false;
+ return currentPosition < originalSize;
}
@Override
- public Object next() {
- return null;
+ public HiveRow next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ // position of the row as this batch is intended to be read (e.g. if
batch is already filtered this
+ // can be different from the physical position)
+ int logicalPosition = currentPosition++;
+ // real position of this row within the original (i.e unfiltered) batch
+ int physicalPosition = originalIndices[logicalPosition];
+
+ HiveRow row = new HiveRow() {
+
+ @Override
+ public Object get(int rowIndex) {
+
+ if (rowIndex == MetadataColumns.ROW_POSITION.fieldId()) {
+ if (fileRowOffset == Long.MIN_VALUE) {
+ throw new UnsupportedOperationException("Can't provide row
position for batch.");
+ }
+ return fileRowOffset + physicalPosition;
+ } else {
+ return
vectorExtractRow.accessor(batch).apply(physicalPosition).apply(rowIndex);
+ }
+ }
+
+ @Override
+ public int physicalBatchIndex() {
+ return physicalPosition;
+ }
+
+ };
+ return row;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // no-op
}
}
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveRow.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveRow.java
new file mode 100644
index 00000000000..f96b19f62a6
--- /dev/null
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveRow.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive.vector;
+
+/**
+ * Hive's record representation, where data is provided by an underlying
VectorizedRowBatch instance.
+ */
+public abstract class HiveRow {
+
+ /**
+ * Whether this row is marked as deleted or not. VRB implementation
registers un-deleted rows in its 'selected' array.
+ */
+ private boolean deleted = false;
+
+ /**
+ * Returns an item/data from the row found on the provided row index.
+ * @param rowIndex row index
+ * @return data
+ */
+ public abstract Object get(int rowIndex);
+
+ /**
+ * Returns the original position of this row in the en-wrapping VRB instance.
+ * @return batch index.
+ */
+ public abstract int physicalBatchIndex();
+
+ public void setDeleted(boolean deleted) {
+ this.deleted = deleted;
+ }
+
+ public boolean isDeleted() {
+ return deleted;
+ }
+
+}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveStructLike.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveStructLike.java
new file mode 100644
index 00000000000..0b49e687a55
--- /dev/null
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveStructLike.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive.vector;
+
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.types.Types;
+
+/**
+ * StructLike implementation for the HiveRow record type.
+ */
+public class HiveStructLike implements StructLike {
+ private final Types.StructType type;
+ private HiveRow wrapped;
+
+ public HiveStructLike(Types.StructType type) {
+ this.type = type;
+ }
+
+ public HiveStructLike wrap(HiveRow row) {
+ this.wrapped = row;
+ return this;
+ }
+
+ @Override
+ public int size() {
+ return type.fields().size();
+ }
+
+ @Override
+ public <T> T get(int pos, Class<T> javaClass) {
+ Types.NestedField field = type.fields().get(pos);
+ return javaClass.cast(HiveValueConverter.convert(field.type(),
wrapped.get(pos)));
+ }
+
+ @Override
+ public <T> void set(int pos, T value) {
+ throw new UnsupportedOperationException("Not implemented: set");
+ }
+}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveValueConverter.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveValueConverter.java
new file mode 100644
index 00000000000..0804a7abe9f
--- /dev/null
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveValueConverter.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive.vector;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hive.common.type.Date;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.Timestamp;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Converts from HiveRow to Iceberg's internal data representation format
+ * aka from POJOs used by Hive to hold record data into those of Iceberg
+ */
+public class HiveValueConverter {
+
+ private HiveValueConverter() {
+ }
+
+ public static Record convert(Schema schema, HiveRow row) {
+ return convert(schema.asStruct(), row);
+ }
+
+ public static Object convert(Type type, Object object) {
+ if (object == null) {
+ return null;
+ }
+
+ switch (type.typeId()) {
+ case STRUCT:
+ return convert(type.asStructType(), (HiveRow) object);
+
+ case LIST:
+ List<Object> convertedList = Lists.newArrayList();
+ List<?> list = (List<?>) object;
+ for (Object element : list) {
+ convertedList.add(convert(type.asListType().elementType(), element));
+ }
+ return convertedList;
+
+ case MAP:
+ Map<Object, Object> convertedMap = Maps.newLinkedHashMap();
+ Map<?, ?> map = (Map<?, ?>) object;
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+ convertedMap.put(
+ convert(type.asMapType().keyType(), entry.getKey()),
+ convert(type.asMapType().valueType(), entry.getValue()));
+ }
+ return convertedMap;
+
+ case DATE:
+ Date hiveDate = (Date) object;
+ return LocalDate.of(hiveDate.getYear(), hiveDate.getMonth(),
hiveDate.getDay());
+ case TIMESTAMP:
+ if (Types.TimestampType.withZone() == type) {
+ throw new UnsupportedOperationException("Not supported with
vectorization.");
+ } else {
+ // Hive's internal TS representation is in UTC
+ return LocalDateTime.ofInstant(((Timestamp)
object).toSqlTimestamp().toInstant(), ZoneId.of("UTC"));
+ }
+ case BINARY:
+ return ByteBuffer.wrap((byte[]) object);
+ case FIXED:
+ return (byte[]) object;
+ case DECIMAL:
+ BigDecimal bigDecimal = ((HiveDecimal) object).bigDecimalValue();
+ if (bigDecimal.longValue() == 0) {
+ // For some reason for value=0 scale information is not preserved in
Hive
+ return bigDecimal.setScale(((Types.DecimalType) type).scale());
+ }
+ return bigDecimal;
+ case INTEGER:
+ case BOOLEAN:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case STRING:
+ return object;
+ default:
+ throw new UnsupportedOperationException("Not a supported type: " +
type);
+ }
+ }
+
+ private static Record convert(Types.StructType struct, HiveRow row) {
+ if (row == null) {
+ return null;
+ }
+
+ Record record = GenericRecord.create(struct);
+ List<Types.NestedField> fields = struct.fields();
+ for (int i = 0; i < fields.size(); i += 1) {
+ Types.NestedField field = fields.get(i);
+
+ Type fieldType = field.type();
+
+ switch (fieldType.typeId()) {
+ case STRUCT:
+ record.set(i, convert(fieldType.asStructType(), row.get(i)));
+ break;
+ case LIST:
+ record.set(i, convert(fieldType.asListType(), row.get(i)));
+ break;
+ case MAP:
+ record.set(i, convert(fieldType.asMapType(), row.get(i)));
+ break;
+ default:
+ record.set(i, convert(fieldType, row.get(i)));
+ }
+ }
+ return record;
+ }
+}
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
index efaea9a8a01..0f0f48911e5 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
@@ -65,7 +65,7 @@ import org.junit.Assert;
import org.junit.rules.TemporaryFolder;
// Helper class for setting up and testing various catalog implementations
-abstract class TestTables {
+public abstract class TestTables {
public static final TestTableType[] ALL_TABLE_TYPES = new TestTableType[] {
TestTableType.HADOOP_TABLE,
TestTableType.HADOOP_CATALOG,
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveIcebergVectorization.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveIcebergVectorization.java
new file mode 100644
index 00000000000..6bc7074a87c
--- /dev/null
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveIcebergVectorization.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive.vector;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcInputFormat;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.mr.TestHelper;
+import org.apache.iceberg.mr.hive.HiveIcebergStorageHandlerWithEngineBase;
+import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.junit.Assume.assumeTrue;
+
+public class TestHiveIcebergVectorization extends
HiveIcebergStorageHandlerWithEngineBase {
+
+ /**
+ * Tests the row iterator implementation (HiveRow,
HiveBatchContext.RowIterator) along with HiveValueConverter by
+ * reading in values from all supported types via VRBs, and iterating on its
records 1-by-1 while comparing with the
+ * expected Iceberg record instances.
+ * @throws Exception any test failure
+ */
+ @Test
+ public void testRowIterator() throws Exception {
+ assumeTrue("Tests a format-independent feature", isVectorized &&
FileFormat.ORC.equals(fileFormat));
+
+ // Create a table with sample data with all supported types, those
unsupported for vectorization are commented out
+ Schema allSchema = new Schema(
+ optional(1, "binary_col", Types.BinaryType.get()),
+ optional(2, "boolean_col", Types.BooleanType.get()),
+ optional(3, "date_col", Types.DateType.get()),
+ optional(4, "decimal_col", Types.DecimalType.of(6, 4)),
+ optional(5, "double_col", Types.DoubleType.get()),
+ optional(6, "fixed_col", Types.FixedType.ofLength(4)),
+ optional(7, "float_col", Types.FloatType.get()),
+ optional(8, "int_col", Types.IntegerType.get()),
+ optional(9, "long_col", Types.LongType.get()),
+ optional(10, "string_col", Types.StringType.get()),
+// optional(11, "uuid_col", Types.UUIDType.get()),
+ optional(12, "timestamp_col", Types.TimestampType.withoutZone())
+// optional(13, "timestamp_with_tz_col",
Types.TimestampType.withZone()),
+// optional(14, "time_col", Types.TimeType.get())
+ );
+
+ // Generate 10 records for all column types into our test table
+ List<Record> records = TestHelper.generateRandomRecords(allSchema, 10, 0L);
+ Table table = testTables.createTable(shell, "temptable", allSchema,
fileFormat, records);
+
+ // Identify data file location - expected to be 1 file exactly
+ Path dataFilePath = new
Path(Lists.newArrayList(Lists.newArrayList(table.newScan().planTasks().iterator()).get(0)
+ .files().iterator()).get(0).file().path().toString());
+
+ // Generate a mock vectorized read job
+ JobConf jobConf = prepareMockJob(allSchema, dataFilePath);
+
+ // Simulates HiveVectorizedReader creating an ORC record reader
(implementation inside Hive QL code)
+ VectorizedOrcInputFormat inputFormat = new VectorizedOrcInputFormat();
+ RecordReader<NullWritable, VectorizedRowBatch>
internalVectorizedRecordReader =
+ inputFormat.getRecordReader(new FileSplit(dataFilePath, 0L,
Long.MAX_VALUE, new String[]{}), jobConf,
+ new MockReporter());
+ HiveBatchIterator hiveBatchIterator = new
HiveBatchIterator(internalVectorizedRecordReader, jobConf, null, null);
+
+ // Expected to be one batch exactly
+ HiveBatchContext hiveBatchContext = hiveBatchIterator.next();
+ CloseableIterator<HiveRow> hiveRowIterator =
hiveBatchContext.rowIterator();
+
+ // Iterator for the expected records
+ Iterator<Record> genericRowIterator = records.iterator();
+
+ // Compare record data provided by Hive with those provided by
GenericRecord implementation of Iceberg
+ while (hiveRowIterator.hasNext() && genericRowIterator.hasNext()) {
+ HiveRow hiveRow = hiveRowIterator.next();
+ Record hiveRecord = HiveValueConverter.convert(allSchema, hiveRow);
+ Record genericRecord = genericRowIterator.next();
+
+ // Will do a deep comparison on values
+ Assert.assertEquals(genericRecord, hiveRecord);
+ }
+
+ // The two iterators both should be at the end by now
+ Assert.assertEquals(genericRowIterator.hasNext(),
hiveRowIterator.hasNext());
+ }
+
+ /**
+ * Creates a mock vectorized ORC read job for a particular data file and a
read schema (projecting on all columns)
+ * @param schema readSchema
+ * @param dataFilePath data file path
+ * @return JobConf instance
+ * @throws HiveException any failure during job creation
+ */
+ private JobConf prepareMockJob(Schema schema, Path dataFilePath) throws
HiveException {
+ StructObjectInspector oi = (StructObjectInspector)
IcebergObjectInspector.create(schema);
+ String hiveColumnNames = String.join(",",
oi.getAllStructFieldRefs().stream()
+ .map(sf -> sf.getFieldName()).collect(Collectors.toList()));
+ String hiveTypeInfoNames = String.join(",",
oi.getAllStructFieldRefs().stream()
+ .map(sf ->
sf.getFieldObjectInspector().getTypeName()).collect(Collectors.toList()));
+
+ // facepalm: getTypeName returns detailed info for decimal type.. :/
+ hiveTypeInfoNames = hiveTypeInfoNames.replaceAll("decimal\\(\\d+,\\d+\\)",
"decimal");
+
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS, hiveColumnNames);
+ conf.set(IOConstants.COLUMNS_TYPES, hiveTypeInfoNames);
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, true);
+
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED,
true);
+ HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp");
+ JobConf vectorJob = new JobConf(conf);
+
+ VectorizedOrcInputFormat.setInputPaths(vectorJob, dataFilePath);
+
+ MapWork mapWork = new MapWork();
+ VectorizedRowBatchCtx rbCtx = new VectorizedRowBatchCtx();
+ rbCtx.init(oi, new String[0]);
+ mapWork.setVectorMode(true);
+ mapWork.setVectorizedRowBatchCtx(rbCtx);
+ Utilities.setMapWork(vectorJob, mapWork);
+ return vectorJob;
+ }
+
+ private static class MockReporter implements Reporter {
+
+ @Override
+ public void setStatus(String s) {
+ }
+
+ @Override
+ public Counters.Counter getCounter(Enum<?> anEnum) {
+ return null;
+ }
+
+ @Override
+ public Counters.Counter getCounter(String s, String s1) {
+ return null;
+ }
+
+ @Override
+ public void incrCounter(Enum<?> anEnum, long l) {
+ }
+
+ @Override
+ public void incrCounter(String s, String s1, long l) {
+ }
+
+ @Override
+ public InputSplit getInputSplit() throws UnsupportedOperationException {
+ return null;
+ }
+
+ @Override
+ public float getProgress() {
+ return 0;
+ }
+
+ @Override
+ public void progress() {
+ }
+ }
+}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
index e1482e077d6..b3be81646c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
@@ -18,11 +18,16 @@
package org.apache.hadoop.hive.ql.exec.vector;
+import java.nio.charset.CharacterCodingException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
+import org.apache.hadoop.hive.common.type.Date;
+import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
+import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
import org.apache.hadoop.hive.common.type.Timestamp;
import org.apache.hadoop.hive.serde2.io.DateWritableV2;
import org.apache.hadoop.hive.serde2.io.TimestampWritableV2;
@@ -93,6 +98,8 @@ public class VectorExtractRow {
private static final byte[] EMPTY_BYTES = new byte[0];
private static final String EMPTY_STRING = "";
+ public enum OutputType { WRITABLES, POJO };
+
/*
* Allocate the various arrays.
*/
@@ -184,14 +191,13 @@ public class VectorExtractRow {
* @return
*/
private Object extractRowColumn(VectorizedRowBatch batch, int batchIndex,
- int logicalColumnIndex) {
+ int logicalColumnIndex, OutputType outputType) {
final int projectionColumnNum = projectionColumnNums[logicalColumnIndex];
final ColumnVector colVector = batch.cols[projectionColumnNum];
final TypeInfo typeInfo = typeInfos[logicalColumnIndex];
// try {
- return extractRowColumn(
- colVector, typeInfo, objectInspectors[logicalColumnIndex],
batchIndex);
+ return extractRowColumn(colVector, typeInfo,
objectInspectors[logicalColumnIndex], batchIndex, outputType);
// } catch (Exception e){
// throw new RuntimeException("Error evaluating column number " +
projectionColumnNum +
// ", typeInfo " + typeInfo.toString() + ", batchIndex " +
batchIndex);
@@ -200,6 +206,12 @@ public class VectorExtractRow {
public Object extractRowColumn(
ColumnVector colVector, TypeInfo typeInfo, ObjectInspector
objectInspector, int batchIndex) {
+ return extractRowColumn(colVector, typeInfo, objectInspector, batchIndex,
OutputType.WRITABLES);
+ }
+
+ public Object extractRowColumn(
+ ColumnVector colVector, TypeInfo typeInfo, ObjectInspector
objectInspector, int batchIndex,
+ OutputType outputType) {
if (colVector == null) {
// The planner will not include unneeded columns for reading but other
parts of execution
@@ -217,51 +229,89 @@ public class VectorExtractRow {
{
final PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo)
typeInfo;
final PrimitiveCategory primitiveCategory =
primitiveTypeInfo.getPrimitiveCategory();
- final Writable primitiveWritable =
- VectorizedBatchUtil.getPrimitiveWritable(primitiveCategory);
+ final Writable primitiveWritable = outputType == OutputType.WRITABLES ?
+ VectorizedBatchUtil.getPrimitiveWritable(primitiveCategory) :
+ null;
switch (primitiveCategory) {
case VOID:
return null;
case BOOLEAN:
- ((BooleanWritable) primitiveWritable).set(
- ((LongColumnVector) colVector).vector[adjustedIndex] == 0 ?
- false : true);
- return primitiveWritable;
+ boolean boolValue = ((LongColumnVector)
colVector).vector[adjustedIndex] == 0 ? false : true;
+ if (outputType == OutputType.WRITABLES) {
+ ((BooleanWritable) primitiveWritable).set(boolValue);
+ return primitiveWritable;
+ } else {
+ return boolValue;
+ }
case BYTE:
- ((ByteWritable) primitiveWritable).set(
- (byte) ((LongColumnVector) colVector).vector[adjustedIndex]);
- return primitiveWritable;
+ byte byteValue = (byte) ((LongColumnVector)
colVector).vector[adjustedIndex];
+ if (outputType == OutputType.WRITABLES) {
+ ((ByteWritable) primitiveWritable).set(byteValue);
+ return primitiveWritable;
+ } else {
+ return byteValue;
+ }
case SHORT:
- ((ShortWritable) primitiveWritable).set(
- (short) ((LongColumnVector) colVector).vector[adjustedIndex]);
- return primitiveWritable;
+ short shortValue = (short) ((LongColumnVector)
colVector).vector[adjustedIndex];
+ if (outputType == OutputType.WRITABLES) {
+ ((ShortWritable) primitiveWritable).set(shortValue);
+ return primitiveWritable;
+ } else {
+ return shortValue;
+ }
case INT:
- ((IntWritable) primitiveWritable).set(
- (int) ((LongColumnVector) colVector).vector[adjustedIndex]);
- return primitiveWritable;
+ int intValue = (int) ((LongColumnVector)
colVector).vector[adjustedIndex];
+ if (outputType == OutputType.WRITABLES) {
+ ((IntWritable) primitiveWritable).set(intValue);
+ return primitiveWritable;
+ } else {
+ return intValue;
+ }
case LONG:
- ((LongWritable) primitiveWritable).set(
- ((LongColumnVector) colVector).vector[adjustedIndex]);
- return primitiveWritable;
+ long longValue = ((LongColumnVector)
colVector).vector[adjustedIndex];
+ if (outputType == OutputType.WRITABLES) {
+ ((LongWritable) primitiveWritable).set(longValue);
+ return primitiveWritable;
+ } else {
+ return longValue;
+ }
case TIMESTAMP:
// From java.sql.Timestamp used by vectorization to serializable
org.apache.hadoop.hive.common.type.Timestamp
java.sql.Timestamp ts =
((TimestampColumnVector)
colVector).asScratchTimestamp(adjustedIndex);
Timestamp serializableTS = Timestamp.ofEpochMilli(ts.getTime(),
ts.getNanos());
- ((TimestampWritableV2) primitiveWritable).set(serializableTS);
- return primitiveWritable;
+ if (outputType == OutputType.WRITABLES) {
+ ((TimestampWritableV2) primitiveWritable).set(serializableTS);
+ return primitiveWritable;
+ } else {
+ // return Hive Timestamp object
+ return serializableTS;
+ }
case DATE:
- ((DateWritableV2) primitiveWritable).set(
- (int) ((LongColumnVector) colVector).vector[adjustedIndex]);
- return primitiveWritable;
+ Date dateValue = Date.ofEpochDay((int) ((LongColumnVector)
colVector).vector[adjustedIndex]);
+ if (outputType == OutputType.WRITABLES) {
+ ((DateWritableV2) primitiveWritable).set(dateValue);
+ return primitiveWritable;
+ } else {
+ // return Hive Date object
+ return dateValue;
+ }
case FLOAT:
- ((FloatWritable) primitiveWritable).set(
- (float) ((DoubleColumnVector) colVector).vector[adjustedIndex]);
- return primitiveWritable;
+ float floatValue = (float) ((DoubleColumnVector)
colVector).vector[adjustedIndex];
+ if (outputType == OutputType.WRITABLES) {
+ ((FloatWritable) primitiveWritable).set(floatValue);
+ return primitiveWritable;
+ } else {
+ return floatValue;
+ }
case DOUBLE:
- ((DoubleWritable) primitiveWritable).set(
- ((DoubleColumnVector) colVector).vector[adjustedIndex]);
- return primitiveWritable;
+ double doubleValue = ((DoubleColumnVector)
colVector).vector[adjustedIndex];
+ if (outputType == OutputType.WRITABLES) {
+ ((DoubleWritable) primitiveWritable).set(doubleValue);
+ return primitiveWritable;
+ } else {
+ return doubleValue;
+ }
case BINARY:
{
final BytesColumnVector bytesColVector =
@@ -270,16 +320,22 @@ public class VectorExtractRow {
final int start = bytesColVector.start[adjustedIndex];
final int length = bytesColVector.length[adjustedIndex];
- BytesWritable bytesWritable = (BytesWritable) primitiveWritable;
- if (bytes == null || length == 0) {
- if (length > 0) {
- nullBytesReadError(primitiveCategory, batchIndex);
+ if (outputType == OutputType.WRITABLES) {
+ BytesWritable bytesWritable = (BytesWritable) primitiveWritable;
+ if (bytes == null || length == 0) {
+ if (length > 0) {
+ nullBytesReadError(primitiveCategory, batchIndex);
+ }
+ bytesWritable.set(EMPTY_BYTES, 0, 0);
+ } else {
+ bytesWritable.set(bytes, start, length);
}
- bytesWritable.set(EMPTY_BYTES, 0, 0);
+ return bytesWritable;
} else {
- bytesWritable.set(bytes, start, length);
+ byte[] ret = new byte[length];
+ System.arraycopy(bytes, start, ret, 0, length);
+ return ret;
}
- return primitiveWritable;
}
case STRING:
{
@@ -289,17 +345,32 @@ public class VectorExtractRow {
final int start = bytesColVector.start[adjustedIndex];
final int length = bytesColVector.length[adjustedIndex];
+ String result = null;
if (bytes == null || length == 0) {
if (length > 0) {
nullBytesReadError(primitiveCategory, batchIndex);
}
- ((Text) primitiveWritable).set(EMPTY_BYTES, 0, 0);
+ result = EMPTY_STRING;
+ if (outputType == OutputType.WRITABLES) {
+ ((Text) primitiveWritable).set(EMPTY_BYTES, 0, 0);
+ }
} else {
-
// Use org.apache.hadoop.io.Text as our helper to go from byte[]
to String.
- ((Text) primitiveWritable).set(bytes, start, length);
+ if (outputType == OutputType.WRITABLES) {
+ ((Text) primitiveWritable).set(bytes, start, length);
+ } else {
+ try {
+ result = Text.decode(bytes, start, length);
+ } catch (CharacterCodingException e) {
+ throw new RuntimeException("Could not decode to String
object.", e);
+ }
+ }
+ }
+ if (outputType == OutputType.WRITABLES) {
+ return primitiveWritable;
+ } else {
+ return result;
}
- return primitiveWritable;
}
case VARCHAR:
{
@@ -309,7 +380,9 @@ public class VectorExtractRow {
final int start = bytesColVector.start[adjustedIndex];
final int length = bytesColVector.length[adjustedIndex];
- final HiveVarcharWritable hiveVarcharWritable =
(HiveVarcharWritable) primitiveWritable;
+ // TODO: maybe not create writable for POJO case
+ final HiveVarcharWritable hiveVarcharWritable =
+ (HiveVarcharWritable)
VectorizedBatchUtil.getPrimitiveWritable(primitiveCategory);
if (bytes == null || length == 0) {
if (length > 0) {
nullBytesReadError(primitiveCategory, batchIndex);
@@ -326,7 +399,11 @@ public class VectorExtractRow {
new String(bytes, start, adjustedLength, Charsets.UTF_8),
-1);
}
}
- return primitiveWritable;
+ if (outputType == OutputType.WRITABLES) {
+ return hiveVarcharWritable;
+ } else {
+ return hiveVarcharWritable.getHiveVarchar();
+ }
}
case CHAR:
{
@@ -336,7 +413,9 @@ public class VectorExtractRow {
final int start = bytesColVector.start[adjustedIndex];
final int length = bytesColVector.length[adjustedIndex];
- final HiveCharWritable hiveCharWritable = (HiveCharWritable)
primitiveWritable;
+ // TODO: maybe not create writable for POJO case
+ final HiveCharWritable hiveCharWritable =
+ (HiveCharWritable)
VectorizedBatchUtil.getPrimitiveWritable(primitiveCategory);
final int maxLength = ((CharTypeInfo)
primitiveTypeInfo).getLength();
if (bytes == null || length == 0) {
if (length > 0) {
@@ -354,27 +433,46 @@ public class VectorExtractRow {
new String(bytes, start, adjustedLength, Charsets.UTF_8),
maxLength);
}
}
- return primitiveWritable;
+ if (outputType == OutputType.WRITABLES) {
+ return hiveCharWritable;
+ } else {
+ return hiveCharWritable.getHiveChar();
+ }
}
case DECIMAL:
+ // decimal code is deep within HiveDecimalWritable, probably can't
avoid creating it
+ HiveDecimalWritable decimalWritable =
+ (HiveDecimalWritable)
VectorizedBatchUtil.getPrimitiveWritable(primitiveCategory);
if (colVector instanceof Decimal64ColumnVector) {
Decimal64ColumnVector dec32ColVector = (Decimal64ColumnVector)
colVector;
- ((HiveDecimalWritable) primitiveWritable).deserialize64(
- dec32ColVector.vector[adjustedIndex], dec32ColVector.scale);
+
decimalWritable.deserialize64(dec32ColVector.vector[adjustedIndex],
dec32ColVector.scale);
} else {
// The HiveDecimalWritable set method will quickly copy the
deserialized decimal writable fields.
- ((HiveDecimalWritable) primitiveWritable).set(
- ((DecimalColumnVector) colVector).vector[adjustedIndex]);
+ decimalWritable.set(((DecimalColumnVector)
colVector).vector[adjustedIndex]);
+ }
+ if (outputType == OutputType.WRITABLES) {
+ return decimalWritable;
+ } else {
+ return decimalWritable.getHiveDecimal();
}
- return primitiveWritable;
case INTERVAL_YEAR_MONTH:
- ((HiveIntervalYearMonthWritable) primitiveWritable).set(
- (int) ((LongColumnVector) colVector).vector[adjustedIndex]);
- return primitiveWritable;
+ HiveIntervalYearMonth hiveIntervalYearMonthValue =
+ new HiveIntervalYearMonth((int) ((LongColumnVector)
colVector).vector[adjustedIndex]);
+ if (outputType == OutputType.WRITABLES) {
+ ((HiveIntervalYearMonthWritable)
primitiveWritable).set(hiveIntervalYearMonthValue);
+ return primitiveWritable;
+ } else {
+ return hiveIntervalYearMonthValue;
+ }
case INTERVAL_DAY_TIME:
- ((HiveIntervalDayTimeWritable) primitiveWritable).set(
- ((IntervalDayTimeColumnVector)
colVector).asScratchIntervalDayTime(adjustedIndex));
- return primitiveWritable;
+ HiveIntervalDayTime hiveIntervalDayTimeValue =
+ ((IntervalDayTimeColumnVector)
colVector).asScratchIntervalDayTime(adjustedIndex);
+ if (outputType == OutputType.WRITABLES) {
+ ((HiveIntervalDayTimeWritable)
primitiveWritable).set(hiveIntervalDayTimeValue);
+ return primitiveWritable;
+ } else {
+ return hiveIntervalDayTimeValue;
+ }
default:
throw new RuntimeException("Primitive category " +
primitiveCategory.name() +
" not supported");
@@ -474,16 +572,42 @@ public class VectorExtractRow {
/**
* Extract an row object from a VectorizedRowBatch at batchIndex.
*
- * @param batch
- * @param batchIndex
- * @param objects
+ * @param batch VRB instance to lookup the row from
+ * @param batchIndex index of the row within this batch
+ * @param objects output
*/
public void extractRow(VectorizedRowBatch batch, int batchIndex, Object[]
objects) {
for (int i = 0; i < projectionColumnNums.length; i++) {
- objects[i] = extractRowColumn(batch, batchIndex, i);
+ objects[i] = extractRowColumn(batch, batchIndex, i,
OutputType.WRITABLES);
+ }
+ }
+
+
+ /**
+ * Extract an row object from a VectorizedRowBatch at batchIndex.
+ *
+ * @param batch VRB instance to lookup the row from
+ * @param batchIndex index of the row within this batch
+ * @param objects output
+ */
+ public void extractRow(VectorizedRowBatch batch, int batchIndex, Object[]
objects, OutputType outputType) {
+ for (int i = 0; i < projectionColumnNums.length; i++) {
+ objects[i] = extractRowColumn(batch, batchIndex, i, outputType);
}
}
+ /**
+ * Returns an accessor function construct that can return data from a VRB
batch instance.
+ * The outer function would take a batch index to select the row inside the
batch, and an inner function
+ * would select the data/item inside the row based on the provided row index.
+ *
+ * @param batch VRB instance
+ * @return function construct
+ */
+ public Function<Integer, Function<Integer, Object>>
accessor(VectorizedRowBatch batch) {
+ return batchIndex -> rowIndex -> extractRowColumn(batch, batchIndex,
rowIndex, OutputType.POJO);
+ }
+
private void nullBytesReadError(PrimitiveCategory primitiveCategory, int
batchIndex) {
throw new RuntimeException("null " + primitiveCategory.name() +
" entry: batchIndex " + batchIndex);