This is an automated email from the ASF dual-hosted git repository.

szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 10135b90e72 HIVE-26696: Implement row iterator for VectorizedRowBatch 
(#3737) (Adam Szita, reviewed by Laszlo Pinter)
10135b90e72 is described below

commit 10135b90e72ab53a2cc761fe7ed24db8fc7924c7
Author: Adam Szita <[email protected]>
AuthorDate: Wed Nov 9 14:51:14 2022 +0100

    HIVE-26696: Implement row iterator for VectorizedRowBatch (#3737) (Adam 
Szita, reviewed by Laszlo Pinter)
---
 .../iceberg/mr/hive/vector/HiveBatchContext.java   |  85 +++++--
 .../org/apache/iceberg/mr/hive/vector/HiveRow.java |  53 +++++
 .../iceberg/mr/hive/vector/HiveStructLike.java     |  56 +++++
 .../iceberg/mr/hive/vector/HiveValueConverter.java | 140 ++++++++++++
 .../org/apache/iceberg/mr/hive/TestTables.java     |   2 +-
 .../hive/vector/TestHiveIcebergVectorization.java  | 204 +++++++++++++++++
 .../hive/ql/exec/vector/VectorExtractRow.java      | 248 +++++++++++++++------
 7 files changed, 712 insertions(+), 76 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchContext.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchContext.java
index 08d2f732555..015eb9de843 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchContext.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchContext.java
@@ -20,14 +20,16 @@
 package org.apache.iceberg.mr.hive.vector;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.NoSuchElementException;
+import java.util.stream.IntStream;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.io.CloseableIterator;
 
-/**
- * Wraps a Hive VRB and holds corresponding metadata information about it, 
such as VRB context (e.g. type infos) and
- * file row offset.
- */
 public class HiveBatchContext {
 
   private final VectorizedRowBatch batch;
@@ -47,25 +49,82 @@ public class HiveBatchContext {
     return batch;
   }
 
-  public RowIterator rowIterator() throws IOException {
-    throw new UnsupportedOperationException("Not implemented yet");
+  public CloseableIterator<HiveRow> rowIterator() throws IOException {
+    return new RowIterator();
   }
 
-  // TODO: implement row iterator
-  class RowIterator implements CloseableIterator {
+  class RowIterator implements CloseableIterator<HiveRow> {
 
-    @Override
-    public void close() throws IOException {
+    private final VectorExtractRow vectorExtractRow;
+    private final int originalSize;
+    private final int[] originalIndices;
+    private int currentPosition = 0;
+
+    RowIterator() throws IOException {
+      try {
+        this.vectorExtractRow = new VectorExtractRow();
+        this.vectorExtractRow.init(vrbCtx.getRowColumnTypeInfos());
+        this.originalSize = batch.size;
+        if (batch.isSelectedInUse()) {
+          // copy, as further operations working on this batch might change 
what rows are selected
+          originalIndices = Arrays.copyOf(batch.selected, batch.size);
+        } else {
+          originalIndices = IntStream.range(0, batch.size).toArray();
+        }
+      } catch (HiveException e) {
+        throw new IOException(e);
+      }
     }
 
     @Override
     public boolean hasNext() {
-      return false;
+      return currentPosition < originalSize;
     }
 
     @Override
-    public Object next() {
-      return null;
+    public HiveRow next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+
+      // position of the row as this batch is intended to be read (e.g. if 
batch is already filtered this
+      // can be different from the physical position)
+      int logicalPosition = currentPosition++;
+      // real position of this row within the original (i.e unfiltered) batch
+      int physicalPosition = originalIndices[logicalPosition];
+
+      HiveRow row = new HiveRow() {
+
+        @Override
+        public Object get(int rowIndex) {
+
+          if (rowIndex == MetadataColumns.ROW_POSITION.fieldId()) {
+            if (fileRowOffset == Long.MIN_VALUE) {
+              throw new UnsupportedOperationException("Can't provide row 
position for batch.");
+            }
+            return fileRowOffset + physicalPosition;
+          } else {
+            return 
vectorExtractRow.accessor(batch).apply(physicalPosition).apply(rowIndex);
+          }
+        }
+
+        @Override
+        public int physicalBatchIndex() {
+          return physicalPosition;
+        }
+
+      };
+      return row;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void close() throws IOException {
+      // no-op
     }
   }
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveRow.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveRow.java
new file mode 100644
index 00000000000..f96b19f62a6
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveRow.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive.vector;
+
+/**
+ * Hive's record representation, where data is provided by an underlying 
VectorizedRowBatch instance.
+ */
+public abstract class HiveRow {
+
+  /**
+   * Whether this row is marked as deleted or not. VRB implementation 
registers un-deleted rows in its 'selected' array.
+   */
+  private boolean deleted = false;
+
+  /**
+   * Returns an item/data from the row found on the provided row index.
+   * @param rowIndex row index
+   * @return data
+   */
+  public abstract Object get(int rowIndex);
+
+  /**
+   * Returns the original position of this row in the en-wrapping VRB instance.
+   * @return batch index.
+   */
+  public abstract int physicalBatchIndex();
+
+  public void setDeleted(boolean deleted) {
+    this.deleted = deleted;
+  }
+
+  public boolean isDeleted() {
+    return deleted;
+  }
+
+}
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveStructLike.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveStructLike.java
new file mode 100644
index 00000000000..0b49e687a55
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveStructLike.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive.vector;
+
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.types.Types;
+
+/**
+ * StructLike implementation for the HiveRow record type.
+ */
+public class HiveStructLike implements StructLike {
+  private final Types.StructType type;
+  private HiveRow wrapped;
+
+  public HiveStructLike(Types.StructType type) {
+    this.type = type;
+  }
+
+  public HiveStructLike wrap(HiveRow row) {
+    this.wrapped = row;
+    return this;
+  }
+
+  @Override
+  public int size() {
+    return type.fields().size();
+  }
+
+  @Override
+  public <T> T get(int pos, Class<T> javaClass) {
+    Types.NestedField field = type.fields().get(pos);
+    return javaClass.cast(HiveValueConverter.convert(field.type(), 
wrapped.get(pos)));
+  }
+
+  @Override
+  public <T> void set(int pos, T value) {
+    throw new UnsupportedOperationException("Not implemented: set");
+  }
+}
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveValueConverter.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveValueConverter.java
new file mode 100644
index 00000000000..0804a7abe9f
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveValueConverter.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive.vector;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hive.common.type.Date;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.Timestamp;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Converts from HiveRow to Iceberg's internal data representation format
+ * aka from POJOs used by Hive to hold record data into those of Iceberg
+ */
+public class HiveValueConverter {
+
+  private HiveValueConverter() {
+  }
+
+  public static Record convert(Schema schema, HiveRow row) {
+    return convert(schema.asStruct(), row);
+  }
+
+  public static Object convert(Type type, Object object) {
+    if (object == null) {
+      return null;
+    }
+
+    switch (type.typeId()) {
+      case STRUCT:
+        return convert(type.asStructType(), (HiveRow) object);
+
+      case LIST:
+        List<Object> convertedList = Lists.newArrayList();
+        List<?> list = (List<?>) object;
+        for (Object element : list) {
+          convertedList.add(convert(type.asListType().elementType(), element));
+        }
+        return convertedList;
+
+      case MAP:
+        Map<Object, Object> convertedMap = Maps.newLinkedHashMap();
+        Map<?, ?> map = (Map<?, ?>) object;
+        for (Map.Entry<?, ?> entry : map.entrySet()) {
+          convertedMap.put(
+              convert(type.asMapType().keyType(), entry.getKey()),
+              convert(type.asMapType().valueType(), entry.getValue()));
+        }
+        return convertedMap;
+
+      case DATE:
+        Date hiveDate = (Date) object;
+        return LocalDate.of(hiveDate.getYear(), hiveDate.getMonth(), 
hiveDate.getDay());
+      case TIMESTAMP:
+        if (Types.TimestampType.withZone() == type) {
+          throw new UnsupportedOperationException("Not supported with 
vectorization.");
+        } else {
+          // Hive's internal TS representation is in UTC
+          return LocalDateTime.ofInstant(((Timestamp) 
object).toSqlTimestamp().toInstant(), ZoneId.of("UTC"));
+        }
+      case BINARY:
+        return ByteBuffer.wrap((byte[]) object);
+      case FIXED:
+        return (byte[]) object;
+      case DECIMAL:
+        BigDecimal bigDecimal = ((HiveDecimal) object).bigDecimalValue();
+        if (bigDecimal.longValue() == 0) {
+          // For some reason for value=0 scale information is not preserved in 
Hive
+          return bigDecimal.setScale(((Types.DecimalType) type).scale());
+        }
+        return bigDecimal;
+      case INTEGER:
+      case BOOLEAN:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+      case STRING:
+        return object;
+      default:
+        throw new UnsupportedOperationException("Not a supported type: " + 
type);
+    }
+  }
+
+  private static Record convert(Types.StructType struct, HiveRow row) {
+    if (row == null) {
+      return null;
+    }
+
+    Record record = GenericRecord.create(struct);
+    List<Types.NestedField> fields = struct.fields();
+    for (int i = 0; i < fields.size(); i += 1) {
+      Types.NestedField field = fields.get(i);
+
+      Type fieldType = field.type();
+
+      switch (fieldType.typeId()) {
+        case STRUCT:
+          record.set(i, convert(fieldType.asStructType(), row.get(i)));
+          break;
+        case LIST:
+          record.set(i, convert(fieldType.asListType(), row.get(i)));
+          break;
+        case MAP:
+          record.set(i, convert(fieldType.asMapType(), row.get(i)));
+          break;
+        default:
+          record.set(i, convert(fieldType, row.get(i)));
+      }
+    }
+    return record;
+  }
+}
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
index efaea9a8a01..0f0f48911e5 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
@@ -65,7 +65,7 @@ import org.junit.Assert;
 import org.junit.rules.TemporaryFolder;
 
 // Helper class for setting up and testing various catalog implementations
-abstract class TestTables {
+public abstract class TestTables {
   public static final TestTableType[] ALL_TABLE_TYPES = new TestTableType[] {
       TestTableType.HADOOP_TABLE,
       TestTableType.HADOOP_CATALOG,
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveIcebergVectorization.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveIcebergVectorization.java
new file mode 100644
index 00000000000..6bc7074a87c
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveIcebergVectorization.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive.vector;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcInputFormat;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.mr.TestHelper;
+import org.apache.iceberg.mr.hive.HiveIcebergStorageHandlerWithEngineBase;
+import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.junit.Assume.assumeTrue;
+
+public class TestHiveIcebergVectorization extends 
HiveIcebergStorageHandlerWithEngineBase {
+
+  /**
+   * Tests the row iterator implementation (HiveRow, 
HiveBatchContext.RowIterator) along with HiveValueConverter by
+   * reading in values from all supported types via VRBs, and iterating on its 
records 1-by-1 while comparing with the
+   * expected Iceberg record instances.
+   * @throws Exception any test failure
+   */
+  @Test
+  public void testRowIterator() throws Exception {
+    assumeTrue("Tests a format-independent feature", isVectorized && 
FileFormat.ORC.equals(fileFormat));
+
+    // Create a table with sample data with all supported types, those 
unsupported for vectorization are commented out
+    Schema allSchema = new Schema(
+        optional(1, "binary_col", Types.BinaryType.get()),
+        optional(2, "boolean_col", Types.BooleanType.get()),
+        optional(3, "date_col", Types.DateType.get()),
+        optional(4, "decimal_col", Types.DecimalType.of(6, 4)),
+        optional(5, "double_col", Types.DoubleType.get()),
+        optional(6, "fixed_col", Types.FixedType.ofLength(4)),
+        optional(7, "float_col", Types.FloatType.get()),
+        optional(8, "int_col", Types.IntegerType.get()),
+        optional(9, "long_col", Types.LongType.get()),
+        optional(10, "string_col", Types.StringType.get()),
+//        optional(11, "uuid_col", Types.UUIDType.get()),
+        optional(12, "timestamp_col", Types.TimestampType.withoutZone())
+//        optional(13, "timestamp_with_tz_col", 
Types.TimestampType.withZone()),
+//        optional(14, "time_col", Types.TimeType.get())
+    );
+
+    // Generate 10 records for all column types into our test table
+    List<Record> records = TestHelper.generateRandomRecords(allSchema, 10, 0L);
+    Table table = testTables.createTable(shell, "temptable", allSchema, 
fileFormat, records);
+
+    // Identify data file location - expected to be 1 file exactly
+    Path dataFilePath = new 
Path(Lists.newArrayList(Lists.newArrayList(table.newScan().planTasks().iterator()).get(0)
+        .files().iterator()).get(0).file().path().toString());
+
+    // Generate a mock vectorized read job
+    JobConf jobConf = prepareMockJob(allSchema, dataFilePath);
+
+    // Simulates HiveVectorizedReader creating an ORC record reader 
(implementation inside Hive QL code)
+    VectorizedOrcInputFormat inputFormat = new VectorizedOrcInputFormat();
+    RecordReader<NullWritable, VectorizedRowBatch> 
internalVectorizedRecordReader =
+        inputFormat.getRecordReader(new FileSplit(dataFilePath, 0L, 
Long.MAX_VALUE, new String[]{}), jobConf,
+            new MockReporter());
+    HiveBatchIterator hiveBatchIterator = new 
HiveBatchIterator(internalVectorizedRecordReader, jobConf, null, null);
+
+    // Expected to be one batch exactly
+    HiveBatchContext hiveBatchContext = hiveBatchIterator.next();
+    CloseableIterator<HiveRow> hiveRowIterator = 
hiveBatchContext.rowIterator();
+
+    // Iterator for the expected records
+    Iterator<Record> genericRowIterator = records.iterator();
+
+    // Compare record data provided by Hive with those provided by 
GenericRecord implementation of Iceberg
+    while (hiveRowIterator.hasNext() && genericRowIterator.hasNext()) {
+      HiveRow hiveRow = hiveRowIterator.next();
+      Record hiveRecord = HiveValueConverter.convert(allSchema, hiveRow);
+      Record genericRecord = genericRowIterator.next();
+
+      // Will do a deep comparison on values
+      Assert.assertEquals(genericRecord, hiveRecord);
+    }
+
+    // The two iterators both should be at the end by now
+    Assert.assertEquals(genericRowIterator.hasNext(), 
hiveRowIterator.hasNext());
+  }
+
+  /**
+   * Creates a mock vectorized ORC read job for a particular data file and a 
read schema (projecting on all columns)
+   * @param schema readSchema
+   * @param dataFilePath data file path
+   * @return JobConf instance
+   * @throws HiveException any failure during job creation
+   */
+  private JobConf prepareMockJob(Schema schema, Path dataFilePath) throws 
HiveException {
+    StructObjectInspector oi = (StructObjectInspector) 
IcebergObjectInspector.create(schema);
+    String hiveColumnNames = String.join(",", 
oi.getAllStructFieldRefs().stream()
+        .map(sf -> sf.getFieldName()).collect(Collectors.toList()));
+    String hiveTypeInfoNames = String.join(",", 
oi.getAllStructFieldRefs().stream()
+        .map(sf -> 
sf.getFieldObjectInspector().getTypeName()).collect(Collectors.toList()));
+
+    // facepalm: getTypeName returns detailed info for decimal type.. :/
+    hiveTypeInfoNames = hiveTypeInfoNames.replaceAll("decimal\\(\\d+,\\d+\\)", 
"decimal");
+
+    Configuration conf = new Configuration();
+    conf.set(IOConstants.COLUMNS, hiveColumnNames);
+    conf.set(IOConstants.COLUMNS_TYPES, hiveTypeInfoNames);
+    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, true);
+
+    HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, 
true);
+    HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp");
+    JobConf vectorJob = new JobConf(conf);
+
+    VectorizedOrcInputFormat.setInputPaths(vectorJob, dataFilePath);
+
+    MapWork mapWork = new MapWork();
+    VectorizedRowBatchCtx rbCtx = new VectorizedRowBatchCtx();
+    rbCtx.init(oi, new String[0]);
+    mapWork.setVectorMode(true);
+    mapWork.setVectorizedRowBatchCtx(rbCtx);
+    Utilities.setMapWork(vectorJob, mapWork);
+    return vectorJob;
+  }
+
+  private static class MockReporter implements Reporter {
+
+    @Override
+    public void setStatus(String s) {
+    }
+
+    @Override
+    public Counters.Counter getCounter(Enum<?> anEnum) {
+      return null;
+    }
+
+    @Override
+    public Counters.Counter getCounter(String s, String s1) {
+      return null;
+    }
+
+    @Override
+    public void incrCounter(Enum<?> anEnum, long l) {
+    }
+
+    @Override
+    public void incrCounter(String s, String s1, long l) {
+    }
+
+    @Override
+    public InputSplit getInputSplit() throws UnsupportedOperationException {
+      return null;
+    }
+
+    @Override
+    public float getProgress() {
+      return 0;
+    }
+
+    @Override
+    public void progress() {
+    }
+  }
+}
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
index e1482e077d6..b3be81646c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
@@ -18,11 +18,16 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
+import java.nio.charset.CharacterCodingException;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 
+import org.apache.hadoop.hive.common.type.Date;
+import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
+import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
 import org.apache.hadoop.hive.common.type.Timestamp;
 import org.apache.hadoop.hive.serde2.io.DateWritableV2;
 import org.apache.hadoop.hive.serde2.io.TimestampWritableV2;
@@ -93,6 +98,8 @@ public class VectorExtractRow {
   private static final byte[] EMPTY_BYTES = new byte[0];
   private static final String EMPTY_STRING = "";
 
+  public enum OutputType { WRITABLES, POJO };
+
   /*
    * Allocate the various arrays.
    */
@@ -184,14 +191,13 @@ public class VectorExtractRow {
    * @return
    */
   private Object extractRowColumn(VectorizedRowBatch batch, int batchIndex,
-      int logicalColumnIndex) {
+      int logicalColumnIndex, OutputType outputType) {
 
     final int projectionColumnNum = projectionColumnNums[logicalColumnIndex];
     final ColumnVector colVector = batch.cols[projectionColumnNum];
     final TypeInfo typeInfo = typeInfos[logicalColumnIndex];
     // try {
-      return extractRowColumn(
-          colVector, typeInfo, objectInspectors[logicalColumnIndex], 
batchIndex);
+      return extractRowColumn(colVector, typeInfo, 
objectInspectors[logicalColumnIndex], batchIndex, outputType);
     // } catch (Exception e){
     //   throw new RuntimeException("Error evaluating column number " + 
projectionColumnNum +
     //       ", typeInfo " + typeInfo.toString() + ", batchIndex " + 
batchIndex);
@@ -200,6 +206,12 @@ public class VectorExtractRow {
 
   public Object extractRowColumn(
       ColumnVector colVector, TypeInfo typeInfo, ObjectInspector 
objectInspector, int batchIndex) {
+    return extractRowColumn(colVector, typeInfo, objectInspector, batchIndex, 
OutputType.WRITABLES);
+  }
+
+  public Object extractRowColumn(
+      ColumnVector colVector, TypeInfo typeInfo, ObjectInspector 
objectInspector, int batchIndex,
+      OutputType outputType) {
 
     if (colVector == null) {
       // The planner will not include unneeded columns for reading but other 
parts of execution
@@ -217,51 +229,89 @@ public class VectorExtractRow {
       {
         final PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) 
typeInfo;
         final PrimitiveCategory primitiveCategory = 
primitiveTypeInfo.getPrimitiveCategory();
-        final Writable primitiveWritable =
-            VectorizedBatchUtil.getPrimitiveWritable(primitiveCategory);
+        final Writable primitiveWritable = outputType == OutputType.WRITABLES ?
+            VectorizedBatchUtil.getPrimitiveWritable(primitiveCategory) :
+            null;
         switch (primitiveCategory) {
         case VOID:
           return null;
         case BOOLEAN:
-          ((BooleanWritable) primitiveWritable).set(
-              ((LongColumnVector) colVector).vector[adjustedIndex] == 0 ?
-                  false : true);
-          return primitiveWritable;
+          boolean boolValue = ((LongColumnVector) 
colVector).vector[adjustedIndex] == 0 ? false : true;
+          if (outputType == OutputType.WRITABLES) {
+            ((BooleanWritable) primitiveWritable).set(boolValue);
+            return primitiveWritable;
+          } else {
+            return boolValue;
+          }
         case BYTE:
-          ((ByteWritable) primitiveWritable).set(
-              (byte) ((LongColumnVector) colVector).vector[adjustedIndex]);
-          return primitiveWritable;
+          byte byteValue = (byte) ((LongColumnVector) 
colVector).vector[adjustedIndex];
+          if (outputType == OutputType.WRITABLES) {
+            ((ByteWritable) primitiveWritable).set(byteValue);
+            return primitiveWritable;
+          } else {
+            return byteValue;
+          }
         case SHORT:
-          ((ShortWritable) primitiveWritable).set(
-              (short) ((LongColumnVector) colVector).vector[adjustedIndex]);
-          return primitiveWritable;
+          short shortValue = (short) ((LongColumnVector) 
colVector).vector[adjustedIndex];
+          if (outputType == OutputType.WRITABLES) {
+            ((ShortWritable) primitiveWritable).set(shortValue);
+            return primitiveWritable;
+          } else {
+            return shortValue;
+          }
         case INT:
-          ((IntWritable) primitiveWritable).set(
-              (int) ((LongColumnVector) colVector).vector[adjustedIndex]);
-          return primitiveWritable;
+          int intValue = (int) ((LongColumnVector) 
colVector).vector[adjustedIndex];
+          if (outputType == OutputType.WRITABLES) {
+            ((IntWritable) primitiveWritable).set(intValue);
+            return primitiveWritable;
+          } else {
+            return intValue;
+          }
         case LONG:
-          ((LongWritable) primitiveWritable).set(
-              ((LongColumnVector) colVector).vector[adjustedIndex]);
-          return primitiveWritable;
+          long longValue = ((LongColumnVector) 
colVector).vector[adjustedIndex];
+          if (outputType == OutputType.WRITABLES) {
+            ((LongWritable) primitiveWritable).set(longValue);
+            return primitiveWritable;
+          } else {
+            return longValue;
+          }
         case TIMESTAMP:
           // From java.sql.Timestamp used by vectorization to serializable 
org.apache.hadoop.hive.common.type.Timestamp
           java.sql.Timestamp ts =
               ((TimestampColumnVector) 
colVector).asScratchTimestamp(adjustedIndex);
           Timestamp serializableTS = Timestamp.ofEpochMilli(ts.getTime(), 
ts.getNanos());
-          ((TimestampWritableV2) primitiveWritable).set(serializableTS);
-          return primitiveWritable;
+          if (outputType == OutputType.WRITABLES) {
+            ((TimestampWritableV2) primitiveWritable).set(serializableTS);
+            return primitiveWritable;
+          } else {
+            // return Hive Timestamp object
+            return serializableTS;
+          }
         case DATE:
-          ((DateWritableV2) primitiveWritable).set(
-              (int) ((LongColumnVector) colVector).vector[adjustedIndex]);
-          return primitiveWritable;
+          Date dateValue = Date.ofEpochDay((int) ((LongColumnVector) 
colVector).vector[adjustedIndex]);
+          if (outputType == OutputType.WRITABLES) {
+            ((DateWritableV2) primitiveWritable).set(dateValue);
+            return primitiveWritable;
+          } else {
+            // return Hive Date object
+            return dateValue;
+          }
         case FLOAT:
-          ((FloatWritable) primitiveWritable).set(
-              (float) ((DoubleColumnVector) colVector).vector[adjustedIndex]);
-          return primitiveWritable;
+          float floatValue = (float) ((DoubleColumnVector) 
colVector).vector[adjustedIndex];
+          if (outputType == OutputType.WRITABLES) {
+            ((FloatWritable) primitiveWritable).set(floatValue);
+            return primitiveWritable;
+          } else {
+            return floatValue;
+          }
         case DOUBLE:
-          ((DoubleWritable) primitiveWritable).set(
-              ((DoubleColumnVector) colVector).vector[adjustedIndex]);
-          return primitiveWritable;
+          double doubleValue = ((DoubleColumnVector) 
colVector).vector[adjustedIndex];
+          if (outputType == OutputType.WRITABLES) {
+            ((DoubleWritable) primitiveWritable).set(doubleValue);
+            return primitiveWritable;
+          } else {
+            return doubleValue;
+          }
         case BINARY:
           {
             final BytesColumnVector bytesColVector =
@@ -270,16 +320,22 @@ public class VectorExtractRow {
             final int start = bytesColVector.start[adjustedIndex];
             final int length = bytesColVector.length[adjustedIndex];
 
-            BytesWritable bytesWritable = (BytesWritable) primitiveWritable;
-            if (bytes == null || length == 0) {
-              if (length > 0) {
-                nullBytesReadError(primitiveCategory, batchIndex);
+            if (outputType == OutputType.WRITABLES) {
+              BytesWritable bytesWritable = (BytesWritable) primitiveWritable;
+              if (bytes == null || length == 0) {
+                if (length > 0) {
+                  nullBytesReadError(primitiveCategory, batchIndex);
+                }
+                bytesWritable.set(EMPTY_BYTES, 0, 0);
+              } else {
+                bytesWritable.set(bytes, start, length);
               }
-              bytesWritable.set(EMPTY_BYTES, 0, 0);
+              return bytesWritable;
             } else {
-              bytesWritable.set(bytes, start, length);
+              byte[] ret = new byte[length];
+              System.arraycopy(bytes, start, ret, 0, length);
+              return ret;
             }
-            return primitiveWritable;
           }
         case STRING:
           {
@@ -289,17 +345,32 @@ public class VectorExtractRow {
             final int start = bytesColVector.start[adjustedIndex];
             final int length = bytesColVector.length[adjustedIndex];
 
+            String result = null;
             if (bytes == null || length == 0) {
               if (length > 0) {
                 nullBytesReadError(primitiveCategory, batchIndex);
               }
-              ((Text) primitiveWritable).set(EMPTY_BYTES, 0, 0);
+              result = EMPTY_STRING;
+              if (outputType == OutputType.WRITABLES) {
+                ((Text) primitiveWritable).set(EMPTY_BYTES, 0, 0);
+              }
             } else {
-
               // Use org.apache.hadoop.io.Text as our helper to go from byte[] 
to String.
-              ((Text) primitiveWritable).set(bytes, start, length);
+              if (outputType == OutputType.WRITABLES) {
+                ((Text) primitiveWritable).set(bytes, start, length);
+              } else {
+                try {
+                  result = Text.decode(bytes, start, length);
+                } catch (CharacterCodingException e) {
+                  throw new RuntimeException("Could not decode to String 
object.", e);
+                }
+              }
+            }
+            if (outputType == OutputType.WRITABLES) {
+              return primitiveWritable;
+            } else {
+              return result;
             }
-            return primitiveWritable;
           }
         case VARCHAR:
           {
@@ -309,7 +380,9 @@ public class VectorExtractRow {
             final int start = bytesColVector.start[adjustedIndex];
             final int length = bytesColVector.length[adjustedIndex];
 
-            final HiveVarcharWritable hiveVarcharWritable = 
(HiveVarcharWritable) primitiveWritable;
+            // TODO: maybe not create writable for POJO case
+            final HiveVarcharWritable hiveVarcharWritable =
+                (HiveVarcharWritable) 
VectorizedBatchUtil.getPrimitiveWritable(primitiveCategory);
             if (bytes == null || length == 0) {
               if (length > 0) {
                 nullBytesReadError(primitiveCategory, batchIndex);
@@ -326,7 +399,11 @@ public class VectorExtractRow {
                     new String(bytes, start, adjustedLength, Charsets.UTF_8), 
-1);
               }
             }
-            return primitiveWritable;
+            if (outputType == OutputType.WRITABLES) {
+              return hiveVarcharWritable;
+            } else {
+              return hiveVarcharWritable.getHiveVarchar();
+            }
           }
         case CHAR:
           {
@@ -336,7 +413,9 @@ public class VectorExtractRow {
             final int start = bytesColVector.start[adjustedIndex];
             final int length = bytesColVector.length[adjustedIndex];
 
-            final HiveCharWritable hiveCharWritable = (HiveCharWritable) 
primitiveWritable;
+            // TODO: maybe not create writable for POJO case
+            final HiveCharWritable hiveCharWritable =
+                (HiveCharWritable) 
VectorizedBatchUtil.getPrimitiveWritable(primitiveCategory);
             final int maxLength = ((CharTypeInfo) 
primitiveTypeInfo).getLength();
             if (bytes == null || length == 0) {
               if (length > 0) {
@@ -354,27 +433,46 @@ public class VectorExtractRow {
                     new String(bytes, start, adjustedLength, Charsets.UTF_8), 
maxLength);
               }
             }
-            return primitiveWritable;
+            if (outputType == OutputType.WRITABLES) {
+              return hiveCharWritable;
+            } else {
+              return hiveCharWritable.getHiveChar();
+            }
           }
         case DECIMAL:
+          // decimal code is deep within HiveDecimalWritable, probably can't 
avoid creating it
+          HiveDecimalWritable decimalWritable =
+              (HiveDecimalWritable) 
VectorizedBatchUtil.getPrimitiveWritable(primitiveCategory);
           if (colVector instanceof Decimal64ColumnVector) {
             Decimal64ColumnVector dec32ColVector = (Decimal64ColumnVector) 
colVector;
-            ((HiveDecimalWritable) primitiveWritable).deserialize64(
-                dec32ColVector.vector[adjustedIndex], dec32ColVector.scale);
+            
decimalWritable.deserialize64(dec32ColVector.vector[adjustedIndex], 
dec32ColVector.scale);
           } else {
             // The HiveDecimalWritable set method will quickly copy the 
deserialized decimal writable fields.
-            ((HiveDecimalWritable) primitiveWritable).set(
-                ((DecimalColumnVector) colVector).vector[adjustedIndex]);
+            decimalWritable.set(((DecimalColumnVector) 
colVector).vector[adjustedIndex]);
+          }
+          if (outputType == OutputType.WRITABLES) {
+            return decimalWritable;
+          } else {
+            return decimalWritable.getHiveDecimal();
           }
-          return primitiveWritable;
         case INTERVAL_YEAR_MONTH:
-          ((HiveIntervalYearMonthWritable) primitiveWritable).set(
-              (int) ((LongColumnVector) colVector).vector[adjustedIndex]);
-          return primitiveWritable;
+          HiveIntervalYearMonth hiveIntervalYearMonthValue =
+              new HiveIntervalYearMonth((int) ((LongColumnVector) 
colVector).vector[adjustedIndex]);
+          if (outputType == OutputType.WRITABLES) {
+            ((HiveIntervalYearMonthWritable) 
primitiveWritable).set(hiveIntervalYearMonthValue);
+            return primitiveWritable;
+          } else {
+            return hiveIntervalYearMonthValue;
+          }
         case INTERVAL_DAY_TIME:
-          ((HiveIntervalDayTimeWritable) primitiveWritable).set(
-              ((IntervalDayTimeColumnVector) 
colVector).asScratchIntervalDayTime(adjustedIndex));
-          return primitiveWritable;
+          HiveIntervalDayTime hiveIntervalDayTimeValue =
+              ((IntervalDayTimeColumnVector) 
colVector).asScratchIntervalDayTime(adjustedIndex);
+          if (outputType == OutputType.WRITABLES) {
+            ((HiveIntervalDayTimeWritable) 
primitiveWritable).set(hiveIntervalDayTimeValue);
+            return primitiveWritable;
+          } else {
+            return hiveIntervalDayTimeValue;
+          }
         default:
           throw new RuntimeException("Primitive category " + 
primitiveCategory.name() +
               " not supported");
@@ -474,16 +572,42 @@ public class VectorExtractRow {
   /**
    * Extract an row object from a VectorizedRowBatch at batchIndex.
    *
-   * @param batch
-   * @param batchIndex
-   * @param objects
+   * @param batch VRB instance to lookup the row from
+   * @param batchIndex index of the row within this batch
+   * @param objects output
    */
   public void extractRow(VectorizedRowBatch batch, int batchIndex, Object[] 
objects) {
     for (int i = 0; i < projectionColumnNums.length; i++) {
-      objects[i] = extractRowColumn(batch, batchIndex, i);
+      objects[i] = extractRowColumn(batch, batchIndex, i, 
OutputType.WRITABLES);
+    }
+  }
+
+
+  /**
+   * Extract an row object from a VectorizedRowBatch at batchIndex.
+   *
+   * @param batch VRB instance to lookup the row from
+   * @param batchIndex index of the row within this batch
+   * @param objects output
+   */
+  public void extractRow(VectorizedRowBatch batch, int batchIndex, Object[] 
objects, OutputType outputType) {
+    for (int i = 0; i < projectionColumnNums.length; i++) {
+      objects[i] = extractRowColumn(batch, batchIndex, i, outputType);
     }
   }
 
+  /**
+   * Returns an accessor function construct that can return data from a VRB 
batch instance.
+   * The outer function would take a batch index to select the row inside the 
batch, and an inner function
+   * would select the data/item inside the row based on the provided row index.
+   *
+   * @param batch VRB instance
+   * @return function construct
+   */
+  public Function<Integer, Function<Integer, Object>> 
accessor(VectorizedRowBatch batch) {
+    return batchIndex -> rowIndex -> extractRowColumn(batch, batchIndex, 
rowIndex, OutputType.POJO);
+  }
+
   private void nullBytesReadError(PrimitiveCategory primitiveCategory, int 
batchIndex) {
     throw new RuntimeException("null " + primitiveCategory.name() +
         " entry: batchIndex " + batchIndex);


Reply via email to