Repository: arrow Updated Branches: refs/heads/master 86f56a607 -> e3c167bd1
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/format/IPC.md ---------------------------------------------------------------------- diff --git a/format/IPC.md b/format/IPC.md index 3f78126..a55dcdf 100644 --- a/format/IPC.md +++ b/format/IPC.md @@ -15,3 +15,109 @@ # Interprocess messaging / communication (IPC) ## File format + +We define a self-contained "file format" containing an Arrow schema along with +one or more record batches defining a dataset. See [format/File.fbs][1] for the +precise details of the file metadata. + +In general, the file looks like: + +``` +<magic number "ARROW1"> +<empty padding bytes [to 64 byte boundary]> +<DICTIONARY 0> +... +<DICTIONARY k - 1> +<RECORD BATCH 0> +... +<RECORD BATCH n - 1> +<METADATA org.apache.arrow.flatbuf.Footer> +<metadata_size: int32> +<magic number "ARROW1"> +``` + +See the File.fbs document for details about the Flatbuffers metadata. The +record batches have a particular structure, defined next. + +### Record batches + +The record batch metadata is written as a flatbuffer (see +[format/Message.fbs][2] -- the RecordBatch message type) prefixed by its size, +followed by each of the memory buffers in the batch written end to end (with +appropriate alignment and padding): + +``` +<int32: metadata flatbuffer size> +<metadata: org.apache.arrow.flatbuf.RecordBatch> +<padding bytes [to 64-byte boundary]> +<body: buffers end to end> +``` + +The `RecordBatch` metadata contains a depth-first (pre-order) flattened set of +field metadata and physical memory buffers (some comments from [Message.fbs][2] +have been shortened / removed): + +``` +table RecordBatch { + length: int; + nodes: [FieldNode]; + buffers: [Buffer]; +} + +struct FieldNode { + /// The number of value slots in the Arrow array at this level of a nested + /// tree + length: int; + + /// The number of observed nulls. Fields with null_count == 0 may choose not + /// to write their physical validity bitmap out as a materialized buffer, + /// instead setting the length of the bitmap buffer to 0. + null_count: int; +} + +struct Buffer { + /// The shared memory page id where this buffer is located. Currently this is + /// not used + page: int; + + /// The relative offset into the shared memory page where the bytes for this + /// buffer starts + offset: long; + + /// The absolute length (in bytes) of the memory buffer. The memory is found + /// from offset (inclusive) to offset + length (non-inclusive). + length: long; +} +``` + +In the context of a file, the `page` is not used, and the `Buffer` offsets use +as a frame of reference the start of the segment where they are written in the +file. So, while in a general IPC setting these offsets may be anyplace in one +or more shared memory regions, in the file format the offsets start from 0. + +The location of a record batch and the size of the metadata block as well as +the body of buffers is stored in the file footer: + +``` +struct Block { + offset: long; + metaDataLength: int; + bodyLength: long; +} +``` + +Some notes about this + +* The `Block` offset indicates the starting byte of the record batch. +* The metadata length includes the flatbuffer size, the record batch metadata + flatbuffer, and any padding bytes + + +### Dictionary batches + +Dictionary batches have not yet been implemented, while they are provided for +in the metadata. For the time being, the `DICTIONARY` segments shown above in +the file do not appear in any of the file implementations. + +[1]: https://github.com/apache/arrow/blob/master/format/File.fbs +[1]: https://github.com/apache/arrow/blob/master/format/Message.fbs \ No newline at end of file http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/format/Message.fbs ---------------------------------------------------------------------- diff --git a/format/Message.fbs b/format/Message.fbs index 2ec9fd1..d07d066 100644 --- a/format/Message.fbs +++ b/format/Message.fbs @@ -18,7 +18,8 @@ namespace org.apache.arrow.flatbuf; enum MetadataVersion:short { - V1_SNAPSHOT + V1, + V2 } /// ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/integration/data/simple.json ---------------------------------------------------------------------- diff --git a/integration/data/simple.json b/integration/data/simple.json new file mode 100644 index 0000000..a91b405 --- /dev/null +++ b/integration/data/simple.json @@ -0,0 +1,66 @@ +{ + "schema": { + "fields": [ + { + "name": "foo", + "type": {"name": "int", "isSigned": true, "bitWidth": 32}, + "nullable": true, "children": [], + "typeLayout": { + "vectors": [ + {"type": "VALIDITY", "typeBitWidth": 1}, + {"type": "DATA", "typeBitWidth": 32} + ] + } + }, + { + "name": "bar", + "type": {"name": "floatingpoint", "precision": "DOUBLE"}, + "nullable": true, "children": [], + "typeLayout": { + "vectors": [ + {"type": "VALIDITY", "typeBitWidth": 1}, + {"type": "DATA", "typeBitWidth": 64} + ] + } + }, + { + "name": "baz", + "type": {"name": "utf8"}, + "nullable": true, "children": [], + "typeLayout": { + "vectors": [ + {"type": "VALIDITY", "typeBitWidth": 1}, + {"type": "OFFSET", "typeBitWidth": 32}, + {"type": "DATA", "typeBitWidth": 64} + ] + } + } + ] + }, + "batches": [ + { + "count": 5, + "columns": [ + { + "name": "foo", + "count": 5, + "VALIDITY": [1, 0, 1, 1, 1], + "DATA": [1, 2, 3, 4, 5] + }, + { + "name": "bar", + "count": 5, + "VALIDITY": [1, 0, 0, 1, 1], + "DATA": [1.0, 2.0, 3.0, 4.0, 5.0] + }, + { + "name": "baz", + "count": 5, + "VALIDITY": [1, 0, 0, 1, 1], + "OFFSET": [0, 2, 2, 2, 5, 9], + "DATA": ["aa", "", "", "bbb", "cccc"] + } + ] + } + ] +} http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/integration/integration_test.py ---------------------------------------------------------------------- diff --git a/integration/integration_test.py b/integration/integration_test.py new file mode 100644 index 0000000..6ea634d --- /dev/null +++ b/integration/integration_test.py @@ -0,0 +1,177 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import argparse +import glob +import itertools +import os +import six +import subprocess +import tempfile +import uuid + + +ARROW_HOME = os.path.abspath(__file__).rsplit("/", 2)[0] + + +def guid(): + return uuid.uuid4().hex + + +def run_cmd(cmd): + if isinstance(cmd, six.string_types): + cmd = cmd.split(' ') + + try: + output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + # this avoids hiding the stdout / stderr of failed processes + print('Command failed: %s' % ' '.join(cmd)) + print('With output:') + print('--------------') + print(e.output) + print('--------------') + raise e + + if isinstance(output, six.binary_type): + output = output.decode('utf-8') + return output + + +class IntegrationRunner(object): + + def __init__(self, json_files, testers, debug=False): + self.json_files = json_files + self.testers = testers + self.temp_dir = tempfile.mkdtemp() + self.debug = debug + + def run(self): + for producer, consumer in itertools.product(self.testers, + self.testers): + if producer is consumer: + continue + + print('-- {0} producing, {1} consuming'.format(producer.name, + consumer.name)) + + for json_path in self.json_files: + print('Testing with {0}'.format(json_path)) + + arrow_path = os.path.join(self.temp_dir, guid()) + + producer.json_to_arrow(json_path, arrow_path) + consumer.validate(json_path, arrow_path) + + +class Tester(object): + + def __init__(self, debug=False): + self.debug = debug + + def json_to_arrow(self, json_path, arrow_path): + raise NotImplementedError + + def validate(self, json_path, arrow_path): + raise NotImplementedError + + +class JavaTester(Tester): + + ARROW_TOOLS_JAR = os.path.join(ARROW_HOME, + 'java/tools/target/arrow-tools-0.1.1-' + 'SNAPSHOT-jar-with-dependencies.jar') + + name = 'Java' + + def _run(self, arrow_path=None, json_path=None, command='VALIDATE'): + cmd = ['java', '-cp', self.ARROW_TOOLS_JAR, + 'org.apache.arrow.tools.Integration'] + + if arrow_path is not None: + cmd.extend(['-a', arrow_path]) + + if json_path is not None: + cmd.extend(['-j', json_path]) + + cmd.extend(['-c', command]) + + if self.debug: + print(' '.join(cmd)) + + return run_cmd(cmd) + + def validate(self, json_path, arrow_path): + return self._run(arrow_path, json_path, 'VALIDATE') + + def json_to_arrow(self, json_path, arrow_path): + return self._run(arrow_path, json_path, 'JSON_TO_ARROW') + + +class CPPTester(Tester): + + CPP_INTEGRATION_EXE = os.environ.get( + 'ARROW_CPP_TESTER', + os.path.join(ARROW_HOME, + 'cpp/test-build/debug/json-integration-test')) + + name = 'C++' + + def _run(self, arrow_path=None, json_path=None, command='VALIDATE'): + cmd = [self.CPP_INTEGRATION_EXE, '--integration'] + + if arrow_path is not None: + cmd.append('--arrow=' + arrow_path) + + if json_path is not None: + cmd.append('--json=' + json_path) + + cmd.append('--mode=' + command) + + if self.debug: + print(' '.join(cmd)) + + return run_cmd(cmd) + + def validate(self, json_path, arrow_path): + return self._run(arrow_path, json_path, 'VALIDATE') + + def json_to_arrow(self, json_path, arrow_path): + return self._run(arrow_path, json_path, 'JSON_TO_ARROW') + + +def get_json_files(): + glob_pattern = os.path.join(ARROW_HOME, 'integration', 'data', '*.json') + return glob.glob(glob_pattern) + + +def run_all_tests(debug=False): + testers = [JavaTester(debug=debug), CPPTester(debug=debug)] + json_files = get_json_files() + + runner = IntegrationRunner(json_files, testers, debug=debug) + runner.run() + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Arrow integration test CLI') + parser.add_argument('--debug', dest='debug', action='store_true', + default=False, + help='Run executables in debug mode as relevant') + + args = parser.parse_args() + run_all_tests(debug=args.debug) http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/java/pom.xml ---------------------------------------------------------------------- diff --git a/java/pom.xml b/java/pom.xml index 7221a14..a147d66 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -24,7 +24,7 @@ <packaging>pom</packaging> <name>Apache Arrow Java Root POM</name> - <description>Apache arrow is an open source, low latency SQL query engine for Hadoop and NoSQL.</description> + <description>Apache Arrow is open source, in-memory columnar data structures and low-overhead messaging</description> <url>http://arrow.apache.org/</url> <properties> @@ -442,8 +442,8 @@ <scope>test</scope> </dependency> <dependency> - <!-- Mockito needs to be on the class path after JUnit (or Hamcrest) as - long as Mockito _contains_ older Hamcrest classes. See arrow-2130. --> + <!-- Mockito needs to be on the class path after JUnit (or Hamcrest) as + long as Mockito _contains_ older Hamcrest classes. See arrow-2130. --> <groupId>org.mockito</groupId> <artifactId>mockito-core</artifactId> <version>1.9.5</version> http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/java/tools/pom.xml ---------------------------------------------------------------------- diff --git a/java/tools/pom.xml b/java/tools/pom.xml index 84b0b5e..ef96328 100644 --- a/java/tools/pom.xml +++ b/java/tools/pom.xml @@ -45,6 +45,12 @@ <artifactId>commons-cli</artifactId> <version>1.2</version> </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <version>1.0.13</version> + <scope>run</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/java/tools/src/main/java/org/apache/arrow/tools/Integration.java ---------------------------------------------------------------------- diff --git a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java index 29f0ee2..fa4bedc 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java @@ -220,6 +220,7 @@ public class Integration { private static void fatalError(String message, Throwable e) { System.err.println(message); + System.err.println(e.getMessage()); LOGGER.error(message, e); System.exit(1); } http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java index 4afd823..c5d642e 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java @@ -81,7 +81,9 @@ public class VectorLoader { try { vector.loadFieldBuffers(fieldNode, ownBuffers); } catch (RuntimeException e) { - throw new IllegalArgumentException("Could not load buffers for field " + field, e); + e.printStackTrace(); + throw new IllegalArgumentException("Could not load buffers for field " + + field + " error message" + e.getMessage(), e); } List<Field> children = field.getChildren(); if (children.size() > 0) { http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java index bbcd3e9..cd520da 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java @@ -123,7 +123,11 @@ public class ArrowReader implements AutoCloseable { if (n != l) { throw new IllegalStateException(n + " != " + l); } - RecordBatch recordBatchFB = RecordBatch.getRootAsRecordBatch(buffer.nioBuffer().asReadOnlyBuffer()); + + // Record batch flatbuffer is prefixed by its size as int32le + final ArrowBuf metadata = buffer.slice(4, recordBatchBlock.getMetadataLength() - 4); + RecordBatch recordBatchFB = RecordBatch.getRootAsRecordBatch(metadata.nioBuffer().asReadOnlyBuffer()); + int nodesLength = recordBatchFB.nodesLength(); final ArrowBuf body = buffer.slice(recordBatchBlock.getMetadataLength(), (int)recordBatchBlock.getBodyLength()); List<ArrowFieldNode> nodes = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java index 9881a22..1cd87eb 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java @@ -99,9 +99,10 @@ public class ArrowWriter implements AutoCloseable { public void writeRecordBatch(ArrowRecordBatch recordBatch) throws IOException { checkStarted(); align(); - // write metadata header + + // write metadata header with int32 size prefix long offset = currentPosition; - write(recordBatch); + write(recordBatch, true); align(); // write body long bodyOffset = currentPosition; @@ -117,6 +118,7 @@ public class ArrowWriter implements AutoCloseable { if (startPosition != currentPosition) { writeZeros((int)(startPosition - currentPosition)); } + write(buffer); if (currentPosition != startPosition + layout.getSize()) { throw new IllegalStateException("wrong buffer size: " + currentPosition + " != " + startPosition + layout.getSize()); @@ -133,7 +135,9 @@ public class ArrowWriter implements AutoCloseable { } private void write(ArrowBuf buffer) throws IOException { - write(buffer.nioBuffer(buffer.readerIndex(), buffer.readableBytes())); + ByteBuffer nioBuffer = buffer.nioBuffer(buffer.readerIndex(), buffer.readableBytes()); + LOGGER.debug("Writing buffer with size: " + nioBuffer.remaining()); + write(nioBuffer); } private void checkStarted() throws IOException { @@ -166,14 +170,21 @@ public class ArrowWriter implements AutoCloseable { private void writeFooter() throws IOException { // TODO: dictionaries - write(new ArrowFooter(schema, Collections.<ArrowBlock>emptyList(), recordBatches)); + write(new ArrowFooter(schema, Collections.<ArrowBlock>emptyList(), recordBatches), false); } - private long write(FBSerializable writer) throws IOException { + private long write(FBSerializable writer, boolean withSizePrefix) throws IOException { FlatBufferBuilder builder = new FlatBufferBuilder(); int root = writer.writeTo(builder); builder.finish(root); - return write(builder.dataBuffer()); + + ByteBuffer buffer = builder.dataBuffer(); + + if (withSizePrefix) { + writeIntLittleEndian(buffer.remaining()); + } + + return write(buffer); } } http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java b/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java index f07b517..f205982 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java @@ -127,8 +127,13 @@ public class JsonFileReader implements AutoCloseable { ValueVector valueVector = (ValueVector)innerVector; valueVector.allocateNew(); Mutator mutator = valueVector.getMutator(); - mutator.setValueCount(count); - for (int i = 0; i < count; i++) { + + int innerVectorCount = count; + if (vectorType.getName() == "OFFSET") { + innerVectorCount++; + } + mutator.setValueCount(innerVectorCount); + for (int i = 0; i < innerVectorCount; i++) { parser.nextToken(); setValueFromParser(valueVector, i); } http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java index 812b3da..6ff3577 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java @@ -114,7 +114,7 @@ public class JsonFileWriter implements AutoCloseable { BufferBacked innerVector = fieldInnerVectors.get(v); generator.writeArrayFieldStart(vectorType.getName()); ValueVector valueVector = (ValueVector)innerVector; - for (int i = 0; i < valueCount; i++) { + for (int i = 0; i < valueVector.getAccessor().getValueCount(); i++) { writeValueToGenerator(valueVector, i); } generator.writeEndArray(); http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/python/.gitignore ---------------------------------------------------------------------- diff --git a/python/.gitignore b/python/.gitignore index 07f2835..c37efc4 100644 --- a/python/.gitignore +++ b/python/.gitignore @@ -12,16 +12,6 @@ Testing/ # Editor temporary/working/backup files *flymake* -# Compiled source -*.a -*.dll -*.o -*.py[ocd] -*.so -*.dylib -.build_cache_dir -MANIFEST - # Generated sources *.c *.cpp
