Repository: arrow Updated Branches: refs/heads/master ec51d5667 -> 803afeb50
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java new file mode 100644 index 0000000..9881a22 --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.file; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.arrow.vector.schema.ArrowBuffer; +import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.schema.FBSerializable; +import org.apache.arrow.vector.types.pojo.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.flatbuffers.FlatBufferBuilder; + +import io.netty.buffer.ArrowBuf; + +public class ArrowWriter implements AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(ArrowWriter.class); + + private static final byte[] MAGIC = "ARROW1".getBytes(); + + private final WritableByteChannel out; + + private final Schema schema; + + private final List<ArrowBlock> recordBatches = new ArrayList<>(); + + private long currentPosition = 0; + + private boolean started = false; + + public ArrowWriter(WritableByteChannel out, Schema schema) { + this.out = out; + this.schema = schema; + } + + private void start() throws IOException { + writeMagic(); + } + + private long write(byte[] buffer) throws IOException { + return write(ByteBuffer.wrap(buffer)); + } + + private long writeZeros(int zeroCount) throws IOException { + return write(new byte[zeroCount]); + } + + private long align() throws IOException { + if (currentPosition % 8 != 0) { // align on 8 byte boundaries + return writeZeros(8 - (int)(currentPosition % 8)); + } + return 0; + } + + private long write(ByteBuffer buffer) throws IOException { + long length = buffer.remaining(); + out.write(buffer); + currentPosition += length; + return length; + } + + private static byte[] intToBytes(int value) { + byte[] outBuffer = new byte[4]; + outBuffer[3] = (byte)(value >>> 24); + outBuffer[2] = (byte)(value >>> 16); + outBuffer[1] = (byte)(value >>> 8); + outBuffer[0] = (byte)(value >>> 0); + return outBuffer; + } + + private long writeIntLittleEndian(int v) throws IOException { + return write(intToBytes(v)); + } + + // TODO: write dictionaries + + public void writeRecordBatch(ArrowRecordBatch recordBatch) throws IOException { + checkStarted(); + align(); + // write metadata header + long offset = currentPosition; + write(recordBatch); + align(); + // write body + long bodyOffset = currentPosition; + List<ArrowBuf> buffers = recordBatch.getBuffers(); + List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout(); + if (buffers.size() != buffersLayout.size()) { + throw new IllegalStateException("the layout does not match: " + buffers.size() + " != " + buffersLayout.size()); + } + for (int i = 0; i < buffers.size(); i++) { + ArrowBuf buffer = buffers.get(i); + ArrowBuffer layout = buffersLayout.get(i); + long startPosition = bodyOffset + layout.getOffset(); + if (startPosition != currentPosition) { + writeZeros((int)(startPosition - currentPosition)); + } + write(buffer); + if (currentPosition != startPosition + layout.getSize()) { + throw new IllegalStateException("wrong buffer size: " + currentPosition + " != " + startPosition + layout.getSize()); + } + } + int metadataLength = (int)(bodyOffset - offset); + if (metadataLength <= 0) { + throw new InvalidArrowFileException("invalid recordBatch"); + } + long bodyLength = currentPosition - bodyOffset; + LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d", offset, metadataLength, bodyLength)); + // add metadata to footer + recordBatches.add(new ArrowBlock(offset, metadataLength, bodyLength)); + } + + private void write(ArrowBuf buffer) throws IOException { + write(buffer.nioBuffer(buffer.readerIndex(), buffer.readableBytes())); + } + + private void checkStarted() throws IOException { + if (!started) { + started = true; + start(); + } + } + + public void close() throws IOException { + try { + long footerStart = currentPosition; + writeFooter(); + int footerLength = (int)(currentPosition - footerStart); + if (footerLength <= 0 ) { + throw new InvalidArrowFileException("invalid footer"); + } + writeIntLittleEndian(footerLength); + LOGGER.debug(String.format("Footer starts at %d, length: %d", footerStart, footerLength)); + writeMagic(); + } finally { + out.close(); + } + } + + private void writeMagic() throws IOException { + write(MAGIC); + LOGGER.debug(String.format("magic written, now at %d", currentPosition)); + } + + private void writeFooter() throws IOException { + // TODO: dictionaries + write(new ArrowFooter(schema, Collections.<ArrowBlock>emptyList(), recordBatches)); + } + + private long write(FBSerializable writer) throws IOException { + FlatBufferBuilder builder = new FlatBufferBuilder(); + int root = writer.writeTo(builder); + builder.finish(root); + return write(builder.dataBuffer()); + } + +} http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/file/InvalidArrowFileException.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/InvalidArrowFileException.java b/java/vector/src/main/java/org/apache/arrow/vector/file/InvalidArrowFileException.java new file mode 100644 index 0000000..3ec75dc --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/InvalidArrowFileException.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.file; + +public class InvalidArrowFileException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public InvalidArrowFileException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java new file mode 100644 index 0000000..3aa3e52 --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.schema; + +import org.apache.arrow.flatbuf.Buffer; + +import com.google.flatbuffers.FlatBufferBuilder; + +public class ArrowBuffer implements FBSerializable { + + private int page; + private long offset; + private long size; + + public ArrowBuffer(int page, long offset, long size) { + super(); + this.page = page; + this.offset = offset; + this.size = size; + } + + public int getPage() { + return page; + } + + public long getOffset() { + return offset; + } + + public long getSize() { + return size; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (int) (offset ^ (offset >>> 32)); + result = prime * result + page; + result = prime * result + (int) (size ^ (size >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + ArrowBuffer other = (ArrowBuffer) obj; + if (offset != other.offset) + return false; + if (page != other.page) + return false; + if (size != other.size) + return false; + return true; + } + + @Override + public int writeTo(FlatBufferBuilder builder) { + return Buffer.createBuffer(builder, page, offset, size); + } +} http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java new file mode 100644 index 0000000..71dd0ab --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.schema; + +import org.apache.arrow.flatbuf.FieldNode; + +import com.google.flatbuffers.FlatBufferBuilder; + +public class ArrowFieldNode implements FBSerializable { + + private final int length; + private final int nullCount; + + public ArrowFieldNode(int length, int nullCount) { + super(); + this.length = length; + this.nullCount = nullCount; + } + + @Override + public int writeTo(FlatBufferBuilder builder) { + return FieldNode.createFieldNode(builder, length, nullCount); + } + + public int getNullCount() { + return nullCount; + } + + public int getLength() { + return length; + } + + @Override + public String toString() { + return "ArrowFieldNode [length=" + length + ", nullCount=" + nullCount + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java new file mode 100644 index 0000000..9162efd --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.schema; + +import static org.apache.arrow.vector.schema.FBSerializables.writeAllStructsToVector; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.arrow.flatbuf.RecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.flatbuffers.FlatBufferBuilder; + +import io.netty.buffer.ArrowBuf; + +public class ArrowRecordBatch implements FBSerializable, AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(ArrowRecordBatch.class); + + /** number of records */ + private final int length; + + /** Nodes correspond to the pre-ordered flattened logical schema */ + private final List<ArrowFieldNode> nodes; + + private final List<ArrowBuf> buffers; + + private final List<ArrowBuffer> buffersLayout; + + private boolean closed = false; + + /** + * @param length how many rows in this batch + * @param nodes field level info + * @param buffers will be retained until this recordBatch is closed + */ + public ArrowRecordBatch(int length, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers) { + super(); + this.length = length; + this.nodes = nodes; + this.buffers = buffers; + List<ArrowBuffer> arrowBuffers = new ArrayList<>(); + long offset = 0; + for (ArrowBuf arrowBuf : buffers) { + arrowBuf.retain(); + long size = arrowBuf.readableBytes(); + arrowBuffers.add(new ArrowBuffer(0, offset, size)); + LOGGER.debug(String.format("Buffer in RecordBatch at %d, length: %d", offset, size)); + offset += size; + if (offset % 8 != 0) { // align on 8 byte boundaries + offset += 8 - (offset % 8); + } + } + this.buffersLayout = Collections.unmodifiableList(arrowBuffers); + } + + public int getLength() { + return length; + } + + /** + * @return the FieldNodes corresponding to the schema + */ + public List<ArrowFieldNode> getNodes() { + return nodes; + } + + /** + * @return the buffers containing the data + */ + public List<ArrowBuf> getBuffers() { + if (closed) { + throw new IllegalStateException("already closed"); + } + return buffers; + } + + /** + * @return the serialized layout if we send the buffers on the wire + */ + public List<ArrowBuffer> getBuffersLayout() { + return buffersLayout; + } + + @Override + public int writeTo(FlatBufferBuilder builder) { + RecordBatch.startNodesVector(builder, nodes.size()); + int nodesOffset = writeAllStructsToVector(builder, nodes); + RecordBatch.startBuffersVector(builder, buffers.size()); + int buffersOffset = writeAllStructsToVector(builder, buffersLayout); + RecordBatch.startRecordBatch(builder); + RecordBatch.addLength(builder, length); + RecordBatch.addNodes(builder, nodesOffset); + RecordBatch.addBuffers(builder, buffersOffset); + return RecordBatch.endRecordBatch(builder); + } + + /** + * releases the buffers + */ + public void close() { + if (!closed) { + closed = true; + for (ArrowBuf arrowBuf : buffers) { + arrowBuf.release(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowVectorType.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowVectorType.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowVectorType.java new file mode 100644 index 0000000..e3d3e34 --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowVectorType.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.schema; + +import org.apache.arrow.flatbuf.VectorType; + +public class ArrowVectorType { + + public static final ArrowVectorType VALUES = new ArrowVectorType(VectorType.VALUES); + public static final ArrowVectorType OFFSET = new ArrowVectorType(VectorType.OFFSET); + public static final ArrowVectorType VALIDITY = new ArrowVectorType(VectorType.VALIDITY); + public static final ArrowVectorType TYPE = new ArrowVectorType(VectorType.TYPE); + + private final short type; + + public ArrowVectorType(short type) { + this.type = type; + } + + public short getType() { + return type; + } + + @Override + public String toString() { + try { + return VectorType.name(type); + } catch (ArrayIndexOutOfBoundsException e) { + return "Unlnown type " + type; + } + } +} http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializable.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializable.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializable.java new file mode 100644 index 0000000..d23ed91 --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializable.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.schema; + +import com.google.flatbuffers.FlatBufferBuilder; + +public interface FBSerializable { + int writeTo(FlatBufferBuilder builder); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializables.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializables.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializables.java new file mode 100644 index 0000000..31c17ad --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializables.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.schema; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import com.google.flatbuffers.FlatBufferBuilder; + +public class FBSerializables { + + public static int writeAllStructsToVector(FlatBufferBuilder builder, List<? extends FBSerializable> all) { + // struct vectors have to be created in reverse order + List<? extends FBSerializable> reversed = new ArrayList<>(all); + Collections.reverse(reversed); + for (FBSerializable element : reversed) { + element.writeTo(builder); + } + return builder.endVector(); + } +} http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java new file mode 100644 index 0000000..1275e0e --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.schema; + +import static java.util.Arrays.asList; +import static org.apache.arrow.flatbuf.Precision.DOUBLE; +import static org.apache.arrow.flatbuf.Precision.SINGLE; +import static org.apache.arrow.vector.schema.VectorLayout.booleanVector; +import static org.apache.arrow.vector.schema.VectorLayout.byteVector; +import static org.apache.arrow.vector.schema.VectorLayout.dataVector; +import static org.apache.arrow.vector.schema.VectorLayout.offsetVector; +import static org.apache.arrow.vector.schema.VectorLayout.typeVector; +import static org.apache.arrow.vector.schema.VectorLayout.validityVector; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.arrow.flatbuf.UnionMode; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeVisitor; +import org.apache.arrow.vector.types.pojo.ArrowType.Binary; +import org.apache.arrow.vector.types.pojo.ArrowType.Bool; +import org.apache.arrow.vector.types.pojo.ArrowType.Date; +import org.apache.arrow.vector.types.pojo.ArrowType.Decimal; +import org.apache.arrow.vector.types.pojo.ArrowType.FloatingPoint; +import org.apache.arrow.vector.types.pojo.ArrowType.Int; +import org.apache.arrow.vector.types.pojo.ArrowType.IntervalDay; +import org.apache.arrow.vector.types.pojo.ArrowType.IntervalYear; +import org.apache.arrow.vector.types.pojo.ArrowType.Null; +import org.apache.arrow.vector.types.pojo.ArrowType.Time; +import org.apache.arrow.vector.types.pojo.ArrowType.Timestamp; +import org.apache.arrow.vector.types.pojo.ArrowType.Tuple; +import org.apache.arrow.vector.types.pojo.ArrowType.Union; +import org.apache.arrow.vector.types.pojo.ArrowType.Utf8; + +/** + * The layout of vectors for a given type + * It defines its own vectors followed by the vectors for the children + * if it is a nested type (Tuple, List, Union) + */ +public class TypeLayout { + + public static TypeLayout getTypeLayout(final ArrowType arrowType) { + TypeLayout layout = arrowType.accept(new ArrowTypeVisitor<TypeLayout>() { + + @Override public TypeLayout visit(Int type) { + return newFixedWidthTypeLayout(dataVector(type.getBitWidth())); + } + + @Override public TypeLayout visit(Union type) { + List<VectorLayout> vectors; + switch (type.getMode()) { + case UnionMode.Dense: + vectors = asList( + // TODO: validate this + validityVector(), + typeVector(), + offsetVector() // offset to find the vector + ); + break; + case UnionMode.Sparse: + vectors = asList( + validityVector(), + typeVector() + ); + break; + default: + throw new UnsupportedOperationException("Unsupported Union Mode: " + type.getMode()); + } + return new TypeLayout(vectors); + } + + @Override public TypeLayout visit(Tuple type) { + List<VectorLayout> vectors = asList( + // TODO: add validity vector in Map +// validityVector() + ); + return new TypeLayout(vectors); + } + + @Override public TypeLayout visit(Timestamp type) { + return newFixedWidthTypeLayout(dataVector(64)); + } + + @Override public TypeLayout visit(org.apache.arrow.vector.types.pojo.ArrowType.List type) { + List<VectorLayout> vectors = asList( + validityVector(), + offsetVector() + ); + return new TypeLayout(vectors); + } + + @Override public TypeLayout visit(FloatingPoint type) { + int bitWidth; + switch (type.getPrecision()) { + case SINGLE: + bitWidth = 32; + break; + case DOUBLE: + bitWidth = 64; + break; + default: + throw new UnsupportedOperationException("Unsupported Precision: " + type.getPrecision()); + } + return newFixedWidthTypeLayout(dataVector(bitWidth)); + } + + @Override public TypeLayout visit(Decimal type) { + // TODO: check size + return newFixedWidthTypeLayout(dataVector(64)); // actually depends on the type fields + } + + @Override public TypeLayout visit(Bool type) { + return newFixedWidthTypeLayout(booleanVector()); + } + + @Override public TypeLayout visit(Binary type) { + return newVariableWidthTypeLayout(); + } + + @Override public TypeLayout visit(Utf8 type) { + return newVariableWidthTypeLayout(); + } + + private TypeLayout newVariableWidthTypeLayout() { + return newPrimitiveTypeLayout(validityVector(), offsetVector(), byteVector()); + } + + private TypeLayout newPrimitiveTypeLayout(VectorLayout... vectors) { + return new TypeLayout(asList(vectors)); + } + + public TypeLayout newFixedWidthTypeLayout(VectorLayout dataVector) { + return newPrimitiveTypeLayout(validityVector(), dataVector); + } + + @Override + public TypeLayout visit(Null type) { + return new TypeLayout(Collections.<VectorLayout>emptyList()); + } + + @Override + public TypeLayout visit(Date type) { + return newFixedWidthTypeLayout(dataVector(64)); + } + + @Override + public TypeLayout visit(Time type) { + return newFixedWidthTypeLayout(dataVector(64)); + } + + @Override + public TypeLayout visit(IntervalDay type) { // TODO: check size + return newFixedWidthTypeLayout(dataVector(64)); + } + + @Override + public TypeLayout visit(IntervalYear type) { // TODO: check size + return newFixedWidthTypeLayout(dataVector(64)); + } + }); + return layout; + } + + private final List<VectorLayout> vectors; + + public TypeLayout(List<VectorLayout> vectors) { + super(); + this.vectors = vectors; + } + + public TypeLayout(VectorLayout... vectors) { + this(asList(vectors)); + } + + + public List<VectorLayout> getVectors() { + return vectors; + } + + public List<ArrowVectorType> getVectorTypes() { + List<ArrowVectorType> types = new ArrayList<>(vectors.size()); + for (VectorLayout vector : vectors) { + types.add(vector.getType()); + } + return types; + } + + public String toString() { + return "TypeLayout{" + vectors + "}"; + } +} http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/schema/VectorLayout.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/VectorLayout.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/VectorLayout.java new file mode 100644 index 0000000..421ebcb --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/VectorLayout.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.schema; + +import static org.apache.arrow.vector.schema.ArrowVectorType.OFFSET; +import static org.apache.arrow.vector.schema.ArrowVectorType.TYPE; +import static org.apache.arrow.vector.schema.ArrowVectorType.VALIDITY; +import static org.apache.arrow.vector.schema.ArrowVectorType.VALUES; + +public class VectorLayout { + + private static final VectorLayout VALIDITY_VECTOR = new VectorLayout(VALIDITY, 1); + private static final VectorLayout OFFSET_VECTOR = new VectorLayout(OFFSET, 32); + private static final VectorLayout TYPE_VECTOR = new VectorLayout(TYPE, 32); + private static final VectorLayout BOOLEAN_VECTOR = new VectorLayout(VALUES, 1); + private static final VectorLayout VALUES_64 = new VectorLayout(VALUES, 64); + private static final VectorLayout VALUES_32 = new VectorLayout(VALUES, 32); + private static final VectorLayout VALUES_16 = new VectorLayout(VALUES, 16); + private static final VectorLayout VALUES_8 = new VectorLayout(VALUES, 8); + + public static VectorLayout typeVector() { + return TYPE_VECTOR; + } + + public static VectorLayout offsetVector() { + return OFFSET_VECTOR; + } + + public static VectorLayout dataVector(int typeBitWidth) { + switch (typeBitWidth) { + case 8: + return VALUES_8; + case 16: + return VALUES_16; + case 32: + return VALUES_32; + case 64: + return VALUES_64; + default: + throw new IllegalArgumentException("only 8, 16, 32, or 64 bits supported"); + } + } + + public static VectorLayout booleanVector() { + return BOOLEAN_VECTOR; + } + + public static VectorLayout validityVector() { + return VALIDITY_VECTOR; + } + + public static VectorLayout byteVector() { + return dataVector(8); + } + + private final int typeBitWidth; + + private final ArrowVectorType type; + + private VectorLayout(ArrowVectorType type, int typeBitWidth) { + super(); + this.type = type; + this.typeBitWidth = typeBitWidth; + } + + public int getTypeBitWidth() { + return typeBitWidth; + } + + public ArrowVectorType getType() { + return type; + } + + @Override + public String toString() { + return String.format("{width=%s,type=%s}", typeBitWidth, type); + } +} http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java b/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java index c34882a..4d0d9ee 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java @@ -17,8 +17,14 @@ */ package org.apache.arrow.vector.types; +import java.util.HashMap; +import java.util.Map; + +import org.apache.arrow.flatbuf.Precision; import org.apache.arrow.flatbuf.Type; +import org.apache.arrow.flatbuf.UnionMode; import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.NullableBigIntVector; import org.apache.arrow.vector.NullableBitVector; import org.apache.arrow.vector.NullableDateVector; @@ -38,7 +44,6 @@ import org.apache.arrow.vector.NullableUInt4Vector; import org.apache.arrow.vector.NullableUInt8Vector; import org.apache.arrow.vector.NullableVarBinaryVector; import org.apache.arrow.vector.NullableVarCharVector; -import org.apache.arrow.vector.SmallIntVector; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.ZeroVector; import org.apache.arrow.vector.complex.ListVector; @@ -85,9 +90,6 @@ import org.apache.arrow.vector.types.pojo.ArrowType.Utf8; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.util.CallBack; -import java.util.HashMap; -import java.util.Map; - public class Types { public static final Field NULL_FIELD = new Field("", true, Null.INSTANCE, null); @@ -104,8 +106,8 @@ public class Types { public static final Field TIMESTAMP_FIELD = new Field("", true, new Timestamp(""), null); public static final Field INTERVALDAY_FIELD = new Field("", true, IntervalDay.INSTANCE, null); public static final Field INTERVALYEAR_FIELD = new Field("", true, IntervalYear.INSTANCE, null); - public static final Field FLOAT4_FIELD = new Field("", true, new FloatingPoint(0), null); - public static final Field FLOAT8_FIELD = new Field("", true, new FloatingPoint(1), null); + public static final Field FLOAT4_FIELD = new Field("", true, new FloatingPoint(Precision.SINGLE), null); + public static final Field FLOAT8_FIELD = new Field("", true, new FloatingPoint(Precision.DOUBLE), null); public static final Field LIST_FIELD = new Field("", true, List.INSTANCE, null); public static final Field VARCHAR_FIELD = new Field("", true, Utf8.INSTANCE, null); public static final Field VARBINARY_FIELD = new Field("", true, Binary.INSTANCE, null); @@ -120,7 +122,7 @@ public class Types { } @Override - public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { + public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { return ZeroVector.INSTANCE; } @@ -136,7 +138,7 @@ public class Types { } @Override - public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { + public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { return new MapVector(name, allocator, callBack); } @@ -153,7 +155,7 @@ public class Types { } @Override - public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { + public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { return new NullableTinyIntVector(name, allocator); } @@ -169,8 +171,8 @@ public class Types { } @Override - public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new SmallIntVector(name, allocator); + public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { + return new NullableSmallIntVector(name, allocator); } @Override @@ -185,7 +187,7 @@ public class Types { } @Override - public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { + public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { return new NullableIntVector(name, allocator); } @@ -201,7 +203,7 @@ public class Types { } @Override - public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { + public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { return new NullableBigIntVector(name, allocator); } @@ -217,7 +219,7 @@ public class Types { } @Override - public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { + public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { return new NullableDateVector(name, allocator); } @@ -233,7 +235,7 @@ public class Types { } @Override - public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { + public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { return new NullableTimeVector(name, allocator); } @@ -249,7 +251,7 @@ public class Types { } @Override - public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { + public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { return new NullableTimeStampVector(name, allocator); } @@ -265,7 +267,7 @@ public class Types { } @Override - public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { + public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { return new NullableIntervalDayVector(name, allocator); } @@ -281,7 +283,7 @@ public class Types { } @Override - public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { + public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { return new NullableIntervalDayVector(name, allocator); } @@ -290,14 +292,14 @@ public class Types { return new IntervalYearWriterImpl((NullableIntervalYearVector) vector); } }, - FLOAT4(new FloatingPoint(0)) { + FLOAT4(new FloatingPoint(Precision.SINGLE)) { @Override public Field getField() { return FLOAT4_FIELD; } @Override - public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { + public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { return new NullableFloat4Vector(name, allocator); } @@ -306,14 +308,14 @@ public class Types { return new Float4WriterImpl((NullableFloat4Vector) vector); } }, // 4 byte ieee 754 - FLOAT8(new FloatingPoint(1)) { + FLOAT8(new FloatingPoint(Precision.DOUBLE)) { @Override public Field getField() { return FLOAT8_FIELD; } @Override - public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { + public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { return new NullableFloat8Vector(name, allocator); } @@ -329,7 +331,7 @@ public class Types { } @Override - public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { + public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { return new NullableBitVector(name, allocator); } @@ -345,7 +347,7 @@ public class Types { } @Override - public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { + public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { return new NullableVarCharVector(name, allocator); } @@ -361,7 +363,7 @@ public class Types { } @Override - public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { + public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { return new NullableVarBinaryVector(name, allocator); } @@ -381,7 +383,7 @@ public class Types { } @Override - public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { + public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { return new NullableDecimalVector(name, allocator, precisionScale[0], precisionScale[1]); } @@ -397,7 +399,7 @@ public class Types { } @Override - public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { + public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { return new NullableUInt1Vector(name, allocator); } @@ -413,7 +415,7 @@ public class Types { } @Override - public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { + public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { return new NullableUInt2Vector(name, allocator); } @@ -429,7 +431,7 @@ public class Types { } @Override - public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { + public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { return new NullableUInt4Vector(name, allocator); } @@ -445,7 +447,7 @@ public class Types { } @Override - public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { + public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { return new NullableUInt8Vector(name, allocator); } @@ -461,7 +463,7 @@ public class Types { } @Override - public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { + public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { return new ListVector(name, allocator, callBack); } @@ -470,14 +472,14 @@ public class Types { return new UnionListWriter((ListVector) vector); } }, - UNION(Union.INSTANCE) { + UNION(new Union(UnionMode.Sparse)) { @Override public Field getField() { throw new UnsupportedOperationException("Cannot get simple field for Union type"); } @Override - public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { + public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { return new UnionVector(name, allocator, callBack); } @@ -499,7 +501,7 @@ public class Types { public abstract Field getField(); - public abstract ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale); + public abstract FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale); public abstract FieldWriter getNewFieldWriter(ValueVector vector); } http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java index 49d0503..36712b9 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java @@ -18,19 +18,24 @@ package org.apache.arrow.vector.types.pojo; -import com.google.common.collect.ImmutableList; -import com.google.flatbuffers.FlatBufferBuilder; +import static org.apache.arrow.vector.types.pojo.ArrowType.getTypeForField; +import java.util.ArrayList; import java.util.List; import java.util.Objects; -import static org.apache.arrow.vector.types.pojo.ArrowType.getTypeForField; +import org.apache.arrow.vector.schema.ArrowVectorType; +import org.apache.arrow.vector.schema.TypeLayout; + +import com.google.common.collect.ImmutableList; +import com.google.flatbuffers.FlatBufferBuilder; public class Field { private final String name; private final boolean nullable; private final ArrowType type; private final List<Field> children; + private final TypeLayout typeLayout; public Field(String name, boolean nullable, ArrowType type, List<Field> children) { this.name = name; @@ -41,18 +46,32 @@ public class Field { } else { this.children = children; } + this.typeLayout = TypeLayout.getTypeLayout(type); } public static Field convertField(org.apache.arrow.flatbuf.Field field) { String name = field.name(); boolean nullable = field.nullable(); ArrowType type = getTypeForField(field); + List<ArrowVectorType> buffers = new ArrayList<>(); + for (int i = 0; i < field.buffersLength(); ++i) { + buffers.add(new ArrowVectorType(field.buffers(i))); + } ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder(); for (int i = 0; i < field.childrenLength(); i++) { childrenBuilder.add(convertField(field.children(i))); } List<Field> children = childrenBuilder.build(); - return new Field(name, nullable, type, children); + Field result = new Field(name, nullable, type, children); + TypeLayout typeLayout = result.getTypeLayout(); + if (typeLayout.getVectors().size() != field.buffersLength()) { + List<ArrowVectorType> types = new ArrayList<>(); + for (int i = 0; i < field.buffersLength(); i++) { + types.add(new ArrowVectorType(field.buffers(i))); + } + throw new IllegalArgumentException("Deserialized field does not match expected vectors. expected: " + typeLayout.getVectorTypes() + " got " + types); + } + return result; } public int getField(FlatBufferBuilder builder) { @@ -63,12 +82,18 @@ public class Field { childrenData[i] = children.get(i).getField(builder); } int childrenOffset = org.apache.arrow.flatbuf.Field.createChildrenVector(builder, childrenData); + short[] buffersData = new short[typeLayout.getVectors().size()]; + for (int i = 0; i < buffersData.length; i++) { + buffersData[i] = typeLayout.getVectors().get(i).getType().getType(); + } + int buffersOffset = org.apache.arrow.flatbuf.Field.createBuffersVector(builder, buffersData ); org.apache.arrow.flatbuf.Field.startField(builder); org.apache.arrow.flatbuf.Field.addName(builder, nameOffset); org.apache.arrow.flatbuf.Field.addNullable(builder, nullable); org.apache.arrow.flatbuf.Field.addTypeType(builder, type.getTypeType()); org.apache.arrow.flatbuf.Field.addType(builder, typeOffset); org.apache.arrow.flatbuf.Field.addChildren(builder, childrenOffset); + org.apache.arrow.flatbuf.Field.addBuffers(builder, buffersOffset); return org.apache.arrow.flatbuf.Field.endField(builder); } @@ -88,6 +113,10 @@ public class Field { return children; } + public TypeLayout getTypeLayout() { + return typeLayout; + } + @Override public boolean equals(Object obj) { if (!(obj instanceof Field)) { @@ -102,4 +131,9 @@ public class Field { (this.children.size() == 0 && that.children == null)); } + + @Override + public String toString() { + return String.format("Field{name=%s, type=%s, children=%s, layout=%s}", name, type, children, typeLayout); + } } http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Schema.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Schema.java b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Schema.java index 9e28941..231be9b 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Schema.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Schema.java @@ -18,15 +18,13 @@ package org.apache.arrow.vector.types.pojo; -import com.google.common.collect.ImmutableList; -import com.google.flatbuffers.FlatBufferBuilder; +import static org.apache.arrow.vector.types.pojo.Field.convertField; -import java.nio.ByteBuffer; import java.util.List; import java.util.Objects; -import static org.apache.arrow.vector.types.pojo.ArrowType.getTypeForField; -import static org.apache.arrow.vector.types.pojo.Field.convertField; +import com.google.common.collect.ImmutableList; +import com.google.flatbuffers.FlatBufferBuilder; public class Schema { private List<Field> fields; @@ -71,4 +69,9 @@ public class Schema { List<Field> fields = childrenBuilder.build(); return new Schema(fields); } + + @Override + public String toString() { + return "Schema" + fields; + } } http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java new file mode 100644 index 0000000..85bb2cf --- /dev/null +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector; + +import java.io.IOException; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.impl.ComplexWriterImpl; +import org.apache.arrow.vector.complex.impl.SingleMapReaderImpl; +import org.apache.arrow.vector.complex.reader.BaseReader.MapReader; +import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter; +import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter; +import org.apache.arrow.vector.complex.writer.BigIntWriter; +import org.apache.arrow.vector.complex.writer.IntWriter; +import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Test; + +public class TestVectorUnloadLoad { + + static final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + + @Test + public void test() throws IOException { + int count = 10000; + Schema schema; + + try ( + BufferAllocator originalVectorsAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); + MapVector parent = new MapVector("parent", originalVectorsAllocator, null)) { + ComplexWriter writer = new ComplexWriterImpl("root", parent); + MapWriter rootWriter = writer.rootAsMap(); + IntWriter intWriter = rootWriter.integer("int"); + BigIntWriter bigIntWriter = rootWriter.bigInt("bigInt"); + for (int i = 0; i < count; i++) { + intWriter.setPosition(i); + intWriter.writeInt(i); + bigIntWriter.setPosition(i); + bigIntWriter.writeBigInt(i); + } + writer.setValueCount(count); + + VectorUnloader vectorUnloader = new VectorUnloader((MapVector)parent.getChild("root")); + schema = vectorUnloader.getSchema(); + + try ( + ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch(); + BufferAllocator finalVectorsAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); + MapVector newParent = new MapVector("parent", finalVectorsAllocator, null)) { + MapVector root = newParent.addOrGet("root", MinorType.MAP, MapVector.class); + VectorLoader vectorLoader = new VectorLoader(schema, root); + + vectorLoader.load(recordBatch); + + MapReader rootReader = new SingleMapReaderImpl(newParent).reader("root"); + for (int i = 0; i < count; i++) { + rootReader.setPosition(i); + Assert.assertEquals(i, rootReader.reader("int").readInteger().intValue()); + Assert.assertEquals(i, rootReader.reader("bigInt").readLong().longValue()); + } + } + } + } + + @AfterClass + public static void afterClass() { + allocator.close(); + } +} http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/test/java/org/apache/arrow/vector/file/ByteArrayReadableSeekableByteChannel.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/ByteArrayReadableSeekableByteChannel.java b/java/vector/src/test/java/org/apache/arrow/vector/file/ByteArrayReadableSeekableByteChannel.java new file mode 100644 index 0000000..7c423d5 --- /dev/null +++ b/java/vector/src/test/java/org/apache/arrow/vector/file/ByteArrayReadableSeekableByteChannel.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.file; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; + +public class ByteArrayReadableSeekableByteChannel implements SeekableByteChannel { + private byte[] byteArray; + private int position = 0; + + public ByteArrayReadableSeekableByteChannel(byte[] byteArray) { + if (byteArray == null) { + throw new NullPointerException(); + } + this.byteArray = byteArray; + } + + @Override + public boolean isOpen() { + return byteArray != null; + } + + @Override + public void close() throws IOException { + byteArray = null; + } + + @Override + public int read(final ByteBuffer dst) throws IOException { + int remainingInBuf = byteArray.length - this.position; + int length = Math.min(dst.remaining(), remainingInBuf); + dst.put(this.byteArray, this.position, length); + this.position += length; + return length; + } + + @Override + public long position() throws IOException { + return this.position; + } + + @Override + public SeekableByteChannel position(final long newPosition) throws IOException { + this.position = (int)newPosition; + return this; + } + + @Override + public long size() throws IOException { + return this.byteArray.length; + } + + @Override + public int write(final ByteBuffer src) throws IOException { + throw new UnsupportedOperationException("Read only"); + } + + @Override + public SeekableByteChannel truncate(final long size) throws IOException { + throw new UnsupportedOperationException("Read only"); + } + +} http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java new file mode 100644 index 0000000..11de0a2 --- /dev/null +++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.file; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.List; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.ValueVector.Accessor; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.impl.ComplexWriterImpl; +import org.apache.arrow.vector.complex.impl.SingleMapReaderImpl; +import org.apache.arrow.vector.complex.reader.BaseReader.MapReader; +import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter; +import org.apache.arrow.vector.complex.writer.BaseWriter.ListWriter; +import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter; +import org.apache.arrow.vector.complex.writer.BigIntWriter; +import org.apache.arrow.vector.complex.writer.IntWriter; +import org.apache.arrow.vector.schema.ArrowBuffer; +import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import io.netty.buffer.ArrowBuf; + +public class TestArrowFile { + private static final int COUNT = 10; + private BufferAllocator allocator; + + @Before + public void init() { + allocator = new RootAllocator(Integer.MAX_VALUE); + } + + @After + public void tearDown() { + allocator.close(); + } + + @Test + public void testWrite() throws IOException { + File file = new File("target/mytest_write.arrow"); + int count = COUNT; + try ( + BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); + MapVector parent = new MapVector("parent", vectorAllocator, null)) { + writeData(count, parent); + write((MapVector)parent.getChild("root"), file); + } + } + + @Test + public void testWriteComplex() throws IOException { + File file = new File("target/mytest_write_complex.arrow"); + int count = COUNT; + try ( + BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); + MapVector parent = new MapVector("parent", vectorAllocator, null)) { + writeComplexData(count, parent); + validateComplexContent(count, parent); + write((MapVector)parent.getChild("root"), file); + } + } + + private void writeComplexData(int count, MapVector parent) { + ArrowBuf varchar = allocator.buffer(3); + varchar.readerIndex(0); + varchar.setByte(0, 'a'); + varchar.setByte(1, 'b'); + varchar.setByte(2, 'c'); + varchar.writerIndex(3); + ComplexWriter writer = new ComplexWriterImpl("root", parent); + MapWriter rootWriter = writer.rootAsMap(); + IntWriter intWriter = rootWriter.integer("int"); + BigIntWriter bigIntWriter = rootWriter.bigInt("bigInt"); + ListWriter listWriter = rootWriter.list("list"); + MapWriter mapWriter = rootWriter.map("map"); + for (int i = 0; i < count; i++) { + intWriter.setPosition(i); + intWriter.writeInt(i); + bigIntWriter.setPosition(i); + bigIntWriter.writeBigInt(i); + listWriter.setPosition(i); + listWriter.startList(); + for (int j = 0; j < i % 3; j++) { + listWriter.varChar().writeVarChar(0, 3, varchar); + } + listWriter.endList(); + mapWriter.setPosition(i); + mapWriter.start(); + mapWriter.timeStamp("timestamp").writeTimeStamp(i); + mapWriter.end(); + } + writer.setValueCount(count); + varchar.release(); + } + + + private void writeData(int count, MapVector parent) { + ComplexWriter writer = new ComplexWriterImpl("root", parent); + MapWriter rootWriter = writer.rootAsMap(); + IntWriter intWriter = rootWriter.integer("int"); + BigIntWriter bigIntWriter = rootWriter.bigInt("bigInt"); + for (int i = 0; i < count; i++) { + intWriter.setPosition(i); + intWriter.writeInt(i); + bigIntWriter.setPosition(i); + bigIntWriter.writeBigInt(i); + } + writer.setValueCount(count); + } + + @Test + public void testWriteRead() throws IOException { + File file = new File("target/mytest.arrow"); + int count = COUNT; + + // write + try ( + BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); + MapVector parent = new MapVector("parent", originalVectorAllocator, null)) { + writeData(count, parent); + write((MapVector)parent.getChild("root"), file); + } + + // read + try ( + BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(file); + ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator); + BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); + MapVector parent = new MapVector("parent", vectorAllocator, null) + ) { + ArrowFooter footer = arrowReader.readFooter(); + Schema schema = footer.getSchema(); + System.out.println("reading schema: " + schema); + + // initialize vectors + + MapVector root = parent.addOrGet("root", MinorType.MAP, MapVector.class); + + VectorLoader vectorLoader = new VectorLoader(schema, root); + + List<ArrowBlock> recordBatches = footer.getRecordBatches(); + for (ArrowBlock rbBlock : recordBatches) { + Assert.assertEquals(0, rbBlock.getOffset() % 8); + Assert.assertEquals(0, rbBlock.getMetadataLength() % 8); + try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { + List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout(); + for (ArrowBuffer arrowBuffer : buffersLayout) { + Assert.assertEquals(0, arrowBuffer.getOffset() % 8); + } + vectorLoader.load(recordBatch); + } + + validateContent(count, parent); + } + } + } + + private void validateContent(int count, MapVector parent) { + MapReader rootReader = new SingleMapReaderImpl(parent).reader("root"); + for (int i = 0; i < count; i++) { + rootReader.setPosition(i); + Assert.assertEquals(i, rootReader.reader("int").readInteger().intValue()); + Assert.assertEquals(i, rootReader.reader("bigInt").readLong().longValue()); + } + } + + @Test + public void testWriteReadComplex() throws IOException { + File file = new File("target/mytest_complex.arrow"); + int count = COUNT; + + // write + try ( + BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); + MapVector parent = new MapVector("parent", originalVectorAllocator, null)) { + writeComplexData(count, parent); + write((MapVector)parent.getChild("root"), file); + } + + // read + try ( + BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(file); + ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator); + BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); + MapVector parent = new MapVector("parent", vectorAllocator, null) + ) { + ArrowFooter footer = arrowReader.readFooter(); + Schema schema = footer.getSchema(); + System.out.println("reading schema: " + schema); + + // initialize vectors + + MapVector root = parent.addOrGet("root", MinorType.MAP, MapVector.class); + + VectorLoader vectorLoader = new VectorLoader(schema, root); + + List<ArrowBlock> recordBatches = footer.getRecordBatches(); + for (ArrowBlock rbBlock : recordBatches) { + try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { + vectorLoader.load(recordBatch); + } + validateComplexContent(count, parent); + } + } + } + + public void printVectors(List<FieldVector> vectors) { + for (FieldVector vector : vectors) { + System.out.println(vector.getField().getName()); + Accessor accessor = vector.getAccessor(); + int valueCount = accessor.getValueCount(); + for (int i = 0; i < valueCount; i++) { + System.out.println(accessor.getObject(i)); + } + } + } + + private void validateComplexContent(int count, MapVector parent) { + printVectors(parent.getChildrenFromFields()); + + MapReader rootReader = new SingleMapReaderImpl(parent).reader("root"); + for (int i = 0; i < count; i++) { + rootReader.setPosition(i); + Assert.assertEquals(i, rootReader.reader("int").readInteger().intValue()); + Assert.assertEquals(i, rootReader.reader("bigInt").readLong().longValue()); + Assert.assertEquals(i % 3, rootReader.reader("list").size()); + Assert.assertEquals(i, rootReader.reader("map").reader("timestamp").readDateTime().getMillis() % COUNT); + } + } + + private void write(MapVector parent, File file) throws FileNotFoundException, IOException { + VectorUnloader vectorUnloader = new VectorUnloader(parent); + Schema schema = vectorUnloader.getSchema(); + System.out.println("writing schema: " + schema); + try ( + FileOutputStream fileOutputStream = new FileOutputStream(file); + ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema); + ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch(); + ) { + arrowWriter.writeRecordBatch(recordBatch); + } + } + + @Test + public void testWriteReadMultipleRBs() throws IOException { + File file = new File("target/mytest_multiple.arrow"); + int count = COUNT; + + // write + try ( + BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); + MapVector parent = new MapVector("parent", originalVectorAllocator, null); + FileOutputStream fileOutputStream = new FileOutputStream(file);) { + writeData(count, parent); + VectorUnloader vectorUnloader = new VectorUnloader(parent.getChild("root")); + Schema schema = vectorUnloader.getSchema(); + Assert.assertEquals(2, schema.getFields().size()); + try (ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);) { + try (ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch()) { + arrowWriter.writeRecordBatch(recordBatch); + } + parent.allocateNew(); + writeData(count, parent); + try (ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch()) { + arrowWriter.writeRecordBatch(recordBatch); + } + } + } + + // read + try ( + BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(file); + ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator); + BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); + MapVector parent = new MapVector("parent", vectorAllocator, null); + ) { + ArrowFooter footer = arrowReader.readFooter(); + Schema schema = footer.getSchema(); + System.out.println("reading schema: " + schema); + MapVector root = parent.addOrGet("root", MinorType.MAP, MapVector.class); + VectorLoader vectorLoader = new VectorLoader(schema, root); + List<ArrowBlock> recordBatches = footer.getRecordBatches(); + Assert.assertEquals(2, recordBatches.size()); + for (ArrowBlock rbBlock : recordBatches) { + Assert.assertEquals(0, rbBlock.getOffset() % 8); + Assert.assertEquals(0, rbBlock.getMetadataLength() % 8); + try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { + List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout(); + for (ArrowBuffer arrowBuffer : buffersLayout) { + Assert.assertEquals(0, arrowBuffer.getOffset() % 8); + } + vectorLoader.load(recordBatch); + validateContent(count, parent); + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java new file mode 100644 index 0000000..707dba2 --- /dev/null +++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.file; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; + +import java.nio.ByteBuffer; +import java.util.Collections; + +import org.apache.arrow.flatbuf.Footer; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.Test; + +import com.google.flatbuffers.FlatBufferBuilder; + +public class TestArrowFooter { + + @Test + public void test() { + Schema schema = new Schema(asList( + new Field("a", true, new ArrowType.Int(8, true), Collections.<Field>emptyList()) + )); + ArrowFooter footer = new ArrowFooter(schema, Collections.<ArrowBlock>emptyList(), Collections.<ArrowBlock>emptyList()); + ArrowFooter newFooter = roundTrip(footer); + assertEquals(footer, newFooter); + } + + + private ArrowFooter roundTrip(ArrowFooter footer) { + FlatBufferBuilder builder = new FlatBufferBuilder(); + int i = footer.writeTo(builder); + builder.finish(i); + ByteBuffer dataBuffer = builder.dataBuffer(); + ArrowFooter newFooter = new ArrowFooter(Footer.getRootAsFooter(dataBuffer)); + return newFooter; + } + +} http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java new file mode 100644 index 0000000..f90329a --- /dev/null +++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.file; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.channels.Channels; +import java.util.Collections; +import java.util.List; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.file.ArrowBlock; +import org.apache.arrow.vector.file.ArrowFooter; +import org.apache.arrow.vector.file.ArrowReader; +import org.apache.arrow.vector.file.ArrowWriter; +import org.apache.arrow.vector.schema.ArrowFieldNode; +import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.Before; +import org.junit.Test; + +import io.netty.buffer.ArrowBuf; + +public class TestArrowReaderWriter { + + private BufferAllocator allocator; + + @Before + public void init() { + allocator = new RootAllocator(Long.MAX_VALUE); + } + + ArrowBuf buf(byte[] bytes) { + ArrowBuf buffer = allocator.buffer(bytes.length); + buffer.writeBytes(bytes); + return buffer; + } + + byte[] array(ArrowBuf buf) { + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + return bytes; + } + + @Test + public void test() throws IOException { + Schema schema = new Schema(asList(new Field("testField", true, new ArrowType.Int(8, true), Collections.<Field>emptyList()))); + byte[] validity = new byte[] { (byte)255, 0}; + // second half is "undefined" + byte[] values = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}; + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (ArrowWriter writer = new ArrowWriter(Channels.newChannel(out), schema)) { + ArrowBuf validityb = buf(validity); + ArrowBuf valuesb = buf(values); + writer.writeRecordBatch(new ArrowRecordBatch(16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb))); + } + + byte[] byteArray = out.toByteArray(); + + try (ArrowReader reader = new ArrowReader(new ByteArrayReadableSeekableByteChannel(byteArray), allocator)) { + ArrowFooter footer = reader.readFooter(); + Schema readSchema = footer.getSchema(); + assertEquals(schema, readSchema); + assertTrue(readSchema.getFields().get(0).getTypeLayout().getVectorTypes().toString(), readSchema.getFields().get(0).getTypeLayout().getVectors().size() > 0); + // TODO: dictionaries + List<ArrowBlock> recordBatches = footer.getRecordBatches(); + assertEquals(1, recordBatches.size()); + ArrowRecordBatch recordBatch = reader.readRecordBatch(recordBatches.get(0)); + List<ArrowFieldNode> nodes = recordBatch.getNodes(); + assertEquals(1, nodes.size()); + ArrowFieldNode node = nodes.get(0); + assertEquals(16, node.getLength()); + assertEquals(8, node.getNullCount()); + List<ArrowBuf> buffers = recordBatch.getBuffers(); + assertEquals(2, buffers.size()); + assertArrayEquals(validity, array(buffers.get(0))); + assertArrayEquals(values, array(buffers.get(1))); + + } + } + +} http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/test/java/org/apache/arrow/vector/pojo/TestConvert.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/pojo/TestConvert.java b/java/vector/src/test/java/org/apache/arrow/vector/pojo/TestConvert.java index 06a1149..61327f1 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/pojo/TestConvert.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/pojo/TestConvert.java @@ -17,19 +17,24 @@ */ package org.apache.arrow.vector.pojo; -import com.google.common.collect.ImmutableList; -import com.google.flatbuffers.FlatBufferBuilder; +import static org.apache.arrow.flatbuf.Precision.DOUBLE; +import static org.apache.arrow.flatbuf.Precision.SINGLE; +import static org.junit.Assert.assertEquals; + +import org.apache.arrow.flatbuf.UnionMode; import org.apache.arrow.vector.types.pojo.ArrowType.FloatingPoint; import org.apache.arrow.vector.types.pojo.ArrowType.Int; +import org.apache.arrow.vector.types.pojo.ArrowType.List; +import org.apache.arrow.vector.types.pojo.ArrowType.Timestamp; import org.apache.arrow.vector.types.pojo.ArrowType.Tuple; +import org.apache.arrow.vector.types.pojo.ArrowType.Union; import org.apache.arrow.vector.types.pojo.ArrowType.Utf8; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; import org.junit.Test; -import java.util.List; - -import static org.junit.Assert.assertEquals; +import com.google.common.collect.ImmutableList; +import com.google.flatbuffers.FlatBufferBuilder; /** * Test conversion between Flatbuf and Pojo field representations @@ -46,7 +51,7 @@ public class TestConvert { public void complex() { ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder(); childrenBuilder.add(new Field("child1", true, Utf8.INSTANCE, null)); - childrenBuilder.add(new Field("child2", true, new FloatingPoint(0), ImmutableList.<Field>of())); + childrenBuilder.add(new Field("child2", true, new FloatingPoint(SINGLE), ImmutableList.<Field>of())); Field initialField = new Field("a", true, Tuple.INSTANCE, childrenBuilder.build()); run(initialField); @@ -56,10 +61,29 @@ public class TestConvert { public void schema() { ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder(); childrenBuilder.add(new Field("child1", true, Utf8.INSTANCE, null)); - childrenBuilder.add(new Field("child2", true, new FloatingPoint(0), ImmutableList.<Field>of())); + childrenBuilder.add(new Field("child2", true, new FloatingPoint(SINGLE), ImmutableList.<Field>of())); Schema initialSchema = new Schema(childrenBuilder.build()); run(initialSchema); + } + @Test + public void nestedSchema() { + ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder(); + childrenBuilder.add(new Field("child1", true, Utf8.INSTANCE, null)); + childrenBuilder.add(new Field("child2", true, new FloatingPoint(SINGLE), ImmutableList.<Field>of())); + childrenBuilder.add(new Field("child3", true, new Tuple(), ImmutableList.<Field>of( + new Field("child3.1", true, Utf8.INSTANCE, null), + new Field("child3.2", true, new FloatingPoint(DOUBLE), ImmutableList.<Field>of()) + ))); + childrenBuilder.add(new Field("child4", true, new List(), ImmutableList.<Field>of( + new Field("child4.1", true, Utf8.INSTANCE, null) + ))); + childrenBuilder.add(new Field("child5", true, new Union(UnionMode.Sparse), ImmutableList.<Field>of( + new Field("child5.1", true, new Timestamp("UTC"), null), + new Field("child5.2", true, new FloatingPoint(DOUBLE), ImmutableList.<Field>of()) + ))); + Schema initialSchema = new Schema(childrenBuilder.build()); + run(initialSchema); } private void run(Field initialField) {