ARROW-1473: ValueVector new hierarchy prototype (implementation phase 1) Close #1164 Close #1198
Change-Id: If18e42d2edfdfef83e83621334a5b65a390e9db9 Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/7f45d86d Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/7f45d86d Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/7f45d86d Branch: refs/heads/java-vector-refactor Commit: 7f45d86d76213a0396d755f5272ac4bfbbb618a9 Parents: 60cb1c3 Author: siddharth <siddha...@dremio.com> Authored: Thu Oct 5 02:11:44 2017 -0700 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Sat Oct 14 11:35:59 2017 -0400 ---------------------------------------------------------------------- .../main/codegen/templates/ComplexReaders.java | 24 +- .../main/codegen/templates/ComplexWriters.java | 45 +- .../codegen/templates/NullableValueVectors.java | 542 +++++++++++-- .../src/main/codegen/templates/UnionVector.java | 18 +- .../vector/BaseNullableFixedWidthVector.java | 701 +++++++++++++++++ .../vector/BaseNullableVariableWidthVector.java | 764 ++++++++++++++++++ .../apache/arrow/vector/BaseValueVector.java | 20 + .../apache/arrow/vector/BitVectorHelper.java | 60 ++ .../apache/arrow/vector/NullableIntVector.java | 299 +++++++ .../arrow/vector/NullableVarCharVector.java | 451 +++++++++++ .../org/apache/arrow/vector/ValueVector.java | 6 + .../org/apache/arrow/vector/VectorUnloader.java | 15 +- .../org/apache/arrow/vector/ZeroVector.java | 9 + .../vector/complex/FixedSizeListVector.java | 27 +- .../apache/arrow/vector/complex/ListVector.java | 31 +- .../apache/arrow/vector/complex/MapVector.java | 33 +- .../arrow/vector/file/json/JsonFileReader.java | 2 +- .../vector/TestBufferOwnershipTransfer.java | 6 +- .../arrow/vector/TestDictionaryVector.java | 59 +- .../arrow/vector/TestFixedSizeListVector.java | 13 +- .../arrow/vector/TestSplitAndTransfer.java | 15 +- .../apache/arrow/vector/TestValueVector.java | 788 +++++++++++++------ .../apache/arrow/vector/TestVectorReAlloc.java | 11 +- .../arrow/vector/TestVectorUnloadLoad.java | 104 ++- .../apache/arrow/vector/file/BaseFileTest.java | 70 +- .../apache/arrow/vector/file/TestArrowFile.java | 2 +- 26 files changed, 3657 insertions(+), 458 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/7f45d86d/java/vector/src/main/codegen/templates/ComplexReaders.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/codegen/templates/ComplexReaders.java b/java/vector/src/main/codegen/templates/ComplexReaders.java index 38cd1bf..7910649 100644 --- a/java/vector/src/main/codegen/templates/ComplexReaders.java +++ b/java/vector/src/main/codegen/templates/ComplexReaders.java @@ -70,7 +70,11 @@ public class ${name}ReaderImpl extends AbstractFieldReader { public boolean isSet(){ <#if nullMode == "Nullable"> - return !vector.getAccessor().isNull(idx()); + <#if minor.class != "Int" && minor.class != "VarChar"> + return !vector.getAccessor().isNull(idx()); + <#else> + return !vector.isNull(idx()); + </#if> <#else> return true; </#if> @@ -93,11 +97,19 @@ public class ${name}ReaderImpl extends AbstractFieldReader { </#if> public void read(Nullable${minor.class?cap_first}Holder h){ - vector.getAccessor().get(idx(), h); + <#if minor.class != "Int" && minor.class != "VarChar"> + vector.getAccessor().get(idx(), h); + <#else> + vector.get(idx(), h); + </#if> } public ${friendlyType} read${safeType}(){ - return vector.getAccessor().getObject(idx()); + <#if minor.class == "Int" || minor.class == "VarChar"> + return vector.getObject(idx()); + <#else> + return vector.getAccessor().getObject(idx()); + </#if> } <#if minor.class == "TimeStampSec" || @@ -115,7 +127,11 @@ public class ${name}ReaderImpl extends AbstractFieldReader { } public Object readObject(){ - return vector.getAccessor().getObject(idx()); + <#if minor.class == "Int" || minor.class == "VarChar"> + return (Object)vector.getObject(idx()); + <#else> + return vector.getAccessor().getObject(idx()); + </#if> } } </#if> http://git-wip-us.apache.org/repos/asf/arrow/blob/7f45d86d/java/vector/src/main/codegen/templates/ComplexWriters.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/codegen/templates/ComplexWriters.java b/java/vector/src/main/codegen/templates/ComplexWriters.java index fe099be..77f6594 100644 --- a/java/vector/src/main/codegen/templates/ComplexWriters.java +++ b/java/vector/src/main/codegen/templates/ComplexWriters.java @@ -39,11 +39,16 @@ package org.apache.arrow.vector.complex.impl; @SuppressWarnings("unused") public class ${eName}WriterImpl extends AbstractFieldWriter { - private final Nullable${name}Vector.Mutator mutator; + <#if minor.class != "Int" && minor.class != "VarChar"> + private final Nullable${name}Vector.Mutator mutator; + </#if> + final Nullable${name}Vector vector; public ${eName}WriterImpl(Nullable${name}Vector vector) { - this.mutator = vector.getMutator(); + <#if minor.class != "Int" && minor.class != "VarChar"> + this.mutator = vector.getMutator(); + </#if> this.vector = vector; } @@ -103,18 +108,33 @@ public class ${eName}WriterImpl extends AbstractFieldWriter { <#else> public void write(${minor.class}Holder h) { - mutator.setSafe(idx(), h); - vector.getMutator().setValueCount(idx()+1); + <#if minor.class != "Int" && minor.class != "VarChar"> + mutator.setSafe(idx(), h); + vector.getMutator().setValueCount(idx()+1); + <#else> + vector.setSafe(idx(), h); + vector.setValueCount(idx()+1); + </#if> } public void write(Nullable${minor.class}Holder h) { - mutator.setSafe(idx(), h); - vector.getMutator().setValueCount(idx()+1); + <#if minor.class != "Int" && minor.class != "VarChar"> + mutator.setSafe(idx(), h); + vector.getMutator().setValueCount(idx()+1); + <#else> + vector.setSafe(idx(), h); + vector.setValueCount(idx()+1); + </#if> } public void write${minor.class}(<#list fields as field>${field.type} ${field.name}<#if field_has_next>, </#if></#list>) { - mutator.setSafe(idx()<#if mode == "Nullable">, 1</#if><#list fields as field><#if field.include!true >, ${field.name}</#if></#list>); - vector.getMutator().setValueCount(idx()+1); + <#if minor.class != "Int" && minor.class != "VarChar"> + mutator.setSafe(idx()<#if mode == "Nullable">, 1</#if><#list fields as field><#if field.include!true >, ${field.name}</#if></#list>); + vector.getMutator().setValueCount(idx()+1); + <#else> + vector.setSafe(idx()<#if mode == "Nullable">, 1</#if><#list fields as field><#if field.include!true >, ${field.name}</#if></#list>); + vector.setValueCount(idx()+1); + </#if> } <#if minor.class == "Decimal"> @@ -126,8 +146,13 @@ public class ${eName}WriterImpl extends AbstractFieldWriter { <#if mode == "Nullable"> public void writeNull() { - mutator.setNull(idx()); - vector.getMutator().setValueCount(idx()+1); + <#if minor.class != "Int" && minor.class != "VarChar"> + mutator.setNull(idx()); + vector.getMutator().setValueCount(idx()+1); + <#else> + vector.setNull(idx()); + vector.setValueCount(idx()+1); + </#if> } </#if> </#if> http://git-wip-us.apache.org/repos/asf/arrow/blob/7f45d86d/java/vector/src/main/codegen/templates/NullableValueVectors.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/codegen/templates/NullableValueVectors.java b/java/vector/src/main/codegen/templates/NullableValueVectors.java index 122cd23..5d1f5a3 100644 --- a/java/vector/src/main/codegen/templates/NullableValueVectors.java +++ b/java/vector/src/main/codegen/templates/NullableValueVectors.java @@ -19,8 +19,14 @@ <#list vv.types as type> <#list type.minor as minor> +<#if minor.class == "Int" || minor.class == "VarChar"> +<#assign className = "LegacyNullable${minor.class}Vector" /> +<#assign valuesName = "Nullable${minor.class}Vector" /> +<#else> <#assign className = "Nullable${minor.class}Vector" /> <#assign valuesName = "${minor.class}Vector" /> +</#if> + <#assign friendlyType = (minor.friendlyType!minor.boxedType!type.boxedType) /> <@pp.changeOutputFile name="/org/apache/arrow/vector/${className}.java" /> @@ -44,15 +50,24 @@ import org.apache.arrow.flatbuf.Precision; * NB: this class is automatically generated from ${.template_name} and ValueVectorTypes.tdd using FreeMarker. */ @SuppressWarnings("unused") -public final class ${className} extends BaseValueVector implements <#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector, NullableVector, FieldVector { +<#if minor.class == "Int" || minor.class == "VarChar"> +@Deprecated +</#if> +public final class ${className} extends BaseValueVector implements <#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector, FieldVector { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(${className}.class); protected final static byte[] emptyByteArray = new byte[]{}; + + <#if minor.class != "Int" && minor.class != "VarChar"> private final FieldReader reader = new ${minor.class}ReaderImpl(${className}.this); + </#if> private final String bitsField = "$bits$"; private final String valuesField = "$values$"; + + <#if minor.class != "Int" && minor.class != "VarChar"> private final Field field; + </#if> final BitVector bits = new BitVector(bitsField, allocator); final ${valuesName} values; @@ -60,7 +75,9 @@ protected final static byte[] emptyByteArray = new byte[]{}; private final Mutator mutator; private final Accessor accessor; + <#if minor.class != "Int" && minor.class != "VarChar"> private final List<BufferBacked> innerVectors; + </#if> <#if minor.typeParams??> <#assign typeParams = minor.typeParams?reverse> @@ -105,6 +122,7 @@ protected final static byte[] emptyByteArray = new byte[]{}; </#if> this.mutator = new Mutator(); this.accessor = new Accessor(); + <#if minor.class != "Int" && minor.class != "VarChar"> this.field = new Field(name, fieldType, null); innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList( bits, @@ -113,16 +131,24 @@ protected final static byte[] emptyByteArray = new byte[]{}; </#if> values )); + </#if> } - @Override + <#if minor.class != "Int" && minor.class != "VarChar"> + /* not needed for new vectors */ public BitVector getValidityVector() { return bits; } + </#if> @Override public List<BufferBacked> getFieldInnerVectors() { - return innerVectors; + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + return values.getFieldInnerVectors(); + <#else> + return innerVectors; + </#if> } @Override @@ -139,6 +165,10 @@ protected final static byte[] emptyByteArray = new byte[]{}; @Override public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) { + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + values.loadFieldBuffers(fieldNode, ownBuffers); + <#else> <#if type.major = "VarLen"> // variable width values: truncate offset vector buffer to size (#1) org.apache.arrow.vector.BaseDataValueVector.truncateBufferBasedOnSize(ownBuffers, 1, @@ -151,34 +181,64 @@ protected final static byte[] emptyByteArray = new byte[]{}; </#if> org.apache.arrow.vector.BaseDataValueVector.load(fieldNode, getFieldInnerVectors(), ownBuffers); bits.valueCount = fieldNode.getLength(); + </#if> } public List<ArrowBuf> getFieldBuffers() { - return org.apache.arrow.vector.BaseDataValueVector.unload(getFieldInnerVectors()); + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + return values.getFieldBuffers(); + <#else> + return org.apache.arrow.vector.BaseDataValueVector.unload(getFieldInnerVectors()); + </#if> } @Override public Field getField() { - return field; + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + return values.getField(); + <#else> + return field; + </#if> } @Override public MinorType getMinorType() { - return MinorType.${minor.class?upper_case}; + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + return values.getMinorType(); + <#else> + return MinorType.${minor.class?upper_case}; + </#if> } @Override public FieldReader getReader(){ - return reader; + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + return values.getReader(); + <#else> + return reader; + </#if> } @Override public int getValueCapacity(){ - return Math.min(bits.getValueCapacity(), values.getValueCapacity()); + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + return values.getValueCapacity(); + <#else> + return Math.min(bits.getValueCapacity(), values.getValueCapacity()); + </#if> } @Override public ArrowBuf[] getBuffers(boolean clear) { + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + return values.getBuffers(clear); + <#else> final ArrowBuf[] buffers = ObjectArrays.concat(bits.getBuffers(false), values.getBuffers(false), ArrowBuf.class); if (clear) { for (final ArrowBuf buffer:buffers) { @@ -187,25 +247,41 @@ protected final static byte[] emptyByteArray = new byte[]{}; clear(); } return buffers; + </#if> } @Override public void close() { - bits.close(); - values.close(); - super.close(); + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + values.close(); + <#else> + bits.close(); + values.close(); + super.close(); + </#if> } @Override public void clear() { - bits.clear(); - values.clear(); - super.clear(); + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + values.clear(); + <#else> + bits.clear(); + values.clear(); + super.clear(); + </#if> } @Override public int getBufferSize(){ - return values.getBufferSize() + bits.getBufferSize(); + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + return values.getBufferSize(); + <#else> + return values.getBufferSize() + bits.getBufferSize(); + </#if> } @Override @@ -214,34 +290,52 @@ protected final static byte[] emptyByteArray = new byte[]{}; return 0; } - return values.getBufferSizeFor(valueCount) - + bits.getBufferSizeFor(valueCount); + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + return values.getBufferSizeFor(valueCount); + <#else> + return values.getBufferSizeFor(valueCount) + + bits.getBufferSizeFor(valueCount); + </#if> } public ArrowBuf getBuffer() { return values.getDataBuffer(); } - @Override public ${valuesName} getValuesVector() { return values; } @Override public void setInitialCapacity(int numRecords) { - bits.setInitialCapacity(numRecords); - values.setInitialCapacity(numRecords); + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + values.setInitialCapacity(numRecords); + <#else> + bits.setInitialCapacity(numRecords); + values.setInitialCapacity(numRecords); + </#if> } @Override public void allocateNew() { + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + values.allocateNew(); + <#else> if(!allocateNewSafe()){ throw new OutOfMemoryException("Failure while allocating buffer."); } + </#if> } @Override public boolean allocateNewSafe() { + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + return values.allocateNewSafe(); + <#else> /* Boolean to keep track if all the memory allocations were successful * Used in the case of composite vectors when we need to allocate multiple * buffers for multiple vectors. If one of the allocations failed we need to @@ -259,23 +353,38 @@ protected final static byte[] emptyByteArray = new byte[]{}; mutator.reset(); accessor.reset(); return success; + </#if> } @Override public void reAlloc() { - bits.reAlloc(); - values.reAlloc(); + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + values.reAlloc(); + <#else> + bits.reAlloc(); + values.reAlloc(); + </#if> } public void reset() { + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + values.reset(); + <#else> bits.zeroVector(); mutator.reset(); accessor.reset(); + </#if> } <#if type.major == "VarLen"> @Override public void allocateNew(int totalBytes, int valueCount) { + <#if minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + values.allocateNew(totalBytes, valueCount); + <#else> try { values.allocateNew(totalBytes, valueCount); bits.allocateNew(valueCount); @@ -286,6 +395,7 @@ protected final static byte[] emptyByteArray = new byte[]{}; bits.zeroVector(); mutator.reset(); accessor.reset(); + </#if> } @Override @@ -301,6 +411,10 @@ protected final static byte[] emptyByteArray = new byte[]{}; <#else> @Override public void allocateNew(int valueCount) { + <#if minor.class == "Int"> + /* DELEGATE TO NEW VECTOR */ + values.allocateNew(valueCount); + <#else> try { values.allocateNew(valueCount); bits.allocateNew(valueCount); @@ -311,6 +425,7 @@ protected final static byte[] emptyByteArray = new byte[]{}; bits.zeroVector(); mutator.reset(); accessor.reset(); + </#if> } /** @@ -318,32 +433,86 @@ protected final static byte[] emptyByteArray = new byte[]{}; */ @Override public void zeroVector() { - bits.zeroVector(); - values.zeroVector(); + <#if minor.class == "Int"> + /* DELEGATE TO NEW VECTOR */ + values.zeroVector(); + <#else> + bits.zeroVector(); + values.zeroVector(); + </#if> } </#if> + + @Override public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) { + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + return values.getTransferPair(ref, allocator, callBack); + <#else> return getTransferPair(ref, allocator); + </#if> } + + @Override public TransferPair getTransferPair(BufferAllocator allocator){ - return new TransferImpl(name, allocator); - + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + return values.getTransferPair(allocator); + <#else> + return new TransferImpl(name, allocator); + </#if> } + + @Override public TransferPair getTransferPair(String ref, BufferAllocator allocator){ - return new TransferImpl(ref, allocator); + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + return values.getTransferPair(ref, allocator); + <#else> + return new TransferImpl(ref, allocator); + </#if> } + + @Override public TransferPair makeTransferPair(ValueVector to) { - return new TransferImpl((${className}) to); + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + return values.makeTransferPair(to); + <#else> + return new TransferImpl((${className}) to); + </#if> + } + + + + <#if minor.class == "Int" || minor.class == "VarChar"> + public void transferTo(${valuesName} target) { + /* DELEGATE TO NEW VECTOR */ + <#if type.major == "VarLen"> + values.transferTo((BaseNullableVariableWidthVector) target); + <#else> + values.transferTo((BaseNullableFixedWidthVector) target); + </#if> + } + + public void splitAndTransferTo(int startIndex, int length, ${valuesName} target) { + /* DELEGATE TO NEW VECTOR */ + <#if type.major == "VarLen"> + values.splitAndTransferTo(startIndex, length, (BaseNullableVariableWidthVector) target); + <#else> + values.splitAndTransferTo(startIndex, length, (BaseNullableFixedWidthVector) target); + </#if> } + <#else> public void transferTo(${className} target){ bits.transferTo(target.bits); values.transferTo(target.values); @@ -360,7 +529,11 @@ protected final static byte[] emptyByteArray = new byte[]{}; target.mutator.lastSet = length - 1; </#if> } + </#if> + + + <#if minor.class != "Int" && minor.class != "VarChar"> private class TransferImpl implements TransferPair { ${className} to; @@ -392,6 +565,9 @@ protected final static byte[] emptyByteArray = new byte[]{}; to.copyFromSafe(fromIndex, toIndex, ${className}.this); } } + </#if> + + @Override public Accessor getAccessor(){ @@ -403,7 +579,20 @@ protected final static byte[] emptyByteArray = new byte[]{}; return mutator; } - public void copyFrom(int fromIndex, int thisIndex, ${className} from){ + + + <#if minor.class == "Int" || minor.class == "VarChar"> + public void copyFrom(int fromIndex, int thisIndex, ${valuesName} from) { + /* DELEGATE TO NEW VECTOR */ + values.copyFrom(fromIndex, thisIndex, from); + } + + public void copyFromSafe(int fromIndex, int thisIndex, ${valuesName} from) { + /* DELEGATE TO NEW VECTOR */ + values.copyFromSafe(fromIndex, thisIndex, from); + } + <#else> + public void copyFrom(int fromIndex, int thisIndex, ${className} from) { final Accessor fromAccessor = from.getAccessor(); if (!fromAccessor.isNull(fromIndex)) { mutator.set(thisIndex, fromAccessor.get(fromIndex)); @@ -428,17 +617,28 @@ protected final static byte[] emptyByteArray = new byte[]{}; values.copyFromSafe(fromIndex, thisIndex, from.values); <#if type.major == "VarLen">mutator.lastSet = thisIndex;</#if> } + </#if> @Override public long getValidityBufferAddress() { /* address of the databuffer associated with the bitVector */ - return (bits.getDataBuffer().memoryAddress()); + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + return values.getValidityBufferAddress(); + <#else> + return (bits.getDataBuffer().memoryAddress()); + </#if> } @Override public long getDataBufferAddress() { /* address of the dataBuffer associated with the valueVector */ - return (values.getDataBuffer().memoryAddress()); + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + return values.getDataBufferAddress(); + <#else> + return (bits.getDataBuffer().memoryAddress()); + </#if> } @Override @@ -446,17 +646,26 @@ protected final static byte[] emptyByteArray = new byte[]{}; /* address of the dataBuffer associated with the offsetVector * this operation is not supported for fixed-width vector types. */ - <#if type.major != "VarLen"> - throw new UnsupportedOperationException(); + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + return values.getOffsetBufferAddress(); <#else> - return (values.getOffsetAddr()); + <#if type.major != "VarLen"> + throw new UnsupportedOperationException(); + <#else> + return (values.getOffsetAddr()); + </#if> </#if> } @Override public ArrowBuf getValidityBuffer() { - /* dataBuffer associated with the bitVector */ - return (bits.getDataBuffer()); + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + return values.getValidityBuffer(); + <#else> + return (bits.getDataBuffer()); + </#if> } @Override @@ -468,10 +677,15 @@ protected final static byte[] emptyByteArray = new byte[]{}; @Override public ArrowBuf getOffsetBuffer() { /* dataBuffer associated with the offsetVector of the valueVector */ - <#if type.major != "VarLen"> - throw new UnsupportedOperationException(); + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + return values.getOffsetBuffer(); <#else> - return (values.getOffsetBuffer()); + <#if type.major != "VarLen"> + throw new UnsupportedOperationException(); + <#else> + return (values.getOffsetBuffer()); + </#if> </#if> } @@ -485,38 +699,80 @@ protected final static byte[] emptyByteArray = new byte[]{}; * @param index position of the value * @return value of the element, if not null */ - public <#if type.major == "VarLen">byte[]<#else>${minor.javaType!type.javaType}</#if> get(int index) { - if (isNull(index)) { + <#if minor.class == "Int" || minor.class == "VarChar"> + public <#if type.major == "VarLen">byte[]<#else>${minor.javaType!type.javaType}</#if> get(int index) { + /* DELEGATE TO NEW VECTOR */ + return values.get(index); + } + <#else> + + public <#if type.major == "VarLen">byte[]<#else>${minor.javaType!type.javaType}</#if> get(int index) { + if (isNull(index)) { throw new IllegalStateException("Can't get a null value"); + } + return vAccessor.get(index); } - return vAccessor.get(index); - } + </#if> @Override public boolean isNull(int index) { - return isSet(index) == 0; + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + return values.isNull(index); + <#else> + return isSet(index) == 0; + </#if> } public int isSet(int index){ - return bAccessor.get(index); + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + return values.isSet(index); + <#else> + return bAccessor.get(index); + </#if> } <#if type.major == "VarLen"> public long getStartEnd(int index){ - return vAccessor.getStartEnd(index); + <#if minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + return values.getStartEnd(index); + <#else> + return vAccessor.getStartEnd(index); + </#if> } @Override public int getValueLength(int index) { - return values.getAccessor().getValueLength(index); + <#if minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + return values.getValueLength(index); + <#else> + return values.getAccessor().getValueLength(index); + </#if> } </#if> + <#if minor.class == "Int" || minor.class == "VarChar"> + public void get(int index, Nullable${minor.class}Holder holder){ + /* DELEGATE TO NEW VECTOR */ + values.get(index, holder); + } + <#else> public void get(int index, Nullable${minor.class}Holder holder){ vAccessor.get(index, holder); holder.isSet = bAccessor.get(index); } + </#if> + <#if minor.class == "Int" || minor.class == "VarChar"> + @Override + public ${friendlyType} getObject(int index) { + /* DELEGATE TO NEW VECTOR */ + return values.getObject(index); + } + <#else> @Override public ${friendlyType} getObject(int index) { if (isNull(index)) { @@ -525,6 +781,7 @@ protected final static byte[] emptyByteArray = new byte[]{}; return vAccessor.getObject(index); } } + </#if> <#if minor.class == "IntervalYear" || minor.class == "IntervalDay"> public StringBuilder getAsStringBuilder(int index) { @@ -538,7 +795,12 @@ protected final static byte[] emptyByteArray = new byte[]{}; @Override public int getValueCount(){ - return bits.getAccessor().getValueCount(); + <#if minor.class == "Int" || minor.class == "VarChar"> + /* DELEGATE TO NEW VECTOR */ + return values.getValueCount(); + <#else> + return bits.getAccessor().getValueCount(); + </#if> } public void reset(){} @@ -551,21 +813,35 @@ protected final static byte[] emptyByteArray = new byte[]{}; private Mutator(){ } - public ${valuesName} getVectorWithValues(){ + public ${valuesName} getVectorWithValues() { return values; } + @Override public void setIndexDefined(int index){ + <#if minor.class == "Int" || minor.class == "VarChar"> + values.setIndexDefined(index); + <#else> bits.getMutator().setToOne(index); + </#if> } + + /** * Set the variable length element at the specified index to the supplied byte array. * * @param index position of the bit to set * @param value array of bytes (or int if smaller than 4 bytes) to write */ + + <#if minor.class == "Int" || minor.class == "VarChar"> + public void set(int index, <#if type.major == "VarLen">byte[]<#elseif (type.width < 4)>int<#else>${minor.javaType!type.javaType}</#if> value) { + /* DELEGATE TO NEW VECTOR */ + values.set(index, value); + } + <#else> public void set(int index, <#if type.major == "VarLen">byte[]<#elseif (type.width < 4)>int<#else>${minor.javaType!type.javaType}</#if> value) { setCount++; final ${valuesName}.Mutator valuesMutator = values.getMutator(); @@ -579,9 +855,24 @@ protected final static byte[] emptyByteArray = new byte[]{}; valuesMutator.set(index, value); <#if type.major == "VarLen">lastSet = index;</#if> } + </#if> + + <#if type.major == "VarLen"> + <#if minor.class == "VarChar"> + public void fillEmpties(int index) { + /* DELEGATE TO NEW VECTOR */ + values.fillEmpties(index); + } + @Override + public void setValueLengthSafe(int index, int length) { + /* DELEGATE TO NEW VECTOR */ + values.setValueLengthSafe(index, length); + } + + <#else> public void fillEmpties(int index){ final ${valuesName}.Mutator valuesMutator = values.getMutator(); for (int i = lastSet + 1; i < index; i++) { @@ -599,7 +890,16 @@ protected final static byte[] emptyByteArray = new byte[]{}; lastSet = index; } </#if> + </#if> + + + <#if minor.class == "Int" || minor.class == "VarChar"> + public void setSafe(int index, byte[] value, int start, int length) { + /* DELEGATE TO NEW VECTOR */ + values.setSafe(index, value, start, length); + } + <#else> public void setSafe(int index, byte[] value, int start, int length) { <#if type.major != "VarLen"> throw new UnsupportedOperationException(); @@ -612,7 +912,16 @@ protected final static byte[] emptyByteArray = new byte[]{}; <#if type.major == "VarLen">lastSet = index;</#if> </#if> } + </#if> + + + <#if minor.class == "VarChar"> + public void setSafe(int index, ByteBuffer value, int start, int length) { + /* DELEGATE TO NEW VECTOR */ + values.setSafe(index, value, start, length); + } + <#else> public void setSafe(int index, ByteBuffer value, int start, int length) { <#if type.major != "VarLen"> throw new UnsupportedOperationException(); @@ -625,11 +934,25 @@ protected final static byte[] emptyByteArray = new byte[]{}; <#if type.major == "VarLen">lastSet = index;</#if> </#if> } + </#if> + + + <#if minor.class == "Int" || minor.class == "VarChar"> + public void setNull(int index) { + /* DELEGATE TO NEW VECTOR */ + values.setNull(index); + } + <#else> public void setNull(int index){ bits.getMutator().setSafe(index, 0); } + </#if> + + + <#if minor.class != "Int" && minor.class != "VarChar"> + /* these methods are probably not needed */ public void setSkipNull(int index, ${minor.class}Holder holder){ values.getMutator().set(index, holder); } @@ -637,8 +960,17 @@ protected final static byte[] emptyByteArray = new byte[]{}; public void setSkipNull(int index, Nullable${minor.class}Holder holder){ values.getMutator().set(index, holder); } + </#if> + - public void set(int index, Nullable${minor.class}Holder holder){ + + <#if minor.class == "Int" || minor.class == "VarChar"> + public void set(int index, Nullable${minor.class}Holder holder) { + /* DELEGATE TO NEW VECTOR */ + values.set(index, holder); + } + <#else> + public void set(int index, Nullable${minor.class}Holder holder) { final ${valuesName}.Mutator valuesMutator = values.getMutator(); <#if type.major == "VarLen"> for (int i = lastSet + 1; i < index; i++) { @@ -649,8 +981,17 @@ protected final static byte[] emptyByteArray = new byte[]{}; valuesMutator.set(index, holder); <#if type.major == "VarLen">lastSet = index;</#if> } + </#if> + + - public void set(int index, ${minor.class}Holder holder){ + <#if minor.class == "Int" || minor.class == "VarChar"> + public void set(int index, ${minor.class}Holder holder) { + /* DELEGATE TO NEW VECTOR */ + values.set(index, holder); + } + <#else> + public void set(int index, ${minor.class}Holder holder) { final ${valuesName}.Mutator valuesMutator = values.getMutator(); <#if type.major == "VarLen"> for (int i = lastSet + 1; i < index; i++) { @@ -661,11 +1002,45 @@ protected final static byte[] emptyByteArray = new byte[]{}; valuesMutator.set(index, holder); <#if type.major == "VarLen">lastSet = index;</#if> } + </#if> + + + <#if minor.class == "Int" || minor.class == "VarChar"> + public boolean isSafe(int outIndex) { + /* DELEGATE TO NEW VECTOR */ + return values.isSafe(outIndex); + } + <#else> public boolean isSafe(int outIndex) { return outIndex < ${className}.this.getValueCapacity(); } + </#if> + + + <#if minor.class == "Int" || minor.class == "VarChar"> + <#if minor.class == "Int"> + public void set(int index, int isSet, int valueField) { + /* DELEGATE TO NEW VECTOR */ + values.set(index, isSet, valueField); + } + public void setSafe(int index, int isSet, int valueField) { + /* DELEGATE TO NEW VECTOR */ + values.setSafe(index, isSet, valueField); + } + </#if> + <#if minor.class == "VarChar"> + public void set(int index, int isSet, int startField, int endField, ArrowBuf bufferField ) { + /* DELEGATE TO NEW VECTOR */ + values.set(index, isSet, startField, endField, bufferField); + } + public void setSafe(int index, int isSet, int startField, int endField, ArrowBuf bufferField ) { + /* DELEGATE TO NEW VECTOR */ + values.setSafe(index, isSet, startField, endField, bufferField); + } + </#if> + <#else> <#assign fields = minor.fields!type.fields /> public void set(int index, int isSet<#list fields as field>, ${field.type} ${field.name}Field</#list> ){ final ${valuesName}.Mutator valuesMutator = values.getMutator(); @@ -688,8 +1063,21 @@ protected final static byte[] emptyByteArray = new byte[]{}; setCount++; <#if type.major == "VarLen">lastSet = index;</#if> } + </#if> + + <#if minor.class == "Int" || minor.class == "VarChar"> + public void setSafe(int index, Nullable${minor.class}Holder value) { + /* DELEGATE TO NEW VECTOR */ + values.setSafe(index, value); + } + + public void setSafe(int index, ${minor.class}Holder value) { + /* DELEGATE TO NEW VECTOR */ + values.setSafe(index, value); + } + <#else> public void setSafe(int index, Nullable${minor.class}Holder value) { <#if type.major == "VarLen"> fillEmpties(index); @@ -709,15 +1097,25 @@ protected final static byte[] emptyByteArray = new byte[]{}; setCount++; <#if type.major == "VarLen">lastSet = index;</#if> } + </#if> + + <#if !(type.major == "VarLen" || minor.class == "IntervalDay")> public void setSafe(int index, ${minor.javaType!type.javaType} value) { + <#if minor.class == "Int"> + /* DELEGATE TO NEW VECTOR */ + values.setSafe(index, value); + <#else> bits.getMutator().setSafeToOne(index); values.getMutator().setSafe(index, value); setCount++; + </#if> } - </#if> + + + <#if minor.class == "Decimal"> public void set(int index, ${friendlyType} value) { bits.getMutator().setToOne(index); @@ -729,8 +1127,17 @@ protected final static byte[] emptyByteArray = new byte[]{}; values.getMutator().setSafe(index, value); setCount++; } - </#if> + + + + <#if minor.class == "Int" || minor.class == "VarChar"> + @Override + public void setValueCount(int valueCount) { + /* DELEGATE TO NEW VECTOR */ + values.setValueCount(valueCount); + } + <#else> @Override public void setValueCount(int valueCount) { assert valueCount >= 0; @@ -740,7 +1147,12 @@ protected final static byte[] emptyByteArray = new byte[]{}; values.getMutator().setValueCount(valueCount); bits.getMutator().setValueCount(valueCount); } + </#if> + + + <#if minor.class != "Int" && minor.class != "VarChar"> + /* THIS METHOD IS PROBABLY NOT NEEDED FOR NEW VECTORS */ @Override public void generateTestData(int valueCount){ bits.getMutator().generateTestDataAlt(valueCount); @@ -748,13 +1160,27 @@ protected final static byte[] emptyByteArray = new byte[]{}; <#if type.major = "VarLen">lastSet = valueCount;</#if> setValueCount(valueCount); } + </#if> + + + <#if minor.class != "Int" && minor.class != "VarChar"> + /* MUTATOR RESET IS NOT NEEDED FOR NEW VECTORS */ @Override public void reset(){ setCount = 0; <#if type.major = "VarLen">lastSet = -1;</#if> } + </#if> + + + <#if minor.class == "VarChar"> + public void setLastSet(int value) { + /* DELEGATE TO NEW VECTOR */ + values.setLastSet(value); + } + <#else> public void setLastSet(int value) { <#if type.major = "VarLen"> lastSet = value; @@ -762,7 +1188,16 @@ protected final static byte[] emptyByteArray = new byte[]{}; throw new UnsupportedOperationException(); </#if> } + </#if> + + + <#if minor.class == "VarChar"> + public int getLastSet() { + /* DELEGATE TO NEW VECTOR */ + return values.getLastSet(); + } + <#else> public int getLastSet() { <#if type.major != "VarLen"> throw new UnsupportedOperationException(); @@ -770,6 +1205,7 @@ protected final static byte[] emptyByteArray = new byte[]{}; return lastSet; </#if> } + </#if> } } </#list> http://git-wip-us.apache.org/repos/asf/arrow/blob/7f45d86d/java/vector/src/main/codegen/templates/UnionVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/codegen/templates/UnionVector.java b/java/vector/src/main/codegen/templates/UnionVector.java index fe24a86..3c7ed01 100644 --- a/java/vector/src/main/codegen/templates/UnionVector.java +++ b/java/vector/src/main/codegen/templates/UnionVector.java @@ -436,7 +436,11 @@ public class UnionVector implements FieldVector { <#assign uncappedName = name?uncap_first/> <#if !minor.typeParams?? > case ${name?upper_case}: - return get${name}Vector().getAccessor().getObject(index); + <#if minor.class != "Int" && minor.class != "VarChar"> + return get${name}Vector().getAccessor().getObject(index); + <#else> + return get${name}Vector().getObject(index); + </#if> </#if> </#list> </#list> @@ -530,7 +534,11 @@ public class UnionVector implements FieldVector { <#if !minor.typeParams?? > public void setSafe(int index, Nullable${name}Holder holder) { setType(index, MinorType.${name?upper_case}); - get${name}Vector().getMutator().setSafe(index, holder); + <#if minor.class != "Int" && minor.class != "VarChar"> + get${name}Vector().getMutator().setSafe(index, holder); + <#else> + get${name}Vector().setSafe(index, holder); + </#if> } </#if> @@ -547,4 +555,10 @@ public class UnionVector implements FieldVector { @Override public void generateTestData(int values) { } } + + public int getValueCount() { return 0; } + + public void setValueCount(int valueCount) { } + + public Object getObject(int index) { return null; } } http://git-wip-us.apache.org/repos/asf/arrow/blob/7f45d86d/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableFixedWidthVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableFixedWidthVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableFixedWidthVector.java new file mode 100644 index 0000000..c5f7810 --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableFixedWidthVector.java @@ -0,0 +1,701 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.vector; + +import io.netty.buffer.ArrowBuf; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.arrow.memory.OutOfMemoryException; +import org.apache.arrow.memory.BaseAllocator; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.schema.ArrowFieldNode; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.util.CallBack; +import org.apache.arrow.vector.util.OversizedAllocationException; +import org.apache.arrow.vector.util.TransferPair; + +public abstract class BaseNullableFixedWidthVector extends BaseValueVector + implements FixedWidthVector, FieldVector { + private final byte typeWidth; + + private int valueAllocationSizeInBytes; + private int validityAllocationSizeInBytes; + + protected final Field field; + private int allocationMonitor; + protected ArrowBuf validityBuffer; + protected ArrowBuf valueBuffer; + protected int valueCount; + + public BaseNullableFixedWidthVector(final String name, final BufferAllocator allocator, + FieldType fieldType, final byte typeWidth) { + super(name, allocator); + this.typeWidth = typeWidth; + valueAllocationSizeInBytes = INITIAL_VALUE_ALLOCATION * typeWidth; + validityAllocationSizeInBytes = getSizeFromCount(INITIAL_VALUE_ALLOCATION); + field = new Field(name, fieldType, null); + valueCount = 0; + allocationMonitor = 0; + validityBuffer = allocator.getEmpty(); + valueBuffer = allocator.getEmpty(); + } + + /* TODO: + * Determine how writerIndex and readerIndex need to be used. Right now we + * are setting the writerIndex and readerIndex in the call to getFieldBuffers + * using the valueCount -- this assumes that the caller of getFieldBuffers + * on the vector has already invoked setValueCount. + * + * Do we need to set them during vector transfer and splitAndTransfer? + */ + + /* TODO: + * + * see if getNullCount() can be made faster -- O(1) + */ + + /* TODO: + * Once the entire hierarchy has been refactored, move common functions + * like getNullCount(), splitAndTransferValidityBuffer to top level + * base class BaseValueVector. + * + * Along with this, some class members (validityBuffer) can also be + * abstracted out to top level base class. + * + * Right now BaseValueVector is the top level base class for other + * vector types in ValueVector hierarchy and those vectors have not + * yet been refactored so moving things to the top class as of now + * is not a good idea. + */ + + /* TODO: + * See if we need logger -- unnecessary object probably + */ + + protected abstract org.slf4j.Logger getLogger(); + + @Override + public Mutator getMutator() { + throw new UnsupportedOperationException("Mutator is not needed to write into vector"); + } + + @Override + public Accessor getAccessor() { + throw new UnsupportedOperationException("Accessor is not needed to read from vector"); + } + + @Override + public long getValidityBufferAddress() { + return (validityBuffer.memoryAddress()); + } + + @Override + public long getDataBufferAddress() { + return (valueBuffer.memoryAddress()); + } + + @Override + public long getOffsetBufferAddress() { + throw new UnsupportedOperationException("not supported for fixed-width vectors"); + } + + @Override + public ArrowBuf getValidityBuffer() { + return validityBuffer; + } + + @Override + public ArrowBuf getDataBuffer() { + return valueBuffer; + } + + @Override + public ArrowBuf getOffsetBuffer() { + throw new UnsupportedOperationException("not supported for fixed-width vectors"); + } + + @Override + public void setInitialCapacity(int valueCount) { + final long size = (long)valueCount * typeWidth; + if (size > MAX_ALLOCATION_SIZE) { + throw new OversizedAllocationException("Requested amount of memory is more than max allowed"); + } + valueAllocationSizeInBytes = (int)size; + validityAllocationSizeInBytes = getSizeFromCount(valueCount); + } + + @Override + public int getValueCapacity(){ + return Math.min(getValueBufferValueCapacity(), getValidityBufferValueCapacity()); + } + + /* for test purposes */ + private int getValueBufferValueCapacity() { + return (int)((valueBuffer.capacity() * 1.0)/typeWidth); + } + + /* for test purposes */ + private int getValidityBufferValueCapacity() { + return (int)(validityBuffer.capacity() * 8L); + } + + /* number of bytes for the validity buffer for the given valueCount */ + protected int getSizeFromCount(int valueCount) { + return (int) Math.ceil(valueCount / 8.0); + } + + @Override + public void zeroVector() { + initValidityBuffer(); + initValueBuffer(); + } + + private void initValidityBuffer() { + validityBuffer.setZero(0, validityBuffer.capacity()); + } + + private void initValueBuffer() { + valueBuffer.setZero(0, valueBuffer.capacity()); + } + + public void reset() { + zeroVector(); + } + + @Override + public void close() { clear(); } + + @Override + public void clear() { + validityBuffer = releaseBuffer(validityBuffer); + valueBuffer = releaseBuffer(valueBuffer); + } + + /* used to step down the memory allocation */ + protected void incrementAllocationMonitor() { + if (allocationMonitor < 0) { + allocationMonitor = 0; + } + allocationMonitor++; + } + + /* used to step up the memory allocation */ + protected void decrementAllocationMonitor() { + if (allocationMonitor > 0) { + allocationMonitor = 0; + } + allocationMonitor--; + } + + @Override + public void allocateNew() { + if(!allocateNewSafe()){ + throw new OutOfMemoryException("Failure while allocating memory."); + } + } + + public boolean allocateNewSafe() { + long curAllocationSizeValue = valueAllocationSizeInBytes; + long curAllocationSizeValidity = validityAllocationSizeInBytes; + + if (curAllocationSizeValue > MAX_ALLOCATION_SIZE) { + throw new OversizedAllocationException("Requested amount of memory exceeds limit"); + } + + /* we are doing a new allocation -- release the current buffers */ + clear(); + + try{ + allocateBytes(curAllocationSizeValue, curAllocationSizeValidity); + } catch (Exception e) { + getLogger().error("ERROR: Failure in allocateNewSafe"); + getLogger().error(e.getMessage()); + clear(); + return false; + } + + return true; + } + + public void allocateNew(int valueCount) { + long valueBufferSize = valueCount * typeWidth; + long validityBufferSize = getSizeFromCount(valueCount); + + if (allocationMonitor > 10) { + /* step down the default memory allocation since we have observed + * multiple times that provisioned value capacity was much larger than + * actually needed. see setValueCount for more details. + */ + valueBufferSize = Math.max(8, valueBufferSize / 2); + validityBufferSize = Math.max(8, validityBufferSize / 2); + allocationMonitor = 0; + } else if (allocationMonitor < -2) { + valueBufferSize = valueBufferSize * 2L; + validityBufferSize = validityBufferSize * 2L; + allocationMonitor = 0; + } + + if (valueBufferSize > MAX_ALLOCATION_SIZE) { + throw new OversizedAllocationException("Requested amount of memory is more than max allowed"); + } + + /* we are doing a new allocation -- release the current buffers */ + clear(); + + try { + allocateBytes(valueBufferSize, validityBufferSize); + } catch(Exception e) { + getLogger().error("ERROR: Failure in allocateNew"); + getLogger().error(e.getMessage()); + clear(); + throw e; + } + } + + /** + * Actual memory allocation is done by this function. All the calculations + * and knowledge about what size to allocate is upto the callers of this + * method. + * Callers appropriately handle errors if memory allocation fails here. + * Callers should also take care of determining that desired size is + * within the bounds of max allocation allowed and any other error + * conditions. + */ + private void allocateBytes(final long valueBufferSize, final long validityBufferSize) { + /* allocate data buffer */ + int curSize = (int)valueBufferSize; + valueBuffer = allocator.buffer(curSize); + valueBuffer.readerIndex(0); + valueAllocationSizeInBytes = curSize; + + /* allocate validity buffer */ + allocateValidityBuffer((int)validityBufferSize); + initValidityBuffer(); + } + + /* + * during splitAndTransfer, if we splitting from a random position within a byte, + * we can't just slice the source buffer so we have to explicitly allocate the + * validityBuffer of the target vector. This is unlike the databuffer which we can + * always slice for the target vector. + */ + private void allocateValidityBuffer(final int validityBufferSize) { + validityBuffer = allocator.buffer(validityBufferSize); + validityBuffer.readerIndex(0); + validityAllocationSizeInBytes = validityBufferSize; + initValidityBuffer(); + } + + @Override + public int getBufferSizeFor(final int count) { + if (count == 0) { return 0; } + return (count * typeWidth) + getSizeFromCount(count); + } + + @Override + public int getBufferSize() { + if (valueCount == 0) { return 0; } + return (valueCount * typeWidth) + getSizeFromCount(valueCount); + } + + @Override + public Field getField() { + return field; + } + + @Override + public ArrowBuf[] getBuffers(boolean clear) { + final ArrowBuf[] buffers = new ArrowBuf[2]; + buffers[0] = validityBuffer; + buffers[1] = valueBuffer; + if (clear) { + for (final ArrowBuf buffer:buffers) { + buffer.retain(1); + } + clear(); + } + return buffers; + } + + @Override + public void reAlloc() { + valueBuffer = reallocBufferHelper(valueBuffer, true); + validityBuffer = reallocBufferHelper(validityBuffer, false); + } + + private ArrowBuf reallocBufferHelper(ArrowBuf buffer, final boolean dataBuffer) { + final int currentBufferCapacity = buffer.capacity(); + long baseSize = (dataBuffer ? valueAllocationSizeInBytes + : validityAllocationSizeInBytes); + + if (baseSize < (long)currentBufferCapacity) { + baseSize = (long)currentBufferCapacity; + } + + long newAllocationSize = baseSize * 2L; + newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize); + + if (newAllocationSize > MAX_ALLOCATION_SIZE) { + throw new OversizedAllocationException("Unable to expand the buffer"); + } + + getLogger().debug("Reallocating vector [{}]. # of bytes: [{}] -> [{}]", + name, (dataBuffer ? valueAllocationSizeInBytes : validityAllocationSizeInBytes), + newAllocationSize); + + final ArrowBuf newBuf = allocator.buffer((int)newAllocationSize); + newBuf.setBytes(0, buffer, 0, currentBufferCapacity); + final int halfNewCapacity = newBuf.capacity() / 2; + newBuf.setZero(halfNewCapacity, halfNewCapacity); + buffer.release(1); + buffer = newBuf; + if (dataBuffer) { + valueAllocationSizeInBytes = (int)newAllocationSize; + } + else { + validityAllocationSizeInBytes = (int)newAllocationSize; + } + + return buffer; + } + + @Override + public List<BufferBacked> getFieldInnerVectors() { throw new UnsupportedOperationException(); } + + @Override + public void initializeChildrenFromFields(List<Field> children) { + if (!children.isEmpty()) { + throw new IllegalArgumentException("primitive type vector can not have children"); + } + } + + @Override + public List<FieldVector> getChildrenFromFields() { + return Collections.emptyList(); + } + + @Override + public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) { + if (ownBuffers.size() != 2) { + throw new IllegalArgumentException("Illegal buffer count, expected " + 2 + ", got: " + ownBuffers.size()); + } + + ArrowBuf bitBuffer = ownBuffers.get(0); + ArrowBuf dataBuffer = ownBuffers.get(1); + + validityBuffer.release(); + validityBuffer = bitBuffer.retain(allocator); + valueBuffer.release(); + valueBuffer = dataBuffer.retain(allocator); + + valueCount = fieldNode.getLength(); + + valueAllocationSizeInBytes = valueBuffer.capacity(); + validityAllocationSizeInBytes = validityBuffer.capacity(); + } + + public List<ArrowBuf> getFieldBuffers() { + List<ArrowBuf> result = new ArrayList<>(2); + + validityBuffer.readerIndex(0); + validityBuffer.writerIndex(getSizeFromCount(valueCount)); + valueBuffer.readerIndex(0); + valueBuffer.writerIndex(valueCount * typeWidth); + + result.add(validityBuffer); + result.add(valueBuffer); + + return result; + } + + @Override + public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) { + return getTransferPair(ref, allocator); + } + + @Override + public TransferPair getTransferPair(BufferAllocator allocator){ + return getTransferPair(name, allocator); + } + + public abstract TransferPair getTransferPair(String ref, BufferAllocator allocator); + + public void transferTo(BaseNullableFixedWidthVector target){ + compareTypes(target, "transferTo"); + target.clear(); + target.validityBuffer = validityBuffer.transferOwnership(target.allocator).buffer; + target.valueBuffer = valueBuffer.transferOwnership(target.allocator).buffer; + target.valueCount = valueCount; + clear(); + } + + public void splitAndTransferTo(int startIndex, int length, + BaseNullableFixedWidthVector target) { + compareTypes(target, "splitAndTransferTo"); + target.clear(); + splitAndTransferValidityBuffer(startIndex, length, target); + splitAndTransferValueBuffer(startIndex, length, target); + target.setValueCount(length); + } + + private void splitAndTransferValueBuffer(int startIndex, int length, + BaseNullableFixedWidthVector target) { + final int startPoint = startIndex * typeWidth; + final int sliceLength = length * typeWidth; + target.valueBuffer = valueBuffer.slice(startPoint, sliceLength).transferOwnership(target.allocator).buffer; + } + + private void splitAndTransferValidityBuffer(int startIndex, int length, + BaseNullableFixedWidthVector target) { + assert startIndex + length <= valueCount; + int firstByteSource = BitVectorHelper.byteIndex(startIndex); + int lastByteSource = BitVectorHelper.byteIndex(valueCount - 1); + int byteSizeTarget = getSizeFromCount(length); + int offset = startIndex % 8; + + if (length > 0) { + if (offset == 0) { + // slice + if (target.validityBuffer != null) { + target.validityBuffer.release(); + } + target.validityBuffer = validityBuffer.slice(firstByteSource, byteSizeTarget); + target.validityBuffer.retain(1); + } + else { + /* Copy data + * When the first bit starts from the middle of a byte (offset != 0), + * copy data from src BitVector. + * Each byte in the target is composed by a part in i-th byte, + * another part in (i+1)-th byte. + */ + target.allocateValidityBuffer(byteSizeTarget); + + for (int i = 0; i < byteSizeTarget - 1; i++) { + byte b1 = getBitsFromCurrentByte(this.validityBuffer, firstByteSource + i, offset); + byte b2 = getBitsFromNextByte(this.validityBuffer, firstByteSource + i + 1, offset); + + target.validityBuffer.setByte(i, (b1 + b2)); + } + + /* Copying the last piece is done in the following manner: + * if the source vector has 1 or more bytes remaining, we copy + * the last piece as a byte formed by shifting data + * from the current byte and the next byte. + * + * if the source vector has no more bytes remaining + * (we are at the last byte), we copy the last piece as a byte + * by shifting data from the current byte. + */ + if((firstByteSource + byteSizeTarget - 1) < lastByteSource) { + byte b1 = getBitsFromCurrentByte(this.validityBuffer, + firstByteSource + byteSizeTarget - 1, offset); + byte b2 = getBitsFromNextByte(this.validityBuffer, + firstByteSource + byteSizeTarget, offset); + + target.validityBuffer.setByte(byteSizeTarget - 1, b1 + b2); + } + else { + byte b1 = getBitsFromCurrentByte(this.validityBuffer, + firstByteSource + byteSizeTarget - 1, offset); + target.validityBuffer.setByte(byteSizeTarget - 1, b1); + } + } + } + } + + private static byte getBitsFromCurrentByte(ArrowBuf data, int index, int offset) { + return (byte)((data.getByte(index) & 0xFF) >>> offset); + } + + private static byte getBitsFromNextByte(ArrowBuf data, int index, int offset) { + return (byte)((data.getByte(index) << (8 - offset))); + } + + + /****************************************************************** + * * + * common getters and setters * + * * + ******************************************************************/ + + + /** + * Get the number of elements that are null in the vector + * + * @return the number of null elements. + */ + public int getNullCount() { + int count = 0; + final int sizeInBytes = getSizeFromCount(valueCount); + + for (int i = 0; i < sizeInBytes; ++i) { + final byte byteValue = validityBuffer.getByte(i); + /* Java uses two's complement binary representation, hence 11111111_b which is -1 + * when converted to Int will have 32bits set to 1. Masking the MSB and then + * adding it back solves the issue. + */ + count += Integer.bitCount(byteValue & 0x7F) - (byteValue >> 7); + } + int nullCount = (sizeInBytes * 8) - count; + /* if the valueCount is not a multiple of 8, + * the bits on the right were counted as null bits. + */ + int remainder = valueCount % 8; + nullCount -= remainder == 0 ? 0 : 8 - remainder; + return nullCount; + } + + + /** + * Get the value count of vector. This will always be zero unless + * {@link #setValueCount(int)} has been called prior to calling this. + * + * @return valueCount for the vector + */ + public int getValueCount(){ + return valueCount; + } + + + /** + * Set value count for the vector. + * + * @param valueCount value count to set + */ + public void setValueCount(int valueCount) { + this.valueCount = valueCount; + final int currentValueCapacity = getValueCapacity(); + while (valueCount > getValueCapacity()) { + reAlloc(); + } + /* + * We are trying to understand the pattern of memory allocation. + * If initially, the user did vector.allocateNew(), we would have + * allocated memory of default size (4096 * type width). + * Later on user invokes setValueCount(count). + * + * If the existing value capacity is twice as large as the + * valueCount, we know that we over-provisioned memory in the + * first place when default memory allocation was done because user + * really needs a much less value count in the vector. + * + * We record this by bumping up the allocationMonitor. If this pattern + * happens for certain number of times and allocationMonitor + * reaches the threshold (internal hardcoded) value, subsequent + * call to allocateNew() will take care of stepping down the + * default memory allocation size. + * + * Another case would be under-provisioning the initial memory and + * thus going through a lot of realloc(). Here the goal is to + * see if we can minimize the number of reallocations. Again the + * state is recorded in allocationMonitor by decrementing it + * (negative value). If a threshold is hit, realloc will try to + * allocate more memory in order to possibly avoid a future realloc. + * This case is also applicable to setSafe() methods which can trigger + * a realloc() and thus we record the state there as well. + */ + if (valueCount > 0) { + if (currentValueCapacity >= (valueCount * 2)) { + incrementAllocationMonitor(); + } else if (currentValueCapacity <= (valueCount/2)) { + decrementAllocationMonitor(); + } + } + } + + + /** + * Check if the given index is within the current value capacity + * of the vector + * + * @param index position to check + * @return true if index is within the current value capacity + */ + public boolean isSafe(int index) { + return index < getValueCapacity(); + } + + + /** + * Check if element at given index is null. + * + * @param index position of element + * @return true if element at given index is null, false otherwise + */ + public boolean isNull(int index) { + return (isSet(index) == 0); + } + + + /** + * Same as {@link #isNull(int)}. + * + * @param index position of element + * @return 1 if element at given index is not null, 0 otherwise + */ + public int isSet(int index) { + final int byteIndex = index >> 3; + final byte b = validityBuffer.getByte(byteIndex); + final int bitIndex = index & 7; + return Long.bitCount(b & (1L << bitIndex)); + } + + public void setIndexDefined(int index) { + handleSafe(index); + BitVectorHelper.setValidityBitToOne(validityBuffer, index); + } + + public void set(int index, byte[] value, int start, int length) { + throw new UnsupportedOperationException(); + } + + public void setSafe(int index, byte[] value, int start, int length) { + throw new UnsupportedOperationException(); + } + + public void set(int index, ByteBuffer value, int start, int length) { + throw new UnsupportedOperationException(); + } + + public void setSafe(int index, ByteBuffer value, int start, int length) { + throw new UnsupportedOperationException(); + } + + + /****************************************************************** + * * + * helper methods for setters * + * * + ******************************************************************/ + + + + protected void handleSafe(int index) { + while (index >= getValueCapacity()) { + decrementAllocationMonitor(); + reAlloc(); + } + } +} \ No newline at end of file