http://git-wip-us.apache.org/repos/asf/hive/blob/2726f302/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
new file mode 100644
index 0000000..bd23011
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.arrow;
+
+import io.netty.buffer.ArrowBuf;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.BitVectorHelper;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.IntervalDayVector;
+import org.apache.arrow.vector.IntervalYearVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeStampNanoVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.NullableMapVector;
+import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.IntervalDayTimeColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAssignRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_SIZE;
+import static 
org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil.createColumnVector;
+import static 
org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MS_PER_SECOND;
+import static 
org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.NS_PER_MS;
+import static 
org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.SECOND_PER_DAY;
+import static 
org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.toStructListTypeInfo;
+import static 
org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.toStructListVector;
+import static 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption.WRITABLE;
+import static 
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getTypeInfoFromObjectInspector;
+
+class Serializer {
+  private final int MAX_BUFFERED_ROWS;
+
+  // Schema
+  private final StructTypeInfo structTypeInfo;
+  private final int fieldSize;
+
+  // Hive columns
+  private final VectorizedRowBatch vectorizedRowBatch;
+  private final VectorAssignRow vectorAssignRow;
+  private int batchSize;
+
+  private final NullableMapVector rootVector;
+
+  Serializer(ArrowColumnarBatchSerDe serDe) throws SerDeException {
+    MAX_BUFFERED_ROWS = HiveConf.getIntVar(serDe.conf, HIVE_ARROW_BATCH_SIZE);
+    ArrowColumnarBatchSerDe.LOG.info("ArrowColumnarBatchSerDe max number of 
buffered columns: " + MAX_BUFFERED_ROWS);
+
+    // Schema
+    structTypeInfo = (StructTypeInfo) 
getTypeInfoFromObjectInspector(serDe.rowObjectInspector);
+    List<TypeInfo> fieldTypeInfos = 
structTypeInfo.getAllStructFieldTypeInfos();
+    fieldSize = fieldTypeInfos.size();
+
+    // Init Arrow stuffs
+    rootVector = NullableMapVector.empty(null, serDe.rootAllocator);
+
+    // Init Hive stuffs
+    vectorizedRowBatch = new VectorizedRowBatch(fieldSize);
+    for (int fieldIndex = 0; fieldIndex < fieldSize; fieldIndex++) {
+      final ColumnVector columnVector = 
createColumnVector(fieldTypeInfos.get(fieldIndex));
+      vectorizedRowBatch.cols[fieldIndex] = columnVector;
+      columnVector.init();
+    }
+    vectorizedRowBatch.ensureSize(MAX_BUFFERED_ROWS);
+    vectorAssignRow = new VectorAssignRow();
+    try {
+      vectorAssignRow.init(serDe.rowObjectInspector);
+    } catch (HiveException e) {
+      throw new SerDeException(e);
+    }
+  }
+
+  private ArrowWrapperWritable serializeBatch() {
+    rootVector.setValueCount(0);
+
+    for (int fieldIndex = 0; fieldIndex < vectorizedRowBatch.projectionSize; 
fieldIndex++) {
+      final int projectedColumn = 
vectorizedRowBatch.projectedColumns[fieldIndex];
+      final ColumnVector hiveVector = vectorizedRowBatch.cols[projectedColumn];
+      final TypeInfo fieldTypeInfo = 
structTypeInfo.getAllStructFieldTypeInfos().get(fieldIndex);
+      final String fieldName = 
structTypeInfo.getAllStructFieldNames().get(fieldIndex);
+      final FieldType fieldType = toFieldType(fieldTypeInfo);
+      final FieldVector arrowVector = rootVector.addOrGet(fieldName, 
fieldType, FieldVector.class);
+      arrowVector.setInitialCapacity(batchSize);
+      arrowVector.allocateNew();
+      write(arrowVector, hiveVector, fieldTypeInfo, batchSize);
+    }
+    vectorizedRowBatch.reset();
+    rootVector.setValueCount(batchSize);
+
+    batchSize = 0;
+    VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(rootVector);
+    return new ArrowWrapperWritable(vectorSchemaRoot);
+  }
+
+  private FieldType toFieldType(TypeInfo typeInfo) {
+    return new FieldType(true, toArrowType(typeInfo), null);
+  }
+
+  private ArrowType toArrowType(TypeInfo typeInfo) {
+    switch (typeInfo.getCategory()) {
+      case PRIMITIVE:
+        switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
+          case BOOLEAN:
+            return Types.MinorType.BIT.getType();
+          case BYTE:
+            return Types.MinorType.TINYINT.getType();
+          case SHORT:
+            return Types.MinorType.SMALLINT.getType();
+          case INT:
+            return Types.MinorType.INT.getType();
+          case LONG:
+            return Types.MinorType.BIGINT.getType();
+          case FLOAT:
+            return Types.MinorType.FLOAT4.getType();
+          case DOUBLE:
+            return Types.MinorType.FLOAT8.getType();
+          case STRING:
+          case VARCHAR:
+          case CHAR:
+            return Types.MinorType.VARCHAR.getType();
+          case DATE:
+            return Types.MinorType.DATEDAY.getType();
+          case TIMESTAMP:
+            return Types.MinorType.TIMESTAMPNANO.getType();
+          case BINARY:
+            return Types.MinorType.VARBINARY.getType();
+          case DECIMAL:
+            final DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo;
+            return new ArrowType.Decimal(decimalTypeInfo.precision(), 
decimalTypeInfo.scale());
+          case INTERVAL_YEAR_MONTH:
+            return Types.MinorType.INTERVALYEAR.getType();
+          case INTERVAL_DAY_TIME:
+            return Types.MinorType.INTERVALDAY.getType();
+          case VOID:
+          case TIMESTAMPLOCALTZ:
+          case UNKNOWN:
+          default:
+            throw new IllegalArgumentException();
+        }
+      case LIST:
+        return ArrowType.List.INSTANCE;
+      case STRUCT:
+        return ArrowType.Struct.INSTANCE;
+      case MAP:
+        return ArrowType.List.INSTANCE;
+      case UNION:
+      default:
+        throw new IllegalArgumentException();
+    }
+  }
+
+  private void write(FieldVector arrowVector, ColumnVector hiveVector, 
TypeInfo typeInfo, int size) {
+    switch (typeInfo.getCategory()) {
+      case PRIMITIVE:
+        writePrimitive(arrowVector, hiveVector, typeInfo, size);
+        break;
+      case LIST:
+        writeList((ListVector) arrowVector, (ListColumnVector) hiveVector, 
(ListTypeInfo) typeInfo, size);
+        break;
+      case STRUCT:
+        writeStruct((MapVector) arrowVector, (StructColumnVector) hiveVector, 
(StructTypeInfo) typeInfo, size);
+        break;
+      case UNION:
+        writeUnion(arrowVector, hiveVector, typeInfo, size);
+        break;
+      case MAP:
+        writeMap((ListVector) arrowVector, (MapColumnVector) hiveVector, 
(MapTypeInfo) typeInfo, size);
+        break;
+      default:
+        throw new IllegalArgumentException();
+    }
+  }
+
+  private void writeMap(ListVector arrowVector, MapColumnVector hiveVector, 
MapTypeInfo typeInfo,
+      int size) {
+    final ListTypeInfo structListTypeInfo = toStructListTypeInfo(typeInfo);
+    final ListColumnVector structListVector = toStructListVector(hiveVector);
+
+    write(arrowVector, structListVector, structListTypeInfo, size);
+
+    final ArrowBuf validityBuffer = arrowVector.getValidityBuffer();
+    for (int rowIndex = 0; rowIndex < size; rowIndex++) {
+      if (hiveVector.isNull[rowIndex]) {
+        BitVectorHelper.setValidityBit(validityBuffer, rowIndex, 0);
+      } else {
+        BitVectorHelper.setValidityBitToOne(validityBuffer, rowIndex);
+      }
+    }
+  }
+
+  private void writeUnion(FieldVector arrowVector, ColumnVector hiveVector, 
TypeInfo typeInfo,
+      int size) {
+    final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
+    final List<TypeInfo> objectTypeInfos = 
unionTypeInfo.getAllUnionObjectTypeInfos();
+    final UnionColumnVector hiveUnionVector = (UnionColumnVector) hiveVector;
+    final ColumnVector[] hiveObjectVectors = hiveUnionVector.fields;
+
+    final int tag = hiveUnionVector.tags[0];
+    final ColumnVector hiveObjectVector = hiveObjectVectors[tag];
+    final TypeInfo objectTypeInfo = objectTypeInfos.get(tag);
+
+    write(arrowVector, hiveObjectVector, objectTypeInfo, size);
+  }
+
+  private void writeStruct(MapVector arrowVector, StructColumnVector 
hiveVector,
+      StructTypeInfo typeInfo, int size) {
+    final List<String> fieldNames = typeInfo.getAllStructFieldNames();
+    final List<TypeInfo> fieldTypeInfos = 
typeInfo.getAllStructFieldTypeInfos();
+    final ColumnVector[] hiveFieldVectors = hiveVector.fields;
+    final int fieldSize = fieldTypeInfos.size();
+
+    for (int fieldIndex = 0; fieldIndex < fieldSize; fieldIndex++) {
+      final TypeInfo fieldTypeInfo = fieldTypeInfos.get(fieldIndex);
+      final ColumnVector hiveFieldVector = hiveFieldVectors[fieldIndex];
+      final String fieldName = fieldNames.get(fieldIndex);
+      final FieldVector arrowFieldVector =
+          arrowVector.addOrGet(fieldName,
+              toFieldType(fieldTypeInfos.get(fieldIndex)), FieldVector.class);
+      arrowFieldVector.setInitialCapacity(size);
+      arrowFieldVector.allocateNew();
+      write(arrowFieldVector, hiveFieldVector, fieldTypeInfo, size);
+    }
+
+    final ArrowBuf validityBuffer = arrowVector.getValidityBuffer();
+    for (int rowIndex = 0; rowIndex < size; rowIndex++) {
+      if (hiveVector.isNull[rowIndex]) {
+        BitVectorHelper.setValidityBit(validityBuffer, rowIndex, 0);
+      } else {
+        BitVectorHelper.setValidityBitToOne(validityBuffer, rowIndex);
+      }
+    }
+  }
+
+  private void writeList(ListVector arrowVector, ListColumnVector hiveVector, 
ListTypeInfo typeInfo,
+      int size) {
+    final int OFFSET_WIDTH = 4;
+    final TypeInfo elementTypeInfo = typeInfo.getListElementTypeInfo();
+    final ColumnVector hiveElementVector = hiveVector.child;
+    final FieldVector arrowElementVector =
+        (FieldVector) 
arrowVector.addOrGetVector(toFieldType(elementTypeInfo)).getVector();
+    arrowElementVector.setInitialCapacity(hiveVector.childCount);
+    arrowElementVector.allocateNew();
+
+    write(arrowElementVector, hiveElementVector, elementTypeInfo, 
hiveVector.childCount);
+
+    final ArrowBuf offsetBuffer = arrowVector.getOffsetBuffer();
+    int nextOffset = 0;
+
+    for (int rowIndex = 0; rowIndex < size; rowIndex++) {
+      if (hiveVector.isNull[rowIndex]) {
+        offsetBuffer.setInt(rowIndex * OFFSET_WIDTH, nextOffset);
+      } else {
+        offsetBuffer.setInt(rowIndex * OFFSET_WIDTH, nextOffset);
+        nextOffset += (int) hiveVector.lengths[rowIndex];
+        arrowVector.setNotNull(rowIndex);
+      }
+    }
+    offsetBuffer.setInt(size * OFFSET_WIDTH, nextOffset);
+  }
+
+  private void writePrimitive(FieldVector arrowVector, ColumnVector 
hiveVector, TypeInfo typeInfo,
+      int size) {
+    final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory =
+        ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory();
+    switch (primitiveCategory) {
+      case BOOLEAN:
+        {
+          final BitVector bitVector = (BitVector) arrowVector;
+          for (int i = 0; i < size; i++) {
+            if (hiveVector.isNull[i]) {
+              bitVector.setNull(i);
+            } else {
+              bitVector.set(i, (int) ((LongColumnVector) 
hiveVector).vector[i]);
+            }
+          }
+        }
+        break;
+      case BYTE:
+        {
+          final TinyIntVector tinyIntVector = (TinyIntVector) arrowVector;
+          for (int i = 0; i < size; i++) {
+            if (hiveVector.isNull[i]) {
+              tinyIntVector.setNull(i);
+            } else {
+              tinyIntVector.set(i, (byte) ((LongColumnVector) 
hiveVector).vector[i]);
+            }
+          }
+        }
+        break;
+      case SHORT:
+        {
+          final SmallIntVector smallIntVector = (SmallIntVector) arrowVector;
+          for (int i = 0; i < size; i++) {
+            if (hiveVector.isNull[i]) {
+              smallIntVector.setNull(i);
+            } else {
+              smallIntVector.set(i, (short) ((LongColumnVector) 
hiveVector).vector[i]);
+            }
+          }
+        }
+        break;
+      case INT:
+        {
+          final IntVector intVector = (IntVector) arrowVector;
+          for (int i = 0; i < size; i++) {
+            if (hiveVector.isNull[i]) {
+              intVector.setNull(i);
+            } else {
+              intVector.set(i, (int) ((LongColumnVector) 
hiveVector).vector[i]);
+            }
+          }
+        }
+        break;
+      case LONG:
+        {
+          final BigIntVector bigIntVector = (BigIntVector) arrowVector;
+          for (int i = 0; i < size; i++) {
+            if (hiveVector.isNull[i]) {
+              bigIntVector.setNull(i);
+            } else {
+              bigIntVector.set(i, ((LongColumnVector) hiveVector).vector[i]);
+            }
+          }
+        }
+        break;
+      case FLOAT:
+        {
+          final Float4Vector float4Vector = (Float4Vector) arrowVector;
+          for (int i = 0; i < size; i++) {
+            if (hiveVector.isNull[i]) {
+              float4Vector.setNull(i);
+            } else {
+              float4Vector.set(i, (float) ((DoubleColumnVector) 
hiveVector).vector[i]);
+            }
+          }
+        }
+        break;
+      case DOUBLE:
+        {
+          final Float8Vector float8Vector = (Float8Vector) arrowVector;
+          for (int i = 0; i < size; i++) {
+            if (hiveVector.isNull[i]) {
+              float8Vector.setNull(i);
+            } else {
+              float8Vector.set(i, ((DoubleColumnVector) hiveVector).vector[i]);
+            }
+          }
+        }
+        break;
+      case STRING:
+      case VARCHAR:
+      case CHAR:
+        {
+          final VarCharVector varCharVector = (VarCharVector) arrowVector;
+          final BytesColumnVector bytesVector = (BytesColumnVector) hiveVector;
+          for (int i = 0; i < size; i++) {
+            if (hiveVector.isNull[i]) {
+              varCharVector.setNull(i);
+            } else {
+              varCharVector.setSafe(i, bytesVector.vector[i], 
bytesVector.start[i], bytesVector.length[i]);
+            }
+          }
+        }
+        break;
+      case DATE:
+        {
+          final DateDayVector dateDayVector = (DateDayVector) arrowVector;
+          for (int i = 0; i < size; i++) {
+            if (hiveVector.isNull[i]) {
+              dateDayVector.setNull(i);
+            } else {
+              dateDayVector.set(i, (int) ((LongColumnVector) 
hiveVector).vector[i]);
+            }
+          }
+        }
+        break;
+      case TIMESTAMP:
+        {
+          final TimeStampNanoVector timeStampNanoVector = 
(TimeStampNanoVector) arrowVector;
+          final TimestampColumnVector timestampColumnVector = 
(TimestampColumnVector) hiveVector;
+          for (int i = 0; i < size; i++) {
+            if (hiveVector.isNull[i]) {
+              timeStampNanoVector.setNull(i);
+            } else {
+              // Time = second + sub-second
+              final long secondInMillis = timestampColumnVector.getTime(i);
+              final long secondInNanos = (secondInMillis - secondInMillis % 
1000) * NS_PER_MS; // second
+              final long subSecondInNanos = timestampColumnVector.getNanos(i); 
// sub-second
+
+              if ((secondInMillis > 0 && secondInNanos < 0) || (secondInMillis 
< 0 && secondInNanos > 0)) {
+                // If the timestamp cannot be represented in long nanosecond, 
set it as a null value
+                timeStampNanoVector.setNull(i);
+              } else {
+                timeStampNanoVector.set(i, secondInNanos + subSecondInNanos);
+              }
+            }
+          }
+        }
+        break;
+      case BINARY:
+        {
+          final VarBinaryVector varBinaryVector = (VarBinaryVector) 
arrowVector;
+          final BytesColumnVector bytesVector = (BytesColumnVector) hiveVector;
+          for (int i = 0; i < size; i++) {
+            if (hiveVector.isNull[i]) {
+              varBinaryVector.setNull(i);
+            } else {
+              varBinaryVector.setSafe(i, bytesVector.vector[i], 
bytesVector.start[i], bytesVector.length[i]);
+            }
+          }
+        }
+        break;
+      case DECIMAL:
+        {
+          final DecimalVector decimalVector = (DecimalVector) arrowVector;
+          final int scale = decimalVector.getScale();
+          for (int i = 0; i < size; i++) {
+            if (hiveVector.isNull[i]) {
+              decimalVector.setNull(i);
+            } else {
+              decimalVector.set(i,
+                  ((DecimalColumnVector) 
hiveVector).vector[i].getHiveDecimal().bigDecimalValue().setScale(scale));
+            }
+          }
+        }
+        break;
+      case INTERVAL_YEAR_MONTH:
+        {
+          final IntervalYearVector intervalYearVector = (IntervalYearVector) 
arrowVector;
+          for (int i = 0; i < size; i++) {
+            if (hiveVector.isNull[i]) {
+              intervalYearVector.setNull(i);
+            } else {
+              intervalYearVector.set(i, (int) ((LongColumnVector) 
hiveVector).vector[i]);
+            }
+          }
+        }
+        break;
+      case INTERVAL_DAY_TIME:
+        {
+          final IntervalDayVector intervalDayVector = (IntervalDayVector) 
arrowVector;
+          final IntervalDayTimeColumnVector intervalDayTimeColumnVector =
+              (IntervalDayTimeColumnVector) hiveVector;
+          for (int i = 0; i < size; i++) {
+            if (hiveVector.isNull[i]) {
+              intervalDayVector.setNull(i);
+            } else {
+              final long totalSeconds = 
intervalDayTimeColumnVector.getTotalSeconds(i);
+              final long days = totalSeconds / SECOND_PER_DAY;
+              final long millis =
+                  (totalSeconds - days * SECOND_PER_DAY) * MS_PER_SECOND +
+                      intervalDayTimeColumnVector.getNanos(i) / NS_PER_MS;
+              intervalDayVector.set(i, (int) days, (int) millis);
+            }
+          }
+        }
+        break;
+      case VOID:
+      case UNKNOWN:
+      case TIMESTAMPLOCALTZ:
+      default:
+        throw new IllegalArgumentException();
+    }
+  }
+
+  ArrowWrapperWritable serialize(Object obj, ObjectInspector objInspector) {
+    // if row is null, it means there are no more rows (closeOp()).
+    // another case can be that the buffer is full.
+    if (obj == null) {
+      return serializeBatch();
+    }
+    List<Object> standardObjects = new ArrayList<Object>();
+    ObjectInspectorUtils.copyToStandardObject(standardObjects, obj,
+        ((StructObjectInspector) objInspector), WRITABLE);
+
+    vectorAssignRow.assignRow(vectorizedRowBatch, batchSize, standardObjects, 
fieldSize);
+    batchSize++;
+    if (batchSize == MAX_BUFFERED_ROWS) {
+      return serializeBatch();
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2726f302/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java
index bcb7a88..74f6624 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.hive.serde2.io.ShortWritable;
 import org.apache.hadoop.hive.serde2.io.TimestampWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -54,7 +53,6 @@ import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -66,10 +64,11 @@ import java.util.Objects;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
 
 public class TestArrowColumnarBatchSerDe {
   private Configuration conf;
@@ -105,14 +104,39 @@ public class TestArrowColumnarBatchSerDe {
       {null, null, null},
   };
 
-  private final static long NOW = System.currentTimeMillis();
+  private final static long TIME_IN_MS = TimeUnit.DAYS.toMillis(365 + 31 + 3);
+  private final static long NEGATIVE_TIME_IN_MS = TimeUnit.DAYS.toMillis(-9 * 
365 + 31 + 3);
+  private final static Timestamp TIMESTAMP;
+  private final static Timestamp NEGATIVE_TIMESTAMP_WITHOUT_NANOS;
+  private final static Timestamp NEGATIVE_TIMESTAMP_WITH_NANOS;
+
+  static {
+    TIMESTAMP = new Timestamp(TIME_IN_MS);
+    TIMESTAMP.setNanos(123456789);
+    NEGATIVE_TIMESTAMP_WITHOUT_NANOS = new Timestamp(NEGATIVE_TIME_IN_MS);
+    NEGATIVE_TIMESTAMP_WITH_NANOS = new Timestamp(NEGATIVE_TIME_IN_MS);
+    NEGATIVE_TIMESTAMP_WITH_NANOS.setNanos(123456789);
+  }
+
   private final static Object[][] DTI_ROWS = {
       {
-          new DateWritable(DateWritable.millisToDays(NOW)),
-          new TimestampWritable(new Timestamp(NOW)),
+          new DateWritable(DateWritable.millisToDays(TIME_IN_MS)),
+          new TimestampWritable(TIMESTAMP),
           new HiveIntervalYearMonthWritable(new HiveIntervalYearMonth(1, 2)),
           new HiveIntervalDayTimeWritable(new HiveIntervalDayTime(1, 2, 3, 4, 
5_000_000))
       },
+      {
+          new DateWritable(DateWritable.millisToDays(NEGATIVE_TIME_IN_MS)),
+          new TimestampWritable(NEGATIVE_TIMESTAMP_WITHOUT_NANOS),
+          null,
+          null
+      },
+      {
+          null,
+          new TimestampWritable(NEGATIVE_TIMESTAMP_WITH_NANOS),
+          null,
+          null
+      },
       {null, null, null, null},
   };
 
@@ -184,7 +208,7 @@ public class TestArrowColumnarBatchSerDe {
   }
 
   private void initAndSerializeAndDeserialize(String[][] schema, Object[][] 
rows) throws SerDeException {
-    AbstractSerDe serDe = new ArrowColumnarBatchSerDe();
+    ArrowColumnarBatchSerDe serDe = new ArrowColumnarBatchSerDe();
     StructObjectInspector rowOI = initSerDe(serDe, schema);
     serializeAndDeserialize(serDe, rows, rowOI);
   }
@@ -214,9 +238,9 @@ public class TestArrowColumnarBatchSerDe {
         TypeInfoFactory.getStructTypeInfo(fieldNameList, typeInfoList));
   }
 
-  private void serializeAndDeserialize(AbstractSerDe serDe, Object[][] rows,
-      StructObjectInspector rowOI) throws SerDeException {
-    Writable serialized = null;
+  private void serializeAndDeserialize(ArrowColumnarBatchSerDe serDe, 
Object[][] rows,
+      StructObjectInspector rowOI) {
+    ArrowWrapperWritable serialized = null;
     for (Object[] row : rows) {
       serialized = serDe.serialize(row, rowOI);
     }
@@ -224,6 +248,7 @@ public class TestArrowColumnarBatchSerDe {
     if (serialized == null) {
       serialized = serDe.serialize(null, rowOI);
     }
+    String s = serialized.getVectorSchemaRoot().contentToTSVString();
     final Object[][] deserializedRows = (Object[][]) 
serDe.deserialize(serialized);
 
     for (int rowIndex = 0; rowIndex < Math.min(deserializedRows.length, 
rows.length); rowIndex++) {
@@ -254,21 +279,28 @@ public class TestArrowColumnarBatchSerDe {
           case STRUCT:
             final Object[] rowStruct = (Object[]) row[fieldIndex];
             final List deserializedRowStruct = (List) 
deserializedRow[fieldIndex];
-            assertArrayEquals(rowStruct, deserializedRowStruct.toArray());
+            if (rowStruct == null) {
+              assertNull(deserializedRowStruct);
+            } else {
+              assertArrayEquals(rowStruct, deserializedRowStruct.toArray());
+            }
             break;
           case LIST:
           case UNION:
             assertEquals(row[fieldIndex], deserializedRow[fieldIndex]);
             break;
           case MAP:
-            Map rowMap = (Map) row[fieldIndex];
-            Map deserializedRowMap = (Map) deserializedRow[fieldIndex];
-            Set rowMapKeySet = rowMap.keySet();
-            Set deserializedRowMapKeySet = deserializedRowMap.keySet();
-            assertTrue(rowMapKeySet.containsAll(deserializedRowMapKeySet));
-            assertTrue(deserializedRowMapKeySet.containsAll(rowMapKeySet));
-            for (Object key : rowMapKeySet) {
-              assertEquals(rowMap.get(key), deserializedRowMap.get(key));
+            final Map rowMap = (Map) row[fieldIndex];
+            final Map deserializedRowMap = (Map) deserializedRow[fieldIndex];
+            if (rowMap == null) {
+              assertNull(deserializedRowMap);
+            } else {
+              final Set rowMapKeySet = rowMap.keySet();
+              final Set deserializedRowMapKeySet = deserializedRowMap.keySet();
+              assertEquals(rowMapKeySet, deserializedRowMapKeySet);
+              for (Object key : rowMapKeySet) {
+                assertEquals(rowMap.get(key), deserializedRowMap.get(key));
+              }
             }
             break;
         }
@@ -341,14 +373,18 @@ public class TestArrowColumnarBatchSerDe {
                         newArrayList(text("hello")),
                         input -> text(input.toString().toUpperCase())),
                     intW(0))), // c16:array<struct<m:map<string,string>,n:int>>
-            new TimestampWritable(new Timestamp(NOW)), // c17:timestamp
+            new TimestampWritable(TIMESTAMP), // c17:timestamp
             decimalW(HiveDecimal.create(0, 0)), // c18:decimal(16,7)
             new BytesWritable("Hello".getBytes()), // c19:binary
             new DateWritable(123), // c20:date
             varcharW("x", 20), // c21:varchar(20)
             charW("y", 15), // c22:char(15)
             new BytesWritable("world!".getBytes()), // c23:binary
-        },
+        }, {
+            null, null, null, null, null, null, null, null, null, null, // 
c1-c10
+            null, null, null, null, null, null, null, null, null, null, // 
c11-c20
+            null, null, null, // c21-c23
+        }
     };
 
     initAndSerializeAndDeserialize(schema, comprehensiveRows);
@@ -378,7 +414,7 @@ public class TestArrowColumnarBatchSerDe {
 
     final int batchSize = 1000;
     final Object[][] integerRows = new Object[batchSize][];
-    final AbstractSerDe serDe = new ArrowColumnarBatchSerDe();
+    final ArrowColumnarBatchSerDe serDe = new ArrowColumnarBatchSerDe();
     StructObjectInspector rowOI = initSerDe(serDe, schema);
 
     for (int j = 0; j < 10; j++) {
@@ -397,7 +433,7 @@ public class TestArrowColumnarBatchSerDe {
           {"bigint1", "bigint"}
       };
 
-      final AbstractSerDe serDe = new ArrowColumnarBatchSerDe();
+      final ArrowColumnarBatchSerDe serDe = new ArrowColumnarBatchSerDe();
       StructObjectInspector rowOI = initSerDe(serDe, schema);
 
       final Random random = new Random();
@@ -572,106 +608,6 @@ public class TestArrowColumnarBatchSerDe {
     initAndSerializeAndDeserialize(schema, toList(BINARY_ROWS));
   }
 
-  private StandardUnionObjectInspector.StandardUnion union(int tag, Object 
object) {
-    return new StandardUnionObjectInspector.StandardUnion((byte) tag, object);
-  }
-
-  public void testUnionInteger() throws SerDeException {
-    String[][] schema = {
-        {"int_union", "uniontype<tinyint,smallint,int,bigint>"},
-    };
-
-    StandardUnionObjectInspector.StandardUnion[][] integerUnions = {
-        {union(0, byteW(0))},
-        {union(1, shortW(1))},
-        {union(2, intW(2))},
-        {union(3, longW(3))},
-    };
-
-    initAndSerializeAndDeserialize(schema, integerUnions);
-  }
-
-  public void testUnionFloat() throws SerDeException {
-    String[][] schema = {
-        {"float_union", "uniontype<float,double>"},
-    };
-
-    StandardUnionObjectInspector.StandardUnion[][] floatUnions = {
-        {union(0, floatW(0f))},
-        {union(1, doubleW(1d))},
-    };
-
-    initAndSerializeAndDeserialize(schema, floatUnions);
-  }
-
-  public void testUnionString() throws SerDeException {
-    String[][] schema = {
-        {"string_union", "uniontype<string,int>"},
-    };
-
-    StandardUnionObjectInspector.StandardUnion[][] stringUnions = {
-        {union(0, text("Hello"))},
-        {union(1, intW(1))},
-    };
-
-    initAndSerializeAndDeserialize(schema, stringUnions);
-  }
-
-  public void testUnionChar() throws SerDeException {
-    String[][] schema = {
-        {"char_union", "uniontype<char(10),int>"},
-    };
-
-    StandardUnionObjectInspector.StandardUnion[][] charUnions = {
-        {union(0, charW("Hello", 10))},
-        {union(1, intW(1))},
-    };
-
-    initAndSerializeAndDeserialize(schema, charUnions);
-  }
-
-  public void testUnionVarchar() throws SerDeException {
-    String[][] schema = {
-        {"varchar_union", "uniontype<varchar(10),int>"},
-    };
-
-    StandardUnionObjectInspector.StandardUnion[][] varcharUnions = {
-        {union(0, varcharW("Hello", 10))},
-        {union(1, intW(1))},
-    };
-
-    initAndSerializeAndDeserialize(schema, varcharUnions);
-  }
-
-  public void testUnionDTI() throws SerDeException {
-    String[][] schema = {
-        {"date_union", 
"uniontype<date,timestamp,interval_year_month,interval_day_time>"},
-    };
-    long NOW = System.currentTimeMillis();
-
-    StandardUnionObjectInspector.StandardUnion[][] dtiUnions = {
-        {union(0, new DateWritable(DateWritable.millisToDays(NOW)))},
-        {union(1, new TimestampWritable(new Timestamp(NOW)))},
-        {union(2, new HiveIntervalYearMonthWritable(new 
HiveIntervalYearMonth(1, 2)))},
-        {union(3, new HiveIntervalDayTimeWritable(new HiveIntervalDayTime(1, 
2, 3, 4, 5_000_000)))},
-    };
-
-    initAndSerializeAndDeserialize(schema, dtiUnions);
-  }
-
-  public void testUnionBooleanBinary() throws SerDeException {
-    String[][] schema = {
-        {"boolean_union", "uniontype<boolean,binary>"},
-    };
-
-    StandardUnionObjectInspector.StandardUnion[][] booleanBinaryUnions = {
-        {union(0, new BooleanWritable(true))},
-        {union(1, new BytesWritable("Hello".getBytes()))},
-    };
-
-    initAndSerializeAndDeserialize(schema, booleanBinaryUnions);
-  }
-
   private Object[][][] toStruct(Object[][] rows) {
     Object[][][] struct = new Object[rows.length][][];
     for (int rowIndex = 0; rowIndex < rows.length; rowIndex++) {
@@ -719,6 +655,15 @@ public class TestArrowColumnarBatchSerDe {
   }
 
   @Test
+  public void testStructDecimal() throws SerDeException {
+    String[][] schema = {
+        {"decimal_struct", "struct<decimal1:decimal(38,10)>"},
+    };
+
+    initAndSerializeAndDeserialize(schema, toStruct(DECIMAL_ROWS));
+  }
+
+  @Test
   public void testStructBoolean() throws SerDeException {
     String[][] schema = {
         {"boolean_struct", "struct<boolean1:boolean>"},
@@ -812,4 +757,21 @@ public class TestArrowColumnarBatchSerDe {
 
     initAndSerializeAndDeserialize(schema, toMap(BINARY_ROWS));
   }
+
+  public void testMapDecimal() throws SerDeException {
+    String[][] schema = {
+        {"decimal_map", "map<string,decimal(38,10)>"},
+    };
+
+    initAndSerializeAndDeserialize(schema, toMap(DECIMAL_ROWS));
+  }
+
+  public void testListDecimal() throws SerDeException {
+    String[][] schema = {
+        {"decimal_list", "array<decimal(38,10)>"},
+    };
+
+    initAndSerializeAndDeserialize(schema, toList(DECIMAL_ROWS));
+  }
+
 }

Reply via email to