Repository: hive
Updated Branches:
  refs/heads/branch-3 7156df66f -> 2334a0ddf


HIVE-19306: Arrow batch serializer (Teddy Choi, reviewed by Matt McCline and 
Eric Wohlstadter (non-binding))


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0e090e58
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0e090e58
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0e090e58

Branch: refs/heads/branch-3
Commit: 0e090e58772516070e472713422aa8566df81b50
Parents: 7156df6
Author: Matt McCline <mmccl...@hortonworks.com>
Authored: Thu May 10 16:42:50 2018 -0500
Committer: Vineet Garg <vg...@apache.org>
Committed: Tue May 29 13:56:07 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |    5 +
 .../ql/io/arrow/ArrowColumnarBatchSerDe.java    | 1179 ++++++++++++++++++
 .../hive/ql/io/arrow/ArrowWrapperWritable.java  |   47 +
 .../hive/ql/io/arrow/RootAllocatorFactory.java  |   44 +
 .../io/arrow/TestArrowColumnarBatchSerDe.java   |  815 ++++++++++++
 serde/pom.xml                                   |    5 +
 6 files changed, 2095 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0e090e58/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 60d5f04..128e892 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2625,6 +2625,11 @@ public class HiveConf extends Configuration {
       "Set to true to ensure that each SQL Merge statement ensures that for 
each row in the target\n" +
         "table there is at most 1 matching row in the source table per SQL 
Specification."),
 
+    // For Arrow SerDe
+    HIVE_ARROW_ROOT_ALLOCATOR_LIMIT("hive.arrow.root.allocator.limit", 
Long.MAX_VALUE,
+        "Arrow root allocator memory size limitation in bytes."),
+    HIVE_ARROW_BATCH_SIZE("hive.arrow.batch.size", 1000, "The number of rows 
sent in one Arrow batch."),
+
     // For Druid storage handler
     HIVE_DRUID_INDEXING_GRANULARITY("hive.druid.indexer.segments.granularity", 
"DAY",
             new PatternSet("YEAR", "MONTH", "WEEK", "DAY", "HOUR", "MINUTE", 
"SECOND"),

http://git-wip-us.apache.org/repos/asf/hive/blob/0e090e58/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java
new file mode 100644
index 0000000..330fa58
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java
@@ -0,0 +1,1179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.arrow;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import io.netty.buffer.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.impl.UnionListWriter;
+import org.apache.arrow.vector.complex.impl.UnionReader;
+import org.apache.arrow.vector.complex.impl.UnionWriter;
+import org.apache.arrow.vector.complex.reader.FieldReader;
+import org.apache.arrow.vector.complex.writer.BaseWriter;
+import org.apache.arrow.vector.complex.writer.BigIntWriter;
+import org.apache.arrow.vector.complex.writer.BitWriter;
+import org.apache.arrow.vector.complex.writer.DateDayWriter;
+import org.apache.arrow.vector.complex.writer.DecimalWriter;
+import org.apache.arrow.vector.complex.writer.FieldWriter;
+import org.apache.arrow.vector.complex.writer.Float4Writer;
+import org.apache.arrow.vector.complex.writer.Float8Writer;
+import org.apache.arrow.vector.complex.writer.IntWriter;
+import org.apache.arrow.vector.complex.writer.IntervalDayWriter;
+import org.apache.arrow.vector.complex.writer.IntervalYearWriter;
+import org.apache.arrow.vector.complex.writer.SmallIntWriter;
+import org.apache.arrow.vector.complex.writer.TimeStampMilliWriter;
+import org.apache.arrow.vector.complex.writer.TinyIntWriter;
+import org.apache.arrow.vector.complex.writer.VarBinaryWriter;
+import org.apache.arrow.vector.complex.writer.VarCharWriter;
+import org.apache.arrow.vector.holders.NullableBigIntHolder;
+import org.apache.arrow.vector.holders.NullableBitHolder;
+import org.apache.arrow.vector.holders.NullableDateDayHolder;
+import org.apache.arrow.vector.holders.NullableFloat4Holder;
+import org.apache.arrow.vector.holders.NullableFloat8Holder;
+import org.apache.arrow.vector.holders.NullableIntHolder;
+import org.apache.arrow.vector.holders.NullableIntervalDayHolder;
+import org.apache.arrow.vector.holders.NullableIntervalYearHolder;
+import org.apache.arrow.vector.holders.NullableSmallIntHolder;
+import org.apache.arrow.vector.holders.NullableTimeStampMilliHolder;
+import org.apache.arrow.vector.holders.NullableTinyIntHolder;
+import org.apache.arrow.vector.holders.NullableVarBinaryHolder;
+import org.apache.arrow.vector.holders.NullableVarCharHolder;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.IntervalDayTimeColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAssignRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TimestampLocalTZTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.lang.reflect.Method;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.IntConsumer;
+
+import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_SIZE;
+import static 
org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil.createColumnVector;
+import static 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption.WRITABLE;
+import static 
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo;
+import static 
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getTypeInfoFromObjectInspector;
+
+/**
+ * ArrowColumnarBatchSerDe converts Apache Hive rows to Apache Arrow columns. 
Its serialized
+ * class is {@link ArrowWrapperWritable}, which doesn't support {@link
+ * Writable#readFields(DataInput)} and {@link Writable#write(DataOutput)}.
+ *
+ * Followings are known issues of current implementation.
+ *
+ * A list column cannot have a decimal column. {@link UnionListWriter} doesn't 
have an
+ * implementation for {@link BaseWriter.ListWriter#decimal()}.
+ *
+ * A union column can have only one of string, char, varchar fields at a same 
time. Apache Arrow
+ * doesn't have string and char, so {@link ArrowColumnarBatchSerDe} uses 
varchar to simulate
+ * string and char. They will be considered as a same data type in
+ * {@link org.apache.arrow.vector.complex.UnionVector}.
+ *
+ * Timestamp with local timezone is not supported. {@link VectorAssignRow} 
doesn't support it.
+ */
+public class ArrowColumnarBatchSerDe extends AbstractSerDe {
+  public static final Logger LOG = 
LoggerFactory.getLogger(ArrowColumnarBatchSerDe.class.getName());
+  private static final String DEFAULT_ARROW_FIELD_NAME = "[DEFAULT]";
+
+  private static final int MS_PER_SECOND = 1_000;
+  private static final int MS_PER_MINUTE = MS_PER_SECOND * 60;
+  private static final int MS_PER_HOUR = MS_PER_MINUTE * 60;
+  private static final int MS_PER_DAY = MS_PER_HOUR * 24;
+  private static final int NS_PER_MS = 1_000_000;
+
+  private BufferAllocator rootAllocator;
+
+  private StructTypeInfo rowTypeInfo;
+  private StructObjectInspector rowObjectInspector;
+  private Configuration conf;
+  private Serializer serializer;
+  private Deserializer deserializer;
+
+  @Override
+  public void initialize(Configuration conf, Properties tbl) throws 
SerDeException {
+    this.conf = conf;
+
+    rootAllocator = RootAllocatorFactory.INSTANCE.getRootAllocator(conf);
+
+    final String columnNameProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMNS);
+    final String columnTypeProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+    final String columnNameDelimiter = 
tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
+        .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : 
String.valueOf(SerDeUtils.COMMA);
+
+    // Create an object inspector
+    final List<String> columnNames;
+    if (columnNameProperty.length() == 0) {
+      columnNames = new ArrayList<>();
+    } else {
+      columnNames = 
Arrays.asList(columnNameProperty.split(columnNameDelimiter));
+    }
+    final List<TypeInfo> columnTypes;
+    if (columnTypeProperty.length() == 0) {
+      columnTypes = new ArrayList<>();
+    } else {
+      columnTypes = 
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+    }
+    rowTypeInfo = (StructTypeInfo) 
TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
+    rowObjectInspector =
+        (StructObjectInspector) 
getStandardWritableObjectInspectorFromTypeInfo(rowTypeInfo);
+
+    final List<Field> fields = new ArrayList<>();
+    final int size = columnNames.size();
+    for (int i = 0; i < size; i++) {
+      fields.add(toField(columnNames.get(i), columnTypes.get(i)));
+    }
+
+    serializer = new Serializer(new Schema(fields));
+    deserializer = new Deserializer();
+  }
+
+  private class Serializer {
+    private final int MAX_BUFFERED_ROWS;
+
+    // Schema
+    private final StructTypeInfo structTypeInfo;
+    private final List<TypeInfo> fieldTypeInfos;
+    private final int fieldSize;
+
+    // Hive columns
+    private final VectorizedRowBatch vectorizedRowBatch;
+    private final VectorAssignRow vectorAssignRow;
+    private int batchSize;
+
+    // Arrow columns
+    private final VectorSchemaRoot vectorSchemaRoot;
+    private final List<FieldVector> arrowVectors;
+    private final List<FieldWriter> fieldWriters;
+
+    private Serializer(Schema schema) throws SerDeException {
+      MAX_BUFFERED_ROWS = HiveConf.getIntVar(conf, HIVE_ARROW_BATCH_SIZE);
+      LOG.info("ArrowColumnarBatchSerDe max number of buffered columns: " + 
MAX_BUFFERED_ROWS);
+
+      // Schema
+      structTypeInfo = (StructTypeInfo) 
getTypeInfoFromObjectInspector(rowObjectInspector);
+      fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+      fieldSize = fieldTypeInfos.size();
+
+      // Init Arrow stuffs
+      vectorSchemaRoot = VectorSchemaRoot.create(schema, rootAllocator);
+      arrowVectors = vectorSchemaRoot.getFieldVectors();
+      fieldWriters = Lists.newArrayList();
+      for (FieldVector fieldVector : arrowVectors) {
+        final FieldWriter fieldWriter =
+            Types.getMinorTypeForArrowType(
+                
fieldVector.getField().getType()).getNewFieldWriter(fieldVector);
+        fieldWriters.add(fieldWriter);
+      }
+
+      // Init Hive stuffs
+      vectorizedRowBatch = new VectorizedRowBatch(fieldSize);
+      for (int i = 0; i < fieldSize; i++) {
+        final ColumnVector columnVector = 
createColumnVector(fieldTypeInfos.get(i));
+        vectorizedRowBatch.cols[i] = columnVector;
+        columnVector.init();
+      }
+      vectorizedRowBatch.ensureSize(MAX_BUFFERED_ROWS);
+      vectorAssignRow = new VectorAssignRow();
+      try {
+        vectorAssignRow.init(rowObjectInspector);
+      } catch (HiveException e) {
+        throw new SerDeException(e);
+      }
+    }
+
+    private ArrowWrapperWritable serializeBatch() {
+      for (int i = 0; i < vectorizedRowBatch.projectionSize; i++) {
+        final int projectedColumn = vectorizedRowBatch.projectedColumns[i];
+        final ColumnVector hiveVector = 
vectorizedRowBatch.cols[projectedColumn];
+        final TypeInfo fieldTypeInfo = 
structTypeInfo.getAllStructFieldTypeInfos().get(i);
+        final FieldWriter fieldWriter = fieldWriters.get(i);
+        final FieldVector arrowVector = arrowVectors.get(i);
+        arrowVector.setValueCount(0);
+        fieldWriter.setPosition(0);
+        write(fieldWriter, arrowVector, hiveVector, fieldTypeInfo, 0, 
batchSize, true);
+      }
+      vectorizedRowBatch.reset();
+      vectorSchemaRoot.setRowCount(batchSize);
+
+      batchSize = 0;
+      return new ArrowWrapperWritable(vectorSchemaRoot);
+    }
+
+    private BaseWriter getWriter(FieldWriter writer, TypeInfo typeInfo, String 
name) {
+      switch (typeInfo.getCategory()) {
+        case PRIMITIVE:
+          switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
+            case BOOLEAN:
+              return writer.bit(name);
+            case BYTE:
+              return writer.tinyInt(name);
+            case SHORT:
+              return writer.smallInt(name);
+            case INT:
+              return writer.integer(name);
+            case LONG:
+              return writer.bigInt(name);
+            case FLOAT:
+              return writer.float4(name);
+            case DOUBLE:
+              return writer.float8(name);
+            case STRING:
+            case VARCHAR:
+            case CHAR:
+              return writer.varChar(name);
+            case DATE:
+              return writer.dateDay(name);
+            case TIMESTAMP:
+              return writer.timeStampMilli(name);
+            case BINARY:
+              return writer.varBinary(name);
+            case DECIMAL:
+              final DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) 
typeInfo;
+              final int scale = decimalTypeInfo.scale();
+              final int precision = decimalTypeInfo.precision();
+              return writer.decimal(name, scale, precision);
+            case INTERVAL_YEAR_MONTH:
+              return writer.intervalYear(name);
+            case INTERVAL_DAY_TIME:
+              return writer.intervalDay(name);
+            case TIMESTAMPLOCALTZ: // VectorAssignRow doesn't support it
+            case VOID:
+            case UNKNOWN:
+            default:
+              throw new IllegalArgumentException();
+          }
+        case LIST:
+        case UNION:
+          return writer.list(name);
+        case STRUCT:
+          return writer.map(name);
+        case MAP: // The caller will convert map to array<struct>
+          return writer.list(name).map();
+        default:
+          throw new IllegalArgumentException();
+      }
+    }
+
+    private BaseWriter getWriter(FieldWriter writer, TypeInfo typeInfo) {
+      switch (typeInfo.getCategory()) {
+        case PRIMITIVE:
+          switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
+            case BOOLEAN:
+              return writer.bit();
+            case BYTE:
+              return writer.tinyInt();
+            case SHORT:
+              return writer.smallInt();
+            case INT:
+              return writer.integer();
+            case LONG:
+              return writer.bigInt();
+            case FLOAT:
+              return writer.float4();
+            case DOUBLE:
+              return writer.float8();
+            case STRING:
+            case VARCHAR:
+            case CHAR:
+              return writer.varChar();
+            case DATE:
+              return writer.dateDay();
+            case TIMESTAMP:
+              return writer.timeStampMilli();
+            case BINARY:
+              return writer.varBinary();
+            case INTERVAL_YEAR_MONTH:
+              return writer.intervalDay();
+            case INTERVAL_DAY_TIME:
+              return writer.intervalYear();
+            case TIMESTAMPLOCALTZ: // VectorAssignRow doesn't support it
+            case DECIMAL: // ListVector doesn't support it
+            case VOID:
+            case UNKNOWN:
+            default:
+              throw new IllegalArgumentException();
+          }
+        case LIST:
+        case UNION:
+          return writer.list();
+        case STRUCT:
+          return writer.map();
+        case MAP: // The caller will convert map to array<struct>
+          return writer.list().map();
+        default:
+          throw new IllegalArgumentException();
+      }
+    }
+
+    private void write(BaseWriter baseWriter, FieldVector arrowVector, 
ColumnVector hiveVector,
+        TypeInfo typeInfo, int offset, int length, boolean incrementIndex) {
+
+      final IntConsumer writer;
+      switch (typeInfo.getCategory()) {
+        case PRIMITIVE:
+          final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory =
+              ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory();
+          switch (primitiveCategory) {
+            case BOOLEAN:
+              writer = index -> ((BitWriter) baseWriter).writeBit(
+                  (int) ((LongColumnVector) hiveVector).vector[index]);
+              break;
+            case BYTE:
+              writer = index ->
+                  ((TinyIntWriter) baseWriter).writeTinyInt(
+                      (byte) ((LongColumnVector) hiveVector).vector[index]);
+              break;
+            case SHORT:
+              writer = index -> ((SmallIntWriter) baseWriter).writeSmallInt(
+                  (short) ((LongColumnVector) hiveVector).vector[index]);
+              break;
+            case INT:
+              writer = index -> ((IntWriter) baseWriter).writeInt(
+                  (int) ((LongColumnVector) hiveVector).vector[index]);
+              break;
+            case LONG:
+              writer = index -> ((BigIntWriter) baseWriter).writeBigInt(
+                  ((LongColumnVector) hiveVector).vector[index]);
+              break;
+            case FLOAT:
+              writer = index -> ((Float4Writer) baseWriter).writeFloat4(
+                  (float) ((DoubleColumnVector) hiveVector).vector[index]);
+              break;
+            case DOUBLE:
+              writer = index -> ((Float8Writer) baseWriter).writeFloat8(
+                  ((DoubleColumnVector) hiveVector).vector[index]);
+              break;
+            case STRING:
+            case VARCHAR:
+            case CHAR:
+              writer = index -> {
+                BytesColumnVector stringVector = (BytesColumnVector) 
hiveVector;
+                byte[] bytes = stringVector.vector[index];
+                int start = stringVector.start[index];
+                int bytesLength = stringVector.length[index];
+                try (ArrowBuf arrowBuf = rootAllocator.buffer(bytesLength)) {
+                  arrowBuf.setBytes(0, bytes, start, bytesLength);
+                  ((VarCharWriter) baseWriter).writeVarChar(0, bytesLength, 
arrowBuf);
+                }
+              };
+              break;
+            case DATE:
+              writer = index -> ((DateDayWriter) baseWriter).writeDateDay(
+                  (int) ((LongColumnVector) hiveVector).vector[index]);
+              break;
+            case TIMESTAMP:
+              writer = index -> ((TimeStampMilliWriter) 
baseWriter).writeTimeStampMilli(
+                  ((TimestampColumnVector) hiveVector).getTime(index));
+              break;
+            case BINARY:
+              writer = index -> {
+                BytesColumnVector binaryVector = (BytesColumnVector) 
hiveVector;
+                final byte[] bytes = binaryVector.vector[index];
+                final int start = binaryVector.start[index];
+                final int byteLength = binaryVector.length[index];
+                try (ArrowBuf arrowBuf = rootAllocator.buffer(byteLength)) {
+                  arrowBuf.setBytes(0, bytes, start, byteLength);
+                  ((VarBinaryWriter) baseWriter).writeVarBinary(0, byteLength, 
arrowBuf);
+                }
+              };
+              break;
+            case DECIMAL:
+              writer = index -> {
+                DecimalColumnVector hiveDecimalVector = (DecimalColumnVector) 
hiveVector;
+                ((DecimalWriter) baseWriter).writeDecimal(
+                    
hiveDecimalVector.vector[index].getHiveDecimal().bigDecimalValue()
+                        .setScale(hiveDecimalVector.scale));
+              };
+              break;
+            case INTERVAL_YEAR_MONTH:
+              writer = index -> ((IntervalYearWriter) 
baseWriter).writeIntervalYear(
+                  (int) ((LongColumnVector) hiveVector).vector[index]);
+              break;
+            case INTERVAL_DAY_TIME:
+              writer = index -> {
+                IntervalDayTimeColumnVector intervalDayTimeVector =
+                    (IntervalDayTimeColumnVector) hiveVector;
+                final long millis = 
(intervalDayTimeVector.getTotalSeconds(index) * 1_000) +
+                    (intervalDayTimeVector.getNanos(index) / 1_000_000);
+                final int days = (int) (millis / MS_PER_DAY);
+                ((IntervalDayWriter) baseWriter).writeIntervalDay(
+                    days, (int) (millis % MS_PER_DAY));
+              };
+              break;
+            case VOID:
+            case UNKNOWN:
+            case TIMESTAMPLOCALTZ:
+            default:
+              throw new IllegalArgumentException();
+          }
+          break;
+        case LIST:
+          final ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
+          final TypeInfo elementTypeInfo = 
listTypeInfo.getListElementTypeInfo();
+          final ListColumnVector hiveListVector = (ListColumnVector) 
hiveVector;
+          final ColumnVector hiveElementVector = hiveListVector.child;
+          final FieldVector arrowElementVector = 
arrowVector.getChildrenFromFields().get(0);
+          final BaseWriter.ListWriter listWriter = (BaseWriter.ListWriter) 
baseWriter;
+          final BaseWriter elementWriter = getWriter((FieldWriter) baseWriter, 
elementTypeInfo);
+
+          writer = index -> {
+            final int listOffset = (int) hiveListVector.offsets[index];
+            final int listLength = (int) hiveListVector.lengths[index];
+            listWriter.startList();
+            write(elementWriter, arrowElementVector, hiveElementVector, 
elementTypeInfo,
+                listOffset, listLength, false);
+            listWriter.endList();
+          };
+
+          incrementIndex = false;
+          break;
+        case STRUCT:
+          final StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+          final List<TypeInfo> fieldTypeInfos = 
structTypeInfo.getAllStructFieldTypeInfos();
+          final StructColumnVector hiveStructVector = (StructColumnVector) 
hiveVector;
+          final List<FieldVector> arrowFieldVectors = 
arrowVector.getChildrenFromFields();
+          final ColumnVector[] hiveFieldVectors = hiveStructVector.fields;
+          final BaseWriter.MapWriter structWriter = (BaseWriter.MapWriter) 
baseWriter;
+          final int fieldSize = fieldTypeInfos.size();
+
+          writer = index -> {
+            structWriter.start();
+            for (int fieldIndex = 0; fieldIndex < fieldSize; fieldIndex++) {
+              final TypeInfo fieldTypeInfo = fieldTypeInfos.get(fieldIndex);
+              final String fieldName = 
structTypeInfo.getAllStructFieldNames().get(fieldIndex);
+              final ColumnVector hiveFieldVector = 
hiveFieldVectors[fieldIndex];
+              final BaseWriter fieldWriter = getWriter((FieldWriter) 
structWriter, fieldTypeInfo,
+                  fieldName);
+              final FieldVector arrowFieldVector = 
arrowFieldVectors.get(fieldIndex);
+              write(fieldWriter, arrowFieldVector, hiveFieldVector, 
fieldTypeInfo, index, 1, false);
+            }
+            structWriter.end();
+          };
+
+          incrementIndex = false;
+          break;
+        case UNION:
+          final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
+          final List<TypeInfo> objectTypeInfos = 
unionTypeInfo.getAllUnionObjectTypeInfos();
+          final UnionColumnVector hiveUnionVector = (UnionColumnVector) 
hiveVector;
+          final ColumnVector[] hiveObjectVectors = hiveUnionVector.fields;
+          final UnionWriter unionWriter = (UnionWriter) baseWriter;
+
+          writer = index -> {
+            final int tag = hiveUnionVector.tags[index];
+            final ColumnVector hiveObjectVector = hiveObjectVectors[tag];
+            final TypeInfo objectTypeInfo = objectTypeInfos.get(tag);
+            write(unionWriter, arrowVector, hiveObjectVector, objectTypeInfo, 
index, 1, false);
+          };
+          break;
+        case MAP:
+          final ListTypeInfo structListTypeInfo =
+              toStructListTypeInfo((MapTypeInfo) typeInfo);
+          final ListColumnVector structListVector =
+              toStructListVector((MapColumnVector) hiveVector);
+
+          writer = index -> write(baseWriter, arrowVector, structListVector, 
structListTypeInfo,
+              index, length, false);
+
+          incrementIndex = false;
+          break;
+        default:
+          throw new IllegalArgumentException();
+      }
+
+      if (hiveVector.noNulls) {
+        if (hiveVector.isRepeating) {
+          for (int i = 0; i < length; i++) {
+            writer.accept(0);
+            if (incrementIndex) {
+              baseWriter.setPosition(baseWriter.getPosition() + 1);
+            }
+          }
+        } else {
+          if (vectorizedRowBatch.selectedInUse) {
+            for (int j = 0; j < length; j++) {
+              final int i = vectorizedRowBatch.selected[j];
+              writer.accept(offset + i);
+              if (incrementIndex) {
+                baseWriter.setPosition(baseWriter.getPosition() + 1);
+              }
+            }
+          } else {
+            for (int i = 0; i < length; i++) {
+              writer.accept(offset + i);
+              if (incrementIndex) {
+                baseWriter.setPosition(baseWriter.getPosition() + 1);
+              }
+            }
+          }
+        }
+      } else {
+        if (hiveVector.isRepeating) {
+          for (int i = 0; i < length; i++) {
+            if (hiveVector.isNull[0]) {
+              writeNull(baseWriter);
+            } else {
+              writer.accept(0);
+            }
+            if (incrementIndex) {
+              baseWriter.setPosition(baseWriter.getPosition() + 1);
+            }
+          }
+        } else {
+          if (vectorizedRowBatch.selectedInUse) {
+            for (int j = 0; j < length; j++) {
+              final int i = vectorizedRowBatch.selected[j];
+              if (hiveVector.isNull[offset + i]) {
+                writeNull(baseWriter);
+              } else {
+                writer.accept(offset + i);
+              }
+              if (incrementIndex) {
+                baseWriter.setPosition(baseWriter.getPosition() + 1);
+              }
+            }
+          } else {
+            for (int i = 0; i < length; i++) {
+              if (hiveVector.isNull[offset + i]) {
+                writeNull(baseWriter);
+              } else {
+                writer.accept(offset + i);
+              }
+              if (incrementIndex) {
+                baseWriter.setPosition(baseWriter.getPosition() + 1);
+              }
+            }
+          }
+        }
+      }
+    }
+
+    public ArrowWrapperWritable serialize(Object obj, ObjectInspector 
objInspector) {
+      // if row is null, it means there are no more rows (closeOp()).
+      // another case can be that the buffer is full.
+      if (obj == null) {
+        return serializeBatch();
+      }
+      List<Object> standardObjects = new ArrayList<Object>();
+      ObjectInspectorUtils.copyToStandardObject(standardObjects, obj,
+          ((StructObjectInspector) objInspector), WRITABLE);
+
+      vectorAssignRow.assignRow(vectorizedRowBatch, batchSize, 
standardObjects, fieldSize);
+      batchSize++;
+      if (batchSize == MAX_BUFFERED_ROWS) {
+        return serializeBatch();
+      }
+      return null;
+    }
+  }
+
+  private static void writeNull(BaseWriter baseWriter) {
+    if (baseWriter instanceof UnionListWriter) {
+      // UnionListWriter should implement AbstractFieldWriter#writeNull
+      BaseWriter.ListWriter listWriter = ((UnionListWriter) baseWriter).list();
+      listWriter.setPosition(listWriter.getPosition() + 1);
+    } else {
+      // FieldWriter should have a super method of 
AbstractFieldWriter#writeNull
+      try {
+        Method method = baseWriter.getClass().getMethod("writeNull");
+        method.setAccessible(true);
+        method.invoke(baseWriter);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private static abstract class PrimitiveReader {
+    final void read(FieldReader reader, ColumnVector columnVector, int offset, 
int length) {
+      for (int i = 0; i < length; i++) {
+        final int rowIndex = offset + i;
+        if (reader.isSet()) {
+          doRead(reader, columnVector, rowIndex);
+        } else {
+          VectorizedBatchUtil.setNullColIsNullValue(columnVector, rowIndex);
+        }
+        reader.setPosition(reader.getPosition() + 1);
+      }
+    }
+
+    abstract void doRead(FieldReader reader, ColumnVector columnVector, int 
rowIndex);
+  }
+
+  private class Deserializer {
+    private final VectorExtractRow vectorExtractRow;
+    private final VectorizedRowBatch vectorizedRowBatch;
+    private Object[][] rows;
+
+    public Deserializer() throws SerDeException {
+      vectorExtractRow = new VectorExtractRow();
+      final List<TypeInfo> fieldTypeInfoList = 
rowTypeInfo.getAllStructFieldTypeInfos();
+      final int fieldCount = fieldTypeInfoList.size();
+      final TypeInfo[] typeInfos = fieldTypeInfoList.toArray(new 
TypeInfo[fieldCount]);
+      try {
+        vectorExtractRow.init(typeInfos);
+      } catch (HiveException e) {
+        throw new SerDeException(e);
+      }
+
+      vectorizedRowBatch = new VectorizedRowBatch(fieldCount);
+      for (int i = 0; i < fieldCount; i++) {
+        final ColumnVector columnVector = createColumnVector(typeInfos[i]);
+        columnVector.init();
+        vectorizedRowBatch.cols[i] = columnVector;
+      }
+    }
+
+    public Object deserialize(Writable writable) {
+      final ArrowWrapperWritable arrowWrapperWritable = (ArrowWrapperWritable) 
writable;
+      final VectorSchemaRoot vectorSchemaRoot = 
arrowWrapperWritable.getVectorSchemaRoot();
+      final List<FieldVector> fieldVectors = 
vectorSchemaRoot.getFieldVectors();
+      final int fieldCount = fieldVectors.size();
+      final int rowCount = vectorSchemaRoot.getRowCount();
+      vectorizedRowBatch.ensureSize(rowCount);
+
+      if (rows == null || rows.length < rowCount ) {
+        rows = new Object[rowCount][];
+        for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
+          rows[rowIndex] = new Object[fieldCount];
+        }
+      }
+
+      for (int i = 0; i < fieldCount; i++) {
+        final FieldVector fieldVector = fieldVectors.get(i);
+        final FieldReader fieldReader = fieldVector.getReader();
+        fieldReader.setPosition(0);
+        final int projectedCol = vectorizedRowBatch.projectedColumns[i];
+        final ColumnVector columnVector = 
vectorizedRowBatch.cols[projectedCol];
+        final TypeInfo typeInfo = 
rowTypeInfo.getAllStructFieldTypeInfos().get(i);
+        read(fieldReader, columnVector, typeInfo, 0, rowCount);
+      }
+      for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
+        vectorExtractRow.extractRow(vectorizedRowBatch, rowIndex, 
rows[rowIndex]);
+      }
+      vectorizedRowBatch.reset();
+      return rows;
+    }
+
+    private void read(FieldReader reader, ColumnVector columnVector, TypeInfo 
typeInfo,
+        int rowOffset, int rowLength) {
+      switch (typeInfo.getCategory()) {
+        case PRIMITIVE:
+          final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory =
+              ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory();
+          final PrimitiveReader primitiveReader;
+          switch (primitiveCategory) {
+            case BOOLEAN:
+              primitiveReader = new PrimitiveReader() {
+                NullableBitHolder holder = new NullableBitHolder();
+
+                @Override
+                void doRead(FieldReader reader, ColumnVector columnVector, int 
rowIndex) {
+                  reader.read(holder);
+                  ((LongColumnVector) columnVector).vector[rowIndex] = 
holder.value;
+                }
+              };
+              break;
+            case BYTE:
+              primitiveReader = new PrimitiveReader() {
+                NullableTinyIntHolder holder = new NullableTinyIntHolder();
+
+                @Override
+                void doRead(FieldReader reader, ColumnVector columnVector, int 
rowIndex) {
+                  reader.read(holder);
+                  ((LongColumnVector) columnVector).vector[rowIndex] = 
holder.value;
+                }
+              };
+              break;
+            case SHORT:
+              primitiveReader = new PrimitiveReader() {
+                NullableSmallIntHolder holder = new NullableSmallIntHolder();
+
+                @Override
+                void doRead(FieldReader reader, ColumnVector columnVector, int 
rowIndex) {
+                  reader.read(holder);
+                  ((LongColumnVector) columnVector).vector[rowIndex] = 
holder.value;
+                }
+              };
+              break;
+            case INT:
+              primitiveReader = new PrimitiveReader() {
+                NullableIntHolder holder = new NullableIntHolder();
+
+                @Override
+                void doRead(FieldReader reader, ColumnVector columnVector, int 
rowIndex) {
+                  reader.read(holder);
+                  ((LongColumnVector) columnVector).vector[rowIndex] = 
holder.value;
+                }
+              };
+              break;
+            case LONG:
+              primitiveReader = new PrimitiveReader() {
+                NullableBigIntHolder holder = new NullableBigIntHolder();
+
+                @Override
+                void doRead(FieldReader reader, ColumnVector columnVector, int 
rowIndex) {
+                  reader.read(holder);
+                  ((LongColumnVector) columnVector).vector[rowIndex] = 
holder.value;
+                }
+              };
+              break;
+            case FLOAT:
+              primitiveReader = new PrimitiveReader() {
+                NullableFloat4Holder holder = new NullableFloat4Holder();
+
+                @Override
+                void doRead(FieldReader reader, ColumnVector columnVector, int 
rowIndex) {
+                  reader.read(holder);
+                  ((DoubleColumnVector) columnVector).vector[rowIndex] = 
holder.value;
+                }
+              };
+              break;
+            case DOUBLE:
+              primitiveReader = new PrimitiveReader() {
+                NullableFloat8Holder holder = new NullableFloat8Holder();
+
+                @Override
+                void doRead(FieldReader reader, ColumnVector columnVector, int 
rowIndex) {
+                  reader.read(holder);
+                  ((DoubleColumnVector) columnVector).vector[rowIndex] = 
holder.value;
+                }
+              };
+              break;
+            case STRING:
+            case VARCHAR:
+            case CHAR:
+              primitiveReader = new PrimitiveReader() {
+                NullableVarCharHolder holder = new NullableVarCharHolder();
+
+                @Override
+                void doRead(FieldReader reader, ColumnVector columnVector, int 
rowIndex) {
+                  reader.read(holder);
+                  int varCharSize = holder.end - holder.start;
+                  byte[] varCharBytes = new byte[varCharSize];
+                  holder.buffer.getBytes(holder.start, varCharBytes);
+                  ((BytesColumnVector) columnVector).setVal(rowIndex, 
varCharBytes, 0, varCharSize);
+                }
+              };
+              break;
+            case DATE:
+              primitiveReader = new PrimitiveReader() {
+                NullableDateDayHolder holder = new NullableDateDayHolder();
+
+                @Override
+                void doRead(FieldReader reader, ColumnVector columnVector, int 
rowIndex) {
+                  reader.read(holder);
+                  ((LongColumnVector) columnVector).vector[rowIndex] = 
holder.value;
+                }
+              };
+              break;
+            case TIMESTAMP:
+              primitiveReader = new PrimitiveReader() {
+                NullableTimeStampMilliHolder timeStampMilliHolder =
+                    new NullableTimeStampMilliHolder();
+
+                @Override
+                void doRead(FieldReader reader, ColumnVector columnVector, int 
rowIndex) {
+                  reader.read(timeStampMilliHolder);
+                  ((TimestampColumnVector) columnVector).set(rowIndex,
+                      new Timestamp(timeStampMilliHolder.value));
+                }
+              };
+              break;
+            case BINARY:
+              primitiveReader = new PrimitiveReader() {
+                NullableVarBinaryHolder holder = new NullableVarBinaryHolder();
+
+                @Override
+                void doRead(FieldReader reader, ColumnVector columnVector, int 
rowIndex) {
+                  reader.read(holder);
+                  final int binarySize = holder.end - holder.start;
+                  final byte[] binaryBytes = new byte[binarySize];
+                  holder.buffer.getBytes(holder.start, binaryBytes);
+                  ((BytesColumnVector) columnVector).setVal(rowIndex, 
binaryBytes, 0, binarySize);
+                }
+              };
+              break;
+            case DECIMAL:
+              primitiveReader = new PrimitiveReader() {
+                @Override
+                void doRead(FieldReader reader, ColumnVector columnVector, int 
rowIndex) {
+                  ((DecimalColumnVector) columnVector).set(rowIndex,
+                      HiveDecimal.create(reader.readBigDecimal()));
+                }
+              };
+              break;
+            case INTERVAL_YEAR_MONTH:
+              primitiveReader = new PrimitiveReader() {
+                NullableIntervalYearHolder holder = new 
NullableIntervalYearHolder();
+
+                @Override
+                void doRead(FieldReader reader, ColumnVector columnVector, int 
rowIndex) {
+                  reader.read(holder);
+                  ((LongColumnVector) columnVector).vector[rowIndex] = 
holder.value;
+                }
+              };
+              break;
+            case INTERVAL_DAY_TIME:
+              primitiveReader = new PrimitiveReader() {
+                NullableIntervalDayHolder holder = new 
NullableIntervalDayHolder();
+
+                @Override
+                void doRead(FieldReader reader, ColumnVector columnVector, int 
rowIndex) {
+                  IntervalDayTimeColumnVector intervalDayTimeVector =
+                      (IntervalDayTimeColumnVector) columnVector;
+                  reader.read(holder);
+                  HiveIntervalDayTime intervalDayTime = new 
HiveIntervalDayTime(
+                      holder.days, // days
+                      holder.milliseconds / MS_PER_HOUR, // hour
+                      (holder.milliseconds % MS_PER_HOUR) / MS_PER_MINUTE, // 
minute
+                      (holder.milliseconds % MS_PER_MINUTE) / MS_PER_SECOND, 
// second
+                      (holder.milliseconds % MS_PER_SECOND) * NS_PER_MS); // 
nanosecond
+                  intervalDayTimeVector.set(rowIndex, intervalDayTime);
+                }
+              };
+              break;
+            default:
+              throw new IllegalArgumentException();
+          }
+          primitiveReader.read(reader, columnVector, rowOffset, rowLength);
+          break;
+        case LIST:
+          final ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
+          final TypeInfo elementTypeInfo = 
listTypeInfo.getListElementTypeInfo();
+          final ListColumnVector listVector = (ListColumnVector) columnVector;
+          final ColumnVector elementVector = listVector.child;
+          final FieldReader elementReader = reader.reader();
+
+          int listOffset = 0;
+          for (int rowIndex = 0; rowIndex < rowLength; rowIndex++) {
+            final int adjustedRowIndex = rowOffset + rowIndex;
+            reader.setPosition(adjustedRowIndex);
+            final int listLength = reader.size();
+            listVector.offsets[adjustedRowIndex] = listOffset;
+            listVector.lengths[adjustedRowIndex] = listLength;
+            read(elementReader, elementVector, elementTypeInfo, listOffset, 
listLength);
+            listOffset += listLength;
+          }
+          break;
+        case STRUCT:
+          final StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+          final List<TypeInfo> fieldTypeInfos = 
structTypeInfo.getAllStructFieldTypeInfos();
+          final List<String> fieldNames = 
structTypeInfo.getAllStructFieldNames();
+          final int fieldSize = fieldNames.size();
+          final StructColumnVector structVector = (StructColumnVector) 
columnVector;
+          final ColumnVector[] fieldVectors = structVector.fields;
+
+          for (int fieldIndex = 0; fieldIndex < fieldSize; fieldIndex++) {
+            final TypeInfo fieldTypeInfo = fieldTypeInfos.get(fieldIndex);
+            final FieldReader fieldReader = 
reader.reader(fieldNames.get(fieldIndex));
+            final ColumnVector fieldVector = fieldVectors[fieldIndex];
+            read(fieldReader, fieldVector, fieldTypeInfo, rowOffset, 
rowLength);
+          }
+          break;
+        case UNION:
+          final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
+          final List<TypeInfo> objectTypeInfos = 
unionTypeInfo.getAllUnionObjectTypeInfos();
+          final UnionColumnVector unionVector = (UnionColumnVector) 
columnVector;
+          final ColumnVector[] objectVectors = unionVector.fields;
+          final Map<Types.MinorType, Integer> minorTypeToTagMap = 
Maps.newHashMap();
+          for (int tag = 0; tag < objectTypeInfos.size(); tag++) {
+            minorTypeToTagMap.put(toMinorType(objectTypeInfos.get(tag)), tag);
+          }
+
+          final UnionReader unionReader = (UnionReader) reader;
+          for (int rowIndex = 0; rowIndex < rowLength; rowIndex++) {
+            final int adjustedRowIndex = rowIndex + rowOffset;
+            unionReader.setPosition(adjustedRowIndex);
+            final Types.MinorType minorType = unionReader.getMinorType();
+            final int tag = minorTypeToTagMap.get(minorType);
+            unionVector.tags[adjustedRowIndex] = tag;
+            read(unionReader, objectVectors[tag], objectTypeInfos.get(tag), 
adjustedRowIndex, 1);
+          }
+          break;
+        case MAP:
+          final MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
+          final ListTypeInfo mapStructListTypeInfo = 
toStructListTypeInfo(mapTypeInfo);
+          final MapColumnVector hiveMapVector = (MapColumnVector) columnVector;
+          final ListColumnVector mapStructListVector = 
toStructListVector(hiveMapVector);
+          final StructColumnVector mapStructVector = (StructColumnVector) 
mapStructListVector.child;
+          read(reader, mapStructListVector, mapStructListTypeInfo, rowOffset, 
rowLength);
+
+          hiveMapVector.isRepeating = mapStructListVector.isRepeating;
+          hiveMapVector.childCount = mapStructListVector.childCount;
+          hiveMapVector.noNulls = mapStructListVector.noNulls;
+          System.arraycopy(mapStructListVector.offsets, 0, 
hiveMapVector.offsets, 0, rowLength);
+          System.arraycopy(mapStructListVector.lengths, 0, 
hiveMapVector.lengths, 0, rowLength);
+          hiveMapVector.keys = mapStructVector.fields[0];
+          hiveMapVector.values = mapStructVector.fields[1];
+          break;
+        default:
+          throw new IllegalArgumentException();
+      }
+    }
+  }
+
+  private static Types.MinorType toMinorType(TypeInfo typeInfo) {
+    switch (typeInfo.getCategory()) {
+      case PRIMITIVE:
+        switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
+          case BOOLEAN:
+            return Types.MinorType.BIT;
+          case BYTE:
+            return Types.MinorType.TINYINT;
+          case SHORT:
+            return Types.MinorType.SMALLINT;
+          case INT:
+            return Types.MinorType.INT;
+          case LONG:
+            return Types.MinorType.BIGINT;
+          case FLOAT:
+            return Types.MinorType.FLOAT4;
+          case DOUBLE:
+            return Types.MinorType.FLOAT8;
+          case STRING:
+          case VARCHAR:
+          case CHAR:
+            return Types.MinorType.VARCHAR;
+          case DATE:
+            return Types.MinorType.DATEDAY;
+          case TIMESTAMP:
+            return Types.MinorType.TIMESTAMPMILLI;
+          case BINARY:
+            return Types.MinorType.VARBINARY;
+          case DECIMAL:
+            return Types.MinorType.DECIMAL;
+          case INTERVAL_YEAR_MONTH:
+            return Types.MinorType.INTERVALYEAR;
+          case INTERVAL_DAY_TIME:
+            return Types.MinorType.INTERVALDAY;
+          case VOID:
+          case TIMESTAMPLOCALTZ:
+          case UNKNOWN:
+          default:
+            throw new IllegalArgumentException();
+        }
+      case LIST:
+        return Types.MinorType.LIST;
+      case STRUCT:
+        return Types.MinorType.MAP;
+      case UNION:
+        return Types.MinorType.UNION;
+      case MAP:
+        // Apache Arrow doesn't have a map vector, so it's converted to a list 
vector of a struct
+        // vector.
+        return Types.MinorType.LIST;
+      default:
+        throw new IllegalArgumentException();
+    }
+  }
+
+  private static ListTypeInfo toStructListTypeInfo(MapTypeInfo mapTypeInfo) {
+    final StructTypeInfo structTypeInfo = new StructTypeInfo();
+    structTypeInfo.setAllStructFieldNames(Lists.newArrayList("keys", 
"values"));
+    structTypeInfo.setAllStructFieldTypeInfos(Lists.newArrayList(
+        mapTypeInfo.getMapKeyTypeInfo(), mapTypeInfo.getMapValueTypeInfo()));
+    final ListTypeInfo structListTypeInfo = new ListTypeInfo();
+    structListTypeInfo.setListElementTypeInfo(structTypeInfo);
+    return structListTypeInfo;
+  }
+
+  private static Field toField(String name, TypeInfo typeInfo) {
+    switch (typeInfo.getCategory()) {
+      case PRIMITIVE:
+        final PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) 
typeInfo;
+        switch (primitiveTypeInfo.getPrimitiveCategory()) {
+          case BOOLEAN:
+            return Field.nullable(name, Types.MinorType.BIT.getType());
+          case BYTE:
+            return Field.nullable(name, Types.MinorType.TINYINT.getType());
+          case SHORT:
+            return Field.nullable(name, Types.MinorType.SMALLINT.getType());
+          case INT:
+            return Field.nullable(name, Types.MinorType.INT.getType());
+          case LONG:
+            return Field.nullable(name, Types.MinorType.BIGINT.getType());
+          case FLOAT:
+            return Field.nullable(name, Types.MinorType.FLOAT4.getType());
+          case DOUBLE:
+            return Field.nullable(name, Types.MinorType.FLOAT8.getType());
+          case STRING:
+            return Field.nullable(name, Types.MinorType.VARCHAR.getType());
+          case DATE:
+            return Field.nullable(name, Types.MinorType.DATEDAY.getType());
+          case TIMESTAMP:
+            return Field.nullable(name, 
Types.MinorType.TIMESTAMPMILLI.getType());
+          case TIMESTAMPLOCALTZ:
+            final TimestampLocalTZTypeInfo timestampLocalTZTypeInfo =
+                (TimestampLocalTZTypeInfo) typeInfo;
+            final String timeZone = 
timestampLocalTZTypeInfo.getTimeZone().toString();
+            return Field.nullable(name, new 
ArrowType.Timestamp(TimeUnit.MILLISECOND, timeZone));
+          case BINARY:
+            return Field.nullable(name, Types.MinorType.VARBINARY.getType());
+          case DECIMAL:
+            final DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo;
+            final int precision = decimalTypeInfo.precision();
+            final int scale = decimalTypeInfo.scale();
+            return Field.nullable(name, new ArrowType.Decimal(precision, 
scale));
+          case VARCHAR:
+            return Field.nullable(name, Types.MinorType.VARCHAR.getType());
+          case CHAR:
+            return Field.nullable(name, Types.MinorType.VARCHAR.getType());
+          case INTERVAL_YEAR_MONTH:
+            return Field.nullable(name, 
Types.MinorType.INTERVALYEAR.getType());
+          case INTERVAL_DAY_TIME:
+            return Field.nullable(name, Types.MinorType.INTERVALDAY.getType());
+          default:
+            throw new IllegalArgumentException();
+        }
+      case LIST:
+        final ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
+        final TypeInfo elementTypeInfo = listTypeInfo.getListElementTypeInfo();
+        return new Field(name, 
FieldType.nullable(Types.MinorType.LIST.getType()),
+            Lists.newArrayList(toField(DEFAULT_ARROW_FIELD_NAME, 
elementTypeInfo)));
+      case STRUCT:
+        final StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+        final List<TypeInfo> fieldTypeInfos = 
structTypeInfo.getAllStructFieldTypeInfos();
+        final List<String> fieldNames = 
structTypeInfo.getAllStructFieldNames();
+        final List<Field> structFields = Lists.newArrayList();
+        final int structSize = fieldNames.size();
+        for (int i = 0; i < structSize; i++) {
+          structFields.add(toField(fieldNames.get(i), fieldTypeInfos.get(i)));
+        }
+        return new Field(name, 
FieldType.nullable(Types.MinorType.MAP.getType()), structFields);
+      case UNION:
+        final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
+        final List<TypeInfo> objectTypeInfos = 
unionTypeInfo.getAllUnionObjectTypeInfos();
+        final List<Field> unionFields = Lists.newArrayList();
+        final int unionSize = unionFields.size();
+        for (int i = 0; i < unionSize; i++) {
+          unionFields.add(toField(DEFAULT_ARROW_FIELD_NAME, 
objectTypeInfos.get(i)));
+        }
+        return new Field(name, 
FieldType.nullable(Types.MinorType.UNION.getType()), unionFields);
+      case MAP:
+        final MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
+        final TypeInfo keyTypeInfo = mapTypeInfo.getMapKeyTypeInfo();
+        final TypeInfo valueTypeInfo = mapTypeInfo.getMapValueTypeInfo();
+
+        final StructTypeInfo mapStructTypeInfo = new StructTypeInfo();
+        mapStructTypeInfo.setAllStructFieldNames(Lists.newArrayList("keys", 
"values"));
+        mapStructTypeInfo.setAllStructFieldTypeInfos(
+            Lists.newArrayList(keyTypeInfo, valueTypeInfo));
+
+        final ListTypeInfo mapListStructTypeInfo = new ListTypeInfo();
+        mapListStructTypeInfo.setListElementTypeInfo(mapStructTypeInfo);
+
+        return toField(name, mapListStructTypeInfo);
+      default:
+        throw new IllegalArgumentException();
+    }
+  }
+
+  private static ListColumnVector toStructListVector(MapColumnVector 
mapVector) {
+    final StructColumnVector structVector;
+    final ListColumnVector structListVector;
+    structVector = new StructColumnVector();
+    structVector.fields = new ColumnVector[] {mapVector.keys, 
mapVector.values};
+    structListVector = new ListColumnVector();
+    structListVector.child = structVector;
+    System.arraycopy(mapVector.offsets, 0, structListVector.offsets, 0, 
mapVector.childCount);
+    System.arraycopy(mapVector.lengths, 0, structListVector.lengths, 0, 
mapVector.childCount);
+    structListVector.childCount = mapVector.childCount;
+    structListVector.isRepeating = mapVector.isRepeating;
+    structListVector.noNulls = mapVector.noNulls;
+    return structListVector;
+  }
+
+  @Override
+  public Class<? extends Writable> getSerializedClass() {
+    return ArrowWrapperWritable.class;
+  }
+
+  @Override
+  public ArrowWrapperWritable serialize(Object obj, ObjectInspector 
objInspector) {
+    return serializer.serialize(obj, objInspector);
+  }
+
+  @Override
+  public SerDeStats getSerDeStats() {
+    return null;
+  }
+
+  @Override
+  public Object deserialize(Writable writable) {
+    return deserializer.deserialize(writable);
+  }
+
+  @Override
+  public ObjectInspector getObjectInspector() {
+    return rowObjectInspector;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0e090e58/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
new file mode 100644
index 0000000..df7b53f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.arrow;
+
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class ArrowWrapperWritable implements Writable {
+  private VectorSchemaRoot vectorSchemaRoot;
+
+  public ArrowWrapperWritable(VectorSchemaRoot vectorSchemaRoot) {
+    this.vectorSchemaRoot = vectorSchemaRoot;
+  }
+
+  public VectorSchemaRoot getVectorSchemaRoot() {
+    return vectorSchemaRoot;
+  }
+
+  @Override
+  public void write(DataOutput dataOutput) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void readFields(DataInput dataInput) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0e090e58/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java
new file mode 100644
index 0000000..78cc188
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.arrow;
+
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_ROOT_ALLOCATOR_LIMIT;
+
+/**
+ * Thread-safe singleton factory for RootAllocator
+ */
+public enum RootAllocatorFactory {
+  INSTANCE;
+
+  private RootAllocator rootAllocator;
+
+  RootAllocatorFactory() {
+  }
+
+  public synchronized RootAllocator getRootAllocator(Configuration conf) {
+    if (rootAllocator == null) {
+      final long limit = HiveConf.getLongVar(conf, 
HIVE_ARROW_ROOT_ALLOCATOR_LIMIT);
+      rootAllocator = new RootAllocator(limit);
+    }
+    return rootAllocator;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0e090e58/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java
new file mode 100644
index 0000000..bcb7a88
--- /dev/null
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java
@@ -0,0 +1,815 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.arrow;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveChar;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
+import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
+import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.HiveIntervalDayTimeWritable;
+import org.apache.hadoop.hive.serde2.io.HiveIntervalYearMonthWritable;
+import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Timestamp;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestArrowColumnarBatchSerDe {
+  private Configuration conf;
+
+  private final static Object[][] INTEGER_ROWS = {
+      {byteW(0), shortW(0), intW(0), longW(0)},
+      {byteW(1), shortW(1), intW(1), longW(1)},
+      {byteW(-1), shortW(-1), intW(-1), longW(-1)},
+      {byteW(Byte.MIN_VALUE), shortW(Short.MIN_VALUE), intW(Integer.MIN_VALUE),
+          longW(Long.MIN_VALUE)},
+      {byteW(Byte.MAX_VALUE), shortW(Short.MAX_VALUE), intW(Integer.MAX_VALUE),
+          longW(Long.MAX_VALUE)},
+      {null, null, null, null},
+  };
+
+  private final static Object[][] FLOAT_ROWS = {
+      {floatW(0f), doubleW(0d)},
+      {floatW(1f), doubleW(1d)},
+      {floatW(-1f), doubleW(-1d)},
+      {floatW(Float.MIN_VALUE), doubleW(Double.MIN_VALUE)},
+      {floatW(-Float.MIN_VALUE), doubleW(-Double.MIN_VALUE)},
+      {floatW(Float.MAX_VALUE), doubleW(Double.MAX_VALUE)},
+      {floatW(-Float.MAX_VALUE), doubleW(-Double.MAX_VALUE)},
+      {floatW(Float.POSITIVE_INFINITY), doubleW(Double.POSITIVE_INFINITY)},
+      {floatW(Float.NEGATIVE_INFINITY), doubleW(Double.NEGATIVE_INFINITY)},
+      {null, null},
+  };
+
+  private final static Object[][] STRING_ROWS = {
+      {text(""), charW("", 10), varcharW("", 10)},
+      {text("Hello"), charW("Hello", 10), varcharW("Hello", 10)},
+      {text("world!"), charW("world!", 10), varcharW("world!", 10)},
+      {null, null, null},
+  };
+
+  private final static long NOW = System.currentTimeMillis();
+  private final static Object[][] DTI_ROWS = {
+      {
+          new DateWritable(DateWritable.millisToDays(NOW)),
+          new TimestampWritable(new Timestamp(NOW)),
+          new HiveIntervalYearMonthWritable(new HiveIntervalYearMonth(1, 2)),
+          new HiveIntervalDayTimeWritable(new HiveIntervalDayTime(1, 2, 3, 4, 
5_000_000))
+      },
+      {null, null, null, null},
+  };
+
+  private final static Object[][] DECIMAL_ROWS = {
+      {decimalW(HiveDecimal.ZERO)},
+      {decimalW(HiveDecimal.ONE)},
+      {decimalW(HiveDecimal.ONE.negate())},
+      {decimalW(HiveDecimal.create("0.000001"))},
+      {decimalW(HiveDecimal.create("100000"))},
+      {null},
+  };
+
+  private final static Object[][] BOOLEAN_ROWS = {
+      {new BooleanWritable(true)},
+      {new BooleanWritable(false)},
+      {null},
+  };
+
+  private final static Object[][] BINARY_ROWS = {
+      {new BytesWritable("".getBytes())},
+      {new BytesWritable("Hello".getBytes())},
+      {new BytesWritable("world!".getBytes())},
+      {null},
+  };
+
+  @Before
+  public void setUp() {
+    conf = new Configuration();
+  }
+
+  private static ByteWritable byteW(int value) {
+    return new ByteWritable((byte) value);
+  }
+
+  private static ShortWritable shortW(int value) {
+    return new ShortWritable((short) value);
+  }
+
+  private static IntWritable intW(int value) {
+    return new IntWritable(value);
+  }
+
+  private static LongWritable longW(long value) {
+    return new LongWritable(value);
+  }
+
+  private static FloatWritable floatW(float value) {
+    return new FloatWritable(value);
+  }
+
+  private static DoubleWritable doubleW(double value) {
+    return new DoubleWritable(value);
+  }
+
+  private static Text text(String value) {
+    return new Text(value);
+  }
+
+  private static HiveCharWritable charW(String value, int length) {
+    return new HiveCharWritable(new HiveChar(value, length));
+  }
+
+  private static HiveVarcharWritable varcharW(String value, int length) {
+    return new HiveVarcharWritable(new HiveVarchar(value, length));
+  }
+
+  private static HiveDecimalWritable decimalW(HiveDecimal value) {
+    return new HiveDecimalWritable(value);
+  }
+
+  private void initAndSerializeAndDeserialize(String[][] schema, Object[][] 
rows) throws SerDeException {
+    AbstractSerDe serDe = new ArrowColumnarBatchSerDe();
+    StructObjectInspector rowOI = initSerDe(serDe, schema);
+    serializeAndDeserialize(serDe, rows, rowOI);
+  }
+
+  private StructObjectInspector initSerDe(AbstractSerDe serDe, String[][] 
schema)
+      throws SerDeException {
+    List<String> fieldNameList = newArrayList();
+    List<String> fieldTypeList = newArrayList();
+    List<TypeInfo> typeInfoList = newArrayList();
+
+    for (String[] nameAndType : schema) {
+      String name = nameAndType[0];
+      String type = nameAndType[1];
+      fieldNameList.add(name);
+      fieldTypeList.add(type);
+      typeInfoList.add(TypeInfoUtils.getTypeInfoFromTypeString(type));
+    }
+
+    String fieldNames = Joiner.on(',').join(fieldNameList);
+    String fieldTypes = Joiner.on(',').join(fieldTypeList);
+
+    Properties schemaProperties = new Properties();
+    schemaProperties.setProperty(serdeConstants.LIST_COLUMNS, fieldNames);
+    schemaProperties.setProperty(serdeConstants.LIST_COLUMN_TYPES, fieldTypes);
+    SerDeUtils.initializeSerDe(serDe, conf, schemaProperties, null);
+    return (StructObjectInspector) 
TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(
+        TypeInfoFactory.getStructTypeInfo(fieldNameList, typeInfoList));
+  }
+
+  private void serializeAndDeserialize(AbstractSerDe serDe, Object[][] rows,
+      StructObjectInspector rowOI) throws SerDeException {
+    Writable serialized = null;
+    for (Object[] row : rows) {
+      serialized = serDe.serialize(row, rowOI);
+    }
+    // Pass null to complete a batch
+    if (serialized == null) {
+      serialized = serDe.serialize(null, rowOI);
+    }
+    final Object[][] deserializedRows = (Object[][]) 
serDe.deserialize(serialized);
+
+    for (int rowIndex = 0; rowIndex < Math.min(deserializedRows.length, 
rows.length); rowIndex++) {
+      final Object[] row = rows[rowIndex];
+      final Object[] deserializedRow = deserializedRows[rowIndex];
+      assertEquals(row.length, deserializedRow.length);
+
+      final List<? extends StructField> fields = rowOI.getAllStructFieldRefs();
+      for (int fieldIndex = 0; fieldIndex < fields.size(); fieldIndex++) {
+        final StructField field = fields.get(fieldIndex);
+        final ObjectInspector fieldObjInspector = 
field.getFieldObjectInspector();
+        switch (fieldObjInspector.getCategory()) {
+          case PRIMITIVE:
+            final PrimitiveObjectInspector primitiveObjInspector =
+                (PrimitiveObjectInspector) fieldObjInspector;
+            switch (primitiveObjInspector.getPrimitiveCategory()) {
+              case STRING:
+              case VARCHAR:
+              case CHAR:
+                assertEquals(Objects.toString(row[fieldIndex]),
+                    Objects.toString(deserializedRow[fieldIndex]));
+                break;
+              default:
+                assertEquals(row[fieldIndex], deserializedRow[fieldIndex]);
+                break;
+            }
+            break;
+          case STRUCT:
+            final Object[] rowStruct = (Object[]) row[fieldIndex];
+            final List deserializedRowStruct = (List) 
deserializedRow[fieldIndex];
+            assertArrayEquals(rowStruct, deserializedRowStruct.toArray());
+            break;
+          case LIST:
+          case UNION:
+            assertEquals(row[fieldIndex], deserializedRow[fieldIndex]);
+            break;
+          case MAP:
+            Map rowMap = (Map) row[fieldIndex];
+            Map deserializedRowMap = (Map) deserializedRow[fieldIndex];
+            Set rowMapKeySet = rowMap.keySet();
+            Set deserializedRowMapKeySet = deserializedRowMap.keySet();
+            assertTrue(rowMapKeySet.containsAll(deserializedRowMapKeySet));
+            assertTrue(deserializedRowMapKeySet.containsAll(rowMapKeySet));
+            for (Object key : rowMapKeySet) {
+              assertEquals(rowMap.get(key), deserializedRowMap.get(key));
+            }
+            break;
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testComprehensive() throws SerDeException {
+    String[][] schema = {
+        {"datatypes.c1", "int"},
+        {"datatypes.c2", "boolean"},
+        {"datatypes.c3", "double"},
+        {"datatypes.c4", "string"},
+        {"datatypes.c5", "array<int>"},
+        {"datatypes.c6", "map<int,string>"},
+        {"datatypes.c7", "map<string,string>"},
+        {"datatypes.c8", "struct<r:string,s:int,t:double>"},
+        {"datatypes.c9", "tinyint"},
+        {"datatypes.c10", "smallint"},
+        {"datatypes.c11", "float"},
+        {"datatypes.c12", "bigint"},
+        {"datatypes.c13", "array<array<string>>"},
+        {"datatypes.c14", "map<int,map<int,int>>"},
+        {"datatypes.c15", "struct<r:int,s:struct<a:int,b:string>>"},
+        {"datatypes.c16", "array<struct<m:map<string,string>,n:int>>"},
+        {"datatypes.c17", "timestamp"},
+        {"datatypes.c18", "decimal(16,7)"},
+        {"datatypes.c19", "binary"},
+        {"datatypes.c20", "date"},
+        {"datatypes.c21", "varchar(20)"},
+        {"datatypes.c22", "char(15)"},
+        {"datatypes.c23", "binary"},
+    };
+
+    Object[][] comprehensiveRows = {
+        {
+          intW(0), // c1:int
+            new BooleanWritable(false), // c2:boolean
+            doubleW(0), // c3:double
+            text("Hello"), // c4:string
+            newArrayList(intW(0), intW(1), intW(2)), // c5:array<int>
+            Maps.toMap(
+                newArrayList(intW(0), intW(1), intW(2)),
+                input -> text("Number " + input)), // c6:map<int,string>
+            Maps.toMap(
+                newArrayList(text("apple"), text("banana"), text("carrot")),
+                input -> text(input.toString().toUpperCase())), // 
c7:map<string,string>
+            new Object[] {text("0"), intW(1), doubleW(2)}, // 
c8:struct<r:string,s:int,t:double>
+            byteW(0), // c9:tinyint
+            shortW(0), // c10:smallint
+            floatW(0), // c11:float
+            longW(0), // c12:bigint
+            newArrayList(
+                newArrayList(text("a"), text("b"), text("c")),
+                newArrayList(text("A"), text("B"), text("C"))), // 
c13:array<array<string>>
+            Maps.toMap(
+                newArrayList(intW(0), intW(1), intW(2)),
+                x -> Maps.toMap(
+                    newArrayList(x, intW(x.get() * 2)),
+                    y -> y)), // c14:map<int,map<int,int>>
+            new Object[] {
+                intW(0),
+                newArrayList(
+                    intW(1),
+                    text("Hello"))}, // 
c15:struct<r:int,s:struct<a:int,b:string>>
+            Collections.singletonList(
+                newArrayList(
+                    Maps.toMap(
+                        newArrayList(text("hello")),
+                        input -> text(input.toString().toUpperCase())),
+                    intW(0))), // c16:array<struct<m:map<string,string>,n:int>>
+            new TimestampWritable(new Timestamp(NOW)), // c17:timestamp
+            decimalW(HiveDecimal.create(0, 0)), // c18:decimal(16,7)
+            new BytesWritable("Hello".getBytes()), // c19:binary
+            new DateWritable(123), // c20:date
+            varcharW("x", 20), // c21:varchar(20)
+            charW("y", 15), // c22:char(15)
+            new BytesWritable("world!".getBytes()), // c23:binary
+        },
+    };
+
+    initAndSerializeAndDeserialize(schema, comprehensiveRows);
+  }
+
+  private <E> List<E> newArrayList(E ... elements) {
+    return Lists.newArrayList(elements);
+  }
+
+  @Test
+  public void testPrimitiveInteger() throws SerDeException {
+    String[][] schema = {
+        {"tinyint1", "tinyint"},
+        {"smallint1", "smallint"},
+        {"int1", "int"},
+        {"bigint1", "bigint"}
+    };
+
+    initAndSerializeAndDeserialize(schema, INTEGER_ROWS);
+  }
+
+  @Test
+  public void testPrimitiveBigInt10000() throws SerDeException {
+    String[][] schema = {
+        {"bigint1", "bigint"}
+    };
+
+    final int batchSize = 1000;
+    final Object[][] integerRows = new Object[batchSize][];
+    final AbstractSerDe serDe = new ArrowColumnarBatchSerDe();
+    StructObjectInspector rowOI = initSerDe(serDe, schema);
+
+    for (int j = 0; j < 10; j++) {
+      for (int i = 0; i < batchSize; i++) {
+        integerRows[i] = new Object[] {longW(i + j * batchSize)};
+      }
+
+      serializeAndDeserialize(serDe, integerRows, rowOI);
+    }
+  }
+
+  @Test
+  public void testPrimitiveBigIntRandom() {
+    try {
+      String[][] schema = {
+          {"bigint1", "bigint"}
+      };
+
+      final AbstractSerDe serDe = new ArrowColumnarBatchSerDe();
+      StructObjectInspector rowOI = initSerDe(serDe, schema);
+
+      final Random random = new Random();
+      for (int j = 0; j < 1000; j++) {
+        final int batchSize = random.nextInt(1000);
+        final Object[][] integerRows = new Object[batchSize][];
+        for (int i = 0; i < batchSize; i++) {
+          integerRows[i] = new Object[] {longW(random.nextLong())};
+        }
+
+        serializeAndDeserialize(serDe, integerRows, rowOI);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void testPrimitiveFloat() throws SerDeException {
+    String[][] schema = {
+        {"float1", "float"},
+        {"double1", "double"},
+    };
+
+    initAndSerializeAndDeserialize(schema, FLOAT_ROWS);
+  }
+
+  @Test(expected = AssertionError.class)
+  public void testPrimitiveFloatNaN() throws SerDeException {
+    String[][] schema = {
+        {"float1", "float"},
+    };
+
+    Object[][] rows = {{new FloatWritable(Float.NaN)}};
+
+    initAndSerializeAndDeserialize(schema, rows);
+  }
+
+  @Test(expected = AssertionError.class)
+  public void testPrimitiveDoubleNaN() throws SerDeException {
+    String[][] schema = {
+        {"double1", "double"},
+    };
+
+    Object[][] rows = {{new DoubleWritable(Double.NaN)}};
+
+    initAndSerializeAndDeserialize(schema, rows);
+  }
+
+  @Test
+  public void testPrimitiveString() throws SerDeException {
+    String[][] schema = {
+        {"string1", "string"},
+        {"char1", "char(10)"},
+        {"varchar1", "varchar(10)"},
+    };
+
+    initAndSerializeAndDeserialize(schema, STRING_ROWS);
+  }
+
+  @Test
+  public void testPrimitiveDTI() throws SerDeException {
+    String[][] schema = {
+        {"date1", "date"},
+        {"timestamp1", "timestamp"},
+        {"interval_year_month1", "interval_year_month"},
+        {"interval_day_time1", "interval_day_time"},
+    };
+
+    initAndSerializeAndDeserialize(schema, DTI_ROWS);
+  }
+
+  @Test
+  public void testPrimitiveDecimal() throws SerDeException {
+    String[][] schema = {
+        {"decimal1", "decimal(38,10)"},
+    };
+
+    initAndSerializeAndDeserialize(schema, DECIMAL_ROWS);
+  }
+
+  @Test
+  public void testPrimitiveBoolean() throws SerDeException {
+    String[][] schema = {
+        {"boolean1", "boolean"},
+    };
+
+    initAndSerializeAndDeserialize(schema, BOOLEAN_ROWS);
+  }
+
+  @Test
+  public void testPrimitiveBinary() throws SerDeException {
+    String[][] schema = {
+        {"binary1", "binary"},
+    };
+
+    initAndSerializeAndDeserialize(schema, BINARY_ROWS);
+  }
+
+  private List[][] toList(Object[][] rows) {
+    List[][] array = new List[rows.length][];
+    for (int rowIndex = 0; rowIndex < rows.length; rowIndex++) {
+      Object[] row = rows[rowIndex];
+      array[rowIndex] = new List[row.length];
+      for (int fieldIndex = 0; fieldIndex < row.length; fieldIndex++) {
+        array[rowIndex][fieldIndex] = newArrayList(row[fieldIndex]);
+      }
+    }
+    return array;
+  }
+
+  @Test
+  public void testListInteger() throws SerDeException {
+    String[][] schema = {
+        {"tinyint_list", "array<tinyint>"},
+        {"smallint_list", "array<smallint>"},
+        {"int_list", "array<int>"},
+        {"bigint_list", "array<bigint>"},
+    };
+
+    initAndSerializeAndDeserialize(schema, toList(INTEGER_ROWS));
+  }
+
+  @Test
+  public void testListFloat() throws SerDeException {
+    String[][] schema = {
+        {"float_list", "array<float>"},
+        {"double_list", "array<double>"},
+    };
+
+    initAndSerializeAndDeserialize(schema, toList(FLOAT_ROWS));
+  }
+
+  @Test
+  public void testListString() throws SerDeException {
+    String[][] schema = {
+        {"string_list", "array<string>"},
+        {"char_list", "array<char(10)>"},
+        {"varchar_list", "array<varchar(10)>"},
+    };
+
+    initAndSerializeAndDeserialize(schema, toList(STRING_ROWS));
+  }
+
+  @Test
+  public void testListDTI() throws SerDeException {
+    String[][] schema = {
+        {"date_list", "array<date>"},
+        {"timestamp_list", "array<timestamp>"},
+        {"interval_year_month_list", "array<interval_year_month>"},
+        {"interval_day_time_list", "array<interval_day_time>"},
+    };
+
+    initAndSerializeAndDeserialize(schema, toList(DTI_ROWS));
+  }
+
+  @Test
+  public void testListBoolean() throws SerDeException {
+    String[][] schema = {
+        {"boolean_list", "array<boolean>"},
+    };
+
+    initAndSerializeAndDeserialize(schema, toList(BOOLEAN_ROWS));
+  }
+
+  @Test
+  public void testListBinary() throws SerDeException {
+    String[][] schema = {
+        {"binary_list", "array<binary>"},
+    };
+
+    initAndSerializeAndDeserialize(schema, toList(BINARY_ROWS));
+  }
+
+  private StandardUnionObjectInspector.StandardUnion union(int tag, Object 
object) {
+    return new StandardUnionObjectInspector.StandardUnion((byte) tag, object);
+  }
+
+  public void testUnionInteger() throws SerDeException {
+    String[][] schema = {
+        {"int_union", "uniontype<tinyint,smallint,int,bigint>"},
+    };
+
+    StandardUnionObjectInspector.StandardUnion[][] integerUnions = {
+        {union(0, byteW(0))},
+        {union(1, shortW(1))},
+        {union(2, intW(2))},
+        {union(3, longW(3))},
+    };
+
+    initAndSerializeAndDeserialize(schema, integerUnions);
+  }
+
+  public void testUnionFloat() throws SerDeException {
+    String[][] schema = {
+        {"float_union", "uniontype<float,double>"},
+    };
+
+    StandardUnionObjectInspector.StandardUnion[][] floatUnions = {
+        {union(0, floatW(0f))},
+        {union(1, doubleW(1d))},
+    };
+
+    initAndSerializeAndDeserialize(schema, floatUnions);
+  }
+
+  public void testUnionString() throws SerDeException {
+    String[][] schema = {
+        {"string_union", "uniontype<string,int>"},
+    };
+
+    StandardUnionObjectInspector.StandardUnion[][] stringUnions = {
+        {union(0, text("Hello"))},
+        {union(1, intW(1))},
+    };
+
+    initAndSerializeAndDeserialize(schema, stringUnions);
+  }
+
+  public void testUnionChar() throws SerDeException {
+    String[][] schema = {
+        {"char_union", "uniontype<char(10),int>"},
+    };
+
+    StandardUnionObjectInspector.StandardUnion[][] charUnions = {
+        {union(0, charW("Hello", 10))},
+        {union(1, intW(1))},
+    };
+
+    initAndSerializeAndDeserialize(schema, charUnions);
+  }
+
+  public void testUnionVarchar() throws SerDeException {
+    String[][] schema = {
+        {"varchar_union", "uniontype<varchar(10),int>"},
+    };
+
+    StandardUnionObjectInspector.StandardUnion[][] varcharUnions = {
+        {union(0, varcharW("Hello", 10))},
+        {union(1, intW(1))},
+    };
+
+    initAndSerializeAndDeserialize(schema, varcharUnions);
+  }
+
+  public void testUnionDTI() throws SerDeException {
+    String[][] schema = {
+        {"date_union", 
"uniontype<date,timestamp,interval_year_month,interval_day_time>"},
+    };
+    long NOW = System.currentTimeMillis();
+
+    StandardUnionObjectInspector.StandardUnion[][] dtiUnions = {
+        {union(0, new DateWritable(DateWritable.millisToDays(NOW)))},
+        {union(1, new TimestampWritable(new Timestamp(NOW)))},
+        {union(2, new HiveIntervalYearMonthWritable(new 
HiveIntervalYearMonth(1, 2)))},
+        {union(3, new HiveIntervalDayTimeWritable(new HiveIntervalDayTime(1, 
2, 3, 4, 5_000_000)))},
+    };
+
+    initAndSerializeAndDeserialize(schema, dtiUnions);
+  }
+
+  public void testUnionBooleanBinary() throws SerDeException {
+    String[][] schema = {
+        {"boolean_union", "uniontype<boolean,binary>"},
+    };
+
+    StandardUnionObjectInspector.StandardUnion[][] booleanBinaryUnions = {
+        {union(0, new BooleanWritable(true))},
+        {union(1, new BytesWritable("Hello".getBytes()))},
+    };
+
+    initAndSerializeAndDeserialize(schema, booleanBinaryUnions);
+  }
+
+  private Object[][][] toStruct(Object[][] rows) {
+    Object[][][] struct = new Object[rows.length][][];
+    for (int rowIndex = 0; rowIndex < rows.length; rowIndex++) {
+      Object[] row = rows[rowIndex];
+      struct[rowIndex] = new Object[][] {row};
+    }
+    return struct;
+  }
+
+  @Test
+  public void testStructInteger() throws SerDeException {
+    String[][] schema = {
+        {"int_struct", 
"struct<tinyint1:tinyint,smallint1:smallint,int1:int,bigint1:bigint>"},
+    };
+
+    initAndSerializeAndDeserialize(schema, toStruct(INTEGER_ROWS));
+  }
+
+  @Test
+  public void testStructFloat() throws SerDeException {
+    String[][] schema = {
+        {"float_struct", "struct<float1:float,double1:double>"},
+    };
+
+    initAndSerializeAndDeserialize(schema, toStruct(FLOAT_ROWS));
+  }
+
+  @Test
+  public void testStructString() throws SerDeException {
+    String[][] schema = {
+        {"string_struct", 
"struct<string1:string,char1:char(10),varchar1:varchar(10)>"},
+    };
+
+    initAndSerializeAndDeserialize(schema, toStruct(STRING_ROWS));
+  }
+
+  @Test
+  public void testStructDTI() throws SerDeException {
+    String[][] schema = {
+        {"date_struct", "struct<date1:date,timestamp1:timestamp," +
+            
"interval_year_month1:interval_year_month,interval_day_time1:interval_day_time>"},
+    };
+
+    initAndSerializeAndDeserialize(schema, toStruct(DTI_ROWS));
+  }
+
+  @Test
+  public void testStructBoolean() throws SerDeException {
+    String[][] schema = {
+        {"boolean_struct", "struct<boolean1:boolean>"},
+    };
+
+    initAndSerializeAndDeserialize(schema, toStruct(BOOLEAN_ROWS));
+  }
+
+  @Test
+  public void testStructBinary() throws SerDeException {
+    String[][] schema = {
+        {"binary_struct", "struct<binary1:binary>"},
+    };
+
+    initAndSerializeAndDeserialize(schema, toStruct(BINARY_ROWS));
+  }
+
+  private Object[][] toMap(Object[][] rows) {
+    Map[][] array = new Map[rows.length][];
+    for (int rowIndex = 0; rowIndex < rows.length; rowIndex++) {
+      Object[] row = rows[rowIndex];
+      array[rowIndex] = new Map[row.length];
+      for (int fieldIndex = 0; fieldIndex < row.length; fieldIndex++) {
+        Map map = Maps.newHashMap();
+        map.put(new Text(String.valueOf(row[fieldIndex])), row[fieldIndex]);
+        array[rowIndex][fieldIndex] = map;
+      }
+    }
+    return array;
+  }
+
+  @Test
+  public void testMapInteger() throws SerDeException {
+    String[][] schema = {
+        {"tinyint_map", "map<string,tinyint>"},
+        {"smallint_map", "map<string,smallint>"},
+        {"int_map", "map<string,int>"},
+        {"bigint_map", "map<string,bigint>"},
+    };
+
+    initAndSerializeAndDeserialize(schema, toMap(INTEGER_ROWS));
+  }
+
+  @Test
+  public void testMapFloat() throws SerDeException {
+    String[][] schema = {
+        {"float_map", "map<string,float>"},
+        {"double_map", "map<string,double>"},
+    };
+
+    initAndSerializeAndDeserialize(schema, toMap(FLOAT_ROWS));
+  }
+
+  @Test
+  public void testMapString() throws SerDeException {
+    String[][] schema = {
+        {"string_map", "map<string,string>"},
+        {"char_map", "map<string,char(10)>"},
+        {"varchar_map", "map<string,varchar(10)>"},
+    };
+
+    initAndSerializeAndDeserialize(schema, toMap(STRING_ROWS));
+  }
+
+  @Test
+  public void testMapDTI() throws SerDeException {
+    String[][] schema = {
+        {"date_map", "map<string,date>"},
+        {"timestamp_map", "map<string,timestamp>"},
+        {"interval_year_month_map", "map<string,interval_year_month>"},
+        {"interval_day_time_map", "map<string,interval_day_time>"},
+    };
+
+    initAndSerializeAndDeserialize(schema, toMap(DTI_ROWS));
+  }
+
+  @Test
+  public void testMapBoolean() throws SerDeException {
+    String[][] schema = {
+        {"boolean_map", "map<string,boolean>"},
+    };
+
+    initAndSerializeAndDeserialize(schema, toMap(BOOLEAN_ROWS));
+  }
+
+  @Test
+  public void testMapBinary() throws SerDeException {
+    String[][] schema = {
+        {"binary_map", "map<string,binary>"},
+    };
+
+    initAndSerializeAndDeserialize(schema, toMap(BINARY_ROWS));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0e090e58/serde/pom.xml
----------------------------------------------------------------------
diff --git a/serde/pom.xml b/serde/pom.xml
index e005585..eca34af 100644
--- a/serde/pom.xml
+++ b/serde/pom.xml
@@ -71,6 +71,11 @@
       <version>${arrow.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-vector</artifactId>
+      <version>${arrow.version}</version>
+    </dependency>
+    <dependency>
       <groupId>com.carrotsearch</groupId>
       <artifactId>hppc</artifactId>
       <version>${hppc.version}</version>

Reply via email to