ARROW-1473: ValueVector new hierarchy prototype (implementation phase 1)

Close #1164
Close #1198

Change-Id: If18e42d2edfdfef83e83621334a5b65a390e9db9


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/7f45d86d
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/7f45d86d
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/7f45d86d

Branch: refs/heads/java-vector-refactor
Commit: 7f45d86d76213a0396d755f5272ac4bfbbb618a9
Parents: 60cb1c3
Author: siddharth <siddha...@dremio.com>
Authored: Thu Oct 5 02:11:44 2017 -0700
Committer: Wes McKinney <wes.mckin...@twosigma.com>
Committed: Sat Oct 14 11:35:59 2017 -0400

----------------------------------------------------------------------
 .../main/codegen/templates/ComplexReaders.java  |  24 +-
 .../main/codegen/templates/ComplexWriters.java  |  45 +-
 .../codegen/templates/NullableValueVectors.java | 542 +++++++++++--
 .../src/main/codegen/templates/UnionVector.java |  18 +-
 .../vector/BaseNullableFixedWidthVector.java    | 701 +++++++++++++++++
 .../vector/BaseNullableVariableWidthVector.java | 764 ++++++++++++++++++
 .../apache/arrow/vector/BaseValueVector.java    |  20 +
 .../apache/arrow/vector/BitVectorHelper.java    |  60 ++
 .../apache/arrow/vector/NullableIntVector.java  | 299 +++++++
 .../arrow/vector/NullableVarCharVector.java     | 451 +++++++++++
 .../org/apache/arrow/vector/ValueVector.java    |   6 +
 .../org/apache/arrow/vector/VectorUnloader.java |  15 +-
 .../org/apache/arrow/vector/ZeroVector.java     |   9 +
 .../vector/complex/FixedSizeListVector.java     |  27 +-
 .../apache/arrow/vector/complex/ListVector.java |  31 +-
 .../apache/arrow/vector/complex/MapVector.java  |  33 +-
 .../arrow/vector/file/json/JsonFileReader.java  |   2 +-
 .../vector/TestBufferOwnershipTransfer.java     |   6 +-
 .../arrow/vector/TestDictionaryVector.java      |  59 +-
 .../arrow/vector/TestFixedSizeListVector.java   |  13 +-
 .../arrow/vector/TestSplitAndTransfer.java      |  15 +-
 .../apache/arrow/vector/TestValueVector.java    | 788 +++++++++++++------
 .../apache/arrow/vector/TestVectorReAlloc.java  |  11 +-
 .../arrow/vector/TestVectorUnloadLoad.java      | 104 ++-
 .../apache/arrow/vector/file/BaseFileTest.java  |  70 +-
 .../apache/arrow/vector/file/TestArrowFile.java |   2 +-
 26 files changed, 3657 insertions(+), 458 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/7f45d86d/java/vector/src/main/codegen/templates/ComplexReaders.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/codegen/templates/ComplexReaders.java 
b/java/vector/src/main/codegen/templates/ComplexReaders.java
index 38cd1bf..7910649 100644
--- a/java/vector/src/main/codegen/templates/ComplexReaders.java
+++ b/java/vector/src/main/codegen/templates/ComplexReaders.java
@@ -70,7 +70,11 @@ public class ${name}ReaderImpl extends AbstractFieldReader {
   
   public boolean isSet(){
     <#if nullMode == "Nullable">
-    return !vector.getAccessor().isNull(idx());
+      <#if minor.class != "Int" && minor.class != "VarChar">
+        return !vector.getAccessor().isNull(idx());
+      <#else>
+        return !vector.isNull(idx());
+      </#if>
     <#else>
     return true;
     </#if>
@@ -93,11 +97,19 @@ public class ${name}ReaderImpl extends AbstractFieldReader {
   </#if>
 
   public void read(Nullable${minor.class?cap_first}Holder h){
-    vector.getAccessor().get(idx(), h);
+    <#if minor.class != "Int" && minor.class != "VarChar">
+      vector.getAccessor().get(idx(), h);
+    <#else>
+      vector.get(idx(), h);
+    </#if>
   }
   
   public ${friendlyType} read${safeType}(){
-    return vector.getAccessor().getObject(idx());
+    <#if minor.class == "Int" || minor.class == "VarChar">
+      return vector.getObject(idx());
+    <#else>
+      return vector.getAccessor().getObject(idx());
+    </#if>
   }
 
   <#if minor.class == "TimeStampSec" ||
@@ -115,7 +127,11 @@ public class ${name}ReaderImpl extends AbstractFieldReader 
{
   }
   
   public Object readObject(){
-    return vector.getAccessor().getObject(idx());
+    <#if minor.class == "Int" || minor.class == "VarChar">
+      return (Object)vector.getObject(idx());
+    <#else>
+      return vector.getAccessor().getObject(idx());
+    </#if>
   }
 }
 </#if>

http://git-wip-us.apache.org/repos/asf/arrow/blob/7f45d86d/java/vector/src/main/codegen/templates/ComplexWriters.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/codegen/templates/ComplexWriters.java 
b/java/vector/src/main/codegen/templates/ComplexWriters.java
index fe099be..77f6594 100644
--- a/java/vector/src/main/codegen/templates/ComplexWriters.java
+++ b/java/vector/src/main/codegen/templates/ComplexWriters.java
@@ -39,11 +39,16 @@ package org.apache.arrow.vector.complex.impl;
 @SuppressWarnings("unused")
 public class ${eName}WriterImpl extends AbstractFieldWriter {
 
-  private final Nullable${name}Vector.Mutator mutator;
+  <#if minor.class != "Int" && minor.class != "VarChar">
+    private final Nullable${name}Vector.Mutator mutator;
+  </#if>
+
   final Nullable${name}Vector vector;
 
   public ${eName}WriterImpl(Nullable${name}Vector vector) {
-    this.mutator = vector.getMutator();
+    <#if minor.class != "Int" && minor.class != "VarChar">
+      this.mutator = vector.getMutator();
+    </#if>
     this.vector = vector;
   }
 
@@ -103,18 +108,33 @@ public class ${eName}WriterImpl extends 
AbstractFieldWriter {
   <#else>
 
   public void write(${minor.class}Holder h) {
-    mutator.setSafe(idx(), h);
-    vector.getMutator().setValueCount(idx()+1);
+    <#if minor.class != "Int" && minor.class != "VarChar">
+      mutator.setSafe(idx(), h);
+      vector.getMutator().setValueCount(idx()+1);
+    <#else>
+        vector.setSafe(idx(), h);
+        vector.setValueCount(idx()+1);
+    </#if>
   }
 
   public void write(Nullable${minor.class}Holder h) {
-    mutator.setSafe(idx(), h);
-    vector.getMutator().setValueCount(idx()+1);
+    <#if minor.class != "Int" && minor.class != "VarChar">
+      mutator.setSafe(idx(), h);
+      vector.getMutator().setValueCount(idx()+1);
+    <#else>
+      vector.setSafe(idx(), h);
+      vector.setValueCount(idx()+1);
+    </#if>
   }
 
   public void write${minor.class}(<#list fields as field>${field.type} 
${field.name}<#if field_has_next>, </#if></#list>) {
-    mutator.setSafe(idx()<#if mode == "Nullable">, 1</#if><#list fields as 
field><#if field.include!true >, ${field.name}</#if></#list>);
-    vector.getMutator().setValueCount(idx()+1);
+    <#if minor.class != "Int" && minor.class != "VarChar">
+      mutator.setSafe(idx()<#if mode == "Nullable">, 1</#if><#list fields as 
field><#if field.include!true >, ${field.name}</#if></#list>);
+      vector.getMutator().setValueCount(idx()+1);
+    <#else>
+      vector.setSafe(idx()<#if mode == "Nullable">, 1</#if><#list fields as 
field><#if field.include!true >, ${field.name}</#if></#list>);
+      vector.setValueCount(idx()+1);
+    </#if>
   }
   <#if minor.class == "Decimal">
 
@@ -126,8 +146,13 @@ public class ${eName}WriterImpl extends 
AbstractFieldWriter {
   <#if mode == "Nullable">
 
   public void writeNull() {
-    mutator.setNull(idx());
-    vector.getMutator().setValueCount(idx()+1);
+    <#if minor.class != "Int" && minor.class != "VarChar">
+        mutator.setNull(idx());
+        vector.getMutator().setValueCount(idx()+1);
+    <#else>
+        vector.setNull(idx());
+        vector.setValueCount(idx()+1);
+    </#if>
   }
   </#if>
   </#if>

http://git-wip-us.apache.org/repos/asf/arrow/blob/7f45d86d/java/vector/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/codegen/templates/NullableValueVectors.java 
b/java/vector/src/main/codegen/templates/NullableValueVectors.java
index 122cd23..5d1f5a3 100644
--- a/java/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/java/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -19,8 +19,14 @@
 <#list vv.types as type>
 <#list type.minor as minor>
 
+<#if minor.class == "Int" || minor.class == "VarChar">
+<#assign className = "LegacyNullable${minor.class}Vector" />
+<#assign valuesName = "Nullable${minor.class}Vector" />
+<#else>
 <#assign className = "Nullable${minor.class}Vector" />
 <#assign valuesName = "${minor.class}Vector" />
+</#if>
+
 <#assign friendlyType = (minor.friendlyType!minor.boxedType!type.boxedType) />
 
 <@pp.changeOutputFile name="/org/apache/arrow/vector/${className}.java" />
@@ -44,15 +50,24 @@ import org.apache.arrow.flatbuf.Precision;
  * NB: this class is automatically generated from ${.template_name} and 
ValueVectorTypes.tdd using FreeMarker.
  */
 @SuppressWarnings("unused")
-public final class ${className} extends BaseValueVector implements <#if 
type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector, 
NullableVector, FieldVector {
+<#if minor.class == "Int" || minor.class == "VarChar">
+@Deprecated
+</#if>
+public final class ${className} extends BaseValueVector implements <#if 
type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector, FieldVector {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(${className}.class);
 
 protected final static byte[] emptyByteArray = new byte[]{};
+
+  <#if minor.class != "Int" && minor.class != "VarChar">
   private final FieldReader reader = new 
${minor.class}ReaderImpl(${className}.this);
+  </#if>
 
   private final String bitsField = "$bits$";
   private final String valuesField = "$values$";
+
+  <#if minor.class != "Int" && minor.class != "VarChar">
   private final Field field;
+  </#if>
 
   final BitVector bits = new BitVector(bitsField, allocator);
   final ${valuesName} values;
@@ -60,7 +75,9 @@ protected final static byte[] emptyByteArray = new byte[]{};
   private final Mutator mutator;
   private final Accessor accessor;
 
+  <#if minor.class != "Int" && minor.class != "VarChar">
   private final List<BufferBacked> innerVectors;
+  </#if>
 
   <#if minor.typeParams??>
     <#assign typeParams = minor.typeParams?reverse>
@@ -105,6 +122,7 @@ protected final static byte[] emptyByteArray = new byte[]{};
     </#if>
     this.mutator = new Mutator();
     this.accessor = new Accessor();
+    <#if minor.class != "Int" && minor.class != "VarChar">
     this.field = new Field(name, fieldType, null);
     innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList(
         bits,
@@ -113,16 +131,24 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
         </#if>
         values
     ));
+    </#if>
   }
 
-  @Override
+  <#if minor.class != "Int" && minor.class != "VarChar">
+  /* not needed for new vectors */
   public BitVector getValidityVector() {
     return bits;
   }
+  </#if>
 
   @Override
   public List<BufferBacked> getFieldInnerVectors() {
-    return innerVectors;
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getFieldInnerVectors();
+    <#else>
+        return innerVectors;
+    </#if>
   }
 
   @Override
@@ -139,6 +165,10 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
 
   @Override
   public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> 
ownBuffers) {
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        values.loadFieldBuffers(fieldNode, ownBuffers);
+    <#else>
     <#if type.major = "VarLen">
     // variable width values: truncate offset vector buffer to size (#1)
     
org.apache.arrow.vector.BaseDataValueVector.truncateBufferBasedOnSize(ownBuffers,
 1,
@@ -151,34 +181,64 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
     </#if>
     org.apache.arrow.vector.BaseDataValueVector.load(fieldNode, 
getFieldInnerVectors(), ownBuffers);
     bits.valueCount = fieldNode.getLength();
+    </#if>
   }
 
   public List<ArrowBuf> getFieldBuffers() {
-    return 
org.apache.arrow.vector.BaseDataValueVector.unload(getFieldInnerVectors());
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getFieldBuffers();
+    <#else>
+        return 
org.apache.arrow.vector.BaseDataValueVector.unload(getFieldInnerVectors());
+    </#if>
   }
 
   @Override
   public Field getField() {
-    return field;
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getField();
+    <#else>
+      return field;
+    </#if>
   }
 
   @Override
   public MinorType getMinorType() {
-    return MinorType.${minor.class?upper_case};
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getMinorType();
+    <#else>
+        return MinorType.${minor.class?upper_case};
+    </#if>
   }
 
   @Override
   public FieldReader getReader(){
-    return reader;
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getReader();
+    <#else>
+        return reader;
+    </#if>
   }
 
   @Override
   public int getValueCapacity(){
-    return Math.min(bits.getValueCapacity(), values.getValueCapacity());
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getValueCapacity();
+    <#else>
+        return Math.min(bits.getValueCapacity(), values.getValueCapacity());
+    </#if>
   }
 
   @Override
   public ArrowBuf[] getBuffers(boolean clear) {
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getBuffers(clear);
+    <#else>
     final ArrowBuf[] buffers = ObjectArrays.concat(bits.getBuffers(false), 
values.getBuffers(false), ArrowBuf.class);
     if (clear) {
       for (final ArrowBuf buffer:buffers) {
@@ -187,25 +247,41 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
       clear();
     }
     return buffers;
+    </#if>
   }
 
   @Override
   public void close() {
-    bits.close();
-    values.close();
-    super.close();
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        values.close();
+    <#else>
+        bits.close();
+        values.close();
+        super.close();
+    </#if>
   }
 
   @Override
   public void clear() {
-    bits.clear();
-    values.clear();
-    super.clear();
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        values.clear();
+    <#else>
+        bits.clear();
+        values.clear();
+        super.clear();
+    </#if>
   }
 
   @Override
   public int getBufferSize(){
-    return values.getBufferSize() + bits.getBufferSize();
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getBufferSize();
+    <#else>
+        return values.getBufferSize() + bits.getBufferSize();
+    </#if>
   }
 
   @Override
@@ -214,34 +290,52 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
       return 0;
     }
 
-    return values.getBufferSizeFor(valueCount)
-        + bits.getBufferSizeFor(valueCount);
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getBufferSizeFor(valueCount);
+    <#else>
+        return values.getBufferSizeFor(valueCount)
+          + bits.getBufferSizeFor(valueCount);
+    </#if>
   }
 
   public ArrowBuf getBuffer() {
     return values.getDataBuffer();
   }
 
-  @Override
   public ${valuesName} getValuesVector() {
     return values;
   }
 
   @Override
   public void setInitialCapacity(int numRecords) {
-    bits.setInitialCapacity(numRecords);
-    values.setInitialCapacity(numRecords);
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        values.setInitialCapacity(numRecords);
+    <#else>
+        bits.setInitialCapacity(numRecords);
+        values.setInitialCapacity(numRecords);
+    </#if>
   }
 
   @Override
   public void allocateNew() {
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        values.allocateNew();
+    <#else>
     if(!allocateNewSafe()){
       throw new OutOfMemoryException("Failure while allocating buffer.");
     }
+    </#if>
   }
 
   @Override
   public boolean allocateNewSafe() {
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.allocateNewSafe();
+    <#else>
     /* Boolean to keep track if all the memory allocations were successful
      * Used in the case of composite vectors when we need to allocate multiple
      * buffers for multiple vectors. If one of the allocations failed we need 
to
@@ -259,23 +353,38 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
     mutator.reset();
     accessor.reset();
     return success;
+    </#if>
   }
 
   @Override
   public void reAlloc() {
-    bits.reAlloc();
-    values.reAlloc();
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        values.reAlloc();
+    <#else>
+        bits.reAlloc();
+        values.reAlloc();
+    </#if>
   }
 
   public void reset() {
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        values.reset();
+    <#else>
     bits.zeroVector();
     mutator.reset();
     accessor.reset();
+    </#if>
   }
 
   <#if type.major == "VarLen">
   @Override
   public void allocateNew(int totalBytes, int valueCount) {
+    <#if minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        values.allocateNew(totalBytes, valueCount);
+    <#else>
     try {
       values.allocateNew(totalBytes, valueCount);
       bits.allocateNew(valueCount);
@@ -286,6 +395,7 @@ protected final static byte[] emptyByteArray = new byte[]{};
     bits.zeroVector();
     mutator.reset();
     accessor.reset();
+    </#if>
   }
 
   @Override
@@ -301,6 +411,10 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
   <#else>
   @Override
   public void allocateNew(int valueCount) {
+    <#if minor.class == "Int">
+        /* DELEGATE TO NEW VECTOR */
+        values.allocateNew(valueCount);
+    <#else>
     try {
       values.allocateNew(valueCount);
       bits.allocateNew(valueCount);
@@ -311,6 +425,7 @@ protected final static byte[] emptyByteArray = new byte[]{};
     bits.zeroVector();
     mutator.reset();
     accessor.reset();
+    </#if>
   }
 
   /**
@@ -318,32 +433,86 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
    */
   @Override
   public void zeroVector() {
-    bits.zeroVector();
-    values.zeroVector();
+    <#if minor.class == "Int">
+        /* DELEGATE TO NEW VECTOR */
+        values.zeroVector();
+    <#else>
+        bits.zeroVector();
+        values.zeroVector();
+    </#if>
   }
   </#if>
 
+
+
   @Override
   public TransferPair getTransferPair(String ref, BufferAllocator allocator, 
CallBack callBack) {
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getTransferPair(ref, allocator, callBack);
+    <#else>
         return getTransferPair(ref, allocator);
+    </#if>
   }
 
+
+
   @Override
   public TransferPair getTransferPair(BufferAllocator allocator){
-    return new TransferImpl(name, allocator);
-
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getTransferPair(allocator);
+    <#else>
+        return new TransferImpl(name, allocator);
+    </#if>
   }
 
+
+
   @Override
   public TransferPair getTransferPair(String ref, BufferAllocator allocator){
-    return new TransferImpl(ref, allocator);
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getTransferPair(ref, allocator);
+    <#else>
+        return new TransferImpl(ref, allocator);
+    </#if>
   }
 
+
+
   @Override
   public TransferPair makeTransferPair(ValueVector to) {
-    return new TransferImpl((${className}) to);
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.makeTransferPair(to);
+    <#else>
+        return new TransferImpl((${className}) to);
+    </#if>
+  }
+
+
+
+  <#if minor.class == "Int" || minor.class == "VarChar">
+  public void transferTo(${valuesName} target) {
+    /* DELEGATE TO NEW VECTOR */
+    <#if type.major == "VarLen">
+        values.transferTo((BaseNullableVariableWidthVector) target);
+    <#else>
+        values.transferTo((BaseNullableFixedWidthVector) target);
+    </#if>
+  }
+
+  public void splitAndTransferTo(int startIndex, int length, ${valuesName} 
target) {
+    /* DELEGATE TO NEW VECTOR */
+    <#if type.major == "VarLen">
+        values.splitAndTransferTo(startIndex, length, 
(BaseNullableVariableWidthVector) target);
+    <#else>
+        values.splitAndTransferTo(startIndex, length, 
(BaseNullableFixedWidthVector) target);
+    </#if>
   }
 
+  <#else>
   public void transferTo(${className} target){
     bits.transferTo(target.bits);
     values.transferTo(target.values);
@@ -360,7 +529,11 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
     target.mutator.lastSet = length - 1;
     </#if>
   }
+  </#if>
 
+
+
+  <#if minor.class != "Int" && minor.class != "VarChar">
   private class TransferImpl implements TransferPair {
     ${className} to;
 
@@ -392,6 +565,9 @@ protected final static byte[] emptyByteArray = new byte[]{};
       to.copyFromSafe(fromIndex, toIndex, ${className}.this);
     }
   }
+  </#if>
+
+
 
   @Override
   public Accessor getAccessor(){
@@ -403,7 +579,20 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
     return mutator;
   }
 
-  public void copyFrom(int fromIndex, int thisIndex, ${className} from){
+
+
+  <#if minor.class == "Int" || minor.class == "VarChar">
+  public void copyFrom(int fromIndex, int thisIndex, ${valuesName} from) {
+    /* DELEGATE TO NEW VECTOR */
+    values.copyFrom(fromIndex, thisIndex, from);
+  }
+
+  public void copyFromSafe(int fromIndex, int thisIndex, ${valuesName} from) {
+    /* DELEGATE TO NEW VECTOR */
+    values.copyFromSafe(fromIndex, thisIndex, from);
+  }
+  <#else>
+  public void copyFrom(int fromIndex, int thisIndex, ${className} from) {
     final Accessor fromAccessor = from.getAccessor();
     if (!fromAccessor.isNull(fromIndex)) {
       mutator.set(thisIndex, fromAccessor.get(fromIndex));
@@ -428,17 +617,28 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
     values.copyFromSafe(fromIndex, thisIndex, from.values);
     <#if type.major == "VarLen">mutator.lastSet = thisIndex;</#if>
   }
+  </#if>
 
   @Override
   public long getValidityBufferAddress() {
     /* address of the databuffer associated with the bitVector */
-    return (bits.getDataBuffer().memoryAddress());
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getValidityBufferAddress();
+    <#else>
+        return (bits.getDataBuffer().memoryAddress());
+    </#if>
   }
 
   @Override
   public long getDataBufferAddress() {
     /* address of the dataBuffer associated with the valueVector */
-    return (values.getDataBuffer().memoryAddress());
+    <#if minor.class == "Int" || minor.class == "VarChar">
+          /* DELEGATE TO NEW VECTOR */
+          return values.getDataBufferAddress();
+    <#else>
+          return (bits.getDataBuffer().memoryAddress());
+    </#if>
   }
 
   @Override
@@ -446,17 +646,26 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
     /* address of the dataBuffer associated with the offsetVector
      * this operation is not supported for fixed-width vector types.
      */
-    <#if type.major != "VarLen">
-        throw new UnsupportedOperationException();
+    <#if minor.class == "Int" || minor.class == "VarChar">
+          /* DELEGATE TO NEW VECTOR */
+          return values.getOffsetBufferAddress();
     <#else>
-        return (values.getOffsetAddr());
+        <#if type.major != "VarLen">
+          throw new UnsupportedOperationException();
+        <#else>
+          return (values.getOffsetAddr());
+        </#if>
     </#if>
   }
 
   @Override
   public ArrowBuf getValidityBuffer() {
-    /* dataBuffer associated with the bitVector */
-    return (bits.getDataBuffer());
+    <#if minor.class == "Int" || minor.class == "VarChar">
+          /* DELEGATE TO NEW VECTOR */
+          return values.getValidityBuffer();
+    <#else>
+          return (bits.getDataBuffer());
+    </#if>
   }
 
   @Override
@@ -468,10 +677,15 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
   @Override
   public ArrowBuf getOffsetBuffer() {
     /* dataBuffer associated with the offsetVector of the valueVector */
-    <#if type.major != "VarLen">
-        throw new UnsupportedOperationException();
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getOffsetBuffer();
     <#else>
-        return (values.getOffsetBuffer());
+        <#if type.major != "VarLen">
+          throw new UnsupportedOperationException();
+        <#else>
+          return (values.getOffsetBuffer());
+        </#if>
     </#if>
   }
 
@@ -485,38 +699,80 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
      * @param  index   position of the value
      * @return value of the element, if not null
      */
-    public <#if type.major == 
"VarLen">byte[]<#else>${minor.javaType!type.javaType}</#if> get(int index) {
-      if (isNull(index)) {
+    <#if minor.class == "Int" || minor.class == "VarChar">
+      public <#if type.major == 
"VarLen">byte[]<#else>${minor.javaType!type.javaType}</#if> get(int index) {
+        /* DELEGATE TO NEW VECTOR */
+        return values.get(index);
+      }
+    <#else>
+
+      public <#if type.major == 
"VarLen">byte[]<#else>${minor.javaType!type.javaType}</#if> get(int index) {
+        if (isNull(index)) {
           throw new IllegalStateException("Can't get a null value");
+        }
+        return vAccessor.get(index);
       }
-      return vAccessor.get(index);
-    }
+    </#if>
 
     @Override
     public boolean isNull(int index) {
-      return isSet(index) == 0;
+      <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.isNull(index);
+      <#else>
+        return isSet(index) == 0;
+      </#if>
     }
 
     public int isSet(int index){
-      return bAccessor.get(index);
+      <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.isSet(index);
+      <#else>
+        return bAccessor.get(index);
+      </#if>
     }
 
     <#if type.major == "VarLen">
     public long getStartEnd(int index){
-      return vAccessor.getStartEnd(index);
+      <#if minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getStartEnd(index);
+      <#else>
+        return vAccessor.getStartEnd(index);
+      </#if>
     }
 
     @Override
     public int getValueLength(int index) {
-      return values.getAccessor().getValueLength(index);
+      <#if minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getValueLength(index);
+      <#else>
+        return values.getAccessor().getValueLength(index);
+      </#if>
     }
     </#if>
 
+    <#if minor.class == "Int" || minor.class == "VarChar">
+    public void get(int index, Nullable${minor.class}Holder holder){
+        /* DELEGATE TO NEW VECTOR */
+        values.get(index, holder);
+    }
+    <#else>
     public void get(int index, Nullable${minor.class}Holder holder){
       vAccessor.get(index, holder);
       holder.isSet = bAccessor.get(index);
     }
+    </#if>
 
+    <#if minor.class == "Int" || minor.class == "VarChar">
+    @Override
+    public ${friendlyType} getObject(int index) {
+      /* DELEGATE TO NEW VECTOR */
+      return values.getObject(index);
+    }
+    <#else>
     @Override
     public ${friendlyType} getObject(int index) {
       if (isNull(index)) {
@@ -525,6 +781,7 @@ protected final static byte[] emptyByteArray = new byte[]{};
         return vAccessor.getObject(index);
       }
     }
+    </#if>
 
     <#if minor.class == "IntervalYear" || minor.class == "IntervalDay">
     public StringBuilder getAsStringBuilder(int index) {
@@ -538,7 +795,12 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
 
     @Override
     public int getValueCount(){
-      return bits.getAccessor().getValueCount();
+      <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getValueCount();
+      <#else>
+        return bits.getAccessor().getValueCount();
+      </#if>
     }
 
     public void reset(){}
@@ -551,21 +813,35 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
     private Mutator(){
     }
 
-    public ${valuesName} getVectorWithValues(){
+    public ${valuesName} getVectorWithValues() {
       return values;
     }
 
+
     @Override
     public void setIndexDefined(int index){
+      <#if minor.class == "Int" || minor.class == "VarChar">
+      values.setIndexDefined(index);
+      <#else>
       bits.getMutator().setToOne(index);
+      </#if>
     }
 
+
+
     /**
      * Set the variable length element at the specified index to the supplied 
byte array.
      *
      * @param index   position of the bit to set
      * @param value   array of bytes (or int if smaller than 4 bytes) to write
      */
+
+    <#if minor.class == "Int" || minor.class == "VarChar">
+    public void set(int index, <#if type.major == "VarLen">byte[]<#elseif 
(type.width < 4)>int<#else>${minor.javaType!type.javaType}</#if> value) {
+       /* DELEGATE TO NEW VECTOR */
+       values.set(index, value);
+    }
+    <#else>
     public void set(int index, <#if type.major == "VarLen">byte[]<#elseif 
(type.width < 4)>int<#else>${minor.javaType!type.javaType}</#if> value) {
       setCount++;
       final ${valuesName}.Mutator valuesMutator = values.getMutator();
@@ -579,9 +855,24 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
       valuesMutator.set(index, value);
       <#if type.major == "VarLen">lastSet = index;</#if>
     }
+    </#if>
+
+
 
     <#if type.major == "VarLen">
+    <#if minor.class == "VarChar">
+    public void fillEmpties(int index) {
+      /* DELEGATE TO NEW VECTOR */
+      values.fillEmpties(index);
+    }
 
+    @Override
+    public void setValueLengthSafe(int index, int length) {
+      /* DELEGATE TO NEW VECTOR */
+      values.setValueLengthSafe(index, length);
+    }
+
+    <#else>
     public void fillEmpties(int index){
       final ${valuesName}.Mutator valuesMutator = values.getMutator();
       for (int i = lastSet + 1; i < index; i++) {
@@ -599,7 +890,16 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
       lastSet = index;
     }
     </#if>
+    </#if>
 
+
+
+    <#if minor.class == "Int" || minor.class == "VarChar">
+    public void setSafe(int index, byte[] value, int start, int length) {
+       /* DELEGATE TO NEW VECTOR */
+      values.setSafe(index, value, start, length);
+    }
+    <#else>
     public void setSafe(int index, byte[] value, int start, int length) {
       <#if type.major != "VarLen">
       throw new UnsupportedOperationException();
@@ -612,7 +912,16 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
       <#if type.major == "VarLen">lastSet = index;</#if>
       </#if>
     }
+    </#if>
+
+
 
+    <#if minor.class == "VarChar">
+    public void setSafe(int index, ByteBuffer value, int start, int length) {
+       /* DELEGATE TO NEW VECTOR */
+       values.setSafe(index, value, start, length);
+    }
+    <#else>
     public void setSafe(int index, ByteBuffer value, int start, int length) {
       <#if type.major != "VarLen">
       throw new UnsupportedOperationException();
@@ -625,11 +934,25 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
       <#if type.major == "VarLen">lastSet = index;</#if>
       </#if>
     }
+    </#if>
 
+
+
+    <#if minor.class == "Int" || minor.class == "VarChar">
+    public void setNull(int index) {
+       /* DELEGATE TO NEW VECTOR */
+       values.setNull(index);
+    }
+    <#else>
     public void setNull(int index){
       bits.getMutator().setSafe(index, 0);
     }
+    </#if>
+
+
 
+    <#if minor.class != "Int" && minor.class != "VarChar">
+    /* these methods are probably not needed */
     public void setSkipNull(int index, ${minor.class}Holder holder){
       values.getMutator().set(index, holder);
     }
@@ -637,8 +960,17 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
     public void setSkipNull(int index, Nullable${minor.class}Holder holder){
       values.getMutator().set(index, holder);
     }
+    </#if>
+
 
-    public void set(int index, Nullable${minor.class}Holder holder){
+
+    <#if minor.class == "Int" || minor.class == "VarChar">
+    public void set(int index, Nullable${minor.class}Holder holder) {
+      /* DELEGATE TO NEW VECTOR */
+      values.set(index, holder);
+    }
+    <#else>
+    public void set(int index, Nullable${minor.class}Holder holder) {
       final ${valuesName}.Mutator valuesMutator = values.getMutator();
       <#if type.major == "VarLen">
       for (int i = lastSet + 1; i < index; i++) {
@@ -649,8 +981,17 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
       valuesMutator.set(index, holder);
       <#if type.major == "VarLen">lastSet = index;</#if>
     }
+    </#if>
+
+
 
-    public void set(int index, ${minor.class}Holder holder){
+    <#if minor.class == "Int" || minor.class == "VarChar">
+    public void set(int index, ${minor.class}Holder holder) {
+        /* DELEGATE TO NEW VECTOR */
+        values.set(index, holder);
+    }
+    <#else>
+    public void set(int index, ${minor.class}Holder holder) {
       final ${valuesName}.Mutator valuesMutator = values.getMutator();
       <#if type.major == "VarLen">
       for (int i = lastSet + 1; i < index; i++) {
@@ -661,11 +1002,45 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
       valuesMutator.set(index, holder);
       <#if type.major == "VarLen">lastSet = index;</#if>
     }
+    </#if>
 
+
+
+    <#if minor.class == "Int" || minor.class == "VarChar">
+    public boolean isSafe(int outIndex) {
+       /* DELEGATE TO NEW VECTOR */
+       return values.isSafe(outIndex);
+    }
+    <#else>
     public boolean isSafe(int outIndex) {
       return outIndex < ${className}.this.getValueCapacity();
     }
+    </#if>
+
+
 
+    <#if minor.class == "Int" || minor.class == "VarChar">
+    <#if minor.class == "Int">
+    public void set(int index, int isSet, int valueField) {
+      /* DELEGATE TO NEW VECTOR */
+      values.set(index, isSet, valueField);
+    }
+    public void setSafe(int index, int isSet, int valueField) {
+      /* DELEGATE TO NEW VECTOR */
+      values.setSafe(index, isSet, valueField);
+    }
+    </#if>
+    <#if minor.class == "VarChar">
+    public void set(int index, int isSet, int startField, int endField, 
ArrowBuf bufferField ) {
+      /* DELEGATE TO NEW VECTOR */
+      values.set(index, isSet, startField, endField, bufferField);
+    }
+    public void setSafe(int index, int isSet, int startField, int endField, 
ArrowBuf bufferField ) {
+        /* DELEGATE TO NEW VECTOR */
+        values.setSafe(index, isSet, startField, endField, bufferField);
+    }
+    </#if>
+    <#else>
     <#assign fields = minor.fields!type.fields />
     public void set(int index, int isSet<#list fields as field>, ${field.type} 
${field.name}Field</#list> ){
       final ${valuesName}.Mutator valuesMutator = values.getMutator();
@@ -688,8 +1063,21 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
       setCount++;
       <#if type.major == "VarLen">lastSet = index;</#if>
     }
+    </#if>
+
 
 
+    <#if minor.class == "Int" || minor.class == "VarChar">
+    public void setSafe(int index, Nullable${minor.class}Holder value) {
+      /* DELEGATE TO NEW VECTOR */
+      values.setSafe(index, value);
+    }
+
+    public void setSafe(int index, ${minor.class}Holder value) {
+      /* DELEGATE TO NEW VECTOR */
+      values.setSafe(index, value);
+    }
+    <#else>
     public void setSafe(int index, Nullable${minor.class}Holder value) {
       <#if type.major == "VarLen">
       fillEmpties(index);
@@ -709,15 +1097,25 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
       setCount++;
       <#if type.major == "VarLen">lastSet = index;</#if>
     }
+    </#if>
+
+
 
     <#if !(type.major == "VarLen" || minor.class == "IntervalDay")>
     public void setSafe(int index, ${minor.javaType!type.javaType} value) {
+      <#if minor.class == "Int">
+        /* DELEGATE TO NEW VECTOR */
+        values.setSafe(index, value);
+      <#else>
       bits.getMutator().setSafeToOne(index);
       values.getMutator().setSafe(index, value);
       setCount++;
+      </#if>
     }
-
     </#if>
+
+
+
     <#if minor.class == "Decimal">
     public void set(int index, ${friendlyType} value) {
       bits.getMutator().setToOne(index);
@@ -729,8 +1127,17 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
       values.getMutator().setSafe(index, value);
       setCount++;
     }
-
     </#if>
+
+
+
+    <#if minor.class == "Int" || minor.class == "VarChar">
+    @Override
+    public void setValueCount(int valueCount) {
+      /* DELEGATE TO NEW VECTOR */
+      values.setValueCount(valueCount);
+    }
+    <#else>
     @Override
     public void setValueCount(int valueCount) {
       assert valueCount >= 0;
@@ -740,7 +1147,12 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
       values.getMutator().setValueCount(valueCount);
       bits.getMutator().setValueCount(valueCount);
     }
+    </#if>
+
 
+
+    <#if minor.class != "Int" && minor.class != "VarChar">
+    /* THIS METHOD IS PROBABLY NOT NEEDED FOR NEW VECTORS */
     @Override
     public void generateTestData(int valueCount){
       bits.getMutator().generateTestDataAlt(valueCount);
@@ -748,13 +1160,27 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
       <#if type.major = "VarLen">lastSet = valueCount;</#if>
       setValueCount(valueCount);
     }
+    </#if>
+
 
+
+    <#if minor.class != "Int" && minor.class != "VarChar">
+    /* MUTATOR RESET IS NOT NEEDED FOR NEW VECTORS */
     @Override
     public void reset(){
       setCount = 0;
       <#if type.major = "VarLen">lastSet = -1;</#if>
     }
+    </#if>
+
+
 
+    <#if minor.class == "VarChar">
+    public void setLastSet(int value) {
+      /* DELEGATE TO NEW VECTOR */
+      values.setLastSet(value);
+    }
+    <#else>
     public void setLastSet(int value) {
       <#if type.major = "VarLen">
         lastSet = value;
@@ -762,7 +1188,16 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
         throw new UnsupportedOperationException();
       </#if>
     }
+    </#if>
+
 
+
+    <#if minor.class == "VarChar">
+    public int getLastSet() {
+      /* DELEGATE TO NEW VECTOR */
+      return values.getLastSet();
+    }
+    <#else>
     public int getLastSet() {
       <#if type.major != "VarLen">
         throw new UnsupportedOperationException();
@@ -770,6 +1205,7 @@ protected final static byte[] emptyByteArray = new 
byte[]{};
         return lastSet;
       </#if>
     }
+    </#if>
   }
 }
 </#list>

http://git-wip-us.apache.org/repos/asf/arrow/blob/7f45d86d/java/vector/src/main/codegen/templates/UnionVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/codegen/templates/UnionVector.java 
b/java/vector/src/main/codegen/templates/UnionVector.java
index fe24a86..3c7ed01 100644
--- a/java/vector/src/main/codegen/templates/UnionVector.java
+++ b/java/vector/src/main/codegen/templates/UnionVector.java
@@ -436,7 +436,11 @@ public class UnionVector implements FieldVector {
           <#assign uncappedName = name?uncap_first/>
           <#if !minor.typeParams?? >
       case ${name?upper_case}:
-        return get${name}Vector().getAccessor().getObject(index);
+        <#if minor.class != "Int" && minor.class != "VarChar">
+          return get${name}Vector().getAccessor().getObject(index);
+        <#else>
+          return get${name}Vector().getObject(index);
+        </#if>
           </#if>
         </#list>
       </#list>
@@ -530,7 +534,11 @@ public class UnionVector implements FieldVector {
         <#if !minor.typeParams?? >
     public void setSafe(int index, Nullable${name}Holder holder) {
       setType(index, MinorType.${name?upper_case});
-      get${name}Vector().getMutator().setSafe(index, holder);
+      <#if minor.class != "Int" && minor.class != "VarChar">
+        get${name}Vector().getMutator().setSafe(index, holder);
+      <#else>
+        get${name}Vector().setSafe(index, holder);
+      </#if>
     }
 
         </#if>
@@ -547,4 +555,10 @@ public class UnionVector implements FieldVector {
     @Override
     public void generateTestData(int values) { }
   }
+
+  public int getValueCount() { return 0; }
+
+  public void setValueCount(int valueCount) { }
+
+  public Object getObject(int index) { return null; }
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/7f45d86d/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableFixedWidthVector.java
----------------------------------------------------------------------
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableFixedWidthVector.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableFixedWidthVector.java
new file mode 100644
index 0000000..c5f7810
--- /dev/null
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableFixedWidthVector.java
@@ -0,0 +1,701 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector;
+
+import io.netty.buffer.ArrowBuf;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.arrow.memory.OutOfMemoryException;
+import org.apache.arrow.memory.BaseAllocator;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.util.CallBack;
+import org.apache.arrow.vector.util.OversizedAllocationException;
+import org.apache.arrow.vector.util.TransferPair;
+
+public abstract class BaseNullableFixedWidthVector extends BaseValueVector
+        implements FixedWidthVector, FieldVector {
+   private final byte typeWidth;
+
+   private int valueAllocationSizeInBytes;
+   private int validityAllocationSizeInBytes;
+
+   protected final Field field;
+   private int allocationMonitor;
+   protected ArrowBuf validityBuffer;
+   protected ArrowBuf valueBuffer;
+   protected int valueCount;
+
+   public BaseNullableFixedWidthVector(final String name, final 
BufferAllocator allocator,
+                                       FieldType fieldType, final byte 
typeWidth) {
+      super(name, allocator);
+      this.typeWidth = typeWidth;
+      valueAllocationSizeInBytes = INITIAL_VALUE_ALLOCATION * typeWidth;
+      validityAllocationSizeInBytes = 
getSizeFromCount(INITIAL_VALUE_ALLOCATION);
+      field = new Field(name, fieldType, null);
+      valueCount = 0;
+      allocationMonitor = 0;
+      validityBuffer = allocator.getEmpty();
+      valueBuffer = allocator.getEmpty();
+   }
+
+   /* TODO:
+    * Determine how writerIndex and readerIndex need to be used. Right now we
+    * are setting the writerIndex and readerIndex in the call to 
getFieldBuffers
+    * using the valueCount -- this assumes that the caller of getFieldBuffers
+    * on the vector has already invoked setValueCount.
+    *
+    * Do we need to set them during vector transfer and splitAndTransfer?
+    */
+
+   /* TODO:
+    *
+    * see if getNullCount() can be made faster -- O(1)
+    */
+
+   /* TODO:
+    * Once the entire hierarchy has been refactored, move common functions
+    * like getNullCount(), splitAndTransferValidityBuffer to top level
+    * base class BaseValueVector.
+    *
+    * Along with this, some class members (validityBuffer) can also be
+    * abstracted out to top level base class.
+    *
+    * Right now BaseValueVector is the top level base class for other
+    * vector types in ValueVector hierarchy and those vectors have not
+    * yet been refactored so moving things to the top class as of now
+    * is not a good idea.
+    */
+
+   /* TODO:
+    * See if we need logger -- unnecessary object probably
+    */
+
+   protected abstract org.slf4j.Logger getLogger();
+
+   @Override
+   public Mutator getMutator() {
+      throw new  UnsupportedOperationException("Mutator is not needed to write 
into vector");
+   }
+
+   @Override
+   public Accessor getAccessor() {
+      throw new UnsupportedOperationException("Accessor is not needed to read 
from vector");
+   }
+
+   @Override
+   public long getValidityBufferAddress() {
+      return (validityBuffer.memoryAddress());
+   }
+
+   @Override
+   public long getDataBufferAddress() {
+      return (valueBuffer.memoryAddress());
+   }
+
+   @Override
+   public long getOffsetBufferAddress() {
+      throw new UnsupportedOperationException("not supported for fixed-width 
vectors");
+   }
+
+   @Override
+   public ArrowBuf getValidityBuffer() {
+      return validityBuffer;
+   }
+
+   @Override
+   public ArrowBuf getDataBuffer() {
+      return valueBuffer;
+   }
+
+   @Override
+   public ArrowBuf getOffsetBuffer() {
+      throw new UnsupportedOperationException("not supported for fixed-width 
vectors");
+   }
+
+   @Override
+   public void setInitialCapacity(int valueCount) {
+      final long size = (long)valueCount * typeWidth;
+      if (size > MAX_ALLOCATION_SIZE) {
+         throw new OversizedAllocationException("Requested amount of memory is 
more than max allowed");
+      }
+      valueAllocationSizeInBytes = (int)size;
+      validityAllocationSizeInBytes = getSizeFromCount(valueCount);
+   }
+
+   @Override
+   public int getValueCapacity(){
+      return Math.min(getValueBufferValueCapacity(), 
getValidityBufferValueCapacity());
+   }
+
+   /* for test purposes */
+   private int getValueBufferValueCapacity() {
+      return (int)((valueBuffer.capacity() * 1.0)/typeWidth);
+   }
+
+   /* for test purposes */
+   private int getValidityBufferValueCapacity() {
+      return (int)(validityBuffer.capacity() * 8L);
+   }
+
+   /* number of bytes for the validity buffer for the given valueCount */
+   protected int getSizeFromCount(int valueCount) {
+      return (int) Math.ceil(valueCount / 8.0);
+   }
+
+   @Override
+   public void zeroVector() {
+      initValidityBuffer();
+      initValueBuffer();
+   }
+
+   private void initValidityBuffer() {
+      validityBuffer.setZero(0, validityBuffer.capacity());
+   }
+
+   private void initValueBuffer() {
+      valueBuffer.setZero(0, valueBuffer.capacity());
+   }
+
+   public void reset() {
+      zeroVector();
+   }
+
+   @Override
+   public void close() { clear(); }
+
+   @Override
+   public void clear() {
+      validityBuffer = releaseBuffer(validityBuffer);
+      valueBuffer = releaseBuffer(valueBuffer);
+   }
+
+   /* used to step down the memory allocation */
+   protected void incrementAllocationMonitor() {
+      if (allocationMonitor < 0) {
+         allocationMonitor = 0;
+      }
+      allocationMonitor++;
+   }
+
+   /* used to step up the memory allocation */
+   protected void decrementAllocationMonitor() {
+      if (allocationMonitor > 0) {
+         allocationMonitor = 0;
+      }
+      allocationMonitor--;
+   }
+
+   @Override
+   public void allocateNew() {
+      if(!allocateNewSafe()){
+         throw new OutOfMemoryException("Failure while allocating memory.");
+      }
+   }
+
+   public boolean allocateNewSafe() {
+      long curAllocationSizeValue = valueAllocationSizeInBytes;
+      long curAllocationSizeValidity = validityAllocationSizeInBytes;
+
+      if (curAllocationSizeValue > MAX_ALLOCATION_SIZE) {
+         throw new OversizedAllocationException("Requested amount of memory 
exceeds limit");
+      }
+
+      /* we are doing a new allocation -- release the current buffers */
+      clear();
+
+      try{
+         allocateBytes(curAllocationSizeValue, curAllocationSizeValidity);
+      } catch (Exception e) {
+         getLogger().error("ERROR: Failure in allocateNewSafe");
+         getLogger().error(e.getMessage());
+         clear();
+         return false;
+      }
+
+      return true;
+   }
+
+   public void allocateNew(int valueCount) {
+      long valueBufferSize = valueCount * typeWidth;
+      long validityBufferSize = getSizeFromCount(valueCount);
+
+      if (allocationMonitor > 10) {
+         /* step down the default memory allocation since we have observed
+          * multiple times that provisioned value capacity was much larger than
+          * actually needed. see setValueCount for more details.
+          */
+         valueBufferSize = Math.max(8, valueBufferSize / 2);
+         validityBufferSize = Math.max(8, validityBufferSize / 2);
+         allocationMonitor = 0;
+      } else if (allocationMonitor < -2) {
+         valueBufferSize = valueBufferSize * 2L;
+         validityBufferSize = validityBufferSize * 2L;
+         allocationMonitor = 0;
+      }
+
+      if (valueBufferSize > MAX_ALLOCATION_SIZE) {
+         throw new OversizedAllocationException("Requested amount of memory is 
more than max allowed");
+      }
+
+      /* we are doing a new allocation -- release the current buffers */
+      clear();
+
+      try {
+         allocateBytes(valueBufferSize, validityBufferSize);
+      } catch(Exception e) {
+         getLogger().error("ERROR: Failure in allocateNew");
+         getLogger().error(e.getMessage());
+         clear();
+         throw e;
+      }
+   }
+
+   /**
+    * Actual memory allocation is done by this function. All the calculations
+    * and knowledge about what size to allocate is upto the callers of this
+    * method.
+    * Callers appropriately handle errors if memory allocation fails here.
+    * Callers should also take care of determining that desired size is
+    * within the bounds of max allocation allowed and any other error
+    * conditions.
+    */
+   private void allocateBytes(final long valueBufferSize, final long 
validityBufferSize) {
+      /* allocate data buffer */
+      int curSize = (int)valueBufferSize;
+      valueBuffer = allocator.buffer(curSize);
+      valueBuffer.readerIndex(0);
+      valueAllocationSizeInBytes = curSize;
+
+      /* allocate validity buffer */
+      allocateValidityBuffer((int)validityBufferSize);
+      initValidityBuffer();
+   }
+
+   /*
+    * during splitAndTransfer, if we splitting from a random position within a 
byte,
+    * we can't just slice the source buffer so we have to explicitly allocate 
the
+    * validityBuffer of the target vector. This is unlike the databuffer which 
we can
+    * always slice for the target vector.
+    */
+   private void allocateValidityBuffer(final int validityBufferSize) {
+      validityBuffer = allocator.buffer(validityBufferSize);
+      validityBuffer.readerIndex(0);
+      validityAllocationSizeInBytes = validityBufferSize;
+      initValidityBuffer();
+   }
+
+   @Override
+   public int getBufferSizeFor(final int count) {
+      if (count == 0) { return 0; }
+      return (count * typeWidth) + getSizeFromCount(count);
+   }
+
+   @Override
+   public int getBufferSize() {
+      if (valueCount == 0) { return 0; }
+      return (valueCount * typeWidth) + getSizeFromCount(valueCount);
+   }
+
+   @Override
+   public Field getField() {
+      return field;
+   }
+
+   @Override
+   public ArrowBuf[] getBuffers(boolean clear) {
+      final ArrowBuf[] buffers = new ArrowBuf[2];
+      buffers[0] = validityBuffer;
+      buffers[1] = valueBuffer;
+      if (clear) {
+         for (final ArrowBuf buffer:buffers) {
+            buffer.retain(1);
+         }
+         clear();
+      }
+      return buffers;
+   }
+
+   @Override
+   public void reAlloc() {
+      valueBuffer = reallocBufferHelper(valueBuffer, true);
+      validityBuffer = reallocBufferHelper(validityBuffer, false);
+   }
+
+   private ArrowBuf reallocBufferHelper(ArrowBuf buffer, final boolean 
dataBuffer) {
+      final int currentBufferCapacity = buffer.capacity();
+      long baseSize  = (dataBuffer ? valueAllocationSizeInBytes
+                                   : validityAllocationSizeInBytes);
+
+      if (baseSize < (long)currentBufferCapacity) {
+         baseSize = (long)currentBufferCapacity;
+      }
+
+      long newAllocationSize = baseSize * 2L;
+      newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize);
+
+      if (newAllocationSize > MAX_ALLOCATION_SIZE) {
+         throw new OversizedAllocationException("Unable to expand the buffer");
+      }
+
+      getLogger().debug("Reallocating vector [{}]. # of bytes: [{}] -> [{}]",
+              name, (dataBuffer ? valueAllocationSizeInBytes : 
validityAllocationSizeInBytes),
+              newAllocationSize);
+
+      final ArrowBuf newBuf = allocator.buffer((int)newAllocationSize);
+      newBuf.setBytes(0, buffer, 0, currentBufferCapacity);
+      final int halfNewCapacity = newBuf.capacity() / 2;
+      newBuf.setZero(halfNewCapacity, halfNewCapacity);
+      buffer.release(1);
+      buffer = newBuf;
+      if (dataBuffer) {
+         valueAllocationSizeInBytes = (int)newAllocationSize;
+      }
+      else {
+         validityAllocationSizeInBytes = (int)newAllocationSize;
+      }
+
+      return buffer;
+   }
+
+   @Override
+   public List<BufferBacked> getFieldInnerVectors() { throw new 
UnsupportedOperationException(); }
+
+   @Override
+   public void initializeChildrenFromFields(List<Field> children) {
+      if (!children.isEmpty()) {
+         throw new IllegalArgumentException("primitive type vector can not 
have children");
+      }
+   }
+
+   @Override
+   public List<FieldVector> getChildrenFromFields() {
+      return Collections.emptyList();
+   }
+
+   @Override
+   public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> 
ownBuffers) {
+      if (ownBuffers.size() != 2) {
+         throw new IllegalArgumentException("Illegal buffer count, expected " 
+ 2 + ", got: " + ownBuffers.size());
+      }
+
+      ArrowBuf bitBuffer = ownBuffers.get(0);
+      ArrowBuf dataBuffer = ownBuffers.get(1);
+
+      validityBuffer.release();
+      validityBuffer = bitBuffer.retain(allocator);
+      valueBuffer.release();
+      valueBuffer = dataBuffer.retain(allocator);
+
+      valueCount = fieldNode.getLength();
+
+      valueAllocationSizeInBytes = valueBuffer.capacity();
+      validityAllocationSizeInBytes = validityBuffer.capacity();
+   }
+
+   public List<ArrowBuf> getFieldBuffers() {
+      List<ArrowBuf> result = new ArrayList<>(2);
+
+      validityBuffer.readerIndex(0);
+      validityBuffer.writerIndex(getSizeFromCount(valueCount));
+      valueBuffer.readerIndex(0);
+      valueBuffer.writerIndex(valueCount * typeWidth);
+
+      result.add(validityBuffer);
+      result.add(valueBuffer);
+
+      return result;
+   }
+
+   @Override
+   public TransferPair getTransferPair(String ref, BufferAllocator allocator, 
CallBack callBack) {
+      return getTransferPair(ref, allocator);
+   }
+
+   @Override
+   public TransferPair getTransferPair(BufferAllocator allocator){
+      return getTransferPair(name, allocator);
+   }
+
+   public abstract TransferPair getTransferPair(String ref, BufferAllocator 
allocator);
+
+   public void transferTo(BaseNullableFixedWidthVector target){
+      compareTypes(target, "transferTo");
+      target.clear();
+      target.validityBuffer = 
validityBuffer.transferOwnership(target.allocator).buffer;
+      target.valueBuffer = 
valueBuffer.transferOwnership(target.allocator).buffer;
+      target.valueCount = valueCount;
+      clear();
+   }
+
+   public void splitAndTransferTo(int startIndex, int length,
+                                  BaseNullableFixedWidthVector target) {
+      compareTypes(target, "splitAndTransferTo");
+      target.clear();
+      splitAndTransferValidityBuffer(startIndex, length, target);
+      splitAndTransferValueBuffer(startIndex, length, target);
+      target.setValueCount(length);
+   }
+
+   private void splitAndTransferValueBuffer(int startIndex, int length,
+                                            BaseNullableFixedWidthVector 
target) {
+      final int startPoint = startIndex * typeWidth;
+      final int sliceLength = length * typeWidth;
+      target.valueBuffer = valueBuffer.slice(startPoint, 
sliceLength).transferOwnership(target.allocator).buffer;
+   }
+
+   private void splitAndTransferValidityBuffer(int startIndex, int length,
+                                               BaseNullableFixedWidthVector 
target) {
+      assert startIndex + length <= valueCount;
+      int firstByteSource = BitVectorHelper.byteIndex(startIndex);
+      int lastByteSource = BitVectorHelper.byteIndex(valueCount - 1);
+      int byteSizeTarget = getSizeFromCount(length);
+      int offset = startIndex % 8;
+
+      if (length > 0) {
+         if (offset == 0) {
+            // slice
+            if (target.validityBuffer != null) {
+               target.validityBuffer.release();
+            }
+            target.validityBuffer = validityBuffer.slice(firstByteSource, 
byteSizeTarget);
+            target.validityBuffer.retain(1);
+         }
+         else {
+            /* Copy data
+             * When the first bit starts from the middle of a byte (offset != 
0),
+             * copy data from src BitVector.
+             * Each byte in the target is composed by a part in i-th byte,
+             * another part in (i+1)-th byte.
+             */
+            target.allocateValidityBuffer(byteSizeTarget);
+
+            for (int i = 0; i < byteSizeTarget - 1; i++) {
+               byte b1 = getBitsFromCurrentByte(this.validityBuffer, 
firstByteSource + i, offset);
+               byte b2 = getBitsFromNextByte(this.validityBuffer, 
firstByteSource + i + 1, offset);
+
+               target.validityBuffer.setByte(i, (b1 + b2));
+            }
+
+            /* Copying the last piece is done in the following manner:
+             * if the source vector has 1 or more bytes remaining, we copy
+             * the last piece as a byte formed by shifting data
+             * from the current byte and the next byte.
+             *
+             * if the source vector has no more bytes remaining
+             * (we are at the last byte), we copy the last piece as a byte
+             * by shifting data from the current byte.
+             */
+            if((firstByteSource + byteSizeTarget - 1) < lastByteSource) {
+               byte b1 = getBitsFromCurrentByte(this.validityBuffer,
+                       firstByteSource + byteSizeTarget - 1, offset);
+               byte b2 = getBitsFromNextByte(this.validityBuffer,
+                       firstByteSource + byteSizeTarget, offset);
+
+               target.validityBuffer.setByte(byteSizeTarget - 1, b1 + b2);
+            }
+            else {
+               byte b1 = getBitsFromCurrentByte(this.validityBuffer,
+                       firstByteSource + byteSizeTarget - 1, offset);
+               target.validityBuffer.setByte(byteSizeTarget - 1, b1);
+            }
+         }
+      }
+   }
+
+   private static byte getBitsFromCurrentByte(ArrowBuf data, int index, int 
offset) {
+      return (byte)((data.getByte(index) & 0xFF) >>> offset);
+   }
+
+   private static byte getBitsFromNextByte(ArrowBuf data, int index, int 
offset) {
+      return (byte)((data.getByte(index) << (8 - offset)));
+   }
+
+
+   /******************************************************************
+    *                                                                *
+    *          common getters and setters                            *
+    *                                                                *
+    ******************************************************************/
+
+
+   /**
+    * Get the number of elements that are null in the vector
+    *
+    * @return the number of null elements.
+    */
+   public int getNullCount() {
+      int count = 0;
+      final int sizeInBytes = getSizeFromCount(valueCount);
+
+      for (int i = 0; i < sizeInBytes; ++i) {
+         final byte byteValue = validityBuffer.getByte(i);
+         /* Java uses two's complement binary representation, hence 11111111_b 
which is -1
+          * when converted to Int will have 32bits set to 1. Masking the MSB 
and then
+          * adding it back solves the issue.
+          */
+         count += Integer.bitCount(byteValue & 0x7F) - (byteValue >> 7);
+      }
+      int nullCount = (sizeInBytes * 8) - count;
+      /* if the valueCount is not a multiple of 8,
+       * the bits on the right were counted as null bits.
+       */
+      int remainder = valueCount % 8;
+      nullCount -= remainder == 0 ? 0 : 8 - remainder;
+      return nullCount;
+   }
+
+
+   /**
+    * Get the value count of vector. This will always be zero unless
+    * {@link #setValueCount(int)} has been called prior to calling this.
+    *
+    * @return valueCount for the vector
+    */
+   public int getValueCount(){
+      return valueCount;
+   }
+
+
+   /**
+    * Set value count for the vector.
+    *
+    * @param valueCount  value count to set
+    */
+   public void setValueCount(int valueCount) {
+      this.valueCount = valueCount;
+      final int currentValueCapacity = getValueCapacity();
+      while (valueCount > getValueCapacity()) {
+         reAlloc();
+      }
+      /*
+       * We are trying to understand the pattern of memory allocation.
+       * If initially, the user did vector.allocateNew(), we would have
+       * allocated memory of default size (4096 * type width).
+       * Later on user invokes setValueCount(count).
+       *
+       * If the existing value capacity is twice as large as the
+       * valueCount, we know that we over-provisioned memory in the
+       * first place when default memory allocation was done because user
+       * really needs a much less value count in the vector.
+       *
+       * We record this by bumping up the allocationMonitor. If this pattern
+       * happens for certain number of times and allocationMonitor
+       * reaches the threshold (internal hardcoded) value, subsequent
+       * call to allocateNew() will take care of stepping down the
+       * default memory allocation size.
+       *
+       * Another case would be under-provisioning the initial memory and
+       * thus going through a lot of realloc(). Here the goal is to
+       * see if we can minimize the number of reallocations. Again the
+       * state is recorded in allocationMonitor by decrementing it
+       * (negative value). If a threshold is hit, realloc will try to
+       * allocate more memory in order to possibly avoid a future realloc.
+       * This case is also applicable to setSafe() methods which can trigger
+       * a realloc() and thus we record the state there as well.
+       */
+      if (valueCount > 0) {
+         if (currentValueCapacity >= (valueCount * 2)) {
+            incrementAllocationMonitor();
+         } else if (currentValueCapacity <= (valueCount/2)) {
+            decrementAllocationMonitor();
+         }
+      }
+   }
+
+
+   /**
+    * Check if the given index is within the current value capacity
+    * of the vector
+    *
+    * @param index  position to check
+    * @return true if index is within the current value capacity
+    */
+   public boolean isSafe(int index) {
+      return index < getValueCapacity();
+   }
+
+
+   /**
+    * Check if element at given index is null.
+    *
+    * @param index  position of element
+    * @return true if element at given index is null, false otherwise
+    */
+   public boolean isNull(int index) {
+      return (isSet(index) == 0);
+   }
+
+
+   /**
+    * Same as {@link #isNull(int)}.
+    *
+    * @param index  position of element
+    * @return 1 if element at given index is not null, 0 otherwise
+    */
+   public int isSet(int index) {
+      final int byteIndex = index >> 3;
+      final byte b = validityBuffer.getByte(byteIndex);
+      final int bitIndex = index & 7;
+      return Long.bitCount(b & (1L << bitIndex));
+   }
+
+   public void setIndexDefined(int index) {
+      handleSafe(index);
+      BitVectorHelper.setValidityBitToOne(validityBuffer, index);
+   }
+
+   public void set(int index, byte[] value, int start, int length) {
+      throw new UnsupportedOperationException();
+   }
+
+   public void setSafe(int index, byte[] value, int start, int length) {
+      throw new UnsupportedOperationException();
+   }
+
+   public void set(int index, ByteBuffer value, int start, int length) {
+      throw new UnsupportedOperationException();
+   }
+
+   public void setSafe(int index, ByteBuffer value, int start, int length) {
+      throw new UnsupportedOperationException();
+   }
+
+
+   /******************************************************************
+    *                                                                *
+    *                helper methods for setters                      *
+    *                                                                *
+    ******************************************************************/
+
+
+
+   protected void handleSafe(int index) {
+      while (index >= getValueCapacity()) {
+         decrementAllocationMonitor();
+         reAlloc();
+      }
+   }
+}
\ No newline at end of file

Reply via email to