ARROW-542: Adding dictionary encoding to FileWriter WIP for comments
Author: Emilio Lahr-Vivaz <[email protected]> Author: Wes McKinney <[email protected]> Closes #334 from elahrvivaz/ARROW-542 and squashes the following commits: 5339730 [Emilio Lahr-Vivaz] fixing bitvector load of value count, adding struct integration test 00d78d3 [Emilio Lahr-Vivaz] fixing set bit validity value in NullableMapVector load 1679934 [Emilio Lahr-Vivaz] cleaning up license 70639e0 [Emilio Lahr-Vivaz] restoring vector loader test bde4eee [Wes McKinney] Handle 0-length message indicator for EOS in C++ StreamReader a24854b [Emilio Lahr-Vivaz] fixing StreamToFile conversion 2ee7cfb [Emilio Lahr-Vivaz] fixing FileToStream conversion adec200 [Emilio Lahr-Vivaz] making arrow magic static, cleanup 8366288 [Emilio Lahr-Vivaz] making magic array private 127937f [Emilio Lahr-Vivaz] removing qualifier for magic db9a007 [Emilio Lahr-Vivaz] adding dictionary tests to echo server 95c7b2a [Emilio Lahr-Vivaz] cleanup 45caa02 [Emilio Lahr-Vivaz] reverting basewriter dictionary methods 682db6f [Emilio Lahr-Vivaz] cleanup a1508b9 [Emilio Lahr-Vivaz] removing dictionary vector method (instead use field.dictionary) 43c28af [Emilio Lahr-Vivaz] adding test for nested dictionary encoded list 92a1e6f [Emilio Lahr-Vivaz] fixing imports e567564 [Emilio Lahr-Vivaz] adding field size check in vectorschemaroot 568fda5 [Emilio Lahr-Vivaz] imports, formatting 363308e [Emilio Lahr-Vivaz] fixing tests 2f69be1 [Emilio Lahr-Vivaz] not passing around dictionary vectors with dictionary fields, adding dictionary encoding to fields, restoring vector loader/unloader e5c8e02 [Emilio Lahr-Vivaz] Merging dictionary unloader/loader with arrow writer/reader Creating base class for stream/file writer Creating base class with visitors for arrow messages Indentation fixes Other cleanup d095f3f [Emilio Lahr-Vivaz] ARROW-542: Adding dictionary encoding to file and stream writing Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/49f666e7 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/49f666e7 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/49f666e7 Branch: refs/heads/master Commit: 49f666e740208d1e6167537f141f27b6b78b77cb Parents: 3b65001 Author: Emilio Lahr-Vivaz <[email protected]> Authored: Thu Mar 16 13:59:53 2017 -0400 Committer: Wes McKinney <[email protected]> Committed: Thu Mar 16 13:59:53 2017 -0400 ---------------------------------------------------------------------- cpp/src/arrow/ipc/reader.cc | 6 + integration/integration_test.py | 4 + .../java/org/apache/arrow/tools/EchoServer.java | 48 +- .../org/apache/arrow/tools/FileRoundtrip.java | 48 +- .../org/apache/arrow/tools/FileToStream.java | 27 +- .../org/apache/arrow/tools/Integration.java | 83 +-- .../org/apache/arrow/tools/StreamToFile.java | 19 +- .../arrow/tools/ArrowFileTestFixtures.java | 51 +- .../org/apache/arrow/tools/EchoServerTest.java | 280 ++++++-- .../org/apache/arrow/tools/TestIntegration.java | 38 +- java/tools/tmptestfilesio | Bin 0 -> 628 bytes .../src/main/codegen/templates/MapWriters.java | 8 +- .../codegen/templates/NullableValueVectors.java | 40 +- .../src/main/codegen/templates/UnionVector.java | 10 +- .../java/org/apache/arrow/vector/BitVector.java | 2 +- .../org/apache/arrow/vector/FieldVector.java | 4 +- .../org/apache/arrow/vector/VectorLoader.java | 13 +- .../apache/arrow/vector/VectorSchemaRoot.java | 32 +- .../org/apache/arrow/vector/VectorUnloader.java | 27 +- .../vector/complex/AbstractContainerVector.java | 3 +- .../arrow/vector/complex/AbstractMapVector.java | 9 +- .../vector/complex/BaseRepeatedValueVector.java | 5 +- .../arrow/vector/complex/DictionaryVector.java | 229 ------- .../apache/arrow/vector/complex/ListVector.java | 26 +- .../apache/arrow/vector/complex/MapVector.java | 5 +- .../arrow/vector/complex/NullableMapVector.java | 9 +- .../vector/complex/impl/ComplexWriterImpl.java | 6 +- .../vector/complex/impl/PromotableWriter.java | 5 +- .../arrow/vector/dictionary/Dictionary.java | 66 ++ .../vector/dictionary/DictionaryEncoder.java | 144 ++++ .../vector/dictionary/DictionaryProvider.java | 47 ++ .../arrow/vector/file/ArrowFileReader.java | 142 ++++ .../arrow/vector/file/ArrowFileWriter.java | 59 ++ .../apache/arrow/vector/file/ArrowFooter.java | 1 - .../apache/arrow/vector/file/ArrowMagic.java | 37 ++ .../apache/arrow/vector/file/ArrowReader.java | 222 +++++-- .../apache/arrow/vector/file/ArrowWriter.java | 173 +++-- .../apache/arrow/vector/file/ReadChannel.java | 11 +- .../arrow/vector/file/SeekableReadChannel.java | 39 ++ .../apache/arrow/vector/file/WriteChannel.java | 7 +- .../arrow/vector/file/json/JsonFileReader.java | 26 +- .../vector/schema/ArrowDictionaryBatch.java | 60 ++ .../arrow/vector/schema/ArrowMessage.java | 30 + .../arrow/vector/schema/ArrowRecordBatch.java | 8 +- .../arrow/vector/stream/ArrowStreamReader.java | 88 +-- .../arrow/vector/stream/ArrowStreamWriter.java | 75 +-- .../arrow/vector/stream/MessageSerializer.java | 164 ++++- .../apache/arrow/vector/types/Dictionary.java | 40 -- .../org/apache/arrow/vector/types/Types.java | 114 ++-- .../vector/types/pojo/DictionaryEncoding.java | 51 ++ .../apache/arrow/vector/types/pojo/Field.java | 59 +- .../apache/arrow/vector/TestDecimalVector.java | 2 +- .../arrow/vector/TestDictionaryVector.java | 82 +-- .../org/apache/arrow/vector/TestListVector.java | 4 +- .../apache/arrow/vector/TestValueVector.java | 12 +- .../arrow/vector/TestVectorUnloadLoad.java | 22 +- .../complex/impl/TestPromotableWriter.java | 2 +- .../complex/writer/TestComplexWriter.java | 14 +- .../apache/arrow/vector/file/TestArrowFile.java | 665 ++++++++++++------- .../vector/file/TestArrowReaderWriter.java | 28 +- .../arrow/vector/file/TestArrowStream.java | 102 +++ .../arrow/vector/file/TestArrowStreamPipe.java | 163 +++++ .../arrow/vector/file/json/TestJSONFile.java | 4 +- .../vector/stream/MessageSerializerTest.java | 8 +- .../arrow/vector/stream/TestArrowStream.java | 96 --- .../vector/stream/TestArrowStreamPipe.java | 129 ---- 66 files changed, 2522 insertions(+), 1511 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/cpp/src/arrow/ipc/reader.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 9734166..4cb5f6c 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -78,6 +78,12 @@ class StreamReader::StreamReaderImpl { int32_t message_length = *reinterpret_cast<const int32_t*>(buffer->data()); + if (message_length == 0) { + // Optional 0 EOS control message + *message = nullptr; + return Status::OK(); + } + RETURN_NOT_OK(stream_->Read(message_length, &buffer)); if (buffer->size() != message_length) { return Status::IOError("Unexpected end of stream trying to read message"); http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/integration/integration_test.py ---------------------------------------------------------------------- diff --git a/integration/integration_test.py b/integration/integration_test.py index 049436a..5cd63c5 100644 --- a/integration/integration_test.py +++ b/integration/integration_test.py @@ -680,12 +680,16 @@ class JavaTester(Tester): cmd = ['java', '-cp', self.ARROW_TOOLS_JAR, 'org.apache.arrow.tools.StreamToFile', stream_path, file_path] + if self.debug: + print(' '.join(cmd)) run_cmd(cmd) def file_to_stream(self, file_path, stream_path): cmd = ['java', '-cp', self.ARROW_TOOLS_JAR, 'org.apache.arrow.tools.FileToStream', file_path, stream_path] + if self.debug: + print(' '.join(cmd)) run_cmd(cmd) http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java ---------------------------------------------------------------------- diff --git a/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java index c00620e..7c0cadd 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java @@ -18,23 +18,19 @@ package org.apache.arrow.tools; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; -import java.util.ArrayList; -import java.util.List; + +import com.google.common.base.Preconditions; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.stream.ArrowStreamReader; import org.apache.arrow.vector.stream.ArrowStreamWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; - public class EchoServer { private static final Logger LOGGER = LoggerFactory.getLogger(EchoServer.class); @@ -57,30 +53,28 @@ public class EchoServer { public void run() throws IOException { BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); - List<ArrowRecordBatch> batches = new ArrayList<ArrowRecordBatch>(); - try ( - InputStream in = socket.getInputStream(); - OutputStream out = socket.getOutputStream(); - ArrowStreamReader reader = new ArrowStreamReader(in, allocator); - ) { - // Read the entire input stream. - reader.init(); - while (true) { - ArrowRecordBatch batch = reader.nextRecordBatch(); - if (batch == null) break; - batches.add(batch); - } - LOGGER.info(String.format("Received %d batches", batches.size())); - - // Write it back - try (ArrowStreamWriter writer = new ArrowStreamWriter(out, reader.getSchema())) { - for (ArrowRecordBatch batch: batches) { - writer.writeRecordBatch(batch); + // Read the entire input stream and write it back + try (ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) { + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + // load the first batch before instantiating the writer so that we have any dictionaries + reader.loadNextBatch(); + try (ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, socket.getOutputStream())) { + writer.start(); + int echoed = 0; + while (true) { + int rowCount = reader.getVectorSchemaRoot().getRowCount(); + if (rowCount == 0) { + break; + } else { + writer.writeBatch(); + echoed += rowCount; + reader.loadNextBatch(); + } } writer.end(); Preconditions.checkState(reader.bytesRead() == writer.bytesWritten()); + LOGGER.info(String.format("Echoed %d records", echoed)); } - LOGGER.info("Done writing stream back."); } } http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java ---------------------------------------------------------------------- diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java index db7a1c2..9fa7b76 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java @@ -23,18 +23,12 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintStream; -import java.util.List; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.VectorLoader; import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.VectorUnloader; -import org.apache.arrow.vector.file.ArrowBlock; -import org.apache.arrow.vector.file.ArrowFooter; -import org.apache.arrow.vector.file.ArrowReader; -import org.apache.arrow.vector.file.ArrowWriter; -import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.file.ArrowFileReader; +import org.apache.arrow.vector.file.ArrowFileWriter; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -86,35 +80,27 @@ public class FileRoundtrip { File inFile = validateFile("input", inFileName); File outFile = validateFile("output", outFileName); BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); // TODO: close - try( - FileInputStream fileInputStream = new FileInputStream(inFile); - ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);) { + try (FileInputStream fileInputStream = new FileInputStream(inFile); + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) { - ArrowFooter footer = arrowReader.readFooter(); - Schema schema = footer.getSchema(); + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); LOGGER.debug("Input file size: " + inFile.length()); LOGGER.debug("Found schema: " + schema); - try ( - FileOutputStream fileOutputStream = new FileOutputStream(outFile); - ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema); - ) { - - // initialize vectors - - List<ArrowBlock> recordBatches = footer.getRecordBatches(); - for (ArrowBlock rbBlock : recordBatches) { - try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock); - VectorSchemaRoot root = new VectorSchemaRoot(schema, allocator);) { - - VectorLoader vectorLoader = new VectorLoader(root); - vectorLoader.load(inRecordBatch); - - VectorUnloader vectorUnloader = new VectorUnloader(root); - ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch(); - arrowWriter.writeRecordBatch(recordBatch); + try (FileOutputStream fileOutputStream = new FileOutputStream(outFile); + ArrowFileWriter arrowWriter = new ArrowFileWriter(root, arrowReader, fileOutputStream.getChannel())) { + arrowWriter.start(); + while (true) { + arrowReader.loadNextBatch(); + int loaded = root.getRowCount(); + if (loaded == 0) { + break; + } else { + arrowWriter.writeBatch(); } } + arrowWriter.end(); } LOGGER.debug("Output file size: " + outFile.length()); } http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java ---------------------------------------------------------------------- diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java index ba6505c..d534553 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java @@ -25,10 +25,8 @@ import java.io.OutputStream; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.file.ArrowBlock; -import org.apache.arrow.vector.file.ArrowFooter; -import org.apache.arrow.vector.file.ArrowReader; -import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.file.ArrowFileReader; import org.apache.arrow.vector.stream.ArrowStreamWriter; /** @@ -36,19 +34,20 @@ import org.apache.arrow.vector.stream.ArrowStreamWriter; * first argument and the output is written to standard out. */ public class FileToStream { + public static void convert(FileInputStream in, OutputStream out) throws IOException { BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); - try( - ArrowReader reader = new ArrowReader(in.getChannel(), allocator);) { - ArrowFooter footer = reader.readFooter(); - try ( - ArrowStreamWriter writer = new ArrowStreamWriter(out, footer.getSchema()); - ) { - for (ArrowBlock block: footer.getRecordBatches()) { - try (ArrowRecordBatch batch = reader.readRecordBatch(block)) { - writer.writeRecordBatch(batch); - } + try (ArrowFileReader reader = new ArrowFileReader(in.getChannel(), allocator)) { + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + // load the first batch before instantiating the writer so that we have any dictionaries + reader.loadNextBatch(); + try (ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, out)) { + writer.start(); + while (root.getRowCount() > 0) { + writer.writeBatch(); + reader.loadNextBatch(); } + writer.end(); } } } http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/main/java/org/apache/arrow/tools/Integration.java ---------------------------------------------------------------------- diff --git a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java index 36d4ee5..5d4849c 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java @@ -28,16 +28,12 @@ import java.util.List; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.VectorLoader; import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.VectorUnloader; import org.apache.arrow.vector.file.ArrowBlock; -import org.apache.arrow.vector.file.ArrowFooter; -import org.apache.arrow.vector.file.ArrowReader; -import org.apache.arrow.vector.file.ArrowWriter; +import org.apache.arrow.vector.file.ArrowFileReader; +import org.apache.arrow.vector.file.ArrowFileWriter; import org.apache.arrow.vector.file.json.JsonFileReader; import org.apache.arrow.vector.file.json.JsonFileWriter; -import org.apache.arrow.vector.schema.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.arrow.vector.util.Validator; import org.apache.commons.cli.CommandLine; @@ -69,24 +65,18 @@ public class Integration { ARROW_TO_JSON(true, false) { @Override public void execute(File arrowFile, File jsonFile) throws IOException { - try( - BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + try(BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); FileInputStream fileInputStream = new FileInputStream(arrowFile); - ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);) { - ArrowFooter footer = arrowReader.readFooter(); - Schema schema = footer.getSchema(); + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); LOGGER.debug("Input file size: " + arrowFile.length()); LOGGER.debug("Found schema: " + schema); - try (JsonFileWriter writer = new JsonFileWriter(jsonFile, JsonFileWriter.config().pretty(true));) { + try (JsonFileWriter writer = new JsonFileWriter(jsonFile, JsonFileWriter.config().pretty(true))) { writer.start(schema); - List<ArrowBlock> recordBatches = footer.getRecordBatches(); - for (ArrowBlock rbBlock : recordBatches) { - try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock); - VectorSchemaRoot root = new VectorSchemaRoot(schema, allocator);) { - VectorLoader vectorLoader = new VectorLoader(root); - vectorLoader.load(inRecordBatch); - writer.write(root); - } + for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) { + arrowReader.loadRecordBatch(rbBlock); + writer.write(root); } } LOGGER.debug("Output file size: " + jsonFile.length()); @@ -96,27 +86,22 @@ public class Integration { JSON_TO_ARROW(false, true) { @Override public void execute(File arrowFile, File jsonFile) throws IOException { - try ( - BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); - JsonFileReader reader = new JsonFileReader(jsonFile, allocator); - ) { + try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + JsonFileReader reader = new JsonFileReader(jsonFile, allocator)) { Schema schema = reader.start(); LOGGER.debug("Input file size: " + jsonFile.length()); LOGGER.debug("Found schema: " + schema); - try ( - FileOutputStream fileOutputStream = new FileOutputStream(arrowFile); - ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema); - ) { - - // initialize vectors - VectorSchemaRoot root; - while ((root = reader.read()) != null) { - VectorUnloader vectorUnloader = new VectorUnloader(root); - try (ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();) { - arrowWriter.writeRecordBatch(recordBatch); - } - root.close(); + try (FileOutputStream fileOutputStream = new FileOutputStream(arrowFile); + VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator); + // TODO json dictionaries + ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream.getChannel())) { + arrowWriter.start(); + reader.read(root); + while (root.getRowCount() != 0) { + arrowWriter.writeBatch(); + reader.read(root); } + arrowWriter.end(); } LOGGER.debug("Output file size: " + arrowFile.length()); } @@ -125,32 +110,26 @@ public class Integration { VALIDATE(true, true) { @Override public void execute(File arrowFile, File jsonFile) throws IOException { - try ( - BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); - JsonFileReader jsonReader = new JsonFileReader(jsonFile, allocator); - FileInputStream fileInputStream = new FileInputStream(arrowFile); - ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator); - ) { + try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + JsonFileReader jsonReader = new JsonFileReader(jsonFile, allocator); + FileInputStream fileInputStream = new FileInputStream(arrowFile); + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) { Schema jsonSchema = jsonReader.start(); - ArrowFooter footer = arrowReader.readFooter(); - Schema arrowSchema = footer.getSchema(); + VectorSchemaRoot arrowRoot = arrowReader.getVectorSchemaRoot(); + Schema arrowSchema = arrowRoot.getSchema(); LOGGER.debug("Arrow Input file size: " + arrowFile.length()); LOGGER.debug("ARROW schema: " + arrowSchema); LOGGER.debug("JSON Input file size: " + jsonFile.length()); LOGGER.debug("JSON schema: " + jsonSchema); Validator.compareSchemas(jsonSchema, arrowSchema); - List<ArrowBlock> recordBatches = footer.getRecordBatches(); + List<ArrowBlock> recordBatches = arrowReader.getRecordBlocks(); Iterator<ArrowBlock> iterator = recordBatches.iterator(); VectorSchemaRoot jsonRoot; while ((jsonRoot = jsonReader.read()) != null && iterator.hasNext()) { ArrowBlock rbBlock = iterator.next(); - try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock); - VectorSchemaRoot arrowRoot = new VectorSchemaRoot(arrowSchema, allocator);) { - VectorLoader vectorLoader = new VectorLoader(arrowRoot); - vectorLoader.load(inRecordBatch); - Validator.compareVectorSchemaRoot(arrowRoot, jsonRoot); - } + arrowReader.loadRecordBatch(rbBlock); + Validator.compareVectorSchemaRoot(arrowRoot, jsonRoot); jsonRoot.close(); } boolean hasMoreJSON = jsonRoot != null; http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java ---------------------------------------------------------------------- diff --git a/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java index c8a5c89..3b79d5b 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java @@ -27,8 +27,8 @@ import java.nio.channels.Channels; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.file.ArrowWriter; -import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.file.ArrowFileWriter; import org.apache.arrow.vector.stream.ArrowStreamReader; /** @@ -38,13 +38,16 @@ public class StreamToFile { public static void convert(InputStream in, OutputStream out) throws IOException { BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator)) { - reader.init(); - try (ArrowWriter writer = new ArrowWriter(Channels.newChannel(out), reader.getSchema());) { - while (true) { - ArrowRecordBatch batch = reader.nextRecordBatch(); - if (batch == null) break; - writer.writeRecordBatch(batch); + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + // load the first batch before instantiating the writer so that we have any dictionaries + reader.loadNextBatch(); + try (ArrowFileWriter writer = new ArrowFileWriter(root, reader, Channels.newChannel(out))) { + writer.start(); + while (root.getRowCount() > 0) { + writer.writeBatch(); + reader.loadNextBatch(); } + writer.end(); } } } http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java ---------------------------------------------------------------------- diff --git a/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java b/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java index 4cfc52f..f752f7e 100644 --- a/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java +++ b/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java @@ -23,13 +23,10 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; -import java.util.List; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.FieldVector; -import org.apache.arrow.vector.VectorLoader; import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.VectorUnloader; import org.apache.arrow.vector.complex.MapVector; import org.apache.arrow.vector.complex.impl.ComplexWriterImpl; import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter; @@ -37,10 +34,8 @@ import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter; import org.apache.arrow.vector.complex.writer.BigIntWriter; import org.apache.arrow.vector.complex.writer.IntWriter; import org.apache.arrow.vector.file.ArrowBlock; -import org.apache.arrow.vector.file.ArrowFooter; -import org.apache.arrow.vector.file.ArrowReader; -import org.apache.arrow.vector.file.ArrowWriter; -import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.file.ArrowFileReader; +import org.apache.arrow.vector.file.ArrowFileWriter; import org.apache.arrow.vector.types.pojo.Schema; import org.junit.Assert; @@ -63,26 +58,14 @@ public class ArrowFileTestFixtures { static void validateOutput(File testOutFile, BufferAllocator allocator) throws Exception { // read - try ( - BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); - FileInputStream fileInputStream = new FileInputStream(testOutFile); - ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator); - BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); - ) { - ArrowFooter footer = arrowReader.readFooter(); - Schema schema = footer.getSchema(); - - // initialize vectors - try (VectorSchemaRoot root = new VectorSchemaRoot(schema, readerAllocator)) { - VectorLoader vectorLoader = new VectorLoader(root); - - List<ArrowBlock> recordBatches = footer.getRecordBatches(); - for (ArrowBlock rbBlock : recordBatches) { - try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) { - vectorLoader.load(recordBatch); - } - validateContent(COUNT, root); - } + try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(testOutFile); + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); + for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) { + arrowReader.loadRecordBatch(rbBlock); + validateContent(COUNT, root); } } } @@ -96,16 +79,10 @@ public class ArrowFileTestFixtures { } static void write(FieldVector parent, File file) throws FileNotFoundException, IOException { - Schema schema = new Schema(parent.getField().getChildren()); - int valueCount = parent.getAccessor().getValueCount(); - List<FieldVector> fields = parent.getChildrenFromFields(); - VectorUnloader vectorUnloader = new VectorUnloader(schema, valueCount, fields); - try ( - FileOutputStream fileOutputStream = new FileOutputStream(file); - ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema); - ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch(); - ) { - arrowWriter.writeRecordBatch(recordBatch); + VectorSchemaRoot root = new VectorSchemaRoot(parent); + try (FileOutputStream fileOutputStream = new FileOutputStream(file); + ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream.getChannel())) { + arrowWriter.writeBatch(); } } http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java ---------------------------------------------------------------------- diff --git a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java index 48d6162..706f8e2 100644 --- a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java +++ b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java @@ -24,106 +24,268 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.net.Socket; import java.net.UnknownHostException; -import java.util.ArrayList; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import com.google.common.collect.ImmutableList; + import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.schema.ArrowFieldNode; -import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.NullableIntVector; +import org.apache.arrow.vector.NullableTinyIntVector; +import org.apache.arrow.vector.NullableVarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.impl.UnionListWriter; +import org.apache.arrow.vector.dictionary.Dictionary; +import org.apache.arrow.vector.dictionary.DictionaryProvider; +import org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider; import org.apache.arrow.vector.stream.ArrowStreamReader; import org.apache.arrow.vector.stream.ArrowStreamWriter; +import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.ArrowType.Int; +import org.apache.arrow.vector.types.pojo.DictionaryEncoding; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.Text; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; -import io.netty.buffer.ArrowBuf; - public class EchoServerTest { - public static ArrowBuf buf(BufferAllocator alloc, byte[] bytes) { - ArrowBuf buffer = alloc.buffer(bytes.length); - buffer.writeBytes(bytes); - return buffer; + + private static EchoServer server; + private static int serverPort; + private static Thread serverThread; + + @BeforeClass + public static void startEchoServer() throws IOException { + server = new EchoServer(0); + serverPort = server.port(); + serverThread = new Thread() { + @Override + public void run() { + try { + server.run(); + } catch (IOException e) { + e.printStackTrace(); + } + } + }; + serverThread.start(); } - public static byte[] array(ArrowBuf buf) { - byte[] bytes = new byte[buf.readableBytes()]; - buf.readBytes(bytes); - return bytes; + @AfterClass + public static void stopEchoServer() throws IOException, InterruptedException { + server.close(); + serverThread.join(); } - private void testEchoServer(int serverPort, Schema schema, List<ArrowRecordBatch> batches) + private void testEchoServer(int serverPort, + Field field, + NullableTinyIntVector vector, + int batches) throws UnknownHostException, IOException { BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE); + VectorSchemaRoot root = new VectorSchemaRoot(asList(field), asList((FieldVector) vector), 0); try (Socket socket = new Socket("localhost", serverPort); - ArrowStreamWriter writer = new ArrowStreamWriter(socket.getOutputStream(), schema); + ArrowStreamWriter writer = new ArrowStreamWriter(root, null, socket.getOutputStream()); ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), alloc)) { - for (ArrowRecordBatch batch: batches) { - writer.writeRecordBatch(batch); + writer.start(); + for (int i = 0; i < batches; i++) { + vector.allocateNew(16); + for (int j = 0; j < 8; j++) { + vector.getMutator().set(j, j + i); + vector.getMutator().set(j + 8, 0, (byte) (j + i)); + } + vector.getMutator().setValueCount(16); + root.setRowCount(16); + writer.writeBatch(); } writer.end(); - reader.init(); - assertEquals(schema, reader.getSchema()); - for (int i = 0; i < batches.size(); i++) { - ArrowRecordBatch result = reader.nextRecordBatch(); - ArrowRecordBatch expected = batches.get(i); - assertTrue(result != null); - assertEquals(expected.getBuffers().size(), result.getBuffers().size()); - for (int j = 0; j < expected.getBuffers().size(); j++) { - assertTrue(expected.getBuffers().get(j).compareTo(result.getBuffers().get(j)) == 0); + assertEquals(new Schema(asList(field)), reader.getVectorSchemaRoot().getSchema()); + + NullableTinyIntVector readVector = (NullableTinyIntVector) reader.getVectorSchemaRoot().getFieldVectors().get(0); + for (int i = 0; i < batches; i++) { + reader.loadNextBatch(); + assertEquals(16, reader.getVectorSchemaRoot().getRowCount()); + assertEquals(16, readVector.getAccessor().getValueCount()); + for (int j = 0; j < 8; j++) { + assertEquals(j + i, readVector.getAccessor().get(j)); + assertTrue(readVector.getAccessor().isNull(j + 8)); } } - ArrowRecordBatch result = reader.nextRecordBatch(); - assertTrue(result == null); + reader.loadNextBatch(); + assertEquals(0, reader.getVectorSchemaRoot().getRowCount()); assertEquals(reader.bytesRead(), writer.bytesWritten()); } } @Test public void basicTest() throws InterruptedException, IOException { - final EchoServer server = new EchoServer(0); - int serverPort = server.port(); - Thread serverThread = new Thread() { - @Override - public void run() { - try { - server.run(); - } catch (IOException e) { - e.printStackTrace(); - } - } - }; - serverThread.start(); - BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE); - byte[] validity = new byte[] { (byte)255, 0}; - byte[] values = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}; - ArrowBuf validityb = buf(alloc, validity); - ArrowBuf valuesb = buf(alloc, values); - ArrowRecordBatch batch = new ArrowRecordBatch( - 16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb)); - Schema schema = new Schema(asList(new Field( - "testField", true, new ArrowType.Int(8, true), Collections.<Field>emptyList()))); + Field field = new Field("testField", true, new ArrowType.Int(8, true), Collections.<Field>emptyList()); + NullableTinyIntVector vector = new NullableTinyIntVector("testField", alloc, null); + Schema schema = new Schema(asList(field)); // Try an empty stream, just the header. - testEchoServer(serverPort, schema, new ArrayList<ArrowRecordBatch>()); + testEchoServer(serverPort, field, vector, 0); // Try with one batch. - List<ArrowRecordBatch> batches = new ArrayList<>(); - batches.add(batch); - testEchoServer(serverPort, schema, batches); + testEchoServer(serverPort, field, vector, 1); // Try with a few - for (int i = 0; i < 10; i++) { - batches.add(batch); + testEchoServer(serverPort, field, vector, 10); + } + + @Test + public void testFlatDictionary() throws IOException { + DictionaryEncoding writeEncoding = new DictionaryEncoding(1L, false, null); + try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + NullableIntVector writeVector = new NullableIntVector("varchar", allocator, writeEncoding); + NullableVarCharVector writeDictionaryVector = new NullableVarCharVector("dict", allocator, null)) { + writeVector.allocateNewSafe(); + NullableIntVector.Mutator mutator = writeVector.getMutator(); + mutator.set(0, 0); + mutator.set(1, 1); + mutator.set(3, 2); + mutator.set(4, 1); + mutator.set(5, 2); + mutator.setValueCount(6); + + writeDictionaryVector.allocateNewSafe(); + NullableVarCharVector.Mutator dictionaryMutator = writeDictionaryVector.getMutator(); + dictionaryMutator.set(0, "foo".getBytes(StandardCharsets.UTF_8)); + dictionaryMutator.set(1, "bar".getBytes(StandardCharsets.UTF_8)); + dictionaryMutator.set(2, "baz".getBytes(StandardCharsets.UTF_8)); + dictionaryMutator.setValueCount(3); + + List<Field> fields = ImmutableList.of(writeVector.getField()); + List<FieldVector> vectors = ImmutableList.of((FieldVector) writeVector); + VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 6); + + DictionaryProvider writeProvider = new MapDictionaryProvider(new Dictionary(writeDictionaryVector, writeEncoding)); + + try (Socket socket = new Socket("localhost", serverPort); + ArrowStreamWriter writer = new ArrowStreamWriter(root, writeProvider, socket.getOutputStream()); + ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) { + writer.start(); + writer.writeBatch(); + writer.end(); + + reader.loadNextBatch(); + VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot(); + Assert.assertEquals(6, readerRoot.getRowCount()); + + FieldVector readVector = readerRoot.getFieldVectors().get(0); + Assert.assertNotNull(readVector); + + DictionaryEncoding readEncoding = readVector.getField().getDictionary(); + Assert.assertNotNull(readEncoding); + Assert.assertEquals(1L, readEncoding.getId()); + + FieldVector.Accessor accessor = readVector.getAccessor(); + Assert.assertEquals(6, accessor.getValueCount()); + Assert.assertEquals(0, accessor.getObject(0)); + Assert.assertEquals(1, accessor.getObject(1)); + Assert.assertEquals(null, accessor.getObject(2)); + Assert.assertEquals(2, accessor.getObject(3)); + Assert.assertEquals(1, accessor.getObject(4)); + Assert.assertEquals(2, accessor.getObject(5)); + + Dictionary dictionary = reader.lookup(1L); + Assert.assertNotNull(dictionary); + NullableVarCharVector.Accessor dictionaryAccessor = ((NullableVarCharVector) dictionary.getVector()).getAccessor(); + Assert.assertEquals(3, dictionaryAccessor.getValueCount()); + Assert.assertEquals(new Text("foo"), dictionaryAccessor.getObject(0)); + Assert.assertEquals(new Text("bar"), dictionaryAccessor.getObject(1)); + Assert.assertEquals(new Text("baz"), dictionaryAccessor.getObject(2)); + } } - testEchoServer(serverPort, schema, batches); + } - server.close(); - serverThread.join(); + @Test + public void testNestedDictionary() throws IOException { + DictionaryEncoding writeEncoding = new DictionaryEncoding(2L, false, null); + try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + NullableVarCharVector writeDictionaryVector = new NullableVarCharVector("dictionary", allocator, null); + ListVector writeVector = new ListVector("list", allocator, null, null)) { + + // data being written: + // [['foo', 'bar'], ['foo'], ['bar']] -> [[0, 1], [0], [1]] + + writeDictionaryVector.allocateNew(); + writeDictionaryVector.getMutator().set(0, "foo".getBytes(StandardCharsets.UTF_8)); + writeDictionaryVector.getMutator().set(1, "bar".getBytes(StandardCharsets.UTF_8)); + writeDictionaryVector.getMutator().setValueCount(2); + + writeVector.addOrGetVector(MinorType.INT, writeEncoding); + writeVector.allocateNew(); + UnionListWriter listWriter = new UnionListWriter(writeVector); + listWriter.startList(); + listWriter.writeInt(0); + listWriter.writeInt(1); + listWriter.endList(); + listWriter.startList(); + listWriter.writeInt(0); + listWriter.endList(); + listWriter.startList(); + listWriter.writeInt(1); + listWriter.endList(); + listWriter.setValueCount(3); + + List<Field> fields = ImmutableList.of(writeVector.getField()); + List<FieldVector> vectors = ImmutableList.of((FieldVector) writeVector); + VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 3); + + DictionaryProvider writeProvider = new MapDictionaryProvider(new Dictionary(writeDictionaryVector, writeEncoding)); + + try (Socket socket = new Socket("localhost", serverPort); + ArrowStreamWriter writer = new ArrowStreamWriter(root, writeProvider, socket.getOutputStream()); + ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) { + writer.start(); + writer.writeBatch(); + writer.end(); + + reader.loadNextBatch(); + VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot(); + Assert.assertEquals(3, readerRoot.getRowCount()); + + ListVector readVector = (ListVector) readerRoot.getFieldVectors().get(0); + Assert.assertNotNull(readVector); + + Assert.assertNull(readVector.getField().getDictionary()); + DictionaryEncoding readEncoding = readVector.getField().getChildren().get(0).getDictionary(); + Assert.assertNotNull(readEncoding); + Assert.assertEquals(2L, readEncoding.getId()); + + Field nestedField = readVector.getField().getChildren().get(0); + + DictionaryEncoding encoding = nestedField.getDictionary(); + Assert.assertNotNull(encoding); + Assert.assertEquals(2L, encoding.getId()); + Assert.assertEquals(new Int(32, true), encoding.getIndexType()); + + ListVector.Accessor accessor = readVector.getAccessor(); + Assert.assertEquals(3, accessor.getValueCount()); + Assert.assertEquals(Arrays.asList(0, 1), accessor.getObject(0)); + Assert.assertEquals(Arrays.asList(0), accessor.getObject(1)); + Assert.assertEquals(Arrays.asList(1), accessor.getObject(2)); + + Dictionary readDictionary = reader.lookup(2L); + Assert.assertNotNull(readDictionary); + NullableVarCharVector.Accessor dictionaryAccessor = ((NullableVarCharVector) readDictionary.getVector()).getAccessor(); + Assert.assertEquals(2, dictionaryAccessor.getValueCount()); + Assert.assertEquals(new Text("foo"), dictionaryAccessor.getObject(0)); + Assert.assertEquals(new Text("bar"), dictionaryAccessor.getObject(1)); + } + } } } http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java ---------------------------------------------------------------------- diff --git a/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java b/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java index 0ae32be..9d4ef5c 100644 --- a/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java +++ b/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java @@ -33,6 +33,11 @@ import java.io.IOException; import java.io.StringReader; import java.util.Map; +import com.fasterxml.jackson.core.util.DefaultPrettyPrinter; +import com.fasterxml.jackson.core.util.DefaultPrettyPrinter.NopIndenter; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; + import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.tools.Integration.Command; @@ -49,11 +54,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import com.fasterxml.jackson.core.util.DefaultPrettyPrinter; -import com.fasterxml.jackson.core.util.DefaultPrettyPrinter.NopIndenter; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; - public class TestIntegration { @Rule @@ -128,6 +128,34 @@ public class TestIntegration { } } + @Test + public void testJSONRoundTripWithStruct() throws Exception { + File testJSONFile = new File("../../integration/data/struct_example.json"); + File testOutFile = testFolder.newFile("testOutStruct.arrow"); + File testRoundTripJSONFile = testFolder.newFile("testOutStruct.json"); + testOutFile.delete(); + testRoundTripJSONFile.delete(); + + Integration integration = new Integration(); + + // convert to arrow + String[] args1 = { "-arrow", testOutFile.getAbsolutePath(), "-json", testJSONFile.getAbsolutePath(), "-command", Command.JSON_TO_ARROW.name()}; + integration.run(args1); + + // convert back to json + String[] args2 = { "-arrow", testOutFile.getAbsolutePath(), "-json", testRoundTripJSONFile.getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()}; + integration.run(args2); + + BufferedReader orig = readNormalized(testJSONFile); + BufferedReader rt = readNormalized(testRoundTripJSONFile); + String i, o; + int j = 0; + while ((i = orig.readLine()) != null && (o = rt.readLine()) != null) { + assertEquals("line: " + j, i, o); + ++j; + } + } + private ObjectMapper om = new ObjectMapper(); { DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter(); http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/tmptestfilesio ---------------------------------------------------------------------- diff --git a/java/tools/tmptestfilesio b/java/tools/tmptestfilesio new file mode 100644 index 0000000..d1b6b6c Binary files /dev/null and b/java/tools/tmptestfilesio differ http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/codegen/templates/MapWriters.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/codegen/templates/MapWriters.java b/java/vector/src/main/codegen/templates/MapWriters.java index 4af6eee..428ce04 100644 --- a/java/vector/src/main/codegen/templates/MapWriters.java +++ b/java/vector/src/main/codegen/templates/MapWriters.java @@ -64,7 +64,7 @@ public class ${mode}MapWriter extends AbstractFieldWriter { list(child.getName()); break; case UNION: - UnionWriter writer = new UnionWriter(container.addOrGet(child.getName(), MinorType.UNION, UnionVector.class), getNullableMapWriterFactory()); + UnionWriter writer = new UnionWriter(container.addOrGet(child.getName(), MinorType.UNION, UnionVector.class, null), getNullableMapWriterFactory()); fields.put(handleCase(child.getName()), writer); break; <#list vv.types as type><#list type.minor as minor> @@ -113,7 +113,7 @@ public class ${mode}MapWriter extends AbstractFieldWriter { FieldWriter writer = fields.get(finalName); if(writer == null){ int vectorCount=container.size(); - NullableMapVector vector = container.addOrGet(name, MinorType.MAP, NullableMapVector.class); + NullableMapVector vector = container.addOrGet(name, MinorType.MAP, NullableMapVector.class, null); writer = new PromotableWriter(vector, container, getNullableMapWriterFactory()); if(vectorCount != container.size()) { writer.allocate(); @@ -157,7 +157,7 @@ public class ${mode}MapWriter extends AbstractFieldWriter { FieldWriter writer = fields.get(finalName); int vectorCount = container.size(); if(writer == null) { - writer = new PromotableWriter(container.addOrGet(name, MinorType.LIST, ListVector.class), container, getNullableMapWriterFactory()); + writer = new PromotableWriter(container.addOrGet(name, MinorType.LIST, ListVector.class, null), container, getNullableMapWriterFactory()); if (container.size() > vectorCount) { writer.allocate(); } @@ -222,7 +222,7 @@ public class ${mode}MapWriter extends AbstractFieldWriter { if(writer == null) { ValueVector vector; ValueVector currentVector = container.getChild(name); - ${vectName}Vector v = container.addOrGet(name, MinorType.${upperName}, ${vectName}Vector.class<#if minor.class == "Decimal"> , new int[] {precision, scale}</#if>); + ${vectName}Vector v = container.addOrGet(name, MinorType.${upperName}, ${vectName}Vector.class, null<#if minor.class == "Decimal"> , new int[] {precision, scale}</#if>); writer = new PromotableWriter(v, container, getNullableMapWriterFactory()); vector = v; if (currentVector == null || currentVector != vector) { http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/codegen/templates/NullableValueVectors.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/codegen/templates/NullableValueVectors.java b/java/vector/src/main/codegen/templates/NullableValueVectors.java index 6b25fb3..b3e10e3 100644 --- a/java/vector/src/main/codegen/templates/NullableValueVectors.java +++ b/java/vector/src/main/codegen/templates/NullableValueVectors.java @@ -65,21 +65,21 @@ public final class ${className} extends BaseDataValueVector implements <#if type private final int precision; private final int scale; - public ${className}(String name, BufferAllocator allocator, int precision, int scale) { + public ${className}(String name, BufferAllocator allocator, DictionaryEncoding dictionary, int precision, int scale) { super(name, allocator); values = new ${valuesName}(valuesField, allocator, precision, scale); this.precision = precision; this.scale = scale; mutator = new Mutator(); accessor = new Accessor(); - field = new Field(name, true, new Decimal(precision, scale), null); + field = new Field(name, true, new Decimal(precision, scale), dictionary, null); innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList( bits, values )); } <#else> - public ${className}(String name, BufferAllocator allocator) { + public ${className}(String name, BufferAllocator allocator, DictionaryEncoding dictionary) { super(name, allocator); values = new ${valuesName}(valuesField, allocator); mutator = new Mutator(); @@ -88,38 +88,38 @@ public final class ${className} extends BaseDataValueVector implements <#if type minor.class == "SmallInt" || minor.class == "Int" || minor.class == "BigInt"> - field = new Field(name, true, new Int(${type.width} * 8, true), null); + field = new Field(name, true, new Int(${type.width} * 8, true), dictionary, null); <#elseif minor.class == "UInt1" || minor.class == "UInt2" || minor.class == "UInt4" || minor.class == "UInt8"> - field = new Field(name, true, new Int(${type.width} * 8, false), null); + field = new Field(name, true, new Int(${type.width} * 8, false), dictionary, null); <#elseif minor.class == "Date"> - field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Date(), null); + field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Date(), dictionary, null); <#elseif minor.class == "Time"> - field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Time(), null); + field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Time(), dictionary, null); <#elseif minor.class == "Float4"> - field = new Field(name, true, new FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE), null); + field = new Field(name, true, new FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE), dictionary, null); <#elseif minor.class == "Float8"> - field = new Field(name, true, new FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE), null); + field = new Field(name, true, new FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE), dictionary, null); <#elseif minor.class == "TimeStampSec"> - field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.SECOND), null); + field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.SECOND), dictionary, null); <#elseif minor.class == "TimeStampMilli"> - field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.MILLISECOND), null); + field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.MILLISECOND), dictionary, null); <#elseif minor.class == "TimeStampMicro"> - field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.MICROSECOND), null); + field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.MICROSECOND), dictionary, null); <#elseif minor.class == "TimeStampNano"> - field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.NANOSECOND), null); + field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.NANOSECOND), dictionary, null); <#elseif minor.class == "IntervalDay"> - field = new Field(name, true, new Interval(org.apache.arrow.vector.types.IntervalUnit.DAY_TIME), null); + field = new Field(name, true, new Interval(org.apache.arrow.vector.types.IntervalUnit.DAY_TIME), dictionary, null); <#elseif minor.class == "IntervalYear"> - field = new Field(name, true, new Interval(org.apache.arrow.vector.types.IntervalUnit.YEAR_MONTH), null); + field = new Field(name, true, new Interval(org.apache.arrow.vector.types.IntervalUnit.YEAR_MONTH), dictionary, null); <#elseif minor.class == "VarChar"> - field = new Field(name, true, new Utf8(), null); + field = new Field(name, true, new Utf8(), dictionary, null); <#elseif minor.class == "VarBinary"> - field = new Field(name, true, new Binary(), null); + field = new Field(name, true, new Binary(), dictionary, null); <#elseif minor.class == "Bit"> - field = new Field(name, true, new Bool(), null); + field = new Field(name, true, new Bool(), dictionary, null); </#if> innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList( bits, @@ -378,9 +378,9 @@ public final class ${className} extends BaseDataValueVector implements <#if type public TransferImpl(String name, BufferAllocator allocator){ <#if minor.class == "Decimal"> - to = new ${className}(name, allocator, precision, scale); + to = new ${className}(name, allocator, field.getDictionary(), precision, scale); <#else> - to = new ${className}(name, allocator); + to = new ${className}(name, allocator, field.getDictionary()); </#if> } http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/codegen/templates/UnionVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/codegen/templates/UnionVector.java b/java/vector/src/main/codegen/templates/UnionVector.java index 1a6908d..076ed93 100644 --- a/java/vector/src/main/codegen/templates/UnionVector.java +++ b/java/vector/src/main/codegen/templates/UnionVector.java @@ -118,11 +118,11 @@ public class UnionVector implements FieldVector { public List<BufferBacked> getFieldInnerVectors() { return this.innerVectors; } - + public NullableMapVector getMap() { if (mapVector == null) { int vectorCount = internalMap.size(); - mapVector = internalMap.addOrGet("map", MinorType.MAP, NullableMapVector.class); + mapVector = internalMap.addOrGet("map", MinorType.MAP, NullableMapVector.class, null); if (internalMap.size() > vectorCount) { mapVector.allocateNew(); if (callBack != null) { @@ -144,7 +144,7 @@ public class UnionVector implements FieldVector { public Nullable${name}Vector get${name}Vector() { if (${uncappedName}Vector == null) { int vectorCount = internalMap.size(); - ${uncappedName}Vector = internalMap.addOrGet("${lowerCaseName}", MinorType.${name?upper_case}, Nullable${name}Vector.class); + ${uncappedName}Vector = internalMap.addOrGet("${lowerCaseName}", MinorType.${name?upper_case}, Nullable${name}Vector.class, null); if (internalMap.size() > vectorCount) { ${uncappedName}Vector.allocateNew(); if (callBack != null) { @@ -162,7 +162,7 @@ public class UnionVector implements FieldVector { public ListVector getList() { if (listVector == null) { int vectorCount = internalMap.size(); - listVector = internalMap.addOrGet("list", MinorType.LIST, ListVector.class); + listVector = internalMap.addOrGet("list", MinorType.LIST, ListVector.class, null); if (internalMap.size() > vectorCount) { listVector.allocateNew(); if (callBack != null) { @@ -262,7 +262,7 @@ public class UnionVector implements FieldVector { public FieldVector addVector(FieldVector v) { String name = v.getMinorType().name().toLowerCase(); Preconditions.checkState(internalMap.getChild(name) == null, String.format("%s vector already exists", name)); - final FieldVector newVector = internalMap.addOrGet(name, v.getMinorType(), v.getClass()); + final FieldVector newVector = internalMap.addOrGet(name, v.getMinorType(), v.getClass(), v.getField().getDictionary()); v.makeTransferPair(newVector).transfer(); internalMap.putChild(name, newVector); if (callBack != null) { http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java index d1e9abe..179f2ee 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java @@ -81,6 +81,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe } else { super.load(fieldNode, data); } + this.valueCount = fieldNode.getLength(); } @Override @@ -451,7 +452,6 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe /** * set count bits to 1 in data starting at firstBitIndex - * @param data the buffer to set * @param firstBitIndex the index of the first bit to set * @param count the number of bits to set */ http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java b/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java index b28433c..0fdbc48 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java @@ -19,11 +19,10 @@ package org.apache.arrow.vector; import java.util.List; +import io.netty.buffer.ArrowBuf; import org.apache.arrow.vector.schema.ArrowFieldNode; import org.apache.arrow.vector.types.pojo.Field; -import io.netty.buffer.ArrowBuf; - /** * A vector corresponding to a Field in the schema * It has inner vectors backed by buffers (validity, offsets, data, ...) @@ -61,5 +60,4 @@ public interface FieldVector extends ValueVector { * @return the inner vectors for this field as defined by the TypeLayout */ List<BufferBacked> getFieldInnerVectors(); - } http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java index 5c1176c..76de250 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java @@ -36,15 +36,14 @@ import io.netty.buffer.ArrowBuf; * Loads buffers into vectors */ public class VectorLoader { + private final VectorSchemaRoot root; /** * will create children in root based on schema - * @param schema the expected schema * @param root the root to add vectors to based on schema */ public VectorLoader(VectorSchemaRoot root) { - super(); this.root = root; } @@ -57,18 +56,16 @@ public class VectorLoader { Iterator<ArrowBuf> buffers = recordBatch.getBuffers().iterator(); Iterator<ArrowFieldNode> nodes = recordBatch.getNodes().iterator(); List<Field> fields = root.getSchema().getFields(); - for (int i = 0; i < fields.size(); ++i) { - Field field = fields.get(i); + for (Field field: fields) { FieldVector fieldVector = root.getVector(field.getName()); loadBuffers(fieldVector, field, buffers, nodes); } root.setRowCount(recordBatch.getLength()); if (nodes.hasNext() || buffers.hasNext()) { - throw new IllegalArgumentException("not all nodes and buffers where consumed. nodes: " + Iterators.toString(nodes) + " buffers: " + Iterators.toString(buffers)); + throw new IllegalArgumentException("not all nodes and buffers were consumed. nodes: " + Iterators.toString(nodes) + " buffers: " + Iterators.toString(buffers)); } } - private void loadBuffers(FieldVector vector, Field field, Iterator<ArrowBuf> buffers, Iterator<ArrowFieldNode> nodes) { checkArgument(nodes.hasNext(), "no more field nodes for for field " + field + " and vector " + vector); @@ -82,7 +79,7 @@ public class VectorLoader { vector.loadFieldBuffers(fieldNode, ownBuffers); } catch (RuntimeException e) { throw new IllegalArgumentException("Could not load buffers for field " + - field + ". error message: " + e.getMessage(), e); + field + ". error message: " + e.getMessage(), e); } List<Field> children = field.getChildren(); if (children.size() > 0) { @@ -96,4 +93,4 @@ public class VectorLoader { } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java index 1cbe187..7e626fb 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java @@ -18,7 +18,6 @@ package org.apache.arrow.vector; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -29,6 +28,9 @@ import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; +/** + * Holder for a set of vectors to be loaded/unloaded + */ public class VectorSchemaRoot implements AutoCloseable { private final Schema schema; @@ -37,9 +39,17 @@ public class VectorSchemaRoot implements AutoCloseable { private final Map<String, FieldVector> fieldVectorsMap = new HashMap<>(); public VectorSchemaRoot(FieldVector parent) { - this.schema = new Schema(parent.getField().getChildren()); - this.rowCount = parent.getAccessor().getValueCount(); - this.fieldVectors = parent.getChildrenFromFields(); + this(parent.getField().getChildren(), parent.getChildrenFromFields(), parent.getAccessor().getValueCount()); + } + + public VectorSchemaRoot(List<Field> fields, List<FieldVector> fieldVectors, int rowCount) { + if (fields.size() != fieldVectors.size()) { + throw new IllegalArgumentException("Fields must match field vectors. Found " + + fieldVectors.size() + " vectors and " + fields.size() + " fields"); + } + this.schema = new Schema(fields); + this.rowCount = rowCount; + this.fieldVectors = fieldVectors; for (int i = 0; i < schema.getFields().size(); ++i) { Field field = schema.getFields().get(i); FieldVector vector = fieldVectors.get(i); @@ -47,21 +57,19 @@ public class VectorSchemaRoot implements AutoCloseable { } } - public VectorSchemaRoot(Schema schema, BufferAllocator allocator) { - super(); - this.schema = schema; + public static VectorSchemaRoot create(Schema schema, BufferAllocator allocator) { List<FieldVector> fieldVectors = new ArrayList<>(); for (Field field : schema.getFields()) { MinorType minorType = Types.getMinorTypeForArrowType(field.getType()); - FieldVector vector = minorType.getNewVector(field.getName(), allocator, null); + FieldVector vector = minorType.getNewVector(field.getName(), allocator, field.getDictionary(), null); vector.initializeChildrenFromFields(field.getChildren()); fieldVectors.add(vector); - fieldVectorsMap.put(field.getName(), vector); } - this.fieldVectors = Collections.unmodifiableList(fieldVectors); - if (this.fieldVectors.size() != schema.getFields().size()) { - throw new IllegalArgumentException("The root vector did not create the right number of children. found " + fieldVectors.size() + " expected " + schema.getFields().size()); + if (fieldVectors.size() != schema.getFields().size()) { + throw new IllegalArgumentException("The root vector did not create the right number of children. found " + + fieldVectors.size() + " expected " + schema.getFields().size()); } + return new VectorSchemaRoot(schema.getFields(), fieldVectors, 0); } public List<FieldVector> getFieldVectors() { http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java index 92d8cb0..8e9ff6d 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java @@ -20,42 +20,27 @@ package org.apache.arrow.vector; import java.util.ArrayList; import java.util.List; +import io.netty.buffer.ArrowBuf; import org.apache.arrow.vector.ValueVector.Accessor; import org.apache.arrow.vector.schema.ArrowFieldNode; import org.apache.arrow.vector.schema.ArrowRecordBatch; import org.apache.arrow.vector.schema.ArrowVectorType; -import org.apache.arrow.vector.types.pojo.Schema; - -import io.netty.buffer.ArrowBuf; public class VectorUnloader { - private final Schema schema; - private final int valueCount; - private final List<FieldVector> vectors; - - public VectorUnloader(Schema schema, int valueCount, List<FieldVector> vectors) { - super(); - this.schema = schema; - this.valueCount = valueCount; - this.vectors = vectors; - } + private final VectorSchemaRoot root; public VectorUnloader(VectorSchemaRoot root) { - this(root.getSchema(), root.getRowCount(), root.getFieldVectors()); - } - - public Schema getSchema() { - return schema; + this.root = root; } public ArrowRecordBatch getRecordBatch() { List<ArrowFieldNode> nodes = new ArrayList<>(); List<ArrowBuf> buffers = new ArrayList<>(); - for (FieldVector vector : vectors) { + for (FieldVector vector : root.getFieldVectors()) { appendNodes(vector, nodes, buffers); } - return new ArrowRecordBatch(valueCount, nodes, buffers); + return new ArrowRecordBatch(root.getRowCount(), nodes, buffers); } private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers) { @@ -74,4 +59,4 @@ public class VectorUnloader { } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java index 2f68886..86a5e82 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java @@ -22,6 +22,7 @@ import org.apache.arrow.memory.OutOfMemoryException; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.DictionaryEncoding; import org.apache.arrow.vector.util.CallBack; /** @@ -85,7 +86,7 @@ public abstract class AbstractContainerVector implements ValueVector { public abstract int size(); // add a new vector with the input MajorType or return the existing vector if we already added one with the same type - public abstract <T extends FieldVector> T addOrGet(String name, MinorType minorType, Class<T> clazz, int... precisionScale); + public abstract <T extends FieldVector> T addOrGet(String name, MinorType minorType, Class<T> clazz, DictionaryEncoding dictionary, int... precisionScale); // return the child vector with the input name public abstract <T extends FieldVector> T getChild(String name, Class<T> clazz); http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java index f030d16..baeeb07 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java @@ -26,6 +26,7 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.DictionaryEncoding; import org.apache.arrow.vector.util.CallBack; import org.apache.arrow.vector.util.MapWithOrdinal; @@ -110,7 +111,7 @@ public abstract class AbstractMapVector extends AbstractContainerVector { * @return resultant {@link org.apache.arrow.vector.ValueVector} */ @Override - public <T extends FieldVector> T addOrGet(String name, MinorType minorType, Class<T> clazz, int... precisionScale) { + public <T extends FieldVector> T addOrGet(String name, MinorType minorType, Class<T> clazz, DictionaryEncoding dictionary, int... precisionScale) { final ValueVector existing = getChild(name); boolean create = false; if (existing == null) { @@ -122,7 +123,7 @@ public abstract class AbstractMapVector extends AbstractContainerVector { create = true; } if (create) { - final T vector = clazz.cast(minorType.getNewVector(name, allocator, callBack, precisionScale)); + final T vector = clazz.cast(minorType.getNewVector(name, allocator, dictionary, callBack, precisionScale)); putChild(name, vector); if (callBack!=null) { callBack.doWork(); @@ -162,12 +163,12 @@ public abstract class AbstractMapVector extends AbstractContainerVector { return typeify(v, clazz); } - protected ValueVector add(String name, MinorType minorType, int... precisionScale) { + protected ValueVector add(String name, MinorType minorType, DictionaryEncoding dictionary, int... precisionScale) { final ValueVector existing = getChild(name); if (existing != null) { throw new IllegalStateException(String.format("Vector already exists: Existing[%s], Requested[%s] ", existing.getClass().getSimpleName(), minorType)); } - FieldVector vector = minorType.getNewVector(name, allocator, callBack, precisionScale); + FieldVector vector = minorType.getNewVector(name, allocator, dictionary, callBack, precisionScale); putChild(name, vector); if (callBack!=null) { callBack.doWork(); http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java index 7424df4..eeb8f58 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java @@ -28,6 +28,7 @@ import org.apache.arrow.vector.UInt4Vector; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.ZeroVector; import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.DictionaryEncoding; import org.apache.arrow.vector.util.SchemaChangeRuntimeException; import com.google.common.base.Preconditions; @@ -150,10 +151,10 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements return vector == DEFAULT_DATA_VECTOR ? 0:1; } - public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(MinorType minorType) { + public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(MinorType minorType, DictionaryEncoding dictionary) { boolean created = false; if (vector instanceof ZeroVector) { - vector = minorType.getNewVector(DATA_VECTOR_NAME, allocator, null); + vector = minorType.getNewVector(DATA_VECTOR_NAME, allocator, dictionary, null); // returned vector must have the same field created = true; } http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/complex/DictionaryVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/DictionaryVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/DictionaryVector.java deleted file mode 100644 index 84760ea..0000000 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/DictionaryVector.java +++ /dev/null @@ -1,229 +0,0 @@ -/******************************************************************************* - - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - ******************************************************************************/ -package org.apache.arrow.vector.complex; - -import io.netty.buffer.ArrowBuf; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.OutOfMemoryException; -import org.apache.arrow.vector.NullableIntVector; -import org.apache.arrow.vector.ValueVector; -import org.apache.arrow.vector.complex.reader.FieldReader; -import org.apache.arrow.vector.types.Dictionary; -import org.apache.arrow.vector.types.Types.MinorType; -import org.apache.arrow.vector.types.pojo.Field; -import org.apache.arrow.vector.util.TransferPair; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -public class DictionaryVector implements ValueVector { - - private ValueVector indices; - private Dictionary dictionary; - - public DictionaryVector(ValueVector indices, Dictionary dictionary) { - this.indices = indices; - this.dictionary = dictionary; - } - - /** - * Dictionary encodes a vector. The dictionary will be built using the values from the vector. - * - * @param vector vector to encode - * @return dictionary encoded vector - */ - public static DictionaryVector encode(ValueVector vector) { - validateType(vector.getMinorType()); - Map<Object, Integer> lookUps = new HashMap<>(); - Map<Integer, Integer> transfers = new HashMap<>(); - - ValueVector.Accessor accessor = vector.getAccessor(); - int count = accessor.getValueCount(); - - NullableIntVector indices = new NullableIntVector(vector.getField().getName(), vector.getAllocator()); - indices.allocateNew(count); - NullableIntVector.Mutator mutator = indices.getMutator(); - - int nextIndex = 0; - for (int i = 0; i < count; i++) { - Object value = accessor.getObject(i); - if (value != null) { // if it's null leave it null - Integer index = lookUps.get(value); - if (index == null) { - index = nextIndex++; - lookUps.put(value, index); - transfers.put(i, index); - } - mutator.set(i, index); - } - } - mutator.setValueCount(count); - - // copy the dictionary values into the dictionary vector - TransferPair dictionaryTransfer = vector.getTransferPair(vector.getAllocator()); - ValueVector dictionaryVector = dictionaryTransfer.getTo(); - dictionaryVector.allocateNewSafe(); - for (Map.Entry<Integer, Integer> entry: transfers.entrySet()) { - dictionaryTransfer.copyValueSafe(entry.getKey(), entry.getValue()); - } - dictionaryVector.getMutator().setValueCount(transfers.size()); - Dictionary dictionary = new Dictionary(dictionaryVector, false); - - return new DictionaryVector(indices, dictionary); - } - - /** - * Dictionary encodes a vector with a provided dictionary. The dictionary must contain all values in the vector. - * - * @param vector vector to encode - * @param dictionary dictionary used for encoding - * @return dictionary encoded vector - */ - public static DictionaryVector encode(ValueVector vector, Dictionary dictionary) { - validateType(vector.getMinorType()); - // load dictionary values into a hashmap for lookup - ValueVector.Accessor dictionaryAccessor = dictionary.getDictionary().getAccessor(); - Map<Object, Integer> lookUps = new HashMap<>(dictionaryAccessor.getValueCount()); - for (int i = 0; i < dictionaryAccessor.getValueCount(); i++) { - // for primitive array types we need a wrapper that implements equals and hashcode appropriately - lookUps.put(dictionaryAccessor.getObject(i), i); - } - - // vector to hold our indices (dictionary encoded values) - NullableIntVector indices = new NullableIntVector(vector.getField().getName(), vector.getAllocator()); - NullableIntVector.Mutator mutator = indices.getMutator(); - - ValueVector.Accessor accessor = vector.getAccessor(); - int count = accessor.getValueCount(); - - indices.allocateNew(count); - - for (int i = 0; i < count; i++) { - Object value = accessor.getObject(i); - if (value != null) { // if it's null leave it null - // note: this may fail if value was not included in the dictionary - mutator.set(i, lookUps.get(value)); - } - } - mutator.setValueCount(count); - - return new DictionaryVector(indices, dictionary); - } - - /** - * Decodes a dictionary encoded array using the provided dictionary. - * - * @param indices dictionary encoded values, must be int type - * @param dictionary dictionary used to decode the values - * @return vector with values restored from dictionary - */ - public static ValueVector decode(ValueVector indices, Dictionary dictionary) { - ValueVector.Accessor accessor = indices.getAccessor(); - int count = accessor.getValueCount(); - ValueVector dictionaryVector = dictionary.getDictionary(); - // copy the dictionary values into the decoded vector - TransferPair transfer = dictionaryVector.getTransferPair(indices.getAllocator()); - transfer.getTo().allocateNewSafe(); - for (int i = 0; i < count; i++) { - Object index = accessor.getObject(i); - if (index != null) { - transfer.copyValueSafe(((Number) index).intValue(), i); - } - } - - ValueVector decoded = transfer.getTo(); - decoded.getMutator().setValueCount(count); - return decoded; - } - - private static void validateType(MinorType type) { - // byte arrays don't work as keys in our dictionary map - we could wrap them with something to - // implement equals and hashcode if we want that functionality - if (type == MinorType.VARBINARY || type == MinorType.LIST || type == MinorType.MAP || type == MinorType.UNION) { - throw new IllegalArgumentException("Dictionary encoding for complex types not implemented"); - } - } - - public ValueVector getIndexVector() { return indices; } - - public ValueVector getDictionaryVector() { return dictionary.getDictionary(); } - - public Dictionary getDictionary() { return dictionary; } - - @Override - public MinorType getMinorType() { return indices.getMinorType(); } - - @Override - public Field getField() { return indices.getField(); } - - // note: dictionary vector is not closed, as it may be shared - @Override - public void close() { indices.close(); } - - @Override - public void allocateNew() throws OutOfMemoryException { indices.allocateNew(); } - - @Override - public boolean allocateNewSafe() { return indices.allocateNewSafe(); } - - @Override - public BufferAllocator getAllocator() { return indices.getAllocator(); } - - @Override - public void setInitialCapacity(int numRecords) { indices.setInitialCapacity(numRecords); } - - @Override - public int getValueCapacity() { return indices.getValueCapacity(); } - - @Override - public int getBufferSize() { return indices.getBufferSize(); } - - @Override - public int getBufferSizeFor(int valueCount) { return indices.getBufferSizeFor(valueCount); } - - @Override - public Iterator<ValueVector> iterator() { - return indices.iterator(); - } - - @Override - public void clear() { indices.clear(); } - - @Override - public TransferPair getTransferPair(BufferAllocator allocator) { return indices.getTransferPair(allocator); } - - @Override - public TransferPair getTransferPair(String ref, BufferAllocator allocator) { return indices.getTransferPair(ref, allocator); } - - @Override - public TransferPair makeTransferPair(ValueVector target) { return indices.makeTransferPair(target); } - - @Override - public Accessor getAccessor() { return indices.getAccessor(); } - - @Override - public Mutator getMutator() { return indices.getMutator(); } - - @Override - public FieldReader getReader() { return indices.getReader(); } - - @Override - public ArrowBuf[] getBuffers(boolean clear) { return indices.getBuffers(clear); } -}
