http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java b/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java index ab539d5..8f2d042 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java @@ -33,10 +33,10 @@ import org.apache.arrow.vector.NullableIntVector; import org.apache.arrow.vector.NullableIntervalDayVector; import org.apache.arrow.vector.NullableIntervalYearVector; import org.apache.arrow.vector.NullableSmallIntVector; -import org.apache.arrow.vector.NullableTimeStampSecVector; -import org.apache.arrow.vector.NullableTimeStampMilliVector; import org.apache.arrow.vector.NullableTimeStampMicroVector; +import org.apache.arrow.vector.NullableTimeStampMilliVector; import org.apache.arrow.vector.NullableTimeStampNanoVector; +import org.apache.arrow.vector.NullableTimeStampSecVector; import org.apache.arrow.vector.NullableTimeVector; import org.apache.arrow.vector.NullableTinyIntVector; import org.apache.arrow.vector.NullableUInt1Vector; @@ -61,10 +61,10 @@ import org.apache.arrow.vector.complex.impl.IntervalDayWriterImpl; import org.apache.arrow.vector.complex.impl.IntervalYearWriterImpl; import org.apache.arrow.vector.complex.impl.NullableMapWriter; import org.apache.arrow.vector.complex.impl.SmallIntWriterImpl; -import org.apache.arrow.vector.complex.impl.TimeStampSecWriterImpl; -import org.apache.arrow.vector.complex.impl.TimeStampMilliWriterImpl; import org.apache.arrow.vector.complex.impl.TimeStampMicroWriterImpl; +import org.apache.arrow.vector.complex.impl.TimeStampMilliWriterImpl; import org.apache.arrow.vector.complex.impl.TimeStampNanoWriterImpl; +import org.apache.arrow.vector.complex.impl.TimeStampSecWriterImpl; import org.apache.arrow.vector.complex.impl.TimeWriterImpl; import org.apache.arrow.vector.complex.impl.TinyIntWriterImpl; import org.apache.arrow.vector.complex.impl.UInt1WriterImpl; @@ -92,6 +92,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType.Time; import org.apache.arrow.vector.types.pojo.ArrowType.Timestamp; import org.apache.arrow.vector.types.pojo.ArrowType.Union; import org.apache.arrow.vector.types.pojo.ArrowType.Utf8; +import org.apache.arrow.vector.types.pojo.DictionaryEncoding; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.util.CallBack; @@ -129,7 +130,7 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { return ZeroVector.INSTANCE; } @@ -145,8 +146,8 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new NullableMapVector(name, allocator, callBack); + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { + return new NullableMapVector(name, allocator, dictionary, callBack); } @Override @@ -161,8 +162,8 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new NullableTinyIntVector(name, allocator); + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { + return new NullableTinyIntVector(name, allocator, dictionary); } @Override @@ -177,8 +178,8 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new NullableSmallIntVector(name, allocator); + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { + return new NullableSmallIntVector(name, allocator, dictionary); } @Override @@ -193,8 +194,8 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new NullableIntVector(name, allocator); + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { + return new NullableIntVector(name, allocator, dictionary); } @Override @@ -209,8 +210,8 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new NullableBigIntVector(name, allocator); + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { + return new NullableBigIntVector(name, allocator, dictionary); } @Override @@ -225,8 +226,8 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new NullableDateVector(name, allocator); + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { + return new NullableDateVector(name, allocator, dictionary); } @Override @@ -241,8 +242,8 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new NullableTimeVector(name, allocator); + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { + return new NullableTimeVector(name, allocator, dictionary); } @Override @@ -258,8 +259,8 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new NullableTimeStampSecVector(name, allocator); + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { + return new NullableTimeStampSecVector(name, allocator, dictionary); } @Override @@ -275,8 +276,8 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new NullableTimeStampMilliVector(name, allocator); + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { + return new NullableTimeStampMilliVector(name, allocator, dictionary); } @Override @@ -292,8 +293,8 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new NullableTimeStampMicroVector(name, allocator); + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { + return new NullableTimeStampMicroVector(name, allocator, dictionary); } @Override @@ -309,8 +310,8 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new NullableTimeStampNanoVector(name, allocator); + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { + return new NullableTimeStampNanoVector(name, allocator, dictionary); } @Override @@ -325,8 +326,8 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new NullableIntervalDayVector(name, allocator); + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { + return new NullableIntervalDayVector(name, allocator, dictionary); } @Override @@ -341,8 +342,8 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new NullableIntervalDayVector(name, allocator); + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { + return new NullableIntervalDayVector(name, allocator, dictionary); } @Override @@ -358,8 +359,8 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new NullableFloat4Vector(name, allocator); + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { + return new NullableFloat4Vector(name, allocator, dictionary); } @Override @@ -375,8 +376,8 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new NullableFloat8Vector(name, allocator); + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { + return new NullableFloat8Vector(name, allocator, dictionary); } @Override @@ -391,8 +392,8 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new NullableBitVector(name, allocator); + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { + return new NullableBitVector(name, allocator, dictionary); } @Override @@ -407,8 +408,8 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new NullableVarCharVector(name, allocator); + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { + return new NullableVarCharVector(name, allocator, dictionary); } @Override @@ -423,8 +424,8 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new NullableVarBinaryVector(name, allocator); + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { + return new NullableVarBinaryVector(name, allocator, dictionary); } @Override @@ -443,8 +444,8 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new NullableDecimalVector(name, allocator, precisionScale[0], precisionScale[1]); + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { + return new NullableDecimalVector(name, allocator, dictionary, precisionScale[0], precisionScale[1]); } @Override @@ -459,8 +460,8 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new NullableUInt1Vector(name, allocator); + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { + return new NullableUInt1Vector(name, allocator, dictionary); } @Override @@ -475,8 +476,8 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new NullableUInt2Vector(name, allocator); + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { + return new NullableUInt2Vector(name, allocator, dictionary); } @Override @@ -491,8 +492,8 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new NullableUInt4Vector(name, allocator); + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { + return new NullableUInt4Vector(name, allocator, dictionary); } @Override @@ -507,8 +508,8 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new NullableUInt8Vector(name, allocator); + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { + return new NullableUInt8Vector(name, allocator, dictionary); } @Override @@ -523,8 +524,8 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new ListVector(name, allocator, callBack); + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { + return new ListVector(name, allocator, dictionary, callBack); } @Override @@ -539,7 +540,10 @@ public class Types { } @Override - public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { + public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) { + if (dictionary != null) { + throw new UnsupportedOperationException("Dictionary encoding not supported for complex types"); + } return new UnionVector(name, allocator, callBack); } @@ -561,7 +565,7 @@ public class Types { public abstract Field getField(); - public abstract FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale); + public abstract FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale); public abstract FieldWriter getNewFieldWriter(ValueVector vector); }
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/DictionaryEncoding.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/DictionaryEncoding.java b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/DictionaryEncoding.java new file mode 100644 index 0000000..6d35cde --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/DictionaryEncoding.java @@ -0,0 +1,51 @@ +/******************************************************************************* + + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.arrow.vector.types.pojo; + +import org.apache.arrow.vector.types.pojo.ArrowType.Int; + +public class DictionaryEncoding { + + private final long id; + private final boolean ordered; + private final Int indexType; + + public DictionaryEncoding(long id, boolean ordered, Int indexType) { + this.id = id; + this.ordered = ordered; + this.indexType = indexType == null ? new Int(32, true) : indexType; + } + + public long getId() { + return id; + } + + public boolean isOrdered() { + return ordered; + } + + public Int getIndexType() { + return indexType; + } + + @Override + public String toString() { + return "DictionaryEncoding[id=" + id + ",ordered=" + ordered + ",indexType=" + indexType + "]"; + } +} http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java index f9b79ce..bbbd559 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java @@ -24,23 +24,27 @@ import static org.apache.arrow.vector.types.pojo.ArrowType.getTypeForField; import java.util.List; import java.util.Objects; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude.Include; -import org.apache.arrow.flatbuf.DictionaryEncoding; -import org.apache.arrow.vector.schema.TypeLayout; -import org.apache.arrow.vector.schema.VectorLayout; - -import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.flatbuffers.FlatBufferBuilder; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.schema.TypeLayout; +import org.apache.arrow.vector.schema.VectorLayout; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.ArrowType.Int; + public class Field { private final String name; private final boolean nullable; private final ArrowType type; - private final Long dictionary; + private final DictionaryEncoding dictionary; private final List<Field> children; private final TypeLayout typeLayout; @@ -49,7 +53,7 @@ public class Field { @JsonProperty("name") String name, @JsonProperty("nullable") boolean nullable, @JsonProperty("type") ArrowType type, - @JsonProperty("dictionary") Long dictionary, + @JsonProperty("dictionary") DictionaryEncoding dictionary, @JsonProperty("children") List<Field> children, @JsonProperty("typeLayout") TypeLayout typeLayout) { this.name = name; @@ -68,18 +72,30 @@ public class Field { this(name, nullable, type, null, children, TypeLayout.getTypeLayout(checkNotNull(type))); } - public Field(String name, boolean nullable, ArrowType type, Long dictionary, List<Field> children) { + public Field(String name, boolean nullable, ArrowType type, DictionaryEncoding dictionary, List<Field> children) { this(name, nullable, type, dictionary, children, TypeLayout.getTypeLayout(checkNotNull(type))); } + public FieldVector createVector(BufferAllocator allocator) { + MinorType minorType = Types.getMinorTypeForArrowType(type); + FieldVector vector = minorType.getNewVector(name, allocator, dictionary, null); + vector.initializeChildrenFromFields(children); + return vector; + } + public static Field convertField(org.apache.arrow.flatbuf.Field field) { String name = field.name(); boolean nullable = field.nullable(); ArrowType type = getTypeForField(field); - DictionaryEncoding dictionaryEncoding = field.dictionary(); - Long dictionary = null; - if (dictionaryEncoding != null) { - dictionary = dictionaryEncoding.id(); + DictionaryEncoding dictionary = null; + org.apache.arrow.flatbuf.DictionaryEncoding dictionaryFB = field.dictionary(); + if (dictionaryFB != null) { + Int indexType = null; + org.apache.arrow.flatbuf.Int indexTypeFB = dictionaryFB.indexType(); + if (indexTypeFB != null) { + indexType = new Int(indexTypeFB.bitWidth(), indexTypeFB.isSigned()); + } + dictionary = new DictionaryEncoding(dictionaryFB.id(), dictionaryFB.isOrdered(), indexType); } ImmutableList.Builder<org.apache.arrow.vector.schema.VectorLayout> layout = ImmutableList.builder(); for (int i = 0; i < field.layoutLength(); ++i) { @@ -105,8 +121,11 @@ public class Field { int typeOffset = type.getType(builder); int dictionaryOffset = -1; if (dictionary != null) { - builder.addLong(dictionary); - dictionaryOffset = builder.offset(); + // TODO encode dictionary type - currently type is only signed 32 bit int (default null) + org.apache.arrow.flatbuf.DictionaryEncoding.startDictionaryEncoding(builder); + org.apache.arrow.flatbuf.DictionaryEncoding.addId(builder, dictionary.getId()); + org.apache.arrow.flatbuf.DictionaryEncoding.addIsOrdered(builder, dictionary.isOrdered()); + dictionaryOffset = org.apache.arrow.flatbuf.DictionaryEncoding.endDictionaryEncoding(builder); } int[] childrenData = new int[children.size()]; for (int i = 0; i < children.size(); i++) { @@ -126,11 +145,11 @@ public class Field { org.apache.arrow.flatbuf.Field.addNullable(builder, nullable); org.apache.arrow.flatbuf.Field.addTypeType(builder, type.getTypeID().getFlatbufID()); org.apache.arrow.flatbuf.Field.addType(builder, typeOffset); + org.apache.arrow.flatbuf.Field.addChildren(builder, childrenOffset); + org.apache.arrow.flatbuf.Field.addLayout(builder, layoutOffset); if (dictionary != null) { org.apache.arrow.flatbuf.Field.addDictionary(builder, dictionaryOffset); } - org.apache.arrow.flatbuf.Field.addChildren(builder, childrenOffset); - org.apache.arrow.flatbuf.Field.addLayout(builder, layoutOffset); return org.apache.arrow.flatbuf.Field.endField(builder); } @@ -147,7 +166,7 @@ public class Field { } @JsonInclude(Include.NON_NULL) - public Long getDictionary() { return dictionary; } + public DictionaryEncoding getDictionary() { return dictionary; } public List<Field> getChildren() { return children; @@ -168,8 +187,8 @@ public class Field { Objects.equals(this.type, that.type) && Objects.equals(this.dictionary, that.dictionary) && (Objects.equals(this.children, that.children) || - (this.children == null && that.children.size() == 0) || - (this.children.size() == 0 && that.children == null)); + (this.children == null || this.children.size() == 0) && + (that.children == null || that.children.size() == 0)); } @Override @@ -180,7 +199,7 @@ public class Field { } sb.append(type); if (dictionary != null) { - sb.append("[dictionary: ").append(dictionary).append("]"); + sb.append("[dictionary: ").append(dictionary.getId()).append("]"); } if (!children.isEmpty()) { sb.append("<").append(Joiner.on(", ").join(children)).append(">"); http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/TestDecimalVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestDecimalVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestDecimalVector.java index cca35e4..20f4aa8 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestDecimalVector.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestDecimalVector.java @@ -44,7 +44,7 @@ public class TestDecimalVector { @Test public void test() { BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); - NullableDecimalVector decimalVector = new NullableDecimalVector("decimal", allocator, 10, scale); + NullableDecimalVector decimalVector = new NullableDecimalVector("decimal", allocator, null, 10, scale); decimalVector.allocateNew(); BigDecimal[] values = new BigDecimal[intValues.length]; for (int i = 0; i < intValues.length; i++) { http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/TestDictionaryVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestDictionaryVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestDictionaryVector.java index 962950a..e3087ef 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestDictionaryVector.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestDictionaryVector.java @@ -18,16 +18,16 @@ package org.apache.arrow.vector; import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.vector.complex.DictionaryVector; -import org.apache.arrow.vector.types.Dictionary; +import org.apache.arrow.vector.dictionary.DictionaryEncoder; +import org.apache.arrow.vector.dictionary.Dictionary; import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.DictionaryEncoding; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.nio.charset.StandardCharsets; -import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; public class TestDictionaryVector { @@ -49,65 +49,10 @@ public class TestDictionaryVector { } @Test - public void testEncodeStringsWithGeneratedDictionary() { + public void testEncodeStrings() { // Create a new value vector - try (final NullableVarCharVector vector = (NullableVarCharVector) MinorType.VARCHAR.getNewVector("foo", allocator, null)) { - final NullableVarCharVector.Mutator m = vector.getMutator(); - vector.allocateNew(512, 5); - - // set some values - m.setSafe(0, zero, 0, zero.length); - m.setSafe(1, one, 0, one.length); - m.setSafe(2, one, 0, one.length); - m.setSafe(3, two, 0, two.length); - m.setSafe(4, zero, 0, zero.length); - m.setValueCount(5); - - DictionaryVector encoded = DictionaryVector.encode(vector); - - try { - // verify values in the dictionary - ValueVector dictionary = encoded.getDictionaryVector(); - assertEquals(vector.getClass(), dictionary.getClass()); - - NullableVarCharVector.Accessor dictionaryAccessor = ((NullableVarCharVector) dictionary).getAccessor(); - assertEquals(3, dictionaryAccessor.getValueCount()); - assertArrayEquals(zero, dictionaryAccessor.get(0)); - assertArrayEquals(one, dictionaryAccessor.get(1)); - assertArrayEquals(two, dictionaryAccessor.get(2)); - - // verify indices - ValueVector indices = encoded.getIndexVector(); - assertEquals(NullableIntVector.class, indices.getClass()); - - NullableIntVector.Accessor indexAccessor = ((NullableIntVector) indices).getAccessor(); - assertEquals(5, indexAccessor.getValueCount()); - assertEquals(0, indexAccessor.get(0)); - assertEquals(1, indexAccessor.get(1)); - assertEquals(1, indexAccessor.get(2)); - assertEquals(2, indexAccessor.get(3)); - assertEquals(0, indexAccessor.get(4)); - - // now run through the decoder and verify we get the original back - try (ValueVector decoded = DictionaryVector.decode(indices, encoded.getDictionary())) { - assertEquals(vector.getClass(), decoded.getClass()); - assertEquals(vector.getAccessor().getValueCount(), decoded.getAccessor().getValueCount()); - for (int i = 0; i < 5; i++) { - assertEquals(vector.getAccessor().getObject(i), decoded.getAccessor().getObject(i)); - } - } - } finally { - encoded.getDictionaryVector().close(); - encoded.getIndexVector().close(); - } - } - } - - @Test - public void testEncodeStringsWithProvidedDictionary() { - // Create a new value vector - try (final NullableVarCharVector vector = (NullableVarCharVector) MinorType.VARCHAR.getNewVector("foo", allocator, null); - final NullableVarCharVector dictionary = (NullableVarCharVector) MinorType.VARCHAR.getNewVector("dict", allocator, null)) { + try (final NullableVarCharVector vector = (NullableVarCharVector) MinorType.VARCHAR.getNewVector("foo", allocator, null, null); + final NullableVarCharVector dictionaryVector = (NullableVarCharVector) MinorType.VARCHAR.getNewVector("dict", allocator, null, null)) { final NullableVarCharVector.Mutator m = vector.getMutator(); vector.allocateNew(512, 5); @@ -120,19 +65,20 @@ public class TestDictionaryVector { m.setValueCount(5); // set some dictionary values - final NullableVarCharVector.Mutator m2 = dictionary.getMutator(); - dictionary.allocateNew(512, 3); + final NullableVarCharVector.Mutator m2 = dictionaryVector.getMutator(); + dictionaryVector.allocateNew(512, 3); m2.setSafe(0, zero, 0, zero.length); m2.setSafe(1, one, 0, one.length); m2.setSafe(2, two, 0, two.length); m2.setValueCount(3); - try(final DictionaryVector encoded = DictionaryVector.encode(vector, new Dictionary(dictionary, false))) { + Dictionary dictionary = new Dictionary(dictionaryVector, new DictionaryEncoding(1L, false, null)); + + try(final ValueVector encoded = (FieldVector) DictionaryEncoder.encode(vector, dictionary)) { // verify indices - ValueVector indices = encoded.getIndexVector(); - assertEquals(NullableIntVector.class, indices.getClass()); + assertEquals(NullableIntVector.class, encoded.getClass()); - NullableIntVector.Accessor indexAccessor = ((NullableIntVector) indices).getAccessor(); + NullableIntVector.Accessor indexAccessor = ((NullableIntVector) encoded).getAccessor(); assertEquals(5, indexAccessor.getValueCount()); assertEquals(0, indexAccessor.get(0)); assertEquals(1, indexAccessor.get(1)); @@ -141,7 +87,7 @@ public class TestDictionaryVector { assertEquals(0, indexAccessor.get(4)); // now run through the decoder and verify we get the original back - try (ValueVector decoded = DictionaryVector.decode(indices, encoded.getDictionary())) { + try (ValueVector decoded = DictionaryEncoder.decode(encoded, dictionary)) { assertEquals(vector.getClass(), decoded.getClass()); assertEquals(vector.getAccessor().getValueCount(), decoded.getAccessor().getValueCount()); for (int i = 0; i < 5; i++) { http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java index 1f0baae..18d93b6 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java @@ -42,8 +42,8 @@ public class TestListVector { @Test public void testCopyFrom() throws Exception { - try (ListVector inVector = new ListVector("input", allocator, null); - ListVector outVector = new ListVector("output", allocator, null)) { + try (ListVector inVector = new ListVector("input", allocator, null, null); + ListVector outVector = new ListVector("output", allocator, null, null)) { UnionListWriter writer = inVector.getWriter(); writer.allocate(); http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java index 774b59e..6917638 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java @@ -86,7 +86,7 @@ public class TestValueVector { public void testNullableVarLen2() { // Create a new value vector for 1024 integers. - try (final NullableVarCharVector vector = new NullableVarCharVector(EMPTY_SCHEMA_PATH, allocator)) { + try (final NullableVarCharVector vector = new NullableVarCharVector(EMPTY_SCHEMA_PATH, allocator, null)) { final NullableVarCharVector.Mutator m = vector.getMutator(); vector.allocateNew(1024 * 10, 1024); @@ -116,7 +116,7 @@ public class TestValueVector { public void testNullableFixedType() { // Create a new value vector for 1024 integers. - try (final NullableUInt4Vector vector = new NullableUInt4Vector(EMPTY_SCHEMA_PATH, allocator)) { + try (final NullableUInt4Vector vector = new NullableUInt4Vector(EMPTY_SCHEMA_PATH, allocator, null)) { final NullableUInt4Vector.Mutator m = vector.getMutator(); vector.allocateNew(1024); @@ -186,7 +186,7 @@ public class TestValueVector { @Test public void testNullableFloat() { // Create a new value vector for 1024 integers - try (final NullableFloat4Vector vector = (NullableFloat4Vector) MinorType.FLOAT4.getNewVector(EMPTY_SCHEMA_PATH, allocator, null)) { + try (final NullableFloat4Vector vector = (NullableFloat4Vector) MinorType.FLOAT4.getNewVector(EMPTY_SCHEMA_PATH, allocator, null, null)) { final NullableFloat4Vector.Mutator m = vector.getMutator(); vector.allocateNew(1024); @@ -233,7 +233,7 @@ public class TestValueVector { @Test public void testNullableInt() { // Create a new value vector for 1024 integers - try (final NullableIntVector vector = (NullableIntVector) MinorType.INT.getNewVector(EMPTY_SCHEMA_PATH, allocator, null)) { + try (final NullableIntVector vector = (NullableIntVector) MinorType.INT.getNewVector(EMPTY_SCHEMA_PATH, allocator, null, null)) { final NullableIntVector.Mutator m = vector.getMutator(); vector.allocateNew(1024); @@ -403,7 +403,7 @@ public class TestValueVector { @Test public void testReAllocNullableFixedWidthVector() { // Create a new value vector for 1024 integers - try (final NullableFloat4Vector vector = (NullableFloat4Vector) MinorType.FLOAT4.getNewVector(EMPTY_SCHEMA_PATH, allocator, null)) { + try (final NullableFloat4Vector vector = (NullableFloat4Vector) MinorType.FLOAT4.getNewVector(EMPTY_SCHEMA_PATH, allocator, null, null)) { final NullableFloat4Vector.Mutator m = vector.getMutator(); vector.allocateNew(1024); @@ -436,7 +436,7 @@ public class TestValueVector { @Test public void testReAllocNullableVariableWidthVector() { // Create a new value vector for 1024 integers - try (final NullableVarCharVector vector = (NullableVarCharVector) MinorType.VARCHAR.getNewVector(EMPTY_SCHEMA_PATH, allocator, null)) { + try (final NullableVarCharVector vector = (NullableVarCharVector) MinorType.VARCHAR.getNewVector(EMPTY_SCHEMA_PATH, allocator, null, null)) { final NullableVarCharVector.Mutator m = vector.getMutator(); vector.allocateNew(); http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java index 79c9d50..372bcf0 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import io.netty.buffer.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.complex.MapVector; @@ -46,8 +47,6 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.Test; -import io.netty.buffer.ArrowBuf; - public class TestVectorUnloadLoad { static final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); @@ -81,8 +80,8 @@ public class TestVectorUnloadLoad { try ( ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch(); BufferAllocator finalVectorsAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); - VectorSchemaRoot newRoot = new VectorSchemaRoot(schema, finalVectorsAllocator); - ) { + VectorSchemaRoot newRoot = VectorSchemaRoot.create(schema, finalVectorsAllocator); + ) { // load it VectorLoader vectorLoader = new VectorLoader(newRoot); @@ -131,8 +130,8 @@ public class TestVectorUnloadLoad { try ( ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch(); BufferAllocator finalVectorsAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); - VectorSchemaRoot newRoot = new VectorSchemaRoot(schema, finalVectorsAllocator); - ) { + VectorSchemaRoot newRoot = VectorSchemaRoot.create(schema, finalVectorsAllocator); + ) { List<ArrowBuf> oldBuffers = recordBatch.getBuffers(); List<ArrowBuf> newBuffers = new ArrayList<>(); for (ArrowBuf oldBuffer : oldBuffers) { @@ -185,7 +184,7 @@ public class TestVectorUnloadLoad { Schema schema = new Schema(asList( new Field("intDefined", true, new ArrowType.Int(32, true), Collections.<Field>emptyList()), new Field("intNull", true, new ArrowType.Int(32, true), Collections.<Field>emptyList()) - )); + )); int count = 10; ArrowBuf validity = allocator.buffer(10).slice(0, 0); ArrowBuf[] values = new ArrowBuf[2]; @@ -200,8 +199,8 @@ public class TestVectorUnloadLoad { try ( ArrowRecordBatch recordBatch = new ArrowRecordBatch(count, asList(new ArrowFieldNode(count, 0), new ArrowFieldNode(count, count)), asList(validity, values[0], validity, values[1])); BufferAllocator finalVectorsAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); - VectorSchemaRoot newRoot = new VectorSchemaRoot(schema, finalVectorsAllocator); - ) { + VectorSchemaRoot newRoot = VectorSchemaRoot.create(schema, finalVectorsAllocator); + ) { // load it VectorLoader vectorLoader = new VectorLoader(newRoot); @@ -244,11 +243,12 @@ public class TestVectorUnloadLoad { Schema schema = new Schema(root.getField().getChildren()); int valueCount = root.getAccessor().getValueCount(); List<FieldVector> fields = root.getChildrenFromFields(); - return new VectorUnloader(schema, valueCount, fields); + VectorSchemaRoot vsr = new VectorSchemaRoot(schema.getFields(), fields, valueCount); + return new VectorUnloader(vsr); } @AfterClass public static void afterClass() { allocator.close(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestPromotableWriter.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestPromotableWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestPromotableWriter.java index 58312b3..2b49d8e 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestPromotableWriter.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestPromotableWriter.java @@ -53,7 +53,7 @@ public class TestPromotableWriter { public void testPromoteToUnion() throws Exception { try (final MapVector container = new MapVector(EMPTY_SCHEMA_PATH, allocator, null); - final NullableMapVector v = container.addOrGet("test", MinorType.MAP, NullableMapVector.class); + final NullableMapVector v = container.addOrGet("test", MinorType.MAP, NullableMapVector.class, null); final PromotableWriter writer = new PromotableWriter(v, container)) { container.allocateNew(); http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java index 7a2d416..a8a2d51 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java @@ -181,7 +181,7 @@ public class TestComplexWriter { @Test public void listScalarType() { - ListVector listVector = new ListVector("list", allocator, null); + ListVector listVector = new ListVector("list", allocator, null, null); listVector.allocateNew(); UnionListWriter listWriter = new UnionListWriter(listVector); for (int i = 0; i < COUNT; i++) { @@ -204,7 +204,7 @@ public class TestComplexWriter { @Test public void listScalarTypeNullable() { - ListVector listVector = new ListVector("list", allocator, null); + ListVector listVector = new ListVector("list", allocator, null, null); listVector.allocateNew(); UnionListWriter listWriter = new UnionListWriter(listVector); for (int i = 0; i < COUNT; i++) { @@ -233,7 +233,7 @@ public class TestComplexWriter { @Test public void listMapType() { - ListVector listVector = new ListVector("list", allocator, null); + ListVector listVector = new ListVector("list", allocator, null, null); listVector.allocateNew(); UnionListWriter listWriter = new UnionListWriter(listVector); MapWriter mapWriter = listWriter.map(); @@ -261,7 +261,7 @@ public class TestComplexWriter { @Test public void listListType() { - try (ListVector listVector = new ListVector("list", allocator, null)) { + try (ListVector listVector = new ListVector("list", allocator, null, null)) { listVector.allocateNew(); UnionListWriter listWriter = new UnionListWriter(listVector); for (int i = 0; i < COUNT; i++) { @@ -286,7 +286,7 @@ public class TestComplexWriter { */ @Test public void listListType2() { - try (ListVector listVector = new ListVector("list", allocator, null)) { + try (ListVector listVector = new ListVector("list", allocator, null, null)) { listVector.allocateNew(); UnionListWriter listWriter = new UnionListWriter(listVector); ListWriter innerListWriter = listWriter.list(); @@ -324,7 +324,7 @@ public class TestComplexWriter { @Test public void unionListListType() { - try (ListVector listVector = new ListVector("list", allocator, null)) { + try (ListVector listVector = new ListVector("list", allocator, null, null)) { listVector.allocateNew(); UnionListWriter listWriter = new UnionListWriter(listVector); for (int i = 0; i < COUNT; i++) { @@ -353,7 +353,7 @@ public class TestComplexWriter { */ @Test public void unionListListType2() { - try (ListVector listVector = new ListVector("list", allocator, null)) { + try (ListVector listVector = new ListVector("list", allocator, null, null)) { listVector.allocateNew(); UnionListWriter listWriter = new UnionListWriter(listVector); ListWriter innerListWriter = listWriter.list(); http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java index a83a283..75e5d2d 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java @@ -17,31 +17,44 @@ */ package org.apache.arrow.vector.file; -import static org.apache.arrow.vector.TestVectorUnloadLoad.newVectorUnloader; -import static org.junit.Assert.assertTrue; - import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.List; +import com.google.common.collect.ImmutableList; + import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.FieldVector; -import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.NullableTinyIntVector; +import org.apache.arrow.vector.NullableVarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.MapVector; import org.apache.arrow.vector.complex.NullableMapVector; +import org.apache.arrow.vector.complex.impl.UnionListWriter; +import org.apache.arrow.vector.dictionary.Dictionary; +import org.apache.arrow.vector.dictionary.DictionaryProvider; +import org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider; +import org.apache.arrow.vector.dictionary.DictionaryEncoder; import org.apache.arrow.vector.schema.ArrowBuffer; +import org.apache.arrow.vector.schema.ArrowMessage; import org.apache.arrow.vector.schema.ArrowRecordBatch; import org.apache.arrow.vector.stream.ArrowStreamReader; import org.apache.arrow.vector.stream.ArrowStreamWriter; +import org.apache.arrow.vector.stream.MessageSerializerTest; +import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.ArrowType.Int; +import org.apache.arrow.vector.types.pojo.DictionaryEncoding; +import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.Text; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; @@ -68,7 +81,7 @@ public class TestArrowFile extends BaseFileTest { int count = COUNT; try ( BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); - NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null)) { + NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null, null)) { writeComplexData(count, parent); FieldVector root = parent.getChild("root"); validateComplexContent(count, new VectorSchemaRoot(root)); @@ -83,71 +96,63 @@ public class TestArrowFile extends BaseFileTest { int count = COUNT; // write - try ( - BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); - MapVector parent = new MapVector("parent", originalVectorAllocator, null)) { + try (BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); + MapVector parent = new MapVector("parent", originalVectorAllocator, null)) { writeData(count, parent); write(parent.getChild("root"), file, stream); } // read - try ( - BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); - FileInputStream fileInputStream = new FileInputStream(file); - ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator); - BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); - MapVector parent = new MapVector("parent", vectorAllocator, null) - ) { - ArrowFooter footer = arrowReader.readFooter(); - Schema schema = footer.getSchema(); - LOGGER.debug("reading schema: " + schema); - - // initialize vectors - - try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator)) { - VectorLoader vectorLoader = new VectorLoader(root); - - List<ArrowBlock> recordBatches = footer.getRecordBatches(); - for (ArrowBlock rbBlock : recordBatches) { - try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { - List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout(); - for (ArrowBuffer arrowBuffer : buffersLayout) { - Assert.assertEquals(0, arrowBuffer.getOffset() % 8); + try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(file); + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator){ + @Override + protected ArrowMessage readMessage(SeekableReadChannel in, BufferAllocator allocator) throws IOException { + ArrowMessage message = super.readMessage(in, allocator); + if (message != null) { + ArrowRecordBatch batch = (ArrowRecordBatch) message; + List<ArrowBuffer> buffersLayout = batch.getBuffersLayout(); + for (ArrowBuffer arrowBuffer : buffersLayout) { + Assert.assertEquals(0, arrowBuffer.getOffset() % 8); + } + } + return message; } - vectorLoader.load(recordBatch); - } - - validateContent(count, root); - } + }) { + Schema schema = arrowReader.getVectorSchemaRoot().getSchema(); + LOGGER.debug("reading schema: " + schema); + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) { + arrowReader.loadRecordBatch(rbBlock); + Assert.assertEquals(count, root.getRowCount()); + validateContent(count, root); } } // Read from stream. - try ( - BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); - ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray()); - ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator); - BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); - MapVector parent = new MapVector("parent", vectorAllocator, null) - ) { - arrowReader.init(); - Schema schema = arrowReader.getSchema(); + try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); + ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray()); + ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator){ + @Override + protected ArrowMessage readMessage(ReadChannel in, BufferAllocator allocator) throws IOException { + ArrowMessage message = super.readMessage(in, allocator); + if (message != null) { + ArrowRecordBatch batch = (ArrowRecordBatch) message; + List<ArrowBuffer> buffersLayout = batch.getBuffersLayout(); + for (ArrowBuffer arrowBuffer : buffersLayout) { + Assert.assertEquals(0, arrowBuffer.getOffset() % 8); + } + } + return message; + } + }) { + + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); LOGGER.debug("reading schema: " + schema); - - try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator)) { - VectorLoader vectorLoader = new VectorLoader(root); - while (true) { - try (ArrowRecordBatch recordBatch = arrowReader.nextRecordBatch()) { - if (recordBatch == null) break; - List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout(); - for (ArrowBuffer arrowBuffer : buffersLayout) { - Assert.assertEquals(0, arrowBuffer.getOffset() % 8); - } - vectorLoader.load(recordBatch); - } - } - validateContent(count, root); - } + arrowReader.loadNextBatch(); + Assert.assertEquals(count, root.getRowCount()); + validateContent(count, root); } } @@ -158,61 +163,37 @@ public class TestArrowFile extends BaseFileTest { int count = COUNT; // write - try ( - BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); - MapVector parent = new MapVector("parent", originalVectorAllocator, null)) { + try (BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); + MapVector parent = new MapVector("parent", originalVectorAllocator, null)) { writeComplexData(count, parent); write(parent.getChild("root"), file, stream); } // read - try ( - BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); - FileInputStream fileInputStream = new FileInputStream(file); - ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator); - BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); - NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null) - ) { - ArrowFooter footer = arrowReader.readFooter(); - Schema schema = footer.getSchema(); + try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(file); + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); LOGGER.debug("reading schema: " + schema); - // initialize vectors - - try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator)) { - VectorLoader vectorLoader = new VectorLoader(root); - List<ArrowBlock> recordBatches = footer.getRecordBatches(); - for (ArrowBlock rbBlock : recordBatches) { - try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { - vectorLoader.load(recordBatch); - } - validateComplexContent(count, root); - } + for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) { + arrowReader.loadRecordBatch(rbBlock); + Assert.assertEquals(count, root.getRowCount()); + validateComplexContent(count, root); } } // Read from stream. - try ( - BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); - ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray()); - ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator); - BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); - MapVector parent = new MapVector("parent", vectorAllocator, null) - ) { - arrowReader.init(); - Schema schema = arrowReader.getSchema(); + try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); + ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray()); + ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator)) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); LOGGER.debug("reading schema: " + schema); - - try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator)) { - VectorLoader vectorLoader = new VectorLoader(root); - while (true) { - try (ArrowRecordBatch recordBatch = arrowReader.nextRecordBatch()) { - if (recordBatch == null) break; - vectorLoader.load(recordBatch); - } - } - validateComplexContent(count, root); - } + arrowReader.loadNextBatch(); + Assert.assertEquals(count, root.getRowCount()); + validateComplexContent(count, root); } } @@ -223,94 +204,70 @@ public class TestArrowFile extends BaseFileTest { int[] counts = { 10, 5 }; // write - try ( - BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); - MapVector parent = new MapVector("parent", originalVectorAllocator, null); - FileOutputStream fileOutputStream = new FileOutputStream(file);) { + try (BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); + MapVector parent = new MapVector("parent", originalVectorAllocator, null); + FileOutputStream fileOutputStream = new FileOutputStream(file)){ writeData(counts[0], parent); - VectorUnloader vectorUnloader0 = newVectorUnloader(parent.getChild("root")); - Schema schema = vectorUnloader0.getSchema(); - Assert.assertEquals(2, schema.getFields().size()); - try (ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema); - ArrowStreamWriter streamWriter = new ArrowStreamWriter(stream, schema)) { - try (ArrowRecordBatch recordBatch = vectorUnloader0.getRecordBatch()) { - Assert.assertEquals("RB #0", counts[0], recordBatch.getLength()); - arrowWriter.writeRecordBatch(recordBatch); - streamWriter.writeRecordBatch(recordBatch); - } + VectorSchemaRoot root = new VectorSchemaRoot(parent.getChild("root")); + + try(ArrowFileWriter fileWriter = new ArrowFileWriter(root, null, fileOutputStream.getChannel()); + ArrowStreamWriter streamWriter = new ArrowStreamWriter(root, null, stream)) { + fileWriter.start(); + streamWriter.start(); + + fileWriter.writeBatch(); + streamWriter.writeBatch(); + parent.allocateNew(); writeData(counts[1], parent); // if we write the same data we don't catch that the metadata is stored in the wrong order. - VectorUnloader vectorUnloader1 = newVectorUnloader(parent.getChild("root")); - try (ArrowRecordBatch recordBatch = vectorUnloader1.getRecordBatch()) { - Assert.assertEquals("RB #1", counts[1], recordBatch.getLength()); - arrowWriter.writeRecordBatch(recordBatch); - streamWriter.writeRecordBatch(recordBatch); - } + root.setRowCount(counts[1]); + + fileWriter.writeBatch(); + streamWriter.writeBatch(); + + fileWriter.end(); + streamWriter.end(); } } - // read - try ( - BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); - FileInputStream fileInputStream = new FileInputStream(file); - ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator); - BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); - MapVector parent = new MapVector("parent", vectorAllocator, null); - ) { - ArrowFooter footer = arrowReader.readFooter(); - Schema schema = footer.getSchema(); + // read file + try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(file); + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); LOGGER.debug("reading schema: " + schema); int i = 0; - try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator);) { - VectorLoader vectorLoader = new VectorLoader(root); - List<ArrowBlock> recordBatches = footer.getRecordBatches(); - Assert.assertEquals(2, recordBatches.size()); - long previousOffset = 0; - for (ArrowBlock rbBlock : recordBatches) { - Assert.assertTrue(rbBlock.getOffset() + " > " + previousOffset, rbBlock.getOffset() > previousOffset); - previousOffset = rbBlock.getOffset(); - try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { - Assert.assertEquals("RB #" + i, counts[i], recordBatch.getLength()); - List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout(); - for (ArrowBuffer arrowBuffer : buffersLayout) { - Assert.assertEquals(0, arrowBuffer.getOffset() % 8); - } - vectorLoader.load(recordBatch); - validateContent(counts[i], root); - } - ++i; - } + List<ArrowBlock> recordBatches = arrowReader.getRecordBlocks(); + Assert.assertEquals(2, recordBatches.size()); + long previousOffset = 0; + for (ArrowBlock rbBlock : recordBatches) { + Assert.assertTrue(rbBlock.getOffset() + " > " + previousOffset, rbBlock.getOffset() > previousOffset); + previousOffset = rbBlock.getOffset(); + arrowReader.loadRecordBatch(rbBlock); + Assert.assertEquals("RB #" + i, counts[i], root.getRowCount()); + validateContent(counts[i], root); + ++i; } } // read stream - try ( - BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); - ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray()); - ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator); - BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); - MapVector parent = new MapVector("parent", vectorAllocator, null) - ) { - arrowReader.init(); - Schema schema = arrowReader.getSchema(); + try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); + ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray()); + ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator)) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); LOGGER.debug("reading schema: " + schema); int i = 0; - try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator);) { - VectorLoader vectorLoader = new VectorLoader(root); - for (int n = 0; n < 2; n++) { - try (ArrowRecordBatch recordBatch = arrowReader.nextRecordBatch()) { - assertTrue(recordBatch != null); - Assert.assertEquals("RB #" + i, counts[i], recordBatch.getLength()); - List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout(); - for (ArrowBuffer arrowBuffer : buffersLayout) { - Assert.assertEquals(0, arrowBuffer.getOffset() % 8); - } - vectorLoader.load(recordBatch); - validateContent(counts[i], root); - } - ++i; - } + + for (int n = 0; n < 2; n++) { + arrowReader.loadNextBatch(); + Assert.assertEquals("RB #" + i, counts[i], root.getRowCount()); + validateContent(counts[i], root); + ++i; } + arrowReader.loadNextBatch(); + Assert.assertEquals(0, root.getRowCount()); } } @@ -319,90 +276,326 @@ public class TestArrowFile extends BaseFileTest { File file = new File("target/mytest_write_union.arrow"); ByteArrayOutputStream stream = new ByteArrayOutputStream(); int count = COUNT; - try ( - BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); - NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null)) { + // write + try (BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); + NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null, null)) { writeUnionData(count, parent); - - printVectors(parent.getChildrenFromFields()); - validateUnionData(count, new VectorSchemaRoot(parent.getChild("root"))); - write(parent.getChild("root"), file, stream); } - // read - try ( - BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); - FileInputStream fileInputStream = new FileInputStream(file); - ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator); - BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); - ) { - ArrowFooter footer = arrowReader.readFooter(); - Schema schema = footer.getSchema(); + + // read file + try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(file); + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); LOGGER.debug("reading schema: " + schema); + arrowReader.loadNextBatch(); + validateUnionData(count, root); + } + + // Read from stream. + try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); + ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray()); + ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator)) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); + LOGGER.debug("reading schema: " + schema); + arrowReader.loadNextBatch(); + validateUnionData(count, root); + } + } - // initialize vectors - try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator);) { - VectorLoader vectorLoader = new VectorLoader(root); - List<ArrowBlock> recordBatches = footer.getRecordBatches(); - for (ArrowBlock rbBlock : recordBatches) { - try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { - vectorLoader.load(recordBatch); - } - validateUnionData(count, root); - } + @Test + public void testWriteReadTiny() throws IOException { + File file = new File("target/mytest_write_tiny.arrow"); + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + + try (VectorSchemaRoot root = VectorSchemaRoot.create(MessageSerializerTest.testSchema(), allocator)) { + root.getFieldVectors().get(0).allocateNew(); + NullableTinyIntVector.Mutator mutator = (NullableTinyIntVector.Mutator) root.getFieldVectors().get(0).getMutator(); + for (int i = 0; i < 16; i++) { + mutator.set(i, i < 8 ? 1 : 0, (byte)(i + 1)); + } + mutator.setValueCount(16); + root.setRowCount(16); + + // write file + try (FileOutputStream fileOutputStream = new FileOutputStream(file); + ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream.getChannel())) { + LOGGER.debug("writing schema: " + root.getSchema()); + arrowWriter.start(); + arrowWriter.writeBatch(); + arrowWriter.end(); + } + // write stream + try (ArrowStreamWriter arrowWriter = new ArrowStreamWriter(root, null, stream)) { + arrowWriter.start(); + arrowWriter.writeBatch(); + arrowWriter.end(); } } + // read file + try (BufferAllocator readerAllocator = allocator.newChildAllocator("fileReader", 0, Integer.MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(file); + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); + LOGGER.debug("reading schema: " + schema); + arrowReader.loadNextBatch(); + validateTinyData(root); + } + // Read from stream. - try ( - BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); - ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray()); - ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator); - BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); - MapVector parent = new MapVector("parent", vectorAllocator, null) - ) { - arrowReader.init(); - Schema schema = arrowReader.getSchema(); + try (BufferAllocator readerAllocator = allocator.newChildAllocator("streamReader", 0, Integer.MAX_VALUE); + ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray()); + ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator)) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); + LOGGER.debug("reading schema: " + schema); + arrowReader.loadNextBatch(); + validateTinyData(root); + } + } + + private void validateTinyData(VectorSchemaRoot root) { + Assert.assertEquals(16, root.getRowCount()); + NullableTinyIntVector vector = (NullableTinyIntVector) root.getFieldVectors().get(0); + for (int i = 0; i < 16; i++) { + if (i < 8) { + Assert.assertEquals((byte)(i + 1), vector.getAccessor().get(i)); + } else { + Assert.assertTrue(vector.getAccessor().isNull(i)); + } + } + } + + @Test + public void testWriteReadDictionary() throws IOException { + File file = new File("target/mytest_dict.arrow"); + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + + // write + try (BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); + NullableVarCharVector vector = new NullableVarCharVector("varchar", originalVectorAllocator, null); + NullableVarCharVector dictionaryVector = new NullableVarCharVector("dict", originalVectorAllocator, null)) { + vector.allocateNewSafe(); + NullableVarCharVector.Mutator mutator = vector.getMutator(); + mutator.set(0, "foo".getBytes(StandardCharsets.UTF_8)); + mutator.set(1, "bar".getBytes(StandardCharsets.UTF_8)); + mutator.set(3, "baz".getBytes(StandardCharsets.UTF_8)); + mutator.set(4, "bar".getBytes(StandardCharsets.UTF_8)); + mutator.set(5, "baz".getBytes(StandardCharsets.UTF_8)); + mutator.setValueCount(6); + + dictionaryVector.allocateNewSafe(); + mutator = dictionaryVector.getMutator(); + mutator.set(0, "foo".getBytes(StandardCharsets.UTF_8)); + mutator.set(1, "bar".getBytes(StandardCharsets.UTF_8)); + mutator.set(2, "baz".getBytes(StandardCharsets.UTF_8)); + mutator.setValueCount(3); + + Dictionary dictionary = new Dictionary(dictionaryVector, new DictionaryEncoding(1L, false, null)); + MapDictionaryProvider provider = new MapDictionaryProvider(); + provider.put(dictionary); + + FieldVector encodedVector = (FieldVector) DictionaryEncoder.encode(vector, dictionary); + + List<Field> fields = ImmutableList.of(encodedVector.getField()); + List<FieldVector> vectors = ImmutableList.of(encodedVector); + VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 6); + + try (FileOutputStream fileOutputStream = new FileOutputStream(file); + ArrowFileWriter fileWriter = new ArrowFileWriter(root, provider, fileOutputStream.getChannel()); + ArrowStreamWriter streamWriter = new ArrowStreamWriter(root, provider, stream)) { + LOGGER.debug("writing schema: " + root.getSchema()); + fileWriter.start(); + streamWriter.start(); + fileWriter.writeBatch(); + streamWriter.writeBatch(); + fileWriter.end(); + streamWriter.end(); + } + + dictionaryVector.close(); + encodedVector.close(); + } + + // read from file + try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(file); + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); LOGGER.debug("reading schema: " + schema); + arrowReader.loadNextBatch(); + validateFlatDictionary(root.getFieldVectors().get(0), arrowReader); + } + + // Read from stream + try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); + ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray()); + ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator)) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); + LOGGER.debug("reading schema: " + schema); + arrowReader.loadNextBatch(); + validateFlatDictionary(root.getFieldVectors().get(0), arrowReader); + } + } + + private void validateFlatDictionary(FieldVector vector, DictionaryProvider provider) { + Assert.assertNotNull(vector); + + DictionaryEncoding encoding = vector.getField().getDictionary(); + Assert.assertNotNull(encoding); + Assert.assertEquals(1L, encoding.getId()); + + FieldVector.Accessor accessor = vector.getAccessor(); + Assert.assertEquals(6, accessor.getValueCount()); + Assert.assertEquals(0, accessor.getObject(0)); + Assert.assertEquals(1, accessor.getObject(1)); + Assert.assertEquals(null, accessor.getObject(2)); + Assert.assertEquals(2, accessor.getObject(3)); + Assert.assertEquals(1, accessor.getObject(4)); + Assert.assertEquals(2, accessor.getObject(5)); + + Dictionary dictionary = provider.lookup(1L); + Assert.assertNotNull(dictionary); + NullableVarCharVector.Accessor dictionaryAccessor = ((NullableVarCharVector) dictionary.getVector()).getAccessor(); + Assert.assertEquals(3, dictionaryAccessor.getValueCount()); + Assert.assertEquals(new Text("foo"), dictionaryAccessor.getObject(0)); + Assert.assertEquals(new Text("bar"), dictionaryAccessor.getObject(1)); + Assert.assertEquals(new Text("baz"), dictionaryAccessor.getObject(2)); + } - try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator)) { - VectorLoader vectorLoader = new VectorLoader(root); - while (true) { - try (ArrowRecordBatch recordBatch = arrowReader.nextRecordBatch()) { - if (recordBatch == null) break; - vectorLoader.load(recordBatch); - } - } - validateUnionData(count, root); + @Test + public void testWriteReadNestedDictionary() throws IOException { + File file = new File("target/mytest_dict_nested.arrow"); + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + + DictionaryEncoding encoding = new DictionaryEncoding(2L, false, null); + + // data being written: + // [['foo', 'bar'], ['foo'], ['bar']] -> [[0, 1], [0], [1]] + + // write + try (NullableVarCharVector dictionaryVector = new NullableVarCharVector("dictionary", allocator, null); + ListVector listVector = new ListVector("list", allocator, null, null)) { + + Dictionary dictionary = new Dictionary(dictionaryVector, encoding); + MapDictionaryProvider provider = new MapDictionaryProvider(); + provider.put(dictionary); + + dictionaryVector.allocateNew(); + dictionaryVector.getMutator().set(0, "foo".getBytes(StandardCharsets.UTF_8)); + dictionaryVector.getMutator().set(1, "bar".getBytes(StandardCharsets.UTF_8)); + dictionaryVector.getMutator().setValueCount(2); + + listVector.addOrGetVector(MinorType.INT, encoding); + listVector.allocateNew(); + UnionListWriter listWriter = new UnionListWriter(listVector); + listWriter.startList(); + listWriter.writeInt(0); + listWriter.writeInt(1); + listWriter.endList(); + listWriter.startList(); + listWriter.writeInt(0); + listWriter.endList(); + listWriter.startList(); + listWriter.writeInt(1); + listWriter.endList(); + listWriter.setValueCount(3); + + List<Field> fields = ImmutableList.of(listVector.getField()); + List<FieldVector> vectors = ImmutableList.of((FieldVector) listVector); + VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 3); + + try ( + FileOutputStream fileOutputStream = new FileOutputStream(file); + ArrowFileWriter fileWriter = new ArrowFileWriter(root, provider, fileOutputStream.getChannel()); + ArrowStreamWriter streamWriter = new ArrowStreamWriter(root, provider, stream)) { + LOGGER.debug("writing schema: " + root.getSchema()); + fileWriter.start(); + streamWriter.start(); + fileWriter.writeBatch(); + streamWriter.writeBatch(); + fileWriter.end(); + streamWriter.end(); } } + + // read from file + try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(file); + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); + LOGGER.debug("reading schema: " + schema); + arrowReader.loadNextBatch(); + validateNestedDictionary((ListVector) root.getFieldVectors().get(0), arrowReader); + } + + // Read from stream + try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); + ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray()); + ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator)) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); + LOGGER.debug("reading schema: " + schema); + arrowReader.loadNextBatch(); + validateNestedDictionary((ListVector) root.getFieldVectors().get(0), arrowReader); + } + } + + private void validateNestedDictionary(ListVector vector, DictionaryProvider provider) { + Assert.assertNotNull(vector); + Assert.assertNull(vector.getField().getDictionary()); + Field nestedField = vector.getField().getChildren().get(0); + + DictionaryEncoding encoding = nestedField.getDictionary(); + Assert.assertNotNull(encoding); + Assert.assertEquals(2L, encoding.getId()); + Assert.assertEquals(new Int(32, true), encoding.getIndexType()); + + ListVector.Accessor accessor = vector.getAccessor(); + Assert.assertEquals(3, accessor.getValueCount()); + Assert.assertEquals(Arrays.asList(0, 1), accessor.getObject(0)); + Assert.assertEquals(Arrays.asList(0), accessor.getObject(1)); + Assert.assertEquals(Arrays.asList(1), accessor.getObject(2)); + + Dictionary dictionary = provider.lookup(2L); + Assert.assertNotNull(dictionary); + NullableVarCharVector.Accessor dictionaryAccessor = ((NullableVarCharVector) dictionary.getVector()).getAccessor(); + Assert.assertEquals(2, dictionaryAccessor.getValueCount()); + Assert.assertEquals(new Text("foo"), dictionaryAccessor.getObject(0)); + Assert.assertEquals(new Text("bar"), dictionaryAccessor.getObject(1)); } /** * Writes the contents of parents to file. If outStream is non-null, also writes it * to outStream in the streaming serialized format. */ - private void write(FieldVector parent, File file, OutputStream outStream) throws FileNotFoundException, IOException { - VectorUnloader vectorUnloader = newVectorUnloader(parent); - Schema schema = vectorUnloader.getSchema(); - LOGGER.debug("writing schema: " + schema); - try ( - FileOutputStream fileOutputStream = new FileOutputStream(file); - ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema); - ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch(); - ) { - arrowWriter.writeRecordBatch(recordBatch); + private void write(FieldVector parent, File file, OutputStream outStream) throws IOException { + VectorSchemaRoot root = new VectorSchemaRoot(parent); + + try (FileOutputStream fileOutputStream = new FileOutputStream(file); + ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream.getChannel());) { + LOGGER.debug("writing schema: " + root.getSchema()); + arrowWriter.start(); + arrowWriter.writeBatch(); + arrowWriter.end(); } // Also try serializing to the stream writer. if (outStream != null) { - try ( - ArrowStreamWriter arrowWriter = new ArrowStreamWriter(outStream, schema); - ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch(); - ) { - arrowWriter.writeRecordBatch(recordBatch); + try (ArrowStreamWriter arrowWriter = new ArrowStreamWriter(root, null, outStream)) { + arrowWriter.start(); + arrowWriter.writeBatch(); + arrowWriter.end(); } } } http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java index 13b04de..914dfe4 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java @@ -17,12 +17,15 @@ */ package org.apache.arrow.vector.file; +import static java.nio.channels.Channels.newChannel; import static java.util.Arrays.asList; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.Channels; @@ -34,8 +37,14 @@ import org.apache.arrow.flatbuf.Message; import org.apache.arrow.flatbuf.RecordBatch; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.NullableIntVector; +import org.apache.arrow.vector.NullableTinyIntVector; +import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.schema.ArrowFieldNode; import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; @@ -69,12 +78,17 @@ public class TestArrowReaderWriter { @Test public void test() throws IOException { Schema schema = new Schema(asList(new Field("testField", true, new ArrowType.Int(8, true), Collections.<Field>emptyList()))); - byte[] validity = new byte[] { (byte)255, 0}; + MinorType minorType = Types.getMinorTypeForArrowType(schema.getFields().get(0).getType()); + FieldVector vector = minorType.getNewVector("testField", allocator, null,null); + vector.initializeChildrenFromFields(schema.getFields().get(0).getChildren()); + + byte[] validity = new byte[] { (byte) 255, 0}; // second half is "undefined" byte[] values = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}; ByteArrayOutputStream out = new ByteArrayOutputStream(); - try (ArrowWriter writer = new ArrowWriter(Channels.newChannel(out), schema)) { + try (VectorSchemaRoot root = new VectorSchemaRoot(schema.getFields(), asList(vector), 16); + ArrowFileWriter writer = new ArrowFileWriter(root, null, newChannel(out))) { ArrowBuf validityb = buf(validity); ArrowBuf valuesb = buf(values); writer.writeRecordBatch(new ArrowRecordBatch(16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb))); @@ -82,15 +96,15 @@ public class TestArrowReaderWriter { byte[] byteArray = out.toByteArray(); - try (ArrowReader reader = new ArrowReader(new ByteArrayReadableSeekableByteChannel(byteArray), allocator)) { - ArrowFooter footer = reader.readFooter(); - Schema readSchema = footer.getSchema(); + SeekableReadChannel channel = new SeekableReadChannel(new ByteArrayReadableSeekableByteChannel(byteArray)); + try (ArrowFileReader reader = new ArrowFileReader(channel, allocator)) { + Schema readSchema = reader.getVectorSchemaRoot().getSchema(); assertEquals(schema, readSchema); assertTrue(readSchema.getFields().get(0).getTypeLayout().getVectorTypes().toString(), readSchema.getFields().get(0).getTypeLayout().getVectors().size() > 0); // TODO: dictionaries - List<ArrowBlock> recordBatches = footer.getRecordBatches(); + List<ArrowBlock> recordBatches = reader.getRecordBlocks(); assertEquals(1, recordBatches.size()); - ArrowRecordBatch recordBatch = reader.readRecordBatch(recordBatches.get(0)); + ArrowRecordBatch recordBatch = (ArrowRecordBatch) reader.readMessage(channel, allocator); List<ArrowFieldNode> nodes = recordBatch.getNodes(); assertEquals(1, nodes.size()); ArrowFieldNode node = nodes.get(0);
