Repository: arrow Updated Branches: refs/heads/master 79344b335 -> 6996c17f7
ARROW-312: [Java] IPC file round trip tool for integration testing Author: Julien Le Dem <[email protected]> Author: Wes McKinney <[email protected]> Closes #186 from wesm/roundtrip-tool and squashes the following commits: aee552a [Julien Le Dem] missing file 9d5c078 [Julien Le Dem] fix read-write bug 7f20b36 [Julien Le Dem] simple roundtrip a04091f [Wes McKinney] Drafting file round trip helper executable Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/6996c17f Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/6996c17f Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/6996c17f Branch: refs/heads/master Commit: 6996c17f70dc13659c37dfaa39bc28e7777ca6a6 Parents: 79344b3 Author: Julien Le Dem <[email protected]> Authored: Tue Nov 8 13:29:34 2016 -0500 Committer: Wes McKinney <[email protected]> Committed: Tue Nov 8 13:29:34 2016 -0500 ---------------------------------------------------------------------- .../src/main/java/io/netty/buffer/ArrowBuf.java | 7 +- .../apache/arrow/memory/TestBaseAllocator.java | 24 ++- java/pom.xml | 1 + java/tools/pom.xml | 73 +++++++++ .../org/apache/arrow/tools/FileRoundtrip.java | 135 ++++++++++++++++ .../apache/arrow/tools/TestFileRoundtrip.java | 159 +++++++++++++++++++ java/vector/pom.xml | 32 ++-- .../codegen/templates/NullableValueVectors.java | 2 +- .../org/apache/arrow/vector/VectorLoader.java | 21 ++- .../apache/arrow/vector/VectorSchemaRoot.java | 140 ++++++++++++++++ .../org/apache/arrow/vector/VectorUnloader.java | 13 +- .../apache/arrow/vector/schema/ArrowBuffer.java | 6 + .../arrow/vector/schema/ArrowRecordBatch.java | 8 + .../arrow/vector/TestVectorUnloadLoad.java | 42 +++-- .../apache/arrow/vector/file/TestArrowFile.java | 149 +++++++++-------- 15 files changed, 681 insertions(+), 131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java ---------------------------------------------------------------------- diff --git a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java index a5989c1..95d2be5 100644 --- a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java @@ -179,7 +179,10 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable { historicalLog.recordEvent("retain(%s)", target.getName()); } final BufferLedger otherLedger = this.ledger.getLedgerForAllocator(target); - return otherLedger.newArrowBuf(offset, length, null); + ArrowBuf newArrowBuf = otherLedger.newArrowBuf(offset, length, null); + newArrowBuf.readerIndex(this.readerIndex); + newArrowBuf.writerIndex(this.writerIndex); + return newArrowBuf; } /** @@ -214,6 +217,8 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable { final BufferLedger otherLedger = this.ledger.getLedgerForAllocator(target); final ArrowBuf newBuf = otherLedger.newArrowBuf(offset, length, null); + newBuf.readerIndex(this.readerIndex); + newBuf.writerIndex(this.writerIndex); final boolean allocationFit = this.ledger.transferBalance(otherLedger); return new TransferResult(allocationFit, newBuf); } http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java ---------------------------------------------------------------------- diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java index aa6b70c..3c96d57 100644 --- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java @@ -22,16 +22,13 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import io.netty.buffer.ArrowBuf; -import io.netty.buffer.ArrowBuf.TransferResult; -import org.apache.arrow.memory.AllocationReservation; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.OutOfMemoryException; -import org.apache.arrow.memory.RootAllocator; import org.junit.Ignore; import org.junit.Test; +import io.netty.buffer.ArrowBuf; +import io.netty.buffer.ArrowBuf.TransferResult; + public class TestBaseAllocator { // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBaseAllocator.class); @@ -134,6 +131,7 @@ public class TestBaseAllocator { final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 4); rootAllocator.verify(); TransferResult transferOwnership = arrowBuf1.transferOwnership(childAllocator2); + assertEquiv(arrowBuf1, transferOwnership.buffer); final boolean allocationFit = transferOwnership.allocationFit; rootAllocator.verify(); assertTrue(allocationFit); @@ -160,6 +158,7 @@ public class TestBaseAllocator { rootAllocator.verify(); assertNotNull(arrowBuf2); assertNotEquals(arrowBuf2, arrowBuf1); + assertEquiv(arrowBuf1, arrowBuf2); // release original buffer (thus transferring ownership to allocator 2. (should leave allocator 1 in empty state) arrowBuf1.release(); @@ -172,6 +171,7 @@ public class TestBaseAllocator { assertNotNull(arrowBuf3); assertNotEquals(arrowBuf3, arrowBuf1); assertNotEquals(arrowBuf3, arrowBuf2); + assertEquiv(arrowBuf1, arrowBuf3); rootAllocator.verify(); arrowBuf2.release(); @@ -452,8 +452,10 @@ public class TestBaseAllocator { rootAllocator.verify(); TransferResult result1 = arrowBuf2s.transferOwnership(childAllocator1); + assertEquiv(arrowBuf2s, result1.buffer); rootAllocator.verify(); TransferResult result2 = arrowBuf1s.transferOwnership(childAllocator2); + assertEquiv(arrowBuf1s, result2.buffer); rootAllocator.verify(); result1.buffer.release(); @@ -482,7 +484,9 @@ public class TestBaseAllocator { rootAllocator.verify(); final ArrowBuf arrowBuf2s1 = arrowBuf2s.retain(childAllocator1); + assertEquiv(arrowBuf2s, arrowBuf2s1); final ArrowBuf arrowBuf1s2 = arrowBuf1s.retain(childAllocator2); + assertEquiv(arrowBuf1s, arrowBuf1s2); rootAllocator.verify(); arrowBuf1s.release(); // releases arrowBuf1 @@ -512,11 +516,13 @@ public class TestBaseAllocator { rootAllocator.verify(); assertNotNull(arrowBuf2); assertNotEquals(arrowBuf2, arrowBuf1); + assertEquiv(arrowBuf1, arrowBuf2); TransferResult result = arrowBuf1.transferOwnership(childAllocator3); allocationFit = result.allocationFit; final ArrowBuf arrowBuf3 = result.buffer; assertTrue(allocationFit); + assertEquiv(arrowBuf1, arrowBuf3); rootAllocator.verify(); // Since childAllocator3 now has childAllocator1's buffer, 1, can close @@ -533,6 +539,7 @@ public class TestBaseAllocator { allocationFit = result.allocationFit; final ArrowBuf arrowBuf4 = result2.buffer; assertTrue(allocationFit); + assertEquiv(arrowBuf3, arrowBuf4); rootAllocator.verify(); arrowBuf3.release(); @@ -645,4 +652,9 @@ public class TestBaseAllocator { } } + + public void assertEquiv(ArrowBuf origBuf, ArrowBuf newBuf) { + assertEquals(origBuf.readerIndex(), newBuf.readerIndex()); + assertEquals(origBuf.writerIndex(), newBuf.writerIndex()); + } } http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/pom.xml ---------------------------------------------------------------------- diff --git a/java/pom.xml b/java/pom.xml index 0147de7..7221a14 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -467,5 +467,6 @@ <module>format</module> <module>memory</module> <module>vector</module> + <module>tools</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/tools/pom.xml ---------------------------------------------------------------------- diff --git a/java/tools/pom.xml b/java/tools/pom.xml new file mode 100644 index 0000000..84b0b5e --- /dev/null +++ b/java/tools/pom.xml @@ -0,0 +1,73 @@ +<?xml version="1.0"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-java-root</artifactId> + <version>0.1.1-SNAPSHOT</version> + </parent> + <artifactId>arrow-tools</artifactId> + <name>Arrow Tools</name> + + <dependencies> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-format</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-vector</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>3.4</version> + </dependency> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + <version>1.2</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>2.6</version> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java ---------------------------------------------------------------------- diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java new file mode 100644 index 0000000..db7a1c2 --- /dev/null +++ b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.arrow.tools; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.List; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.file.ArrowBlock; +import org.apache.arrow.vector.file.ArrowFooter; +import org.apache.arrow.vector.file.ArrowReader; +import org.apache.arrow.vector.file.ArrowWriter; +import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FileRoundtrip { + private static final Logger LOGGER = LoggerFactory.getLogger(FileRoundtrip.class); + + public static void main(String[] args) { + System.exit(new FileRoundtrip(System.out, System.err).run(args)); + } + + private final Options options; + private final PrintStream out; + private final PrintStream err; + + FileRoundtrip(PrintStream out, PrintStream err) { + this.out = out; + this.err = err; + this.options = new Options(); + this.options.addOption("i", "in", true, "input file"); + this.options.addOption("o", "out", true, "output file"); + + } + + private File validateFile(String type, String fileName) { + if (fileName == null) { + throw new IllegalArgumentException("missing " + type + " file parameter"); + } + File f = new File(fileName); + if (!f.exists() || f.isDirectory()) { + throw new IllegalArgumentException(type + " file not found: " + f.getAbsolutePath()); + } + return f; + } + + int run(String[] args) { + try { + CommandLineParser parser = new PosixParser(); + CommandLine cmd = parser.parse(options, args, false); + + String inFileName = cmd.getOptionValue("in"); + String outFileName = cmd.getOptionValue("out"); + + File inFile = validateFile("input", inFileName); + File outFile = validateFile("output", outFileName); + BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); // TODO: close + try( + FileInputStream fileInputStream = new FileInputStream(inFile); + ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);) { + + ArrowFooter footer = arrowReader.readFooter(); + Schema schema = footer.getSchema(); + LOGGER.debug("Input file size: " + inFile.length()); + LOGGER.debug("Found schema: " + schema); + + try ( + FileOutputStream fileOutputStream = new FileOutputStream(outFile); + ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema); + ) { + + // initialize vectors + + List<ArrowBlock> recordBatches = footer.getRecordBatches(); + for (ArrowBlock rbBlock : recordBatches) { + try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock); + VectorSchemaRoot root = new VectorSchemaRoot(schema, allocator);) { + + VectorLoader vectorLoader = new VectorLoader(root); + vectorLoader.load(inRecordBatch); + + VectorUnloader vectorUnloader = new VectorUnloader(root); + ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch(); + arrowWriter.writeRecordBatch(recordBatch); + } + } + } + LOGGER.debug("Output file size: " + outFile.length()); + } + } catch (ParseException e) { + return fatalError("Invalid parameters", e); + } catch (IOException e) { + return fatalError("Error accessing files", e); + } + return 0; + } + + private int fatalError(String message, Throwable e) { + err.println(message); + LOGGER.error(message, e); + return 1; + } + +} http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java ---------------------------------------------------------------------- diff --git a/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java b/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java new file mode 100644 index 0000000..339725e --- /dev/null +++ b/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.arrow.tools; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.List; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.impl.ComplexWriterImpl; +import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter; +import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter; +import org.apache.arrow.vector.complex.writer.BigIntWriter; +import org.apache.arrow.vector.complex.writer.IntWriter; +import org.apache.arrow.vector.file.ArrowBlock; +import org.apache.arrow.vector.file.ArrowFooter; +import org.apache.arrow.vector.file.ArrowReader; +import org.apache.arrow.vector.file.ArrowWriter; +import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestFileRoundtrip { + private static final int COUNT = 10; + + @Rule + public TemporaryFolder testFolder = new TemporaryFolder(); + + private BufferAllocator allocator; + + @Before + public void init() { + allocator = new RootAllocator(Integer.MAX_VALUE); + } + + @After + public void tearDown() { + allocator.close(); + } + + private void writeData(int count, MapVector parent) { + ComplexWriter writer = new ComplexWriterImpl("root", parent); + MapWriter rootWriter = writer.rootAsMap(); + IntWriter intWriter = rootWriter.integer("int"); + BigIntWriter bigIntWriter = rootWriter.bigInt("bigInt"); + for (int i = 0; i < count; i++) { + intWriter.setPosition(i); + intWriter.writeInt(i); + bigIntWriter.setPosition(i); + bigIntWriter.writeBigInt(i); + } + writer.setValueCount(count); + } + + @Test + public void test() throws Exception { + File testInFile = testFolder.newFile("testIn.arrow"); + File testOutFile = testFolder.newFile("testOut.arrow"); + + writeInput(testInFile); + + String[] args = { "-i", testInFile.getAbsolutePath(), "-o", testOutFile.getAbsolutePath()}; + int result = new FileRoundtrip(System.out, System.err).run(args); + assertEquals(0, result); + + validateOutput(testOutFile); + } + + private void validateOutput(File testOutFile) throws Exception { + // read + try ( + BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(testOutFile); + ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator); + BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); + ) { + ArrowFooter footer = arrowReader.readFooter(); + Schema schema = footer.getSchema(); + + // initialize vectors + try (VectorSchemaRoot root = new VectorSchemaRoot(schema, readerAllocator)) { + VectorLoader vectorLoader = new VectorLoader(root); + + List<ArrowBlock> recordBatches = footer.getRecordBatches(); + for (ArrowBlock rbBlock : recordBatches) { + try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { + vectorLoader.load(recordBatch); + } + validateContent(COUNT, root); + } + } + } + } + + private void validateContent(int count, VectorSchemaRoot root) { + Assert.assertEquals(count, root.getRowCount()); + for (int i = 0; i < count; i++) { + Assert.assertEquals(i, root.getVector("int").getAccessor().getObject(i)); + Assert.assertEquals(Long.valueOf(i), root.getVector("bigInt").getAccessor().getObject(i)); + } + } + + public void writeInput(File testInFile) throws FileNotFoundException, IOException { + int count = COUNT; + try ( + BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); + MapVector parent = new MapVector("parent", vectorAllocator, null)) { + writeData(count, parent); + write(parent.getChild("root"), testInFile); + } + } + + private void write(FieldVector parent, File file) throws FileNotFoundException, IOException { + Schema schema = new Schema(parent.getField().getChildren()); + int valueCount = parent.getAccessor().getValueCount(); + List<FieldVector> fields = parent.getChildrenFromFields(); + VectorUnloader vectorUnloader = new VectorUnloader(schema, valueCount, fields); + try ( + FileOutputStream fileOutputStream = new FileOutputStream(file); + ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema); + ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch(); + ) { + arrowWriter.writeRecordBatch(recordBatch); + } + } + +} http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/vector/pom.xml ---------------------------------------------------------------------- diff --git a/java/vector/pom.xml b/java/vector/pom.xml index 1d06bde..64b68bf 100644 --- a/java/vector/pom.xml +++ b/java/vector/pom.xml @@ -1,13 +1,13 @@ <?xml version="1.0"?> -<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor - license agreements. See the NOTICE file distributed with this work for additional - information regarding copyright ownership. The ASF licenses this file to - You under the Apache License, Version 2.0 (the "License"); you may not use - this file except in compliance with the License. You may obtain a copy of - the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required - by applicable law or agreed to in writing, software distributed under the - License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS - OF ANY KIND, either express or implied. See the License for the specific +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> @@ -56,8 +56,6 @@ <artifactId>commons-lang3</artifactId> <version>3.4</version> </dependency> - - </dependencies> <pluginRepositories> @@ -72,13 +70,13 @@ <enabled>false</enabled> </snapshots> </pluginRepository> - </pluginRepositories> - + </pluginRepositories> + <build> <resources> <resource> - <!-- Copy freemarker template and fmpp configuration files of Vector's + <!-- Copy freemarker template and fmpp configuration files of Vector's to allow clients to leverage definitions. --> <directory>${basedir}/src/main/codegen</directory> <targetPath>codegen</targetPath> @@ -129,7 +127,7 @@ </plugins> <pluginManagement> <plugins> - <!--This plugin's configuration is used to store Eclipse m2e settings + <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself. --> <plugin> <groupId>org.eclipse.m2e</groupId> @@ -160,8 +158,8 @@ </plugin> </plugins> </pluginManagement> - - + + </build> http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/vector/src/main/codegen/templates/NullableValueVectors.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/codegen/templates/NullableValueVectors.java b/java/vector/src/main/codegen/templates/NullableValueVectors.java index bafa317..48af7a2 100644 --- a/java/vector/src/main/codegen/templates/NullableValueVectors.java +++ b/java/vector/src/main/codegen/templates/NullableValueVectors.java @@ -145,7 +145,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type @Override public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) { org.apache.arrow.vector.BaseDataValueVector.load(getFieldInnerVectors(), ownBuffers); - // TODO: do something with the sizes in fieldNode? + bits.valueCount = fieldNode.getLength(); } public List<ArrowBuf> getFieldBuffers() { http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java index b7040da..4afd823 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java @@ -27,7 +27,6 @@ import org.apache.arrow.vector.schema.ArrowFieldNode; import org.apache.arrow.vector.schema.ArrowRecordBatch; import org.apache.arrow.vector.schema.VectorLayout; import org.apache.arrow.vector.types.pojo.Field; -import org.apache.arrow.vector.types.pojo.Schema; import com.google.common.collect.Iterators; @@ -37,22 +36,16 @@ import io.netty.buffer.ArrowBuf; * Loads buffers into vectors */ public class VectorLoader { - private final List<FieldVector> fieldVectors; - private final List<Field> fields; + private final VectorSchemaRoot root; /** * will create children in root based on schema * @param schema the expected schema * @param root the root to add vectors to based on schema */ - public VectorLoader(Schema schema, FieldVector root) { + public VectorLoader(VectorSchemaRoot root) { super(); - this.fields = schema.getFields(); - root.initializeChildrenFromFields(fields); - this.fieldVectors = root.getChildrenFromFields(); - if (this.fieldVectors.size() != fields.size()) { - throw new IllegalArgumentException("The root vector did not create the right number of children. found " + fieldVectors.size() + " expected " + fields.size()); - } + this.root = root; } /** @@ -63,16 +56,19 @@ public class VectorLoader { public void load(ArrowRecordBatch recordBatch) { Iterator<ArrowBuf> buffers = recordBatch.getBuffers().iterator(); Iterator<ArrowFieldNode> nodes = recordBatch.getNodes().iterator(); + List<Field> fields = root.getSchema().getFields(); for (int i = 0; i < fields.size(); ++i) { Field field = fields.get(i); - FieldVector fieldVector = fieldVectors.get(i); + FieldVector fieldVector = root.getVector(field.getName()); loadBuffers(fieldVector, field, buffers, nodes); } + root.setRowCount(recordBatch.getLength()); if (nodes.hasNext() || buffers.hasNext()) { throw new IllegalArgumentException("not all nodes and buffers where consumed. nodes: " + Iterators.toString(nodes) + " buffers: " + Iterators.toString(buffers)); } } + private void loadBuffers(FieldVector vector, Field field, Iterator<ArrowBuf> buffers, Iterator<ArrowFieldNode> nodes) { checkArgument(nodes.hasNext(), "no more field nodes for for field " + field + " and vector " + vector); @@ -85,7 +81,7 @@ public class VectorLoader { try { vector.loadFieldBuffers(fieldNode, ownBuffers); } catch (RuntimeException e) { - throw new IllegalArgumentException("Could not load buffers for field " + field); + throw new IllegalArgumentException("Could not load buffers for field " + field, e); } List<Field> children = field.getChildren(); if (children.size() > 0) { @@ -98,4 +94,5 @@ public class VectorLoader { } } } + } http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java new file mode 100644 index 0000000..1cbe187 --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; + +public class VectorSchemaRoot implements AutoCloseable { + + private final Schema schema; + private int rowCount; + private final List<FieldVector> fieldVectors; + private final Map<String, FieldVector> fieldVectorsMap = new HashMap<>(); + + public VectorSchemaRoot(FieldVector parent) { + this.schema = new Schema(parent.getField().getChildren()); + this.rowCount = parent.getAccessor().getValueCount(); + this.fieldVectors = parent.getChildrenFromFields(); + for (int i = 0; i < schema.getFields().size(); ++i) { + Field field = schema.getFields().get(i); + FieldVector vector = fieldVectors.get(i); + fieldVectorsMap.put(field.getName(), vector); + } + } + + public VectorSchemaRoot(Schema schema, BufferAllocator allocator) { + super(); + this.schema = schema; + List<FieldVector> fieldVectors = new ArrayList<>(); + for (Field field : schema.getFields()) { + MinorType minorType = Types.getMinorTypeForArrowType(field.getType()); + FieldVector vector = minorType.getNewVector(field.getName(), allocator, null); + vector.initializeChildrenFromFields(field.getChildren()); + fieldVectors.add(vector); + fieldVectorsMap.put(field.getName(), vector); + } + this.fieldVectors = Collections.unmodifiableList(fieldVectors); + if (this.fieldVectors.size() != schema.getFields().size()) { + throw new IllegalArgumentException("The root vector did not create the right number of children. found " + fieldVectors.size() + " expected " + schema.getFields().size()); + } + } + + public List<FieldVector> getFieldVectors() { + return fieldVectors; + } + + public FieldVector getVector(String name) { + return fieldVectorsMap.get(name); + } + + public Schema getSchema() { + return schema; + } + + public int getRowCount() { + return rowCount; + } + + public void setRowCount(int rowCount) { + this.rowCount = rowCount; + } + + @Override + public void close() { + RuntimeException ex = null; + for (FieldVector fieldVector : fieldVectors) { + try { + fieldVector.close(); + } catch (RuntimeException e) { + ex = chain(ex, e); + } + } + if (ex!= null) { + throw ex; + } + } + + private RuntimeException chain(RuntimeException root, RuntimeException e) { + if (root == null) { + root = e; + } else { + root.addSuppressed(e); + } + return root; + } + + private void printRow(StringBuilder sb, List<Object> row) { + boolean first = true; + for (Object v : row) { + if (first) { + first = false; + } else { + sb.append("\t"); + } + sb.append(v); + } + sb.append("\n"); + } + + public String contentToTSVString() { + StringBuilder sb = new StringBuilder(); + List<Object> row = new ArrayList<>(schema.getFields().size()); + for (Field field : schema.getFields()) { + row.add(field.getName()); + } + printRow(sb, row); + for (int i = 0; i < rowCount; i++) { + row.clear(); + for (FieldVector v : fieldVectors) { + row.add(v.getAccessor().getObject(i)); + } + printRow(sb, row); + } + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java index 3375a7d..e246218 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java @@ -34,11 +34,15 @@ public class VectorUnloader { private final int valueCount; private final List<FieldVector> vectors; - public VectorUnloader(FieldVector parent) { + public VectorUnloader(Schema schema, int valueCount, List<FieldVector> vectors) { super(); - this.schema = new Schema(parent.getField().getChildren()); - this.valueCount = parent.getAccessor().getValueCount(); - this.vectors = parent.getChildrenFromFields(); + this.schema = schema; + this.valueCount = valueCount; + this.vectors = vectors; + } + + public VectorUnloader(VectorSchemaRoot root) { + this(root.getSchema(), root.getRowCount(), root.getFieldVectors()); } public Schema getSchema() { @@ -77,4 +81,5 @@ public class VectorUnloader { appendNodes(child, nodes, buffers); } } + } http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java index 3aa3e52..4e2e200 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java @@ -78,4 +78,10 @@ public class ArrowBuffer implements FBSerializable { public int writeTo(FlatBufferBuilder builder) { return Buffer.createBuffer(builder, page, offset, size); } + + @Override + public String toString() { + return "ArrowBuffer [page=" + page + ", offset=" + offset + ", size=" + size + "]"; + } + } http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java index 9162efd..adb99e2 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java @@ -124,4 +124,12 @@ public class ArrowRecordBatch implements FBSerializable, AutoCloseable { } } + @Override + public String toString() { + return "ArrowRecordBatch [length=" + length + ", nodes=" + nodes + ", #buffers=" + buffers.size() + ", buffersLayout=" + + buffersLayout + ", closed=" + closed + "]"; + } + + + } http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java index 7dcb897..78f69ee 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java @@ -18,20 +18,18 @@ package org.apache.arrow.vector; import java.io.IOException; +import java.util.List; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.complex.MapVector; -import org.apache.arrow.vector.complex.NullableMapVector; import org.apache.arrow.vector.complex.impl.ComplexWriterImpl; -import org.apache.arrow.vector.complex.impl.SingleMapReaderImpl; -import org.apache.arrow.vector.complex.reader.BaseReader.MapReader; +import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter; import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter; import org.apache.arrow.vector.complex.writer.BigIntWriter; import org.apache.arrow.vector.complex.writer.IntWriter; import org.apache.arrow.vector.schema.ArrowRecordBatch; -import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.types.pojo.Schema; import org.junit.AfterClass; import org.junit.Assert; @@ -42,13 +40,15 @@ public class TestVectorUnloadLoad { static final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); @Test - public void test() throws IOException { + public void testUnloadLoad() throws IOException { int count = 10000; Schema schema; try ( BufferAllocator originalVectorsAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); MapVector parent = new MapVector("parent", originalVectorsAllocator, null)) { + + // write some data ComplexWriter writer = new ComplexWriterImpl("root", parent); MapWriter rootWriter = writer.rootAsMap(); IntWriter intWriter = rootWriter.integer("int"); @@ -61,28 +61,40 @@ public class TestVectorUnloadLoad { } writer.setValueCount(count); - VectorUnloader vectorUnloader = new VectorUnloader(parent.getChild("root")); - schema = vectorUnloader.getSchema(); - + // unload it + FieldVector root = parent.getChild("root"); + schema = new Schema(root.getField().getChildren()); + VectorUnloader vectorUnloader = newVectorUnloader(root); try ( ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch(); BufferAllocator finalVectorsAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); - MapVector newParent = new MapVector("parent", finalVectorsAllocator, null)) { - FieldVector root = newParent.addOrGet("root", MinorType.MAP, NullableMapVector.class); - VectorLoader vectorLoader = new VectorLoader(schema, root); + VectorSchemaRoot newRoot = new VectorSchemaRoot(schema, finalVectorsAllocator); + ) { + + // load it + VectorLoader vectorLoader = new VectorLoader(newRoot); vectorLoader.load(recordBatch); - MapReader rootReader = new SingleMapReaderImpl(newParent).reader("root"); + FieldReader intReader = newRoot.getVector("int").getReader(); + FieldReader bigIntReader = newRoot.getVector("bigInt").getReader(); for (int i = 0; i < count; i++) { - rootReader.setPosition(i); - Assert.assertEquals(i, rootReader.reader("int").readInteger().intValue()); - Assert.assertEquals(i, rootReader.reader("bigInt").readLong().longValue()); + intReader.setPosition(i); + Assert.assertEquals(i, intReader.readInteger().intValue()); + bigIntReader.setPosition(i); + Assert.assertEquals(i, bigIntReader.readLong().longValue()); } } } } + public static VectorUnloader newVectorUnloader(FieldVector root) { + Schema schema = new Schema(root.getField().getChildren()); + int valueCount = root.getAccessor().getValueCount(); + List<FieldVector> fields = root.getChildrenFromFields(); + return new VectorUnloader(schema, valueCount, fields); + } + @AfterClass public static void afterClass() { allocator.close(); http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java index 0f28d53..e97bc14 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java @@ -17,6 +17,8 @@ */ package org.apache.arrow.vector.file; +import static org.apache.arrow.vector.TestVectorUnloadLoad.newVectorUnloader; + import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -29,12 +31,12 @@ import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.ValueVector.Accessor; import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.VectorUnloader; import org.apache.arrow.vector.complex.MapVector; import org.apache.arrow.vector.complex.NullableMapVector; import org.apache.arrow.vector.complex.impl.ComplexWriterImpl; -import org.apache.arrow.vector.complex.impl.SingleMapReaderImpl; -import org.apache.arrow.vector.complex.reader.BaseReader.MapReader; +import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter; import org.apache.arrow.vector.complex.writer.BaseWriter.ListWriter; import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter; @@ -43,7 +45,6 @@ import org.apache.arrow.vector.complex.writer.IntWriter; import org.apache.arrow.vector.holders.NullableTimeStampHolder; import org.apache.arrow.vector.schema.ArrowBuffer; import org.apache.arrow.vector.schema.ArrowRecordBatch; -import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.types.pojo.Schema; import org.joda.time.DateTimeZone; import org.junit.After; @@ -94,8 +95,9 @@ public class TestArrowFile { BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null)) { writeComplexData(count, parent); - validateComplexContent(count, parent); - write(parent.getChild("root"), file); + FieldVector root = parent.getChild("root"); + validateComplexContent(count, new VectorSchemaRoot(root)); + write(root, file); } } @@ -174,33 +176,31 @@ public class TestArrowFile { // initialize vectors - NullableMapVector root = parent.addOrGet("root", MinorType.MAP, NullableMapVector.class); - - VectorLoader vectorLoader = new VectorLoader(schema, root); - - List<ArrowBlock> recordBatches = footer.getRecordBatches(); - for (ArrowBlock rbBlock : recordBatches) { - Assert.assertEquals(0, rbBlock.getOffset() % 8); - Assert.assertEquals(0, rbBlock.getMetadataLength() % 8); - try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { - List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout(); - for (ArrowBuffer arrowBuffer : buffersLayout) { - Assert.assertEquals(0, arrowBuffer.getOffset() % 8); + try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator)) { + VectorLoader vectorLoader = new VectorLoader(root); + + List<ArrowBlock> recordBatches = footer.getRecordBatches(); + for (ArrowBlock rbBlock : recordBatches) { + Assert.assertEquals(0, rbBlock.getOffset() % 8); + Assert.assertEquals(0, rbBlock.getMetadataLength() % 8); + try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { + List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout(); + for (ArrowBuffer arrowBuffer : buffersLayout) { + Assert.assertEquals(0, arrowBuffer.getOffset() % 8); + } + vectorLoader.load(recordBatch); } - vectorLoader.load(recordBatch); - } - validateContent(count, parent); + validateContent(count, root); + } } } } - private void validateContent(int count, MapVector parent) { - MapReader rootReader = new SingleMapReaderImpl(parent).reader("root"); + private void validateContent(int count, VectorSchemaRoot root) { for (int i = 0; i < count; i++) { - rootReader.setPosition(i); - Assert.assertEquals(i, rootReader.reader("int").readInteger().intValue()); - Assert.assertEquals(i, rootReader.reader("bigInt").readLong().longValue()); + Assert.assertEquals(i, root.getVector("int").getAccessor().getObject(i)); + Assert.assertEquals(Long.valueOf(i), root.getVector("bigInt").getAccessor().getObject(i)); } } @@ -231,15 +231,15 @@ public class TestArrowFile { // initialize vectors - NullableMapVector root = parent.addOrGet("root", MinorType.MAP, NullableMapVector.class); - VectorLoader vectorLoader = new VectorLoader(schema, root); - - List<ArrowBlock> recordBatches = footer.getRecordBatches(); - for (ArrowBlock rbBlock : recordBatches) { - try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { - vectorLoader.load(recordBatch); + try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator)) { + VectorLoader vectorLoader = new VectorLoader(root); + List<ArrowBlock> recordBatches = footer.getRecordBatches(); + for (ArrowBlock rbBlock : recordBatches) { + try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { + vectorLoader.load(recordBatch); + } + validateComplexContent(count, root); } - validateComplexContent(count, parent); } } } @@ -255,23 +255,23 @@ public class TestArrowFile { } } - private void validateComplexContent(int count, NullableMapVector parent) { - printVectors(parent.getChildrenFromFields()); - - MapReader rootReader = new SingleMapReaderImpl(parent).reader("root"); + private void validateComplexContent(int count, VectorSchemaRoot root) { + Assert.assertEquals(count, root.getRowCount()); + printVectors(root.getFieldVectors()); for (int i = 0; i < count; i++) { - rootReader.setPosition(i); - Assert.assertEquals(i, rootReader.reader("int").readInteger().intValue()); - Assert.assertEquals(i, rootReader.reader("bigInt").readLong().longValue()); - Assert.assertEquals(i % 3, rootReader.reader("list").size()); + Assert.assertEquals(i, root.getVector("int").getAccessor().getObject(i)); + Assert.assertEquals(Long.valueOf(i), root.getVector("bigInt").getAccessor().getObject(i)); + Assert.assertEquals(i % 3, ((List<?>)root.getVector("list").getAccessor().getObject(i)).size()); NullableTimeStampHolder h = new NullableTimeStampHolder(); - rootReader.reader("map").reader("timestamp").read(h); + FieldReader mapReader = root.getVector("map").getReader(); + mapReader.setPosition(i); + mapReader.reader("timestamp").read(h); Assert.assertEquals(i, h.value); } } private void write(FieldVector parent, File file) throws FileNotFoundException, IOException { - VectorUnloader vectorUnloader = new VectorUnloader(parent); + VectorUnloader vectorUnloader = newVectorUnloader(parent); Schema schema = vectorUnloader.getSchema(); LOGGER.debug("writing schema: " + schema); try ( @@ -294,7 +294,7 @@ public class TestArrowFile { MapVector parent = new MapVector("parent", originalVectorAllocator, null); FileOutputStream fileOutputStream = new FileOutputStream(file);) { writeData(count, parent); - VectorUnloader vectorUnloader = new VectorUnloader(parent.getChild("root")); + VectorUnloader vectorUnloader = newVectorUnloader(parent.getChild("root")); Schema schema = vectorUnloader.getSchema(); Assert.assertEquals(2, schema.getFields().size()); try (ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);) { @@ -320,20 +320,21 @@ public class TestArrowFile { ArrowFooter footer = arrowReader.readFooter(); Schema schema = footer.getSchema(); LOGGER.debug("reading schema: " + schema); - NullableMapVector root = parent.addOrGet("root", MinorType.MAP, NullableMapVector.class); - VectorLoader vectorLoader = new VectorLoader(schema, root); - List<ArrowBlock> recordBatches = footer.getRecordBatches(); - Assert.assertEquals(2, recordBatches.size()); - for (ArrowBlock rbBlock : recordBatches) { - Assert.assertEquals(0, rbBlock.getOffset() % 8); - Assert.assertEquals(0, rbBlock.getMetadataLength() % 8); - try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { - List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout(); - for (ArrowBuffer arrowBuffer : buffersLayout) { - Assert.assertEquals(0, arrowBuffer.getOffset() % 8); + try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator);) { + VectorLoader vectorLoader = new VectorLoader(root); + List<ArrowBlock> recordBatches = footer.getRecordBatches(); + Assert.assertEquals(2, recordBatches.size()); + for (ArrowBlock rbBlock : recordBatches) { + Assert.assertEquals(0, rbBlock.getOffset() % 8); + Assert.assertEquals(0, rbBlock.getMetadataLength() % 8); + try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { + List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout(); + for (ArrowBuffer arrowBuffer : buffersLayout) { + Assert.assertEquals(0, arrowBuffer.getOffset() % 8); + } + vectorLoader.load(recordBatch); + validateContent(count, root); } - vectorLoader.load(recordBatch); - validateContent(count, parent); } } } @@ -351,7 +352,7 @@ public class TestArrowFile { printVectors(parent.getChildrenFromFields()); - validateUnionData(count, parent); + validateUnionData(count, new VectorSchemaRoot(parent.getChild("root"))); write(parent.getChild("root"), file); } @@ -361,44 +362,42 @@ public class TestArrowFile { FileInputStream fileInputStream = new FileInputStream(file); ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator); BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); - NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null) ) { ArrowFooter footer = arrowReader.readFooter(); Schema schema = footer.getSchema(); LOGGER.debug("reading schema: " + schema); // initialize vectors - - NullableMapVector root = parent.addOrGet("root", MinorType.MAP, NullableMapVector.class); - VectorLoader vectorLoader = new VectorLoader(schema, root); - - List<ArrowBlock> recordBatches = footer.getRecordBatches(); - for (ArrowBlock rbBlock : recordBatches) { - try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { - vectorLoader.load(recordBatch); + try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator);) { + VectorLoader vectorLoader = new VectorLoader(root); + List<ArrowBlock> recordBatches = footer.getRecordBatches(); + for (ArrowBlock rbBlock : recordBatches) { + try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { + vectorLoader.load(recordBatch); + } + validateUnionData(count, root); } - validateUnionData(count, parent); } } } - public void validateUnionData(int count, MapVector parent) { - MapReader rootReader = new SingleMapReaderImpl(parent).reader("root"); + public void validateUnionData(int count, VectorSchemaRoot root) { + FieldReader unionReader = root.getVector("union").getReader(); for (int i = 0; i < count; i++) { - rootReader.setPosition(i); + unionReader.setPosition(i); switch (i % 4) { case 0: - Assert.assertEquals(i, rootReader.reader("union").readInteger().intValue()); + Assert.assertEquals(i, unionReader.readInteger().intValue()); break; case 1: - Assert.assertEquals(i, rootReader.reader("union").readLong().longValue()); + Assert.assertEquals(i, unionReader.readLong().longValue()); break; case 2: - Assert.assertEquals(i % 3, rootReader.reader("union").size()); + Assert.assertEquals(i % 3, unionReader.size()); break; case 3: NullableTimeStampHolder h = new NullableTimeStampHolder(); - rootReader.reader("union").reader("timestamp").read(h); + unionReader.reader("timestamp").read(h); Assert.assertEquals(i, h.value); break; }
