Repository: arrow Updated Branches: refs/heads/master 3b650014f -> 49f666e74
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStream.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStream.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStream.java new file mode 100644 index 0000000..e7cdf3f --- /dev/null +++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStream.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.file; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import io.netty.buffer.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.NullableTinyIntVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.schema.ArrowFieldNode; +import org.apache.arrow.vector.schema.ArrowMessage; +import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.stream.ArrowStreamReader; +import org.apache.arrow.vector.stream.ArrowStreamWriter; +import org.apache.arrow.vector.stream.MessageSerializerTest; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.Test; + +public class TestArrowStream extends BaseFileTest { + @Test + public void testEmptyStream() throws IOException { + Schema schema = MessageSerializerTest.testSchema(); + VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator); + + // Write the stream. + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (ArrowStreamWriter writer = new ArrowStreamWriter(root, null, out)) { + } + + ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray()); + try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator)) { + assertEquals(schema, reader.getVectorSchemaRoot().getSchema()); + // Empty should return nothing. Can be called repeatedly. + reader.loadNextBatch(); + assertEquals(0, reader.getVectorSchemaRoot().getRowCount()); + reader.loadNextBatch(); + assertEquals(0, reader.getVectorSchemaRoot().getRowCount()); + } + } + + @Test + public void testReadWrite() throws IOException { + Schema schema = MessageSerializerTest.testSchema(); + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + int numBatches = 1; + + root.getFieldVectors().get(0).allocateNew(); + NullableTinyIntVector.Mutator mutator = (NullableTinyIntVector.Mutator) root.getFieldVectors().get(0).getMutator(); + for (int i = 0; i < 16; i++) { + mutator.set(i, i < 8 ? 1 : 0, (byte)(i + 1)); + } + mutator.setValueCount(16); + root.setRowCount(16); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + long bytesWritten = 0; + try (ArrowStreamWriter writer = new ArrowStreamWriter(root, null, out)) { + writer.start(); + for (int i = 0; i < numBatches; i++) { + writer.writeBatch(); + } + writer.end(); + bytesWritten = writer.bytesWritten(); + } + + ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray()); + try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator)) { + Schema readSchema = reader.getVectorSchemaRoot().getSchema(); + assertEquals(schema, readSchema); + for (int i = 0; i < numBatches; i++) { + reader.loadNextBatch(); + } + // TODO figure out why reader isn't getting padding bytes + assertEquals(bytesWritten, reader.bytesRead() + 4); + reader.loadNextBatch(); + assertEquals(0, reader.getVectorSchemaRoot().getRowCount()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java new file mode 100644 index 0000000..46d4679 --- /dev/null +++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.file; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.nio.channels.Pipe; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.NullableTinyIntVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.schema.ArrowMessage; +import org.apache.arrow.vector.stream.ArrowStreamReader; +import org.apache.arrow.vector.stream.ArrowStreamWriter; +import org.apache.arrow.vector.stream.MessageSerializerTest; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.Assert; +import org.junit.Test; + +public class TestArrowStreamPipe { + Schema schema = MessageSerializerTest.testSchema(); + BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE); + + private final class WriterThread extends Thread { + + private final int numBatches; + private final ArrowStreamWriter writer; + private final VectorSchemaRoot root; + + public WriterThread(int numBatches, WritableByteChannel sinkChannel) + throws IOException { + this.numBatches = numBatches; + BufferAllocator allocator = alloc.newChildAllocator("writer thread", 0, Integer.MAX_VALUE); + root = VectorSchemaRoot.create(schema, allocator); + writer = new ArrowStreamWriter(root, null, sinkChannel); + } + + @Override + public void run() { + try { + writer.start(); + for (int j = 0; j < numBatches; j++) { + root.getFieldVectors().get(0).allocateNew(); + NullableTinyIntVector.Mutator mutator = (NullableTinyIntVector.Mutator) root.getFieldVectors().get(0).getMutator(); + // Send a changing batch id first + mutator.set(0, j); + for (int i = 1; i < 16; i++) { + mutator.set(i, i < 8 ? 1 : 0, (byte)(i + 1)); + } + mutator.setValueCount(16); + root.setRowCount(16); + + writer.writeBatch(); + } + writer.close(); + root.close(); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(e.toString()); // have to explicitly fail since we're in a separate thread + } + } + + public long bytesWritten() { return writer.bytesWritten(); } + } + + private final class ReaderThread extends Thread { + private int batchesRead = 0; + private final ArrowStreamReader reader; + private final BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE); + private boolean done = false; + + public ReaderThread(ReadableByteChannel sourceChannel) + throws IOException { + reader = new ArrowStreamReader(sourceChannel, alloc) { + @Override + protected ArrowMessage readMessage(ReadChannel in, BufferAllocator allocator) throws IOException { + // Read all the batches. Each batch contains an incrementing id and then some + // constant data. Verify both. + ArrowMessage message = super.readMessage(in, allocator); + if (message == null) { + done = true; + } else { + batchesRead++; + } + return message; + } + @Override + public void loadNextBatch() throws IOException { + super.loadNextBatch(); + if (!done) { + VectorSchemaRoot root = getVectorSchemaRoot(); + Assert.assertEquals(16, root.getRowCount()); + NullableTinyIntVector vector = (NullableTinyIntVector) root.getFieldVectors().get(0); + Assert.assertEquals((byte)(batchesRead - 1), vector.getAccessor().get(0)); + for (int i = 1; i < 16; i++) { + if (i < 8) { + Assert.assertEquals((byte)(i + 1), vector.getAccessor().get(i)); + } else { + Assert.assertTrue(vector.getAccessor().isNull(i)); + } + } + } + } + }; + } + + @Override + public void run() { + try { + assertEquals(schema, reader.getVectorSchemaRoot().getSchema()); + assertTrue( + reader.getVectorSchemaRoot().getSchema().getFields().get(0).getTypeLayout().getVectorTypes().toString(), + reader.getVectorSchemaRoot().getSchema().getFields().get(0).getTypeLayout().getVectors().size() > 0); + while (!done) { + reader.loadNextBatch(); + } + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(e.toString()); // have to explicitly fail since we're in a separate thread + } + } + + public int getBatchesRead() { return batchesRead; } + public long bytesRead() { return reader.bytesRead(); } + } + + // Starts up a producer and consumer thread to read/write batches. + @Test + public void pipeTest() throws IOException, InterruptedException { + int NUM_BATCHES = 10; + Pipe pipe = Pipe.open(); + WriterThread writer = new WriterThread(NUM_BATCHES, pipe.sink()); + ReaderThread reader = new ReaderThread(pipe.source()); + + writer.start(); + reader.start(); + reader.join(); + writer.join(); + + assertEquals(NUM_BATCHES, reader.getBatchesRead()); + assertEquals(writer.bytesWritten(), reader.bytesRead()); + } +} http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java b/java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java index 3720a13..c88958c 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java @@ -70,7 +70,7 @@ public class TestJSONFile extends BaseFileTest { int count = COUNT; try ( BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); - NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null)) { + NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null, null)) { writeComplexData(count, parent); VectorSchemaRoot root = new VectorSchemaRoot(parent.getChild("root")); validateComplexContent(root.getRowCount(), root); @@ -92,7 +92,7 @@ public class TestJSONFile extends BaseFileTest { int count = COUNT; try ( BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); - NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null)) { + NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null, null)) { writeUnionData(count, parent); http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java b/java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java index 7b4de80..bb2ccf8 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java @@ -34,6 +34,7 @@ import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.file.ReadChannel; import org.apache.arrow.vector.file.WriteChannel; import org.apache.arrow.vector.schema.ArrowFieldNode; +import org.apache.arrow.vector.schema.ArrowMessage; import org.apache.arrow.vector.schema.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; @@ -88,9 +89,10 @@ public class MessageSerializerTest { MessageSerializer.serialize(new WriteChannel(Channels.newChannel(out)), batch); ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray()); - ArrowRecordBatch deserialized = MessageSerializer.deserializeRecordBatch( - new ReadChannel(Channels.newChannel(in)), alloc); - verifyBatch(deserialized, validity, values); + ReadChannel channel = new ReadChannel(Channels.newChannel(in)); + ArrowMessage deserialized = MessageSerializer.deserializeMessageBatch(channel, alloc); + assertEquals(ArrowRecordBatch.class, deserialized.getClass()); + verifyBatch((ArrowRecordBatch) deserialized, validity, values); } public static Schema testSchema() { http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStream.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStream.java b/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStream.java deleted file mode 100644 index 725272a..0000000 --- a/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStream.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.arrow.vector.stream; - -import static java.util.Arrays.asList; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; - -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.file.BaseFileTest; -import org.apache.arrow.vector.schema.ArrowFieldNode; -import org.apache.arrow.vector.schema.ArrowRecordBatch; -import org.apache.arrow.vector.types.pojo.Schema; -import org.junit.Test; - -import io.netty.buffer.ArrowBuf; - -public class TestArrowStream extends BaseFileTest { - @Test - public void testEmptyStream() throws IOException { - Schema schema = MessageSerializerTest.testSchema(); - - // Write the stream. - ByteArrayOutputStream out = new ByteArrayOutputStream(); - try (ArrowStreamWriter writer = new ArrowStreamWriter(out, schema)) { - } - - ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray()); - try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator)) { - reader.init(); - assertEquals(schema, reader.getSchema()); - // Empty should return null. Can be called repeatedly. - assertTrue(reader.nextRecordBatch() == null); - assertTrue(reader.nextRecordBatch() == null); - } - } - - @Test - public void testReadWrite() throws IOException { - Schema schema = MessageSerializerTest.testSchema(); - byte[] validity = new byte[] { (byte)255, 0}; - // second half is "undefined" - byte[] values = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}; - - int numBatches = 5; - BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - long bytesWritten = 0; - try (ArrowStreamWriter writer = new ArrowStreamWriter(out, schema)) { - ArrowBuf validityb = MessageSerializerTest.buf(alloc, validity); - ArrowBuf valuesb = MessageSerializerTest.buf(alloc, values); - for (int i = 0; i < numBatches; i++) { - writer.writeRecordBatch(new ArrowRecordBatch( - 16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb))); - } - bytesWritten = writer.bytesWritten(); - } - - ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray()); - try (ArrowStreamReader reader = new ArrowStreamReader(in, alloc)) { - reader.init(); - Schema readSchema = reader.getSchema(); - for (int i = 0; i < numBatches; i++) { - assertEquals(schema, readSchema); - assertTrue( - readSchema.getFields().get(0).getTypeLayout().getVectorTypes().toString(), - readSchema.getFields().get(0).getTypeLayout().getVectors().size() > 0); - ArrowRecordBatch recordBatch = reader.nextRecordBatch(); - MessageSerializerTest.verifyBatch(recordBatch, validity, values); - assertTrue(recordBatch != null); - } - assertTrue(reader.nextRecordBatch() == null); - assertEquals(bytesWritten, reader.bytesRead()); - } - } -} http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java b/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java deleted file mode 100644 index aa0b77e..0000000 --- a/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.arrow.vector.stream; - -import static java.util.Arrays.asList; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.nio.channels.Pipe; -import java.nio.channels.ReadableByteChannel; -import java.nio.channels.WritableByteChannel; - -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.schema.ArrowFieldNode; -import org.apache.arrow.vector.schema.ArrowRecordBatch; -import org.apache.arrow.vector.types.pojo.Schema; -import org.junit.Test; - -import io.netty.buffer.ArrowBuf; - -public class TestArrowStreamPipe { - Schema schema = MessageSerializerTest.testSchema(); - // second half is "undefined" - byte[] values = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}; - - private final class WriterThread extends Thread { - private final int numBatches; - private final ArrowStreamWriter writer; - - public WriterThread(int numBatches, WritableByteChannel sinkChannel) - throws IOException { - this.numBatches = numBatches; - writer = new ArrowStreamWriter(sinkChannel, schema); - } - - @Override - public void run() { - BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE); - try { - ArrowBuf valuesb = MessageSerializerTest.buf(alloc, values); - for (int i = 0; i < numBatches; i++) { - // Send a changing byte id first. - byte[] validity = new byte[] { (byte)i, 0}; - ArrowBuf validityb = MessageSerializerTest.buf(alloc, validity); - writer.writeRecordBatch(new ArrowRecordBatch( - 16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb))); - } - writer.close(); - } catch (IOException e) { - e.printStackTrace(); - assertTrue(false); - } - } - - public long bytesWritten() { return writer.bytesWritten(); } - } - - private final class ReaderThread extends Thread { - private int batchesRead = 0; - private final ArrowStreamReader reader; - private final BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE); - - public ReaderThread(ReadableByteChannel sourceChannel) - throws IOException { - reader = new ArrowStreamReader(sourceChannel, alloc); - } - - @Override - public void run() { - try { - reader.init(); - assertEquals(schema, reader.getSchema()); - assertTrue( - reader.getSchema().getFields().get(0).getTypeLayout().getVectorTypes().toString(), - reader.getSchema().getFields().get(0).getTypeLayout().getVectors().size() > 0); - - // Read all the batches. Each batch contains an incrementing id and then some - // constant data. Verify both. - while (true) { - ArrowRecordBatch batch = reader.nextRecordBatch(); - if (batch == null) break; - byte[] validity = new byte[] { (byte)batchesRead, 0}; - MessageSerializerTest.verifyBatch(batch, validity, values); - batchesRead++; - } - } catch (IOException e) { - e.printStackTrace(); - assertTrue(false); - } - } - - public int getBatchesRead() { return batchesRead; } - public long bytesRead() { return reader.bytesRead(); } - } - - // Starts up a producer and consumer thread to read/write batches. - @Test - public void pipeTest() throws IOException, InterruptedException { - int NUM_BATCHES = 10; - Pipe pipe = Pipe.open(); - WriterThread writer = new WriterThread(NUM_BATCHES, pipe.sink()); - ReaderThread reader = new ReaderThread(pipe.source()); - - writer.start(); - reader.start(); - reader.join(); - writer.join(); - - assertEquals(NUM_BATCHES, reader.getBatchesRead()); - assertEquals(writer.bytesWritten(), reader.bytesRead()); - } -}
