Repository: arrow Updated Branches: refs/heads/master 78288b5fc -> 841709627
ARROW-367: converter json <=> Arrow file format for Integration tests Author: Julien Le Dem <jul...@dremio.com> Closes #203 from julienledem/integration and squashes the following commits: b3cd326 [Julien Le Dem] add license fdbe03f [Julien Le Dem] ARROW-367: converter json <=> Arrow file format for Integration tests Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/84170962 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/84170962 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/84170962 Branch: refs/heads/master Commit: 84170962712b976fd6f68f10ba55e219155a57db Parents: 78288b5 Author: Julien Le Dem <jul...@dremio.com> Authored: Fri Nov 18 11:09:28 2016 -0500 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Fri Nov 18 11:09:28 2016 -0500 ---------------------------------------------------------------------- .../org/apache/arrow/tools/Integration.java | 262 +++++++++++++++++++ .../arrow/tools/ArrowFileTestFixtures.java | 122 +++++++++ .../apache/arrow/tools/TestFileRoundtrip.java | 101 +------ .../org/apache/arrow/tools/TestIntegration.java | 143 ++++++++++ .../arrow/vector/file/json/JsonFileReader.java | 37 +-- .../arrow/vector/file/json/JsonFileWriter.java | 3 +- 6 files changed, 554 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/84170962/java/tools/src/main/java/org/apache/arrow/tools/Integration.java ---------------------------------------------------------------------- diff --git a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java new file mode 100644 index 0000000..29f0ee2 --- /dev/null +++ b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.arrow.tools; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.file.ArrowBlock; +import org.apache.arrow.vector.file.ArrowFooter; +import org.apache.arrow.vector.file.ArrowReader; +import org.apache.arrow.vector.file.ArrowWriter; +import org.apache.arrow.vector.file.json.JsonFileReader; +import org.apache.arrow.vector.file.json.JsonFileWriter; +import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Objects; + +public class Integration { + private static final Logger LOGGER = LoggerFactory.getLogger(Integration.class); + + public static void main(String[] args) { + try { + new Integration().run(args); + } catch (ParseException e) { + fatalError("Invalid parameters", e); + } catch (IOException e) { + fatalError("Error accessing files", e); + } catch (RuntimeException e) { + fatalError("Incompatible files", e); + } + } + + private final Options options; + + enum Command { + ARROW_TO_JSON(true, false) { + @Override + public void execute(File arrowFile, File jsonFile) throws IOException { + try( + BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(arrowFile); + ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);) { + ArrowFooter footer = arrowReader.readFooter(); + Schema schema = footer.getSchema(); + LOGGER.debug("Input file size: " + arrowFile.length()); + LOGGER.debug("Found schema: " + schema); + try (JsonFileWriter writer = new JsonFileWriter(jsonFile);) { + writer.start(schema); + List<ArrowBlock> recordBatches = footer.getRecordBatches(); + for (ArrowBlock rbBlock : recordBatches) { + try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock); + VectorSchemaRoot root = new VectorSchemaRoot(schema, allocator);) { + VectorLoader vectorLoader = new VectorLoader(root); + vectorLoader.load(inRecordBatch); + writer.write(root); + } + } + } + LOGGER.debug("Output file size: " + jsonFile.length()); + } + } + }, + JSON_TO_ARROW(false, true) { + @Override + public void execute(File arrowFile, File jsonFile) throws IOException { + try ( + BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + JsonFileReader reader = new JsonFileReader(jsonFile, allocator); + ) { + Schema schema = reader.start(); + LOGGER.debug("Input file size: " + jsonFile.length()); + LOGGER.debug("Found schema: " + schema); + try ( + FileOutputStream fileOutputStream = new FileOutputStream(arrowFile); + ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema); + ) { + + // initialize vectors + VectorSchemaRoot root; + while ((root = reader.read()) != null) { + VectorUnloader vectorUnloader = new VectorUnloader(root); + try (ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();) { + arrowWriter.writeRecordBatch(recordBatch); + } + root.close(); + } + } + LOGGER.debug("Output file size: " + arrowFile.length()); + } + } + }, + VALIDATE(true, true) { + @Override + public void execute(File arrowFile, File jsonFile) throws IOException { + try ( + BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + JsonFileReader jsonReader = new JsonFileReader(jsonFile, allocator); + FileInputStream fileInputStream = new FileInputStream(arrowFile); + ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator); + ) { + Schema jsonSchema = jsonReader.start(); + ArrowFooter footer = arrowReader.readFooter(); + Schema arrowSchema = footer.getSchema(); + LOGGER.debug("Arrow Input file size: " + arrowFile.length()); + LOGGER.debug("ARROW schema: " + arrowSchema); + LOGGER.debug("JSON Input file size: " + jsonFile.length()); + LOGGER.debug("JSON schema: " + jsonSchema); + compareSchemas(jsonSchema, arrowSchema); + + List<ArrowBlock> recordBatches = footer.getRecordBatches(); + Iterator<ArrowBlock> iterator = recordBatches.iterator(); + VectorSchemaRoot jsonRoot; + while ((jsonRoot = jsonReader.read()) != null && iterator.hasNext()) { + ArrowBlock rbBlock = iterator.next(); + try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock); + VectorSchemaRoot arrowRoot = new VectorSchemaRoot(arrowSchema, allocator);) { + VectorLoader vectorLoader = new VectorLoader(arrowRoot); + vectorLoader.load(inRecordBatch); + // TODO: compare + compare(arrowRoot, jsonRoot); + } + jsonRoot.close(); + } + boolean hasMoreJSON = jsonRoot != null; + boolean hasMoreArrow = iterator.hasNext(); + if (hasMoreJSON || hasMoreArrow) { + throw new IllegalArgumentException("Unexpected RecordBatches. J:" + hasMoreJSON + " A:" + hasMoreArrow); + } + } + } + }; + + public final boolean arrowExists; + public final boolean jsonExists; + + Command(boolean arrowExists, boolean jsonExists) { + this.arrowExists = arrowExists; + this.jsonExists = jsonExists; + } + + abstract public void execute(File arrowFile, File jsonFile) throws IOException; + + } + + Integration() { + this.options = new Options(); + this.options.addOption("a", "arrow", true, "arrow file"); + this.options.addOption("j", "json", true, "json file"); + this.options.addOption("c", "command", true, "command to execute: " + Arrays.toString(Command.values())); + } + + private File validateFile(String type, String fileName, boolean shouldExist) { + if (fileName == null) { + throw new IllegalArgumentException("missing " + type + " file parameter"); + } + File f = new File(fileName); + if (shouldExist && (!f.exists() || f.isDirectory())) { + throw new IllegalArgumentException(type + " file not found: " + f.getAbsolutePath()); + } + if (!shouldExist && f.exists()) { + throw new IllegalArgumentException(type + " file already exists: " + f.getAbsolutePath()); + } + return f; + } + + void run(String[] args) throws ParseException, IOException { + CommandLineParser parser = new PosixParser(); + CommandLine cmd = parser.parse(options, args, false); + + + Command command = toCommand(cmd.getOptionValue("command")); + File arrowFile = validateFile("arrow", cmd.getOptionValue("arrow"), command.arrowExists); + File jsonFile = validateFile("json", cmd.getOptionValue("json"), command.jsonExists); + command.execute(arrowFile, jsonFile); + } + + private Command toCommand(String commandName) { + try { + return Command.valueOf(commandName); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Unknown command: " + commandName + " expected one of " + Arrays.toString(Command.values())); + } + } + + private static void fatalError(String message, Throwable e) { + System.err.println(message); + LOGGER.error(message, e); + System.exit(1); + } + + + private static void compare(VectorSchemaRoot arrowRoot, VectorSchemaRoot jsonRoot) { + compareSchemas(jsonRoot.getSchema(), arrowRoot.getSchema()); + if (arrowRoot.getRowCount() != jsonRoot.getRowCount()) { + throw new IllegalArgumentException("Different row count:\n" + arrowRoot.getRowCount() + "\n" + jsonRoot.getRowCount()); + } + List<FieldVector> arrowVectors = arrowRoot.getFieldVectors(); + List<FieldVector> jsonVectors = jsonRoot.getFieldVectors(); + if (arrowVectors.size() != jsonVectors.size()) { + throw new IllegalArgumentException("Different column count:\n" + arrowVectors.size() + "\n" + jsonVectors.size()); + } + for (int i = 0; i < arrowVectors.size(); i++) { + Field field = arrowRoot.getSchema().getFields().get(i); + FieldVector arrowVector = arrowVectors.get(i); + FieldVector jsonVector = jsonVectors.get(i); + int valueCount = arrowVector.getAccessor().getValueCount(); + if (valueCount != jsonVector.getAccessor().getValueCount()) { + throw new IllegalArgumentException("Different value count for field " + field + " : " + valueCount + " != " + jsonVector.getAccessor().getValueCount()); + } + for (int j = 0; j < valueCount; j++) { + Object arrow = arrowVector.getAccessor().getObject(j); + Object json = jsonVector.getAccessor().getObject(j); + if (!Objects.equal(arrow, json)) { + throw new IllegalArgumentException( + "Different values in column:\n" + field + " at index " + j + ": " + arrow + " != " + json); + } + } + } + } + + private static void compareSchemas(Schema jsonSchema, Schema arrowSchema) { + if (!arrowSchema.equals(jsonSchema)) { + throw new IllegalArgumentException("Different schemas:\n" + arrowSchema + "\n" + jsonSchema); + } + } +} http://git-wip-us.apache.org/repos/asf/arrow/blob/84170962/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java ---------------------------------------------------------------------- diff --git a/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java b/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java new file mode 100644 index 0000000..4cfc52f --- /dev/null +++ b/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.arrow.tools; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.List; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.impl.ComplexWriterImpl; +import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter; +import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter; +import org.apache.arrow.vector.complex.writer.BigIntWriter; +import org.apache.arrow.vector.complex.writer.IntWriter; +import org.apache.arrow.vector.file.ArrowBlock; +import org.apache.arrow.vector.file.ArrowFooter; +import org.apache.arrow.vector.file.ArrowReader; +import org.apache.arrow.vector.file.ArrowWriter; +import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.Assert; + +public class ArrowFileTestFixtures { + static final int COUNT = 10; + + static void writeData(int count, MapVector parent) { + ComplexWriter writer = new ComplexWriterImpl("root", parent); + MapWriter rootWriter = writer.rootAsMap(); + IntWriter intWriter = rootWriter.integer("int"); + BigIntWriter bigIntWriter = rootWriter.bigInt("bigInt"); + for (int i = 0; i < count; i++) { + intWriter.setPosition(i); + intWriter.writeInt(i); + bigIntWriter.setPosition(i); + bigIntWriter.writeBigInt(i); + } + writer.setValueCount(count); + } + + static void validateOutput(File testOutFile, BufferAllocator allocator) throws Exception { + // read + try ( + BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(testOutFile); + ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator); + BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); + ) { + ArrowFooter footer = arrowReader.readFooter(); + Schema schema = footer.getSchema(); + + // initialize vectors + try (VectorSchemaRoot root = new VectorSchemaRoot(schema, readerAllocator)) { + VectorLoader vectorLoader = new VectorLoader(root); + + List<ArrowBlock> recordBatches = footer.getRecordBatches(); + for (ArrowBlock rbBlock : recordBatches) { + try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { + vectorLoader.load(recordBatch); + } + validateContent(COUNT, root); + } + } + } + } + + static void validateContent(int count, VectorSchemaRoot root) { + Assert.assertEquals(count, root.getRowCount()); + for (int i = 0; i < count; i++) { + Assert.assertEquals(i, root.getVector("int").getAccessor().getObject(i)); + Assert.assertEquals(Long.valueOf(i), root.getVector("bigInt").getAccessor().getObject(i)); + } + } + + static void write(FieldVector parent, File file) throws FileNotFoundException, IOException { + Schema schema = new Schema(parent.getField().getChildren()); + int valueCount = parent.getAccessor().getValueCount(); + List<FieldVector> fields = parent.getChildrenFromFields(); + VectorUnloader vectorUnloader = new VectorUnloader(schema, valueCount, fields); + try ( + FileOutputStream fileOutputStream = new FileOutputStream(file); + ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema); + ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch(); + ) { + arrowWriter.writeRecordBatch(recordBatch); + } + } + + + static void writeInput(File testInFile, BufferAllocator allocator) throws FileNotFoundException, IOException { + int count = ArrowFileTestFixtures.COUNT; + try ( + BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); + MapVector parent = new MapVector("parent", vectorAllocator, null)) { + writeData(count, parent); + write(parent.getChild("root"), testInFile); + } + } +} http://git-wip-us.apache.org/repos/asf/arrow/blob/84170962/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java ---------------------------------------------------------------------- diff --git a/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java b/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java index 339725e..ee39f5e 100644 --- a/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java +++ b/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java @@ -18,42 +18,21 @@ */ package org.apache.arrow.tools; +import static org.apache.arrow.tools.ArrowFileTestFixtures.validateOutput; +import static org.apache.arrow.tools.ArrowFileTestFixtures.writeInput; import static org.junit.Assert.assertEquals; import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.List; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.FieldVector; -import org.apache.arrow.vector.VectorLoader; -import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.VectorUnloader; -import org.apache.arrow.vector.complex.MapVector; -import org.apache.arrow.vector.complex.impl.ComplexWriterImpl; -import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter; -import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter; -import org.apache.arrow.vector.complex.writer.BigIntWriter; -import org.apache.arrow.vector.complex.writer.IntWriter; -import org.apache.arrow.vector.file.ArrowBlock; -import org.apache.arrow.vector.file.ArrowFooter; -import org.apache.arrow.vector.file.ArrowReader; -import org.apache.arrow.vector.file.ArrowWriter; -import org.apache.arrow.vector.schema.ArrowRecordBatch; -import org.apache.arrow.vector.types.pojo.Schema; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; public class TestFileRoundtrip { - private static final int COUNT = 10; @Rule public TemporaryFolder testFolder = new TemporaryFolder(); @@ -70,90 +49,18 @@ public class TestFileRoundtrip { allocator.close(); } - private void writeData(int count, MapVector parent) { - ComplexWriter writer = new ComplexWriterImpl("root", parent); - MapWriter rootWriter = writer.rootAsMap(); - IntWriter intWriter = rootWriter.integer("int"); - BigIntWriter bigIntWriter = rootWriter.bigInt("bigInt"); - for (int i = 0; i < count; i++) { - intWriter.setPosition(i); - intWriter.writeInt(i); - bigIntWriter.setPosition(i); - bigIntWriter.writeBigInt(i); - } - writer.setValueCount(count); - } - @Test public void test() throws Exception { File testInFile = testFolder.newFile("testIn.arrow"); File testOutFile = testFolder.newFile("testOut.arrow"); - writeInput(testInFile); + writeInput(testInFile, allocator); String[] args = { "-i", testInFile.getAbsolutePath(), "-o", testOutFile.getAbsolutePath()}; int result = new FileRoundtrip(System.out, System.err).run(args); assertEquals(0, result); - validateOutput(testOutFile); - } - - private void validateOutput(File testOutFile) throws Exception { - // read - try ( - BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); - FileInputStream fileInputStream = new FileInputStream(testOutFile); - ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator); - BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); - ) { - ArrowFooter footer = arrowReader.readFooter(); - Schema schema = footer.getSchema(); - - // initialize vectors - try (VectorSchemaRoot root = new VectorSchemaRoot(schema, readerAllocator)) { - VectorLoader vectorLoader = new VectorLoader(root); - - List<ArrowBlock> recordBatches = footer.getRecordBatches(); - for (ArrowBlock rbBlock : recordBatches) { - try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { - vectorLoader.load(recordBatch); - } - validateContent(COUNT, root); - } - } - } - } - - private void validateContent(int count, VectorSchemaRoot root) { - Assert.assertEquals(count, root.getRowCount()); - for (int i = 0; i < count; i++) { - Assert.assertEquals(i, root.getVector("int").getAccessor().getObject(i)); - Assert.assertEquals(Long.valueOf(i), root.getVector("bigInt").getAccessor().getObject(i)); - } - } - - public void writeInput(File testInFile) throws FileNotFoundException, IOException { - int count = COUNT; - try ( - BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); - MapVector parent = new MapVector("parent", vectorAllocator, null)) { - writeData(count, parent); - write(parent.getChild("root"), testInFile); - } - } - - private void write(FieldVector parent, File file) throws FileNotFoundException, IOException { - Schema schema = new Schema(parent.getField().getChildren()); - int valueCount = parent.getAccessor().getValueCount(); - List<FieldVector> fields = parent.getChildrenFromFields(); - VectorUnloader vectorUnloader = new VectorUnloader(schema, valueCount, fields); - try ( - FileOutputStream fileOutputStream = new FileOutputStream(file); - ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema); - ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch(); - ) { - arrowWriter.writeRecordBatch(recordBatch); - } + validateOutput(testOutFile, allocator); } } http://git-wip-us.apache.org/repos/asf/arrow/blob/84170962/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java ---------------------------------------------------------------------- diff --git a/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java b/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java new file mode 100644 index 0000000..bb69ed1 --- /dev/null +++ b/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.arrow.tools; + +import static org.apache.arrow.tools.ArrowFileTestFixtures.validateOutput; +import static org.apache.arrow.tools.ArrowFileTestFixtures.write; +import static org.apache.arrow.tools.ArrowFileTestFixtures.writeData; +import static org.apache.arrow.tools.ArrowFileTestFixtures.writeInput; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.tools.Integration.Command; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.impl.ComplexWriterImpl; +import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter; +import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter; +import org.apache.arrow.vector.complex.writer.BigIntWriter; +import org.apache.arrow.vector.complex.writer.IntWriter; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestIntegration { + + @Rule + public TemporaryFolder testFolder = new TemporaryFolder(); + + private BufferAllocator allocator; + + @Before + public void init() { + allocator = new RootAllocator(Integer.MAX_VALUE); + } + + @After + public void tearDown() { + allocator.close(); + } + + @Test + public void testValid() throws Exception { + File testInFile = testFolder.newFile("testIn.arrow"); + File testJSONFile = testFolder.newFile("testOut.json"); + testJSONFile.delete(); + File testOutFile = testFolder.newFile("testOut.arrow"); + testOutFile.delete(); + + // generate an arow file + writeInput(testInFile, allocator); + + Integration integration = new Integration(); + + // convert it to json + String[] args1 = { "-arrow", testInFile.getAbsolutePath(), "-json", testJSONFile.getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()}; + integration.run(args1); + + // convert back to arrow + String[] args2 = { "-arrow", testOutFile.getAbsolutePath(), "-json", testJSONFile.getAbsolutePath(), "-command", Command.JSON_TO_ARROW.name()}; + integration.run(args2); + + // check it is the same + validateOutput(testOutFile, allocator); + + // validate arrow against json + String[] args3 = { "-arrow", testInFile.getAbsolutePath(), "-json", testJSONFile.getAbsolutePath(), "-command", Command.VALIDATE.name()}; + integration.run(args3); + } + + @Test + public void testInvalid() throws Exception { + File testValidInFile = testFolder.newFile("testValidIn.arrow"); + File testInvalidInFile = testFolder.newFile("testInvalidIn.arrow"); + File testJSONFile = testFolder.newFile("testInvalidOut.json"); + testJSONFile.delete(); + + // generate an arrow file + writeInput(testValidInFile, allocator); + // generate a different arrow file + writeInput2(testInvalidInFile, allocator); + + Integration integration = new Integration(); + + // convert the "valid" file to json + String[] args1 = { "-arrow", testValidInFile.getAbsolutePath(), "-json", testJSONFile.getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()}; + integration.run(args1); + + // compare the "invalid" file to the "valid" json + String[] args3 = { "-arrow", testInvalidInFile.getAbsolutePath(), "-json", testJSONFile.getAbsolutePath(), "-command", Command.VALIDATE.name()}; + // this should fail + try { + integration.run(args3); + fail("should have failed"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage(), e.getMessage().contains("Different values in column")); + Assert.assertTrue(e.getMessage(), e.getMessage().contains("999")); + } + + } + + static void writeInput2(File testInFile, BufferAllocator allocator) throws FileNotFoundException, IOException { + int count = ArrowFileTestFixtures.COUNT; + try ( + BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); + MapVector parent = new MapVector("parent", vectorAllocator, null)) { + writeData(count, parent); + ComplexWriter writer = new ComplexWriterImpl("root", parent); + MapWriter rootWriter = writer.rootAsMap(); + IntWriter intWriter = rootWriter.integer("int"); + BigIntWriter bigIntWriter = rootWriter.bigInt("bigInt"); + intWriter.setPosition(5); + intWriter.writeInt(999); + bigIntWriter.setPosition(4); + bigIntWriter.writeBigInt(777L); + writer.setValueCount(count); + write(parent.getChild("root"), testInFile); + } + } + +} http://git-wip-us.apache.org/repos/asf/arrow/blob/84170962/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java b/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java index 859a3a0..f07b517 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java @@ -56,7 +56,7 @@ import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.databind.MappingJsonFactory; import com.google.common.base.Objects; -public class JsonFileReader { +public class JsonFileReader implements AutoCloseable { private final File inputFile; private final JsonParser parser; private final BufferAllocator allocator; @@ -81,23 +81,29 @@ public class JsonFileReader { } public VectorSchemaRoot read() throws IOException { - VectorSchemaRoot recordBatch = new VectorSchemaRoot(schema, allocator); - readToken(START_OBJECT); - { - int count = readNextField("count", Integer.class); - recordBatch.setRowCount(count); - nextFieldIs("columns"); - readToken(START_ARRAY); + JsonToken t = parser.nextToken(); + if (t == START_OBJECT) { + VectorSchemaRoot recordBatch = new VectorSchemaRoot(schema, allocator); { - for (Field field : schema.getFields()) { - FieldVector vector = recordBatch.getVector(field.getName()); - readVector(field, vector); + int count = readNextField("count", Integer.class); + recordBatch.setRowCount(count); + nextFieldIs("columns"); + readToken(START_ARRAY); + { + for (Field field : schema.getFields()) { + FieldVector vector = recordBatch.getVector(field.getName()); + readVector(field, vector); + } } + readToken(END_ARRAY); } - readToken(END_ARRAY); + readToken(END_OBJECT); + return recordBatch; + } else if (t == END_ARRAY) { + return null; + } else { + throw new IllegalArgumentException("Invalid token: " + t); } - readToken(END_OBJECT); - return recordBatch; } private void readVector(Field field, FieldVector vector) throws JsonParseException, IOException { @@ -194,9 +200,8 @@ public class JsonFileReader { } } + @Override public void close() throws IOException { - readToken(END_ARRAY); - readToken(END_OBJECT); parser.close(); } http://git-wip-us.apache.org/repos/asf/arrow/blob/84170962/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java index 47c1a7d..812b3da 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java @@ -38,7 +38,7 @@ import com.fasterxml.jackson.core.util.DefaultPrettyPrinter; import com.fasterxml.jackson.core.util.DefaultPrettyPrinter.NopIndenter; import com.fasterxml.jackson.databind.MappingJsonFactory; -public class JsonFileWriter { +public class JsonFileWriter implements AutoCloseable { public static final class JSONWriteConfig { private final boolean pretty; @@ -158,6 +158,7 @@ public class JsonFileWriter { } } + @Override public void close() throws IOException { generator.writeEndArray(); generator.writeEndObject();