Repository: arrow Updated Branches: refs/heads/master edb8252c7 -> 0f9c88f71
ARROW-725: [Formats/Java] FixedSizeList message and java implementation ~Currently only added minor type for 2-tuples~ Author: Emilio Lahr-Vivaz <elahrvi...@ccri.com> Closes #452 from elahrvivaz/ARROW-725 and squashes the following commits: b139d3d [Emilio Lahr-Vivaz] adding reAlloc to FixedSizeListVector 229e24a [Emilio Lahr-Vivaz] re-ordering imports 594c0a2 [Emilio Lahr-Vivaz] simplifying writing of list vectors through mutator 7cb2324 [Emilio Lahr-Vivaz] reverting writer changes, adding examples of writing fixed size list using vector mutators 756dc8a [Emilio Lahr-Vivaz] ARROW-725: [Formats/Java] FixedSizeList message and java implementation Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/0f9c88f7 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/0f9c88f7 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/0f9c88f7 Branch: refs/heads/master Commit: 0f9c88f71bc64ec3288e381c8a4edb48c696b182 Parents: edb8252 Author: Emilio Lahr-Vivaz <elahrvi...@ccri.com> Authored: Sat Apr 15 17:15:07 2017 -0400 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Sat Apr 15 17:15:07 2017 -0400 ---------------------------------------------------------------------- format/Schema.fbs | 8 +- .../vector/src/main/codegen/data/ArrowTypes.tdd | 5 + .../main/codegen/templates/ComplexCopier.java | 2 + .../vector/complex/BaseRepeatedValueVector.java | 6 +- .../vector/complex/FixedSizeListVector.java | 387 +++++++++++++++++++ .../apache/arrow/vector/complex/ListVector.java | 18 +- .../arrow/vector/complex/NullableMapVector.java | 8 +- .../arrow/vector/complex/Positionable.java | 1 + .../arrow/vector/complex/PromotableVector.java | 32 ++ .../vector/complex/RepeatedValueVector.java | 6 +- .../vector/complex/impl/AbstractBaseReader.java | 5 + .../vector/complex/impl/AbstractBaseWriter.java | 5 + .../complex/impl/UnionFixedSizeListReader.java | 103 +++++ .../apache/arrow/vector/schema/TypeLayout.java | 8 + .../org/apache/arrow/vector/types/Types.java | 23 ++ .../arrow/vector/util/JsonStringArrayList.java | 8 + .../arrow/vector/TestFixedSizeListVector.java | 156 ++++++++ .../apache/arrow/vector/file/TestArrowFile.java | 69 +++- 18 files changed, 838 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/0f9c88f7/format/Schema.fbs ---------------------------------------------------------------------- diff --git a/format/Schema.fbs b/format/Schema.fbs index badc7ea..ff61199 100644 --- a/format/Schema.fbs +++ b/format/Schema.fbs @@ -39,6 +39,11 @@ table Struct_ { table List { } +table FixedSizeList { + /// Number of list items per value + listSize: int; +} + enum UnionMode:short { Sparse, Dense } /// A union is a complex type with children in Field @@ -159,7 +164,8 @@ union Type { List, Struct_, Union, - FixedSizeBinary + FixedSizeBinary, + FixedSizeList } /// ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/0f9c88f7/java/vector/src/main/codegen/data/ArrowTypes.tdd ---------------------------------------------------------------------- diff --git a/java/vector/src/main/codegen/data/ArrowTypes.tdd b/java/vector/src/main/codegen/data/ArrowTypes.tdd index e1fb5e0..ce92c13 100644 --- a/java/vector/src/main/codegen/data/ArrowTypes.tdd +++ b/java/vector/src/main/codegen/data/ArrowTypes.tdd @@ -28,6 +28,11 @@ complex: true }, { + name: "FixedSizeList", + fields: [{name: "listSize", type: int}], + complex: true + }, + { name: "Union", fields: [{name: "mode", type: short, valueType: UnionMode}, {name: "typeIds", type: "int[]"}], complex: true http://git-wip-us.apache.org/repos/asf/arrow/blob/0f9c88f7/java/vector/src/main/codegen/templates/ComplexCopier.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/codegen/templates/ComplexCopier.java b/java/vector/src/main/codegen/templates/ComplexCopier.java index 0dffe5e..89368ce 100644 --- a/java/vector/src/main/codegen/templates/ComplexCopier.java +++ b/java/vector/src/main/codegen/templates/ComplexCopier.java @@ -55,6 +55,8 @@ public class ComplexCopier { writer.endList(); } break; + case FIXED_SIZE_LIST: + throw new UnsupportedOperationException("Copy fixed size list"); case MAP: if (reader.isSet()) { writer.start(); http://git-wip-us.apache.org/repos/asf/arrow/blob/0f9c88f7/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java index da221e3..c9a9319 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java @@ -213,12 +213,14 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements public abstract class BaseRepeatedMutator extends BaseValueVector.BaseMutator implements RepeatedMutator { @Override - public void startNewValue(int index) { + public int startNewValue(int index) { while (offsets.getValueCapacity() <= index) { offsets.reAlloc(); } - offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index)); + int offset = offsets.getAccessor().get(index); + offsets.getMutator().setSafe(index+1, offset); setValueCount(index+1); + return offset; } @Override http://git-wip-us.apache.org/repos/asf/arrow/blob/0f9c88f7/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java new file mode 100644 index 0000000..7ac9f3b --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java @@ -0,0 +1,387 @@ +/******************************************************************************* + + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.arrow.vector.complex; + +import static java.util.Collections.singletonList; +import static org.apache.arrow.vector.complex.BaseRepeatedValueVector.DATA_VECTOR_NAME; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ObjectArrays; + +import io.netty.buffer.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.OutOfMemoryException; +import org.apache.arrow.vector.AddOrGetResult; +import org.apache.arrow.vector.BaseDataValueVector; +import org.apache.arrow.vector.BaseValueVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.BufferBacked; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.ZeroVector; +import org.apache.arrow.vector.complex.impl.UnionFixedSizeListReader; +import org.apache.arrow.vector.schema.ArrowFieldNode; +import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.DictionaryEncoding; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.util.CallBack; +import org.apache.arrow.vector.util.JsonStringArrayList; +import org.apache.arrow.vector.util.SchemaChangeRuntimeException; +import org.apache.arrow.vector.util.TransferPair; + +public class FixedSizeListVector extends BaseValueVector implements FieldVector, PromotableVector { + + private FieldVector vector; + private final BitVector bits; + private final int listSize; + private final DictionaryEncoding dictionary; + private final List<BufferBacked> innerVectors; + + private UnionFixedSizeListReader reader; + + private Mutator mutator = new Mutator(); + private Accessor accessor = new Accessor(); + + public FixedSizeListVector(String name, + BufferAllocator allocator, + int listSize, + DictionaryEncoding dictionary, + CallBack schemaChangeCallback) { + super(name, allocator); + Preconditions.checkArgument(listSize > 0, "list size must be positive"); + this.bits = new BitVector("$bits$", allocator); + this.vector = ZeroVector.INSTANCE; + this.listSize = listSize; + this.dictionary = dictionary; + this.innerVectors = Collections.singletonList((BufferBacked) bits); + this.reader = new UnionFixedSizeListReader(this); + } + + @Override + public Field getField() { + List<Field> children = ImmutableList.of(getDataVector().getField()); + return new Field(name, true, new ArrowType.FixedSizeList(listSize), children); + } + + @Override + public MinorType getMinorType() { + return MinorType.FIXED_SIZE_LIST; + } + + public int getListSize() { + return listSize; + } + + @Override + public void initializeChildrenFromFields(List<Field> children) { + if (children.size() != 1) { + throw new IllegalArgumentException("Lists have only one child. Found: " + children); + } + Field field = children.get(0); + FieldType type = new FieldType(field.isNullable(), field.getType(), field.getDictionary()); + AddOrGetResult<FieldVector> addOrGetVector = addOrGetVector(type); + if (!addOrGetVector.isCreated()) { + throw new IllegalArgumentException("Child vector already existed: " + addOrGetVector.getVector()); + } + addOrGetVector.getVector().initializeChildrenFromFields(field.getChildren()); + } + + @Override + public List<FieldVector> getChildrenFromFields() { + return singletonList(vector); + } + + @Override + public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) { + BaseDataValueVector.load(fieldNode, innerVectors, ownBuffers); + } + + @Override + public List<ArrowBuf> getFieldBuffers() { + return BaseDataValueVector.unload(innerVectors); + } + + @Override + public List<BufferBacked> getFieldInnerVectors() { + return innerVectors; + } + + @Override + public Accessor getAccessor() { + return accessor; + } + + @Override + public Mutator getMutator() { + return mutator; + } + + @Override + public UnionFixedSizeListReader getReader() { + return reader; + } + + @Override + public void allocateNew() throws OutOfMemoryException { + allocateNewSafe(); + } + + @Override + public boolean allocateNewSafe() { + /* boolean to keep track if all the memory allocation were successful + * Used in the case of composite vectors when we need to allocate multiple + * buffers for multiple vectors. If one of the allocations failed we need to + * clear all the memory that we allocated + */ + boolean success = false; + try { + success = bits.allocateNewSafe() && vector.allocateNewSafe(); + } finally { + if (!success) { + clear(); + } + } + if (success) { + bits.zeroVector(); + } + return success; + } + + @Override + public void reAlloc() { + bits.reAlloc(); + vector.reAlloc(); + } + + public FieldVector getDataVector() { + return vector; + } + + @Override + public void setInitialCapacity(int numRecords) { + bits.setInitialCapacity(numRecords); + vector.setInitialCapacity(numRecords * listSize); + } + + @Override + public int getValueCapacity() { + if (vector == ZeroVector.INSTANCE) { + return 0; + } + return vector.getValueCapacity() / listSize; + } + + @Override + public int getBufferSize() { + if (accessor.getValueCount() == 0) { + return 0; + } + return bits.getBufferSize() + vector.getBufferSize(); + } + + @Override + public int getBufferSizeFor(int valueCount) { + if (valueCount == 0) { + return 0; + } + return bits.getBufferSizeFor(valueCount) + vector.getBufferSizeFor(valueCount * listSize); + } + + @Override + public Iterator<ValueVector> iterator() { + return Collections.<ValueVector>singleton(vector).iterator(); + } + + @Override + public void clear() { + bits.clear(); + vector.clear(); + super.clear(); + } + + @Override + public ArrowBuf[] getBuffers(boolean clear) { + final ArrowBuf[] buffers = ObjectArrays.concat(bits.getBuffers(false), vector.getBuffers(false), ArrowBuf.class); + if (clear) { + for (ArrowBuf buffer: buffers) { + buffer.retain(); + } + clear(); + } + return buffers; + } + + /** + * Returns 1 if inner vector is explicitly set via #addOrGetVector else 0 + */ + public int size() { + return vector == ZeroVector.INSTANCE ? 0 : 1; + } + + @Override + @SuppressWarnings("unchecked") + public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(FieldType type) { + boolean created = false; + if (vector instanceof ZeroVector) { + vector = type.createNewSingleVector(DATA_VECTOR_NAME, allocator, null); + this.reader = new UnionFixedSizeListReader(this); + created = true; + } + // returned vector must have the same field + if (!Objects.equals(vector.getField().getType(), type.getType())) { + final String msg = String.format("Inner vector type mismatch. Requested type: [%s], actual type: [%s]", + type.getType(), vector.getField().getType()); + throw new SchemaChangeRuntimeException(msg); + } + + return new AddOrGetResult<>((T) vector, created); + } + + public void copyFromSafe(int inIndex, int outIndex, FixedSizeListVector from) { + copyFrom(inIndex, outIndex, from); + } + + public void copyFrom(int inIndex, int outIndex, FixedSizeListVector from) { + throw new UnsupportedOperationException("FixedSizeListVector.copyFrom"); + } + + @Override + public UnionVector promoteToUnion() { + UnionVector vector = new UnionVector(name, allocator, null); + this.vector.clear(); + this.vector = vector; + this.reader = new UnionFixedSizeListReader(this); + return vector; + } + + public class Accessor extends BaseValueVector.BaseAccessor { + + @Override + public Object getObject(int index) { + if (isNull(index)) { + return null; + } + final List<Object> vals = new JsonStringArrayList<>(listSize); + final ValueVector.Accessor valuesAccessor = vector.getAccessor(); + for(int i = 0; i < listSize; i++) { + vals.add(valuesAccessor.getObject(index * listSize + i)); + } + return vals; + } + + @Override + public boolean isNull(int index) { + return bits.getAccessor().get(index) == 0; + } + + @Override + public int getNullCount() { + return bits.getAccessor().getNullCount(); + } + + @Override + public int getValueCount() { + return bits.getAccessor().getValueCount(); + } + } + + public class Mutator extends BaseValueVector.BaseMutator { + + public void setNull(int index) { + bits.getMutator().setSafe(index, 0); + } + + public void setNotNull(int index) { + bits.getMutator().setSafe(index, 1); + } + + @Override + public void setValueCount(int valueCount) { + bits.getMutator().setValueCount(valueCount); + vector.getMutator().setValueCount(valueCount * listSize); + } + } + + @Override + public TransferPair getTransferPair(String ref, BufferAllocator allocator) { + return getTransferPair(ref, allocator, null); + } + + @Override + public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) { + return new TransferImpl(ref, allocator, callBack); + } + + @Override + public TransferPair makeTransferPair(ValueVector target) { + return new TransferImpl((FixedSizeListVector) target); + } + + private class TransferImpl implements TransferPair { + + FixedSizeListVector to; + TransferPair pairs[] = new TransferPair[2]; + + public TransferImpl(String name, BufferAllocator allocator, CallBack callBack) { + this(new FixedSizeListVector(name, allocator, listSize, dictionary, callBack)); + } + + public TransferImpl(FixedSizeListVector to) { + this.to = to; + Field field = vector.getField(); + FieldType type = new FieldType(field.isNullable(), field.getType(), field.getDictionary()); + to.addOrGetVector(type); + pairs[0] = bits.makeTransferPair(to.bits); + pairs[1] = getDataVector().makeTransferPair(to.getDataVector()); + } + + @Override + public void transfer() { + for (TransferPair pair : pairs) { + pair.transfer(); + } + } + + @Override + public void splitAndTransfer(int startIndex, int length) { + to.allocateNew(); + for (int i = 0; i < length; i++) { + copyValueSafe(startIndex + i, i); + } + } + + @Override + public ValueVector getTo() { + return to; + } + + @Override + public void copyValueSafe(int from, int to) { + this.to.copyFrom(from, to, FixedSizeListVector.this); + } + } +} http://git-wip-us.apache.org/repos/asf/arrow/blob/0f9c88f7/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java index 63235df..9392afb 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java @@ -53,7 +53,7 @@ import com.google.common.collect.ObjectArrays; import io.netty.buffer.ArrowBuf; -public class ListVector extends BaseRepeatedValueVector implements FieldVector { +public class ListVector extends BaseRepeatedValueVector implements FieldVector, PromotableVector { final UInt4Vector offsets; final BitVector bits; @@ -220,7 +220,7 @@ public class ListVector extends BaseRepeatedValueVector implements FieldVector { } @Override - public FieldReader getReader() { + public UnionListReader getReader() { return reader; } @@ -297,6 +297,7 @@ public class ListVector extends BaseRepeatedValueVector implements FieldVector { return buffers; } + @Override public UnionVector promoteToUnion() { UnionVector vector = new UnionVector(name, allocator, callBack); replaceDataVector(vector); @@ -345,12 +346,23 @@ public class ListVector extends BaseRepeatedValueVector implements FieldVector { } @Override - public void startNewValue(int index) { + public int startNewValue(int index) { for (int i = lastSet; i <= index; i++) { offsets.getMutator().setSafe(i + 1, offsets.getAccessor().get(i)); } setNotNull(index); lastSet = index + 1; + return offsets.getAccessor().get(lastSet); + } + + /** + * End the current value + * + * @param index index of the value to end + * @param size number of elements in the list that was written + */ + public void endValue(int index, int size) { + offsets.getMutator().set(index + 1, offsets.getAccessor().get(index + 1) + size); } @Override http://git-wip-us.apache.org/repos/asf/arrow/blob/0f9c88f7/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java index 647ab28..6456efb 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java @@ -31,6 +31,7 @@ import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.NullableVectorDefinitionSetter; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.complex.impl.NullableMapReaderImpl; +import org.apache.arrow.vector.complex.impl.NullableMapWriter; import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.holders.ComplexHolder; import org.apache.arrow.vector.schema.ArrowFieldNode; @@ -45,6 +46,7 @@ import io.netty.buffer.ArrowBuf; public class NullableMapVector extends MapVector implements FieldVector { private final NullableMapReaderImpl reader = new NullableMapReaderImpl(this); + private final NullableMapWriter writer = new NullableMapWriter(this); protected final BitVector bits; @@ -84,10 +86,14 @@ public class NullableMapVector extends MapVector implements FieldVector { } @Override - public FieldReader getReader() { + public NullableMapReaderImpl getReader() { return reader; } + public NullableMapWriter getWriter() { + return writer; + } + @Override public TransferPair getTransferPair(BufferAllocator allocator) { return new NullableMapTransferPair(this, new NullableMapVector(name, allocator, dictionary, null), false); http://git-wip-us.apache.org/repos/asf/arrow/blob/0f9c88f7/java/vector/src/main/java/org/apache/arrow/vector/complex/Positionable.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/Positionable.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/Positionable.java index 9345118..e1a4f36 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/Positionable.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/Positionable.java @@ -18,5 +18,6 @@ package org.apache.arrow.vector.complex; public interface Positionable { + public int getPosition(); public void setPosition(int index); } http://git-wip-us.apache.org/repos/asf/arrow/blob/0f9c88f7/java/vector/src/main/java/org/apache/arrow/vector/complex/PromotableVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/PromotableVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/PromotableVector.java new file mode 100644 index 0000000..8b528b4 --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/PromotableVector.java @@ -0,0 +1,32 @@ +/******************************************************************************* + + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.arrow.vector.complex; + +import org.apache.arrow.vector.AddOrGetResult; +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.DictionaryEncoding; +import org.apache.arrow.vector.types.pojo.FieldType; + +public interface PromotableVector { + + <T extends ValueVector> AddOrGetResult<T> addOrGetVector(FieldType type); + + UnionVector promoteToUnion(); +} http://git-wip-us.apache.org/repos/asf/arrow/blob/0f9c88f7/java/vector/src/main/java/org/apache/arrow/vector/complex/RepeatedValueVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/RepeatedValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/RepeatedValueVector.java index 54db393..b01a4e7 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/RepeatedValueVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/RepeatedValueVector.java @@ -73,13 +73,13 @@ public interface RepeatedValueVector extends ValueVector { } interface RepeatedMutator extends ValueVector.Mutator { + /** * Starts a new value that is a container of cells. * * @param index index of new value to start + * @return index into the child vector */ - void startNewValue(int index); - - + int startNewValue(int index); } } http://git-wip-us.apache.org/repos/asf/arrow/blob/0f9c88f7/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseReader.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseReader.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseReader.java index e7c3c8c..7c73c27 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseReader.java @@ -35,6 +35,11 @@ abstract class AbstractBaseReader implements FieldReader{ super(); } + @Override + public int getPosition() { + return index; + } + public void setPosition(int index){ this.index = index; } http://git-wip-us.apache.org/repos/asf/arrow/blob/0f9c88f7/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseWriter.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseWriter.java index e6cf098..13a0a6b 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseWriter.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseWriter.java @@ -35,6 +35,11 @@ abstract class AbstractBaseWriter implements FieldWriter { } @Override + public int getPosition() { + return index; + } + + @Override public void setPosition(int index) { this.index = index; } http://git-wip-us.apache.org/repos/asf/arrow/blob/0f9c88f7/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/UnionFixedSizeListReader.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/UnionFixedSizeListReader.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/UnionFixedSizeListReader.java new file mode 100644 index 0000000..515d4ab --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/UnionFixedSizeListReader.java @@ -0,0 +1,103 @@ +/******************************************************************************* + + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.arrow.vector.complex.impl; + +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.complex.FixedSizeListVector; +import org.apache.arrow.vector.complex.reader.FieldReader; +import org.apache.arrow.vector.complex.writer.BaseWriter.ListWriter; +import org.apache.arrow.vector.complex.writer.FieldWriter; +import org.apache.arrow.vector.holders.UnionHolder; +import org.apache.arrow.vector.types.Types.MinorType; + +/** + * Reader for fixed size list vectors + */ +public class UnionFixedSizeListReader extends AbstractFieldReader { + + private final FixedSizeListVector vector; + private final ValueVector data; + private final int listSize; + + private int currentOffset; + + public UnionFixedSizeListReader(FixedSizeListVector vector) { + this.vector = vector; + this.data = vector.getDataVector(); + this.listSize = vector.getListSize(); + } + + @Override + public boolean isSet() { + return !vector.getAccessor().isNull(idx()); + } + + @Override + public FieldReader reader() { + return data.getReader(); + } + + @Override + public Object readObject() { + return vector.getAccessor().getObject(idx()); + } + + @Override + public MinorType getMinorType() { + return vector.getMinorType(); + } + + @Override + public void setPosition(int index) { + super.setPosition(index); + data.getReader().setPosition(index * listSize); + currentOffset = 0; + } + + @Override + public void read(int index, UnionHolder holder) { + setPosition(idx()); + for (int i = -1; i < index; i++) { + if (!next()) { + throw new IndexOutOfBoundsException("Requested " + index + ", size " + listSize); + } + } + holder.reader = data.getReader(); + holder.isSet = vector.getAccessor().isNull(idx()) ? 0 : 1; + } + + @Override + public int size() { + return listSize; + } + + @Override + public boolean next() { + if (currentOffset < listSize) { + data.getReader().setPosition(idx() * listSize + currentOffset++); + return true; + } else { + return false; + } + } + + public void copyAsValue(ListWriter writer) { + ComplexCopier.copy(this, (FieldWriter) writer); + } +} http://git-wip-us.apache.org/repos/asf/arrow/blob/0f9c88f7/java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java index 69d550f..24840ec 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java @@ -35,6 +35,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType.Binary; import org.apache.arrow.vector.types.pojo.ArrowType.Bool; import org.apache.arrow.vector.types.pojo.ArrowType.Date; import org.apache.arrow.vector.types.pojo.ArrowType.Decimal; +import org.apache.arrow.vector.types.pojo.ArrowType.FixedSizeList; import org.apache.arrow.vector.types.pojo.ArrowType.FloatingPoint; import org.apache.arrow.vector.types.pojo.ArrowType.Int; import org.apache.arrow.vector.types.pojo.ArrowType.Interval; @@ -105,6 +106,13 @@ public class TypeLayout { return new TypeLayout(vectors); } + @Override public TypeLayout visit(FixedSizeList type) { + List<VectorLayout> vectors = asList( + validityVector() + ); + return new TypeLayout(vectors); + } + @Override public TypeLayout visit(FloatingPoint type) { int bitWidth; switch (type.getPrecision()) { http://git-wip-us.apache.org/repos/asf/arrow/blob/0f9c88f7/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java b/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java index b0455fa..6023f1c 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java @@ -51,6 +51,7 @@ import org.apache.arrow.vector.NullableVarBinaryVector; import org.apache.arrow.vector.NullableVarCharVector; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.ZeroVector; +import org.apache.arrow.vector.complex.FixedSizeListVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.NullableMapVector; import org.apache.arrow.vector.complex.UnionVector; @@ -90,6 +91,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType.Binary; import org.apache.arrow.vector.types.pojo.ArrowType.Bool; import org.apache.arrow.vector.types.pojo.ArrowType.Date; import org.apache.arrow.vector.types.pojo.ArrowType.Decimal; +import org.apache.arrow.vector.types.pojo.ArrowType.FixedSizeList; import org.apache.arrow.vector.types.pojo.ArrowType.FloatingPoint; import org.apache.arrow.vector.types.pojo.ArrowType.Int; import org.apache.arrow.vector.types.pojo.ArrowType.Interval; @@ -436,6 +438,23 @@ public class Types { return new UnionListWriter((ListVector) vector); } }, + FIXED_SIZE_LIST(null) { + @Override + public ArrowType getType() { + throw new UnsupportedOperationException("Cannot get simple type for FixedSizeList type"); + } + + @Override + public FieldVector getNewVector(String name, FieldType fieldType, BufferAllocator allocator, CallBack schemaChangeCallback) { + int size = ((FixedSizeList)fieldType.getType()).getListSize(); + return new FixedSizeListVector(name, allocator, size, fieldType.getDictionary(), schemaChangeCallback); + } + + @Override + public FieldWriter getNewFieldWriter(ValueVector vector) { + throw new UnsupportedOperationException("FieldWriter not implemented for FixedSizeList type"); + } + }, UNION(new Union(Sparse, null)) { @Override public FieldVector getNewVector(String name, FieldType fieldType, BufferAllocator allocator, CallBack schemaChangeCallback) { @@ -480,6 +499,10 @@ public class Types { return MinorType.LIST; } + @Override public MinorType visit(FixedSizeList type) { + return MinorType.FIXED_SIZE_LIST; + } + @Override public MinorType visit(Union type) { return MinorType.UNION; } http://git-wip-us.apache.org/repos/asf/arrow/blob/0f9c88f7/java/vector/src/main/java/org/apache/arrow/vector/util/JsonStringArrayList.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/util/JsonStringArrayList.java b/java/vector/src/main/java/org/apache/arrow/vector/util/JsonStringArrayList.java index 6291bfe..c598069 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/util/JsonStringArrayList.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/util/JsonStringArrayList.java @@ -31,6 +31,14 @@ public class JsonStringArrayList<E> extends ArrayList<E> { mapper = new ObjectMapper(); } + public JsonStringArrayList() { + super(); + } + + public JsonStringArrayList(int size) { + super(size); + } + @Override public boolean equals(Object obj) { if (this == obj) { http://git-wip-us.apache.org/repos/asf/arrow/blob/0f9c88f7/java/vector/src/test/java/org/apache/arrow/vector/TestFixedSizeListVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestFixedSizeListVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestFixedSizeListVector.java new file mode 100644 index 0000000..cfb7b3d --- /dev/null +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestFixedSizeListVector.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector; + +import com.google.common.collect.Lists; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.complex.FixedSizeListVector; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.impl.UnionFixedSizeListReader; +import org.apache.arrow.vector.complex.impl.UnionListReader; +import org.apache.arrow.vector.complex.reader.FieldReader; +import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestFixedSizeListVector { + + private BufferAllocator allocator; + + @Before + public void init() { + allocator = new DirtyRootAllocator(Long.MAX_VALUE, (byte) 100); + } + + @After + public void terminate() throws Exception { + allocator.close(); + } + + @Test + public void testIntType() { + try (FixedSizeListVector vector = new FixedSizeListVector("list", allocator, 2, null, null)) { + NullableIntVector nested = (NullableIntVector) vector.addOrGetVector(FieldType.nullable(MinorType.INT.getType())).getVector(); + NullableIntVector.Mutator mutator = nested.getMutator(); + vector.allocateNew(); + + for (int i = 0; i < 10; i++) { + vector.getMutator().setNotNull(i); + mutator.set(i * 2, i); + mutator.set(i * 2 + 1, i + 10); + } + vector.getMutator().setValueCount(10); + + UnionFixedSizeListReader reader = vector.getReader(); + for (int i = 0; i < 10; i++) { + reader.setPosition(i); + Assert.assertTrue(reader.isSet()); + Assert.assertTrue(reader.next()); + Assert.assertEquals(i, reader.reader().readInteger().intValue()); + Assert.assertTrue(reader.next()); + Assert.assertEquals(i + 10, reader.reader().readInteger().intValue()); + Assert.assertFalse(reader.next()); + Assert.assertEquals(Lists.newArrayList(i, i + 10), reader.readObject()); + } + } + } + + @Test + public void testFloatTypeNullable() { + try (FixedSizeListVector vector = new FixedSizeListVector("list", allocator, 2, null, null)) { + NullableFloat4Vector nested = (NullableFloat4Vector) vector.addOrGetVector(FieldType.nullable(MinorType.FLOAT4.getType())).getVector(); + NullableFloat4Vector.Mutator mutator = nested.getMutator(); + vector.allocateNew(); + + for (int i = 0; i < 10; i++) { + if (i % 2 == 0) { + vector.getMutator().setNotNull(i); + mutator.set(i * 2, i + 0.1f); + mutator.set(i * 2 + 1, i + 10.1f); + } + } + vector.getMutator().setValueCount(10); + + UnionFixedSizeListReader reader = vector.getReader(); + for (int i = 0; i < 10; i++) { + reader.setPosition(i); + if (i % 2 == 0) { + Assert.assertTrue(reader.isSet()); + Assert.assertTrue(reader.next()); + Assert.assertEquals(i + 0.1f, reader.reader().readFloat(), 0.00001); + Assert.assertTrue(reader.next()); + Assert.assertEquals(i + 10.1f, reader.reader().readFloat(), 0.00001); + Assert.assertFalse(reader.next()); + Assert.assertEquals(Lists.newArrayList(i + 0.1f, i + 10.1f), reader.readObject()); + } else { + Assert.assertFalse(reader.isSet()); + Assert.assertNull(reader.readObject()); + } + } + } + } + + @Test + public void testNestedInList() { + try (ListVector vector = new ListVector("list", allocator, null, null)) { + ListVector.Mutator mutator = vector.getMutator(); + FixedSizeListVector tuples = (FixedSizeListVector) vector.addOrGetVector(FieldType.nullable(new ArrowType.FixedSizeList(2))).getVector(); + FixedSizeListVector.Mutator tupleMutator = tuples.getMutator(); + NullableIntVector.Mutator innerMutator = (NullableIntVector.Mutator) tuples.addOrGetVector(FieldType.nullable(MinorType.INT.getType())).getVector().getMutator(); + vector.allocateNew(); + + for (int i = 0; i < 10; i++) { + if (i % 2 == 0) { + int position = mutator.startNewValue(i); + for (int j = 0; j < i % 7; j++) { + tupleMutator.setNotNull(position + j); + innerMutator.set((position + j) * 2, j); + innerMutator.set((position + j) * 2 + 1, j + 1); + } + mutator.endValue(i, i % 7); + } + } + mutator.setValueCount(10); + + UnionListReader reader = vector.getReader(); + for (int i = 0; i < 10; i++) { + reader.setPosition(i); + if (i % 2 == 0) { + for (int j = 0; j < i % 7; j++) { + Assert.assertTrue(reader.next()); + FieldReader innerListReader = reader.reader(); + for (int k = 0; k < 2; k++) { + Assert.assertTrue(innerListReader.next()); + Assert.assertEquals(k + j, innerListReader.reader().readInteger().intValue()); + } + Assert.assertFalse(innerListReader.next()); + } + Assert.assertFalse(reader.next()); + } else { + Assert.assertFalse(reader.isSet()); + Assert.assertNull(reader.readObject()); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/arrow/blob/0f9c88f7/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java index 11730af..3bed453 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java @@ -30,11 +30,17 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.NullableFloat4Vector; +import org.apache.arrow.vector.NullableIntVector; import org.apache.arrow.vector.NullableTinyIntVector; import org.apache.arrow.vector.NullableVarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.FixedSizeListVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.MapVector; import org.apache.arrow.vector.complex.NullableMapVector; @@ -49,6 +55,8 @@ import org.apache.arrow.vector.schema.ArrowRecordBatch; import org.apache.arrow.vector.stream.ArrowStreamReader; import org.apache.arrow.vector.stream.ArrowStreamWriter; import org.apache.arrow.vector.stream.MessageSerializerTest; +import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.ArrowType.FixedSizeList; import org.apache.arrow.vector.types.pojo.ArrowType.Int; import org.apache.arrow.vector.types.pojo.DictionaryEncoding; import org.apache.arrow.vector.types.pojo.Field; @@ -60,8 +68,6 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.ImmutableList; - public class TestArrowFile extends BaseFileTest { private static final Logger LOGGER = LoggerFactory.getLogger(TestArrowFile.class); @@ -576,6 +582,65 @@ public class TestArrowFile extends BaseFileTest { Assert.assertEquals(new Text("bar"), dictionaryAccessor.getObject(1)); } + @Test + public void testWriteReadFixedSizeList() throws IOException { + File file = new File("target/mytest_fixed_list.arrow"); + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + int count = COUNT; + + // write + try (BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); + NullableMapVector parent = new NullableMapVector("parent", originalVectorAllocator, null, null)) { + FixedSizeListVector tuples = parent.addOrGet("float-pairs", new FieldType(true, new FixedSizeList(2), null), FixedSizeListVector.class); + NullableFloat4Vector floats = (NullableFloat4Vector) tuples.addOrGetVector(new FieldType(true, MinorType.FLOAT4.getType(), null)).getVector(); + NullableIntVector ints = parent.addOrGet("ints", new FieldType(true, new Int(32, true), null), NullableIntVector.class); + parent.allocateNew(); + + for (int i = 0; i < 10; i++) { + tuples.getMutator().setNotNull(i); + floats.getMutator().set(i * 2, i + 0.1f); + floats.getMutator().set(i * 2 + 1, i + 10.1f); + ints.getMutator().set(i, i); + } + + parent.getMutator().setValueCount(10); + write(parent, file, stream); + } + + // read + try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(file); + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); + LOGGER.debug("reading schema: " + schema); + + for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) { + arrowReader.loadRecordBatch(rbBlock); + Assert.assertEquals(count, root.getRowCount()); + for (int i = 0; i < 10; i++) { + Assert.assertEquals(Lists.newArrayList(i + 0.1f, i + 10.1f), root.getVector("float-pairs").getAccessor().getObject(i)); + Assert.assertEquals(i, root.getVector("ints").getAccessor().getObject(i)); + } + } + } + + // read from stream + try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); + ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray()); + ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator)) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); + LOGGER.debug("reading schema: " + schema); + arrowReader.loadNextBatch(); + Assert.assertEquals(count, root.getRowCount()); + for (int i = 0; i < 10; i++) { + Assert.assertEquals(Lists.newArrayList(i + 0.1f, i + 10.1f), root.getVector("float-pairs").getAccessor().getObject(i)); + Assert.assertEquals(i, root.getVector("ints").getAccessor().getObject(i)); + } + } + } + /** * Writes the contents of parents to file. If outStream is non-null, also writes it * to outStream in the streaming serialized format.