ARROW-363: [Java/C++] integration testing harness, initial integration tests
This also includes format reconciliation as discussed in ARROW-384. Author: Wes McKinney <[email protected]> Closes #211 from wesm/ARROW-363 and squashes the following commits: 6982c3c [Wes McKinney] Permit end of buffer IPC reads if length is 0 4d46c8b [Wes McKinney] Fix logical error with offsets array in JsonFileWriter. Add broken string test case to simple.json 36ab5d6 [Wes McKinney] Increment MetadataVersion in flatbuffer 844257e [Wes McKinney] cpplint a2711f2 [Wes McKinney] Address other format incompatibilities, write vectorLayout to Arrow metadata 13608ef [Wes McKinney] Relax 64 byte padding. Do not write RecordBatch embedded in Message for now 6a66fc8 [Wes McKinney] Write record batch size prefix in Java 72ea42c [Wes McKinney] Note that padding is 64-bytes at start of file (for now) c2ffde4 [Wes McKinney] More notes about the file format aef4382 [Wes McKinney] cpplint 85128f7 [Wes McKinney] Refactor IPC/File record batch read/write structure to reflect discussion in ARROW-384 dbd6ed6 [Wes McKinney] Do not embed metadata length in WriteDataHeader c529d63 [Wes McKinney] Fix JSON integration test example to make it further d806aa6 [Wes McKinney] Exclude JSON files from Apache RAT checks a7e2d4b [Wes McKinney] Draft testing harness Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/e3c167bd Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/e3c167bd Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/e3c167bd Branch: refs/heads/master Commit: e3c167bd101734f92c3a2be2eb7f56f1fba91e67 Parents: 86f56a6 Author: Wes McKinney <[email protected]> Authored: Mon Nov 28 21:29:19 2016 -0500 Committer: Wes McKinney <[email protected]> Committed: Mon Nov 28 21:29:19 2016 -0500 ---------------------------------------------------------------------- .gitignore | 26 ++ cpp/CMakeLists.txt | 1 - cpp/src/arrow/io/io-file-test.cc | 2 +- cpp/src/arrow/io/memory.cc | 25 +- cpp/src/arrow/io/memory.h | 8 +- cpp/src/arrow/ipc/adapter.cc | 251 ++++++++++--------- cpp/src/arrow/ipc/adapter.h | 65 ++--- cpp/src/arrow/ipc/file.cc | 31 ++- cpp/src/arrow/ipc/ipc-adapter-test.cc | 85 ++++--- cpp/src/arrow/ipc/ipc-file-test.cc | 2 +- cpp/src/arrow/ipc/ipc-json-test.cc | 20 +- cpp/src/arrow/ipc/ipc-metadata-test.cc | 12 +- cpp/src/arrow/ipc/json-integration-test.cc | 30 ++- cpp/src/arrow/ipc/json-internal.cc | 110 +++----- cpp/src/arrow/ipc/metadata-internal.cc | 100 +++++--- cpp/src/arrow/ipc/metadata-internal.h | 6 +- cpp/src/arrow/ipc/metadata.cc | 115 +++++---- cpp/src/arrow/ipc/metadata.h | 50 ++-- cpp/src/arrow/ipc/test-common.h | 15 +- cpp/src/arrow/ipc/util.h | 6 +- cpp/src/arrow/test-util.h | 8 +- cpp/src/arrow/type.cc | 46 +++- cpp/src/arrow/type.h | 73 ++++-- cpp/src/arrow/types/primitive.cc | 2 +- cpp/src/arrow/util/bit-util.h | 4 + dev/release/run-rat.sh | 3 +- format/IPC.md | 106 ++++++++ format/Message.fbs | 3 +- integration/data/simple.json | 66 +++++ integration/integration_test.py | 177 +++++++++++++ java/pom.xml | 6 +- java/tools/pom.xml | 6 + .../org/apache/arrow/tools/Integration.java | 1 + .../org/apache/arrow/vector/VectorLoader.java | 4 +- .../apache/arrow/vector/file/ArrowReader.java | 6 +- .../apache/arrow/vector/file/ArrowWriter.java | 23 +- .../arrow/vector/file/json/JsonFileReader.java | 9 +- .../arrow/vector/file/json/JsonFileWriter.java | 2 +- python/.gitignore | 10 - 39 files changed, 1024 insertions(+), 491 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a00cbba --- /dev/null +++ b/.gitignore @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Compiled source +*.a +*.dll +*.o +*.py[ocd] +*.so +*.dylib +.build_cache_dir +MANIFEST http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 0edb8ce..1a97008 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -528,7 +528,6 @@ if(ARROW_BUILD_TESTS) ExternalProject_Add(gflags_ep GIT_REPOSITORY https://github.com/gflags/gflags.git GIT_TAG cce68f0c9c5d054017425e6e6fd54f696d36e8ee - # URL "https://github.com/gflags/gflags/archive/v${GFLAGS_VERSION}.tar.gz" BUILD_IN_SOURCE 1 CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} -DCMAKE_INSTALL_PREFIX=${GFLAGS_PREFIX} http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/io/io-file-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc index 54c21d2..fad49ce 100644 --- a/cpp/src/arrow/io/io-file-test.cc +++ b/cpp/src/arrow/io/io-file-test.cc @@ -19,7 +19,7 @@ #include <cstdio> #include <cstring> #ifndef _MSC_VER -# include <fcntl.h> +#include <fcntl.h> #endif #include <fstream> #include <memory> http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/io/memory.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc index 71b0f1e..af495e2 100644 --- a/cpp/src/arrow/io/memory.cc +++ b/cpp/src/arrow/io/memory.cc @@ -258,8 +258,11 @@ Status BufferOutputStream::Reserve(int64_t nbytes) { // ---------------------------------------------------------------------- // In-memory buffer reader -BufferReader::BufferReader(const uint8_t* buffer, int buffer_size) - : buffer_(buffer), buffer_size_(buffer_size), position_(0) {} +BufferReader::BufferReader(const std::shared_ptr<Buffer>& buffer) + : buffer_(buffer), data_(buffer->data()), size_(buffer->size()), position_(0) {} + +BufferReader::BufferReader(const uint8_t* data, int64_t size) + : buffer_(nullptr), data_(data), size_(size), position_(0) {} BufferReader::~BufferReader() {} @@ -278,26 +281,32 @@ bool BufferReader::supports_zero_copy() const { } Status BufferReader::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) { - memcpy(buffer, buffer_ + position_, nbytes); - *bytes_read = std::min(nbytes, buffer_size_ - position_); + memcpy(buffer, data_ + position_, nbytes); + *bytes_read = std::min(nbytes, size_ - position_); position_ += *bytes_read; return Status::OK(); } Status BufferReader::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) { - int64_t size = std::min(nbytes, buffer_size_ - position_); - *out = std::make_shared<Buffer>(buffer_ + position_, size); + int64_t size = std::min(nbytes, size_ - position_); + + if (buffer_ != nullptr) { + *out = SliceBuffer(buffer_, position_, size); + } else { + *out = std::make_shared<Buffer>(data_ + position_, size); + } + position_ += nbytes; return Status::OK(); } Status BufferReader::GetSize(int64_t* size) { - *size = buffer_size_; + *size = size_; return Status::OK(); } Status BufferReader::Seek(int64_t position) { - if (position < 0 || position >= buffer_size_) { + if (position < 0 || position >= size_) { return Status::IOError("position out of bounds"); } http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/io/memory.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h index df2fe8d..b72f93b 100644 --- a/cpp/src/arrow/io/memory.h +++ b/cpp/src/arrow/io/memory.h @@ -99,7 +99,8 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface { class ARROW_EXPORT BufferReader : public ReadableFileInterface { public: - BufferReader(const uint8_t* buffer, int buffer_size); + explicit BufferReader(const std::shared_ptr<Buffer>& buffer); + BufferReader(const uint8_t* data, int64_t size); ~BufferReader(); Status Close() override; @@ -116,8 +117,9 @@ class ARROW_EXPORT BufferReader : public ReadableFileInterface { bool supports_zero_copy() const override; private: - const uint8_t* buffer_; - int buffer_size_; + std::shared_ptr<Buffer> buffer_; + const uint8_t* data_; + int64_t size_; int64_t position_; }; http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/adapter.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc index da718c0..edf716f 100644 --- a/cpp/src/arrow/ipc/adapter.cc +++ b/cpp/src/arrow/ipc/adapter.cc @@ -48,15 +48,6 @@ namespace flatbuf = org::apache::arrow::flatbuf; namespace ipc { -namespace { -Status CheckMultipleOf64(int64_t size) { - if (BitUtil::IsMultipleOf64(size)) { return Status::OK(); } - return Status::Invalid( - "Attempted to write a buffer that " - "wasn't a multiple of 64 bytes"); -} -} - static bool IsPrimitive(const DataType* type) { DCHECK(type != nullptr); switch (type->type) { @@ -124,30 +115,30 @@ Status VisitArray(const Array* arr, std::vector<flatbuf::FieldNode>* field_nodes class RecordBatchWriter { public: RecordBatchWriter(const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows, - int max_recursion_depth) + int64_t buffer_start_offset, int max_recursion_depth) : columns_(&columns), num_rows_(num_rows), + buffer_start_offset_(buffer_start_offset), max_recursion_depth_(max_recursion_depth) {} - Status AssemblePayload() { + Status AssemblePayload(int64_t* body_length) { + if (field_nodes_.size() > 0) { + field_nodes_.clear(); + buffer_meta_.clear(); + buffers_.clear(); + } + // Perform depth-first traversal of the row-batch for (size_t i = 0; i < columns_->size(); ++i) { const Array* arr = (*columns_)[i].get(); RETURN_NOT_OK(VisitArray(arr, &field_nodes_, &buffers_, max_recursion_depth_)); } - return Status::OK(); - } - Status Write( - io::OutputStream* dst, int64_t* body_end_offset, int64_t* header_end_offset) { - // Get the starting position - int64_t start_position; - RETURN_NOT_OK(dst->Tell(&start_position)); - - // Keep track of the current position so we can determine the size of the - // message body - int64_t position = start_position; + // The position for the start of a buffer relative to the passed frame of + // reference. May be 0 or some other position in an address space + int64_t offset = buffer_start_offset_; + // Construct the buffer metadata for the record batch header for (size_t i = 0; i < buffers_.size(); ++i) { const Buffer* buffer = buffers_[i].get(); int64_t size = 0; @@ -161,65 +152,103 @@ class RecordBatchWriter { // TODO(wesm): We currently have no notion of shared memory page id's, // but we've included it in the metadata IDL for when we have it in the - // future. Use page=0 for now + // future. Use page = -1 for now // // Note that page ids are a bespoke notion for Arrow and not a feature we // are using from any OS-level shared memory. The thought is that systems // may (in the future) associate integer page id's with physical memory // pages (according to whatever is the desired shared memory mechanism) - buffer_meta_.push_back(flatbuf::Buffer(0, position, size + padding)); - - if (size > 0) { - RETURN_NOT_OK(dst->Write(buffer->data(), size)); - position += size; - } - - if (padding > 0) { - RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); - position += padding; - } + buffer_meta_.push_back(flatbuf::Buffer(-1, offset, size + padding)); + offset += size + padding; } - *body_end_offset = position; + *body_length = offset - buffer_start_offset_; + DCHECK(BitUtil::IsMultipleOf64(*body_length)); + + return Status::OK(); + } + Status WriteMetadata( + int64_t body_length, io::OutputStream* dst, int32_t* metadata_length) { // Now that we have computed the locations of all of the buffers in shared // memory, the data header can be converted to a flatbuffer and written out // // Note: The memory written here is prefixed by the size of the flatbuffer - // itself as an int32_t. On reading from a input, you will have to - // determine the data header size then request a buffer such that you can - // construct the flatbuffer data accessor object (see arrow::ipc::Message) - std::shared_ptr<Buffer> data_header; - RETURN_NOT_OK(WriteDataHeader( - num_rows_, position - start_position, field_nodes_, buffer_meta_, &data_header)); + // itself as an int32_t. + std::shared_ptr<Buffer> metadata_fb; + RETURN_NOT_OK(WriteRecordBatchMetadata( + num_rows_, body_length, field_nodes_, buffer_meta_, &metadata_fb)); + + // Need to write 4 bytes (metadata size), the metadata, plus padding to + // fall on a 64-byte offset + int64_t padded_metadata_length = + BitUtil::RoundUpToMultipleOf64(metadata_fb->size() + 4); + + // The returned metadata size includes the length prefix, the flatbuffer, + // plus padding + *metadata_length = padded_metadata_length; - // Write the data header at the end - RETURN_NOT_OK(dst->Write(data_header->data(), data_header->size())); + // Write the flatbuffer size prefix + int32_t flatbuffer_size = metadata_fb->size(); + RETURN_NOT_OK( + dst->Write(reinterpret_cast<const uint8_t*>(&flatbuffer_size), sizeof(int32_t))); - position += data_header->size(); - *header_end_offset = position; + // Write the flatbuffer + RETURN_NOT_OK(dst->Write(metadata_fb->data(), metadata_fb->size())); - return Align(dst, &position); + // Write any padding + int64_t padding = padded_metadata_length - metadata_fb->size() - 4; + if (padding > 0) { RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); } + + return Status::OK(); } - Status Align(io::OutputStream* dst, int64_t* position) { - // Write all buffers here on word boundaries - // TODO(wesm): Is there benefit to 64-byte padding in IPC? - int64_t remainder = PaddedLength(*position) - *position; - if (remainder > 0) { - RETURN_NOT_OK(dst->Write(kPaddingBytes, remainder)); - *position += remainder; + Status Write(io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length) { + RETURN_NOT_OK(AssemblePayload(body_length)); + +#ifndef NDEBUG + int64_t start_position, current_position; + RETURN_NOT_OK(dst->Tell(&start_position)); +#endif + + RETURN_NOT_OK(WriteMetadata(*body_length, dst, metadata_length)); + +#ifndef NDEBUG + RETURN_NOT_OK(dst->Tell(¤t_position)); + DCHECK(BitUtil::IsMultipleOf8(current_position)); +#endif + + // Now write the buffers + for (size_t i = 0; i < buffers_.size(); ++i) { + const Buffer* buffer = buffers_[i].get(); + int64_t size = 0; + int64_t padding = 0; + + // The buffer might be null if we are handling zero row lengths. + if (buffer) { + size = buffer->size(); + padding = BitUtil::RoundUpToMultipleOf64(size) - size; + } + + if (size > 0) { RETURN_NOT_OK(dst->Write(buffer->data(), size)); } + + if (padding > 0) { RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); } } + +#ifndef NDEBUG + RETURN_NOT_OK(dst->Tell(¤t_position)); + DCHECK(BitUtil::IsMultipleOf8(current_position)); +#endif + return Status::OK(); } - // This must be called after invoking AssemblePayload Status GetTotalSize(int64_t* size) { // emulates the behavior of Write without actually writing - int64_t body_offset; - int64_t data_header_offset; + int32_t metadata_length; + int64_t body_length; MockOutputStream dst; - RETURN_NOT_OK(Write(&dst, &body_offset, &data_header_offset)); + RETURN_NOT_OK(Write(&dst, &metadata_length, &body_length)); *size = dst.GetExtentBytesWritten(); return Status::OK(); } @@ -228,6 +257,7 @@ class RecordBatchWriter { // Do not copy this vector. Ownership must be retained elsewhere const std::vector<std::shared_ptr<Array>>* columns_; int32_t num_rows_; + int64_t buffer_start_offset_; std::vector<flatbuf::FieldNode> field_nodes_; std::vector<flatbuf::Buffer> buffer_meta_; @@ -236,18 +266,17 @@ class RecordBatchWriter { }; Status WriteRecordBatch(const std::vector<std::shared_ptr<Array>>& columns, - int32_t num_rows, io::OutputStream* dst, int64_t* body_end_offset, - int64_t* header_end_offset, int max_recursion_depth) { + int32_t num_rows, int64_t buffer_start_offset, io::OutputStream* dst, + int32_t* metadata_length, int64_t* body_length, int max_recursion_depth) { DCHECK_GT(max_recursion_depth, 0); - RecordBatchWriter serializer(columns, num_rows, max_recursion_depth); - RETURN_NOT_OK(serializer.AssemblePayload()); - return serializer.Write(dst, body_end_offset, header_end_offset); + RecordBatchWriter serializer( + columns, num_rows, buffer_start_offset, max_recursion_depth); + return serializer.Write(dst, metadata_length, body_length); } Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size) { RecordBatchWriter serializer( - batch->columns(), batch->num_rows(), kMaxIpcRecursionDepth); - RETURN_NOT_OK(serializer.AssemblePayload()); + batch->columns(), batch->num_rows(), 0, kMaxIpcRecursionDepth); RETURN_NOT_OK(serializer.GetTotalSize(size)); return Status::OK(); } @@ -255,30 +284,33 @@ Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size) { // ---------------------------------------------------------------------- // Record batch read path -class RecordBatchReader::RecordBatchReaderImpl { +class RecordBatchReader { public: - RecordBatchReaderImpl(io::ReadableFileInterface* file, - const std::shared_ptr<RecordBatchMessage>& metadata, int max_recursion_depth) - : file_(file), metadata_(metadata), max_recursion_depth_(max_recursion_depth) { + RecordBatchReader(const std::shared_ptr<RecordBatchMetadata>& metadata, + const std::shared_ptr<Schema>& schema, int max_recursion_depth, + io::ReadableFileInterface* file) + : metadata_(metadata), + schema_(schema), + max_recursion_depth_(max_recursion_depth), + file_(file) { num_buffers_ = metadata->num_buffers(); num_flattened_fields_ = metadata->num_fields(); } - Status AssembleBatch( - const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatch>* out) { - std::vector<std::shared_ptr<Array>> arrays(schema->num_fields()); + Status Read(std::shared_ptr<RecordBatch>* out) { + std::vector<std::shared_ptr<Array>> arrays(schema_->num_fields()); // The field_index and buffer_index are incremented in NextArray based on // how much of the batch is "consumed" (through nested data reconstruction, // for example) field_index_ = 0; buffer_index_ = 0; - for (int i = 0; i < schema->num_fields(); ++i) { - const Field* field = schema->field(i).get(); + for (int i = 0; i < schema_->num_fields(); ++i) { + const Field* field = schema_->field(i).get(); RETURN_NOT_OK(NextArray(field, max_recursion_depth_, &arrays[i])); } - *out = std::make_shared<RecordBatch>(schema, metadata_->length(), arrays); + *out = std::make_shared<RecordBatch>(schema_, metadata_->length(), arrays); return Status::OK(); } @@ -370,67 +402,56 @@ class RecordBatchReader::RecordBatchReaderImpl { Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) { BufferMetadata metadata = metadata_->buffer(buffer_index); - RETURN_NOT_OK(CheckMultipleOf64(metadata.length)); - return file_->ReadAt(metadata.offset, metadata.length, out); + + if (metadata.length == 0) { + *out = std::make_shared<Buffer>(nullptr, 0); + return Status::OK(); + } else { + return file_->ReadAt(metadata.offset, metadata.length, out); + } } private: + std::shared_ptr<RecordBatchMetadata> metadata_; + std::shared_ptr<Schema> schema_; + int max_recursion_depth_; io::ReadableFileInterface* file_; - std::shared_ptr<RecordBatchMessage> metadata_; int field_index_; int buffer_index_; - int max_recursion_depth_; int num_buffers_; int num_flattened_fields_; }; -Status RecordBatchReader::Open(io::ReadableFileInterface* file, int64_t offset, - std::shared_ptr<RecordBatchReader>* out) { - return Open(file, offset, kMaxIpcRecursionDepth, out); -} - -Status RecordBatchReader::Open(io::ReadableFileInterface* file, int64_t offset, - int max_recursion_depth, std::shared_ptr<RecordBatchReader>* out) { +Status ReadRecordBatchMetadata(int64_t offset, int32_t metadata_length, + io::ReadableFileInterface* file, std::shared_ptr<RecordBatchMetadata>* metadata) { std::shared_ptr<Buffer> buffer; - RETURN_NOT_OK(file->ReadAt(offset - sizeof(int32_t), sizeof(int32_t), &buffer)); - - int32_t metadata_size = *reinterpret_cast<const int32_t*>(buffer->data()); + RETURN_NOT_OK(file->ReadAt(offset, metadata_length, &buffer)); - if (metadata_size + static_cast<int>(sizeof(int32_t)) > offset) { - return Status::Invalid("metadata size invalid"); - } - - // Read the metadata - RETURN_NOT_OK( - file->ReadAt(offset - metadata_size - sizeof(int32_t), metadata_size, &buffer)); - - // TODO(wesm): buffer slicing here would be better in case ReadAt returns - // allocated memory - - std::shared_ptr<Message> message; - RETURN_NOT_OK(Message::Open(buffer, &message)); + int32_t flatbuffer_size = *reinterpret_cast<const int32_t*>(buffer->data()); - if (message->type() != Message::RECORD_BATCH) { - return Status::Invalid("Metadata message is not a record batch"); + if (flatbuffer_size + static_cast<int>(sizeof(int32_t)) > metadata_length) { + std::stringstream ss; + ss << "flatbuffer size " << metadata_length << " invalid. File offset: " << offset + << ", metadata length: " << metadata_length; + return Status::Invalid(ss.str()); } - std::shared_ptr<RecordBatchMessage> batch_meta = message->GetRecordBatch(); - - std::shared_ptr<RecordBatchReader> result(new RecordBatchReader()); - result->impl_.reset(new RecordBatchReaderImpl(file, batch_meta, max_recursion_depth)); - *out = result; - + *metadata = std::make_shared<RecordBatchMetadata>(buffer, sizeof(int32_t)); return Status::OK(); } -// Here the explicit destructor is required for compilers to be aware of -// the complete information of RecordBatchReader::RecordBatchReaderImpl class -RecordBatchReader::~RecordBatchReader() {} +Status ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata, + const std::shared_ptr<Schema>& schema, io::ReadableFileInterface* file, + std::shared_ptr<RecordBatch>* out) { + return ReadRecordBatch(metadata, schema, kMaxIpcRecursionDepth, file, out); +} -Status RecordBatchReader::GetRecordBatch( - const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatch>* out) { - return impl_->AssembleBatch(schema, out); +Status ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata, + const std::shared_ptr<Schema>& schema, int max_recursion_depth, + io::ReadableFileInterface* file, std::shared_ptr<RecordBatch>* out) { + RecordBatchReader reader(metadata, schema, max_recursion_depth, file); + return reader.Read(out); } } // namespace ipc http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/adapter.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h index b02de28..963b9ee 100644 --- a/cpp/src/arrow/ipc/adapter.h +++ b/cpp/src/arrow/ipc/adapter.h @@ -43,7 +43,7 @@ class OutputStream; namespace ipc { -class RecordBatchMessage; +class RecordBatchMetadata; // ---------------------------------------------------------------------- // Write path @@ -51,22 +51,30 @@ class RecordBatchMessage; // TODO(emkornfield) investigate this more constexpr int kMaxIpcRecursionDepth = 64; -// Write the RecordBatch (collection of equal-length Arrow arrays) to the output -// stream +// Write the RecordBatch (collection of equal-length Arrow arrays) to the +// output stream in a contiguous block. The record batch metadata is written as +// a flatbuffer (see format/Message.fbs -- the RecordBatch message type) +// prefixed by its size, followed by each of the memory buffers in the batch +// written end to end (with appropriate alignment and padding): // -// First, each of the memory buffers are written out end-to-end -// -// Then, this function writes the batch metadata as a flatbuffer (see -// format/Message.fbs -- the RecordBatch message type) like so: -// -// <int32: metadata size> <uint8*: metadata> +// <int32: metadata size> <uint8*: metadata> <buffers> // // Finally, the absolute offsets (relative to the start of the output stream) // to the end of the body and end of the metadata / data header (suffixed by // the header size) is returned in out-variables +// +// @param(in) buffer_start_offset: the start offset to use in the buffer metadata, +// default should be 0 +// +// @param(out) metadata_length: the size of the length-prefixed flatbuffer +// including padding to a 64-byte boundary +// +// @param(out) body_length: the size of the contiguous buffer block plus +// padding bytes ARROW_EXPORT Status WriteRecordBatch(const std::vector<std::shared_ptr<Array>>& columns, - int32_t num_rows, io::OutputStream* dst, int64_t* body_end_offset, - int64_t* header_end_offset, int max_recursion_depth = kMaxIpcRecursionDepth); + int32_t num_rows, int64_t buffer_start_offset, io::OutputStream* dst, + int32_t* metadata_length, int64_t* body_length, + int max_recursion_depth = kMaxIpcRecursionDepth); // int64_t GetRecordBatchMetadata(const RecordBatch* batch); @@ -78,27 +86,20 @@ ARROW_EXPORT Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size); // ---------------------------------------------------------------------- // "Read" path; does not copy data if the input supports zero copy reads -class ARROW_EXPORT RecordBatchReader { - public: - // The offset is the absolute position to the *end* of the record batch data - // header - static Status Open(io::ReadableFileInterface* file, int64_t offset, - std::shared_ptr<RecordBatchReader>* out); - - static Status Open(io::ReadableFileInterface* file, int64_t offset, - int max_recursion_depth, std::shared_ptr<RecordBatchReader>* out); - - virtual ~RecordBatchReader(); - - // Reassemble the record batch. A Schema is required to be able to construct - // the right array containers - Status GetRecordBatch( - const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatch>* out); - - private: - class RecordBatchReaderImpl; - std::unique_ptr<RecordBatchReaderImpl> impl_; -}; +// Read the record batch flatbuffer metadata starting at the indicated file offset +// +// The flatbuffer is expected to be length-prefixed, so the metadata_length +// includes at least the length prefix and the flatbuffer +Status ARROW_EXPORT ReadRecordBatchMetadata(int64_t offset, int32_t metadata_length, + io::ReadableFileInterface* file, std::shared_ptr<RecordBatchMetadata>* metadata); + +Status ARROW_EXPORT ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata, + const std::shared_ptr<Schema>& schema, io::ReadableFileInterface* file, + std::shared_ptr<RecordBatch>* out); + +Status ARROW_EXPORT ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata, + const std::shared_ptr<Schema>& schema, int max_recursion_depth, + io::ReadableFileInterface* file, std::shared_ptr<RecordBatch>* out); } // namespace ipc } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/file.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/file.cc b/cpp/src/arrow/ipc/file.cc index c68244d..06001cc 100644 --- a/cpp/src/arrow/ipc/file.cc +++ b/cpp/src/arrow/ipc/file.cc @@ -23,6 +23,7 @@ #include <vector> #include "arrow/io/interfaces.h" +#include "arrow/io/memory.h" #include "arrow/ipc/adapter.h" #include "arrow/ipc/metadata.h" #include "arrow/ipc/util.h" @@ -87,19 +88,19 @@ Status FileWriter::WriteRecordBatch( int64_t offset = position_; - int64_t body_end_offset; - int64_t header_end_offset; + // There may be padding ever the end of the metadata, so we cannot rely on + // position_ + int32_t metadata_length; + int64_t body_length; + + // Frame of reference in file format is 0, see ARROW-384 + const int64_t buffer_start_offset = 0; RETURN_NOT_OK(arrow::ipc::WriteRecordBatch( - columns, num_rows, sink_, &body_end_offset, &header_end_offset)); + columns, num_rows, buffer_start_offset, sink_, &metadata_length, &body_length)); RETURN_NOT_OK(UpdatePosition()); DCHECK(position_ % 8 == 0) << "ipc::WriteRecordBatch did not perform aligned writes"; - // There may be padding ever the end of the metadata, so we cannot rely on - // position_ - int32_t metadata_length = header_end_offset - body_end_offset; - int32_t body_length = body_end_offset - offset; - // Append metadata, to be written in the footer later record_batches_.emplace_back(offset, metadata_length, body_length); @@ -198,12 +199,18 @@ Status FileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) { DCHECK_GE(i, 0); DCHECK_LT(i, num_record_batches()); FileBlock block = footer_->record_batch(i); - int64_t metadata_end_offset = block.offset + block.body_length + block.metadata_length; - std::shared_ptr<RecordBatchReader> reader; - RETURN_NOT_OK(RecordBatchReader::Open(file_.get(), metadata_end_offset, &reader)); + std::shared_ptr<RecordBatchMetadata> metadata; + RETURN_NOT_OK(ReadRecordBatchMetadata( + block.offset, block.metadata_length, file_.get(), &metadata)); + + // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see + // ARROW-384). + std::shared_ptr<Buffer> buffer_block; + RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block)); + io::BufferReader reader(buffer_block); - return reader->GetRecordBatch(schema_, batch); + return ReadRecordBatch(metadata, schema_, &reader, batch); } } // namespace ipc http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/ipc-adapter-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc index f5611d4..1accfde 100644 --- a/cpp/src/arrow/ipc/ipc-adapter-test.cc +++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc @@ -54,17 +54,24 @@ class TestWriteRecordBatch : public ::testing::TestWithParam<MakeRecordBatch*>, std::string path = "test-write-row-batch"; io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_); - int64_t body_end_offset; - int64_t header_end_offset; + int32_t metadata_length; + int64_t body_length; - RETURN_NOT_OK(WriteRecordBatch(batch.columns(), batch.num_rows(), mmap_.get(), - &body_end_offset, &header_end_offset)); + const int64_t buffer_offset = 0; - std::shared_ptr<RecordBatchReader> reader; - RETURN_NOT_OK(RecordBatchReader::Open(mmap_.get(), header_end_offset, &reader)); + RETURN_NOT_OK(WriteRecordBatch(batch.columns(), batch.num_rows(), buffer_offset, + mmap_.get(), &metadata_length, &body_length)); - RETURN_NOT_OK(reader->GetRecordBatch(batch.schema(), batch_result)); - return Status::OK(); + std::shared_ptr<RecordBatchMetadata> metadata; + RETURN_NOT_OK(ReadRecordBatchMetadata(0, metadata_length, mmap_.get(), &metadata)); + + // The buffer offsets start at 0, so we must construct a + // ReadableFileInterface according to that frame of reference + std::shared_ptr<Buffer> buffer_payload; + RETURN_NOT_OK(mmap_->ReadAt(metadata_length, body_length, &buffer_payload)); + io::BufferReader buffer_reader(buffer_payload); + + return ReadRecordBatch(metadata, batch.schema(), &buffer_reader, batch_result); } protected: @@ -96,11 +103,11 @@ INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRecordBatch, void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) { ipc::MockOutputStream mock; - int64_t mock_header_offset = -1; - int64_t mock_body_offset = -1; + int32_t mock_metadata_length = -1; + int64_t mock_body_length = -1; int64_t size = -1; - ASSERT_OK(WriteRecordBatch(batch->columns(), batch->num_rows(), &mock, - &mock_body_offset, &mock_header_offset)); + ASSERT_OK(WriteRecordBatch(batch->columns(), batch->num_rows(), 0, &mock, + &mock_metadata_length, &mock_body_length)); ASSERT_OK(GetRecordBatchSize(batch.get(), &size)); ASSERT_EQ(mock.GetExtentBytesWritten(), size); } @@ -129,39 +136,36 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture { void SetUp() { pool_ = default_memory_pool(); } void TearDown() { io::MemoryMapFixture::TearDown(); } - Status WriteToMmap(int recursion_level, bool override_level, - int64_t* header_out = nullptr, std::shared_ptr<Schema>* schema_out = nullptr) { + Status WriteToMmap(int recursion_level, bool override_level, int32_t* metadata_length, + int64_t* body_length, std::shared_ptr<Schema>* schema) { const int batch_length = 5; - TypePtr type = kInt32; + TypePtr type = int32(); ArrayPtr array; const bool include_nulls = true; RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool_, &array)); for (int i = 0; i < recursion_level; ++i) { - type = std::static_pointer_cast<DataType>(std::make_shared<ListType>(type)); + type = list(type); RETURN_NOT_OK( MakeRandomListArray(array, batch_length, include_nulls, pool_, &array)); } - auto f0 = std::make_shared<Field>("f0", type); - std::shared_ptr<Schema> schema(new Schema({f0})); - if (schema_out != nullptr) { *schema_out = schema; } + auto f0 = field("f0", type); + + *schema = std::shared_ptr<Schema>(new Schema({f0})); + std::vector<ArrayPtr> arrays = {array}; - auto batch = std::make_shared<RecordBatch>(schema, batch_length, arrays); + auto batch = std::make_shared<RecordBatch>(*schema, batch_length, arrays); std::string path = "test-write-past-max-recursion"; const int memory_map_size = 1 << 16; io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_); - int64_t body_offset; - int64_t header_offset; - - int64_t* header_out_param = header_out == nullptr ? &header_offset : header_out; if (override_level) { - return WriteRecordBatch(batch->columns(), batch->num_rows(), mmap_.get(), - &body_offset, header_out_param, recursion_level + 1); + return WriteRecordBatch(batch->columns(), batch->num_rows(), 0, mmap_.get(), + metadata_length, body_length, recursion_level + 1); } else { - return WriteRecordBatch(batch->columns(), batch->num_rows(), mmap_.get(), - &body_offset, header_out_param); + return WriteRecordBatch(batch->columns(), batch->num_rows(), 0, mmap_.get(), + metadata_length, body_length); } } @@ -171,18 +175,29 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture { }; TEST_F(RecursionLimits, WriteLimit) { - ASSERT_RAISES(Invalid, WriteToMmap((1 << 8) + 1, false)); + int32_t metadata_length = -1; + int64_t body_length = -1; + std::shared_ptr<Schema> schema; + ASSERT_RAISES( + Invalid, WriteToMmap((1 << 8) + 1, false, &metadata_length, &body_length, &schema)); } TEST_F(RecursionLimits, ReadLimit) { - int64_t header_offset = -1; + int32_t metadata_length = -1; + int64_t body_length = -1; std::shared_ptr<Schema> schema; - ASSERT_OK(WriteToMmap(64, true, &header_offset, &schema)); + ASSERT_OK(WriteToMmap(64, true, &metadata_length, &body_length, &schema)); - std::shared_ptr<RecordBatchReader> reader; - ASSERT_OK(RecordBatchReader::Open(mmap_.get(), header_offset, &reader)); - std::shared_ptr<RecordBatch> batch_result; - ASSERT_RAISES(Invalid, reader->GetRecordBatch(schema, &batch_result)); + std::shared_ptr<RecordBatchMetadata> metadata; + ASSERT_OK(ReadRecordBatchMetadata(0, metadata_length, mmap_.get(), &metadata)); + + std::shared_ptr<Buffer> payload; + ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload)); + + io::BufferReader reader(payload); + + std::shared_ptr<RecordBatch> batch; + ASSERT_RAISES(Invalid, ReadRecordBatch(metadata, schema, &reader, &batch)); } } // namespace ipc http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/ipc-file-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-file-test.cc b/cpp/src/arrow/ipc/ipc-file-test.cc index cd424bf..a1feac4 100644 --- a/cpp/src/arrow/ipc/ipc-file-test.cc +++ b/cpp/src/arrow/ipc/ipc-file-test.cc @@ -68,7 +68,7 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> { RETURN_NOT_OK(sink_->Tell(&footer_offset)); // Open the file - auto reader = std::make_shared<io::BufferReader>(buffer_->data(), buffer_->size()); + auto reader = std::make_shared<io::BufferReader>(buffer_); RETURN_NOT_OK(FileReader::Open(reader, footer_offset, &file_reader_)); EXPECT_EQ(num_batches, file_reader_->num_record_batches()); http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/ipc-json-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-json-test.cc b/cpp/src/arrow/ipc/ipc-json-test.cc index a51371c..e5c3a08 100644 --- a/cpp/src/arrow/ipc/ipc-json-test.cc +++ b/cpp/src/arrow/ipc/ipc-json-test.cc @@ -284,19 +284,23 @@ TEST(TestJsonFileReadWrite, MinimalFormatExample) { "name": "foo", "type": {"name": "int", "isSigned": true, "bitWidth": 32}, "nullable": true, "children": [], - "typeLayout": [ - {"type": "VALIDITY", "typeBitWidth": 1}, - {"type": "DATA", "typeBitWidth": 32} - ] + "typeLayout": { + "vectors": [ + {"type": "VALIDITY", "typeBitWidth": 1}, + {"type": "DATA", "typeBitWidth": 32} + ] + } }, { "name": "bar", "type": {"name": "floatingpoint", "precision": "DOUBLE"}, "nullable": true, "children": [], - "typeLayout": [ - {"type": "VALIDITY", "typeBitWidth": 1}, - {"type": "DATA", "typeBitWidth": 64} - ] + "typeLayout": { + "vectors": [ + {"type": "VALIDITY", "typeBitWidth": 1}, + {"type": "DATA", "typeBitWidth": 64} + ] + } } ] }, http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/ipc-metadata-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-metadata-test.cc b/cpp/src/arrow/ipc/ipc-metadata-test.cc index 1dc3969..d29583f 100644 --- a/cpp/src/arrow/ipc/ipc-metadata-test.cc +++ b/cpp/src/arrow/ipc/ipc-metadata-test.cc @@ -43,7 +43,7 @@ static inline void assert_schema_equal(const Schema* lhs, const Schema* rhs) { } } -class TestSchemaMessage : public ::testing::Test { +class TestSchemaMetadata : public ::testing::Test { public: void SetUp() {} @@ -52,11 +52,11 @@ class TestSchemaMessage : public ::testing::Test { ASSERT_OK(WriteSchema(schema, &buffer)); std::shared_ptr<Message> message; - ASSERT_OK(Message::Open(buffer, &message)); + ASSERT_OK(Message::Open(buffer, 0, &message)); ASSERT_EQ(Message::SCHEMA, message->type()); - std::shared_ptr<SchemaMessage> schema_msg = message->GetSchema(); + auto schema_msg = std::make_shared<SchemaMetadata>(message); ASSERT_EQ(schema->num_fields(), schema_msg->num_fields()); std::shared_ptr<Schema> schema2; @@ -68,7 +68,7 @@ class TestSchemaMessage : public ::testing::Test { const std::shared_ptr<DataType> INT32 = std::make_shared<Int32Type>(); -TEST_F(TestSchemaMessage, PrimitiveFields) { +TEST_F(TestSchemaMetadata, PrimitiveFields) { auto f0 = std::make_shared<Field>("f0", std::make_shared<Int8Type>()); auto f1 = std::make_shared<Field>("f1", std::make_shared<Int16Type>()); auto f2 = std::make_shared<Field>("f2", std::make_shared<Int32Type>()); @@ -85,7 +85,7 @@ TEST_F(TestSchemaMessage, PrimitiveFields) { CheckRoundtrip(&schema); } -TEST_F(TestSchemaMessage, NestedFields) { +TEST_F(TestSchemaMetadata, NestedFields) { auto type = std::make_shared<ListType>(std::make_shared<Int32Type>()); auto f0 = std::make_shared<Field>("f0", type); @@ -111,7 +111,7 @@ class TestFileFooter : public ::testing::Test { std::unique_ptr<FileFooter> footer; ASSERT_OK(FileFooter::Open(buffer, &footer)); - ASSERT_EQ(MetadataVersion::V1_SNAPSHOT, footer->version()); + ASSERT_EQ(MetadataVersion::V2, footer->version()); // Check schema std::shared_ptr<Schema> schema2; http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/json-integration-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc index 5eff899..7a313f7 100644 --- a/cpp/src/arrow/ipc/json-integration-test.cc +++ b/cpp/src/arrow/ipc/json-integration-test.cc @@ -255,19 +255,23 @@ static const char* JSON_EXAMPLE = R"example( "name": "foo", "type": {"name": "int", "isSigned": true, "bitWidth": 32}, "nullable": true, "children": [], - "typeLayout": [ - {"type": "VALIDITY", "typeBitWidth": 1}, - {"type": "DATA", "typeBitWidth": 32} - ] + "typeLayout": { + "vectors": [ + {"type": "VALIDITY", "typeBitWidth": 1}, + {"type": "DATA", "typeBitWidth": 32} + ] + } }, { "name": "bar", "type": {"name": "floatingpoint", "precision": "DOUBLE"}, "nullable": true, "children": [], - "typeLayout": [ - {"type": "VALIDITY", "typeBitWidth": 1}, - {"type": "DATA", "typeBitWidth": 64} - ] + "typeLayout": { + "vectors": [ + {"type": "VALIDITY", "typeBitWidth": 1}, + {"type": "DATA", "typeBitWidth": 64} + ] + } } ] }, @@ -301,10 +305,12 @@ static const char* JSON_EXAMPLE2 = R"example( "name": "foo", "type": {"name": "int", "isSigned": true, "bitWidth": 32}, "nullable": true, "children": [], - "typeLayout": [ - {"type": "VALIDITY", "typeBitWidth": 1}, - {"type": "DATA", "typeBitWidth": 32} - ] + "typeLayout": { + "vectors": [ + {"type": "VALIDITY", "typeBitWidth": 1}, + {"type": "DATA", "typeBitWidth": 32} + ] + } } ] }, http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/json-internal.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/json-internal.cc b/cpp/src/arrow/ipc/json-internal.cc index 31fe35b..e56bcb3 100644 --- a/cpp/src/arrow/ipc/json-internal.cc +++ b/cpp/src/arrow/ipc/json-internal.cc @@ -45,8 +45,6 @@ namespace ipc { using RjArray = rj::Value::ConstArray; using RjObject = rj::Value::ConstObject; -enum class BufferType : char { DATA, OFFSET, TYPE, VALIDITY }; - static std::string GetBufferTypeName(BufferType type) { switch (type) { case BufferType::DATA: @@ -93,27 +91,6 @@ static std::string GetTimeUnitName(TimeUnit unit) { return "UNKNOWN"; } -class BufferLayout { - public: - BufferLayout(BufferType type, int bit_width) : type_(type), bit_width_(bit_width) {} - - BufferType type() const { return type_; } - int bit_width() const { return bit_width_; } - - private: - BufferType type_; - int bit_width_; -}; - -static const BufferLayout kValidityBuffer(BufferType::VALIDITY, 1); -static const BufferLayout kOffsetBuffer(BufferType::OFFSET, 32); -static const BufferLayout kTypeBuffer(BufferType::TYPE, 32); -static const BufferLayout kBooleanBuffer(BufferType::DATA, 1); -static const BufferLayout kValues64(BufferType::DATA, 64); -static const BufferLayout kValues32(BufferType::DATA, 32); -static const BufferLayout kValues16(BufferType::DATA, 16); -static const BufferLayout kValues8(BufferType::DATA, 8); - class JsonSchemaWriter : public TypeVisitor { public: explicit JsonSchemaWriter(const Schema& schema, RjWriter* writer) @@ -154,9 +131,9 @@ class JsonSchemaWriter : public TypeVisitor { } template <typename T> - typename std::enable_if<std::is_base_of<NoExtraMeta, T>::value || - std::is_base_of<BooleanType, T>::value || - std::is_base_of<NullType, T>::value, + typename std::enable_if< + std::is_base_of<NoExtraMeta, T>::value || std::is_base_of<BooleanType, T>::value || + std::is_base_of<DateType, T>::value || std::is_base_of<NullType, T>::value, void>::type WriteTypeMetadata(const T& type) {} @@ -243,11 +220,10 @@ class JsonSchemaWriter : public TypeVisitor { } template <typename T> - Status WritePrimitive(const std::string& typeclass, const T& type, - const std::vector<BufferLayout>& buffer_layout) { + Status WritePrimitive(const std::string& typeclass, const T& type) { WriteName(typeclass, type); SetNoChildren(); - WriteBufferLayout(buffer_layout); + WriteBufferLayout(type.GetBufferLayout()); return Status::OK(); } @@ -255,15 +231,17 @@ class JsonSchemaWriter : public TypeVisitor { Status WriteVarBytes(const std::string& typeclass, const T& type) { WriteName(typeclass, type); SetNoChildren(); - WriteBufferLayout({kValidityBuffer, kOffsetBuffer, kValues8}); + WriteBufferLayout(type.GetBufferLayout()); return Status::OK(); } - void WriteBufferLayout(const std::vector<BufferLayout>& buffer_layout) { + void WriteBufferLayout(const std::vector<BufferDescr>& buffer_layout) { writer_->Key("typeLayout"); + writer_->StartObject(); + writer_->Key("vectors"); writer_->StartArray(); - for (const BufferLayout& buffer : buffer_layout) { + for (const BufferDescr& buffer : buffer_layout) { writer_->StartObject(); writer_->Key("type"); writer_->String(GetBufferTypeName(buffer.type())); @@ -274,6 +252,7 @@ class JsonSchemaWriter : public TypeVisitor { writer_->EndObject(); } writer_->EndArray(); + writer_->EndObject(); } Status WriteChildren(const std::vector<std::shared_ptr<Field>>& children) { @@ -286,74 +265,52 @@ class JsonSchemaWriter : public TypeVisitor { return Status::OK(); } - Status Visit(const NullType& type) override { return WritePrimitive("null", type, {}); } + Status Visit(const NullType& type) override { return WritePrimitive("null", type); } - Status Visit(const BooleanType& type) override { - return WritePrimitive("bool", type, {kValidityBuffer, kBooleanBuffer}); - } + Status Visit(const BooleanType& type) override { return WritePrimitive("bool", type); } - Status Visit(const Int8Type& type) override { - return WritePrimitive("int", type, {kValidityBuffer, kValues8}); - } + Status Visit(const Int8Type& type) override { return WritePrimitive("int", type); } - Status Visit(const Int16Type& type) override { - return WritePrimitive("int", type, {kValidityBuffer, kValues16}); - } + Status Visit(const Int16Type& type) override { return WritePrimitive("int", type); } - Status Visit(const Int32Type& type) override { - return WritePrimitive("int", type, {kValidityBuffer, kValues32}); - } + Status Visit(const Int32Type& type) override { return WritePrimitive("int", type); } - Status Visit(const Int64Type& type) override { - return WritePrimitive("int", type, {kValidityBuffer, kValues64}); - } + Status Visit(const Int64Type& type) override { return WritePrimitive("int", type); } - Status Visit(const UInt8Type& type) override { - return WritePrimitive("int", type, {kValidityBuffer, kValues8}); - } + Status Visit(const UInt8Type& type) override { return WritePrimitive("int", type); } - Status Visit(const UInt16Type& type) override { - return WritePrimitive("int", type, {kValidityBuffer, kValues16}); - } + Status Visit(const UInt16Type& type) override { return WritePrimitive("int", type); } - Status Visit(const UInt32Type& type) override { - return WritePrimitive("int", type, {kValidityBuffer, kValues32}); - } + Status Visit(const UInt32Type& type) override { return WritePrimitive("int", type); } - Status Visit(const UInt64Type& type) override { - return WritePrimitive("int", type, {kValidityBuffer, kValues64}); - } + Status Visit(const UInt64Type& type) override { return WritePrimitive("int", type); } Status Visit(const HalfFloatType& type) override { - return WritePrimitive("floatingpoint", type, {kValidityBuffer, kValues16}); + return WritePrimitive("floatingpoint", type); } Status Visit(const FloatType& type) override { - return WritePrimitive("floatingpoint", type, {kValidityBuffer, kValues32}); + return WritePrimitive("floatingpoint", type); } Status Visit(const DoubleType& type) override { - return WritePrimitive("floatingpoint", type, {kValidityBuffer, kValues64}); + return WritePrimitive("floatingpoint", type); } Status Visit(const StringType& type) override { return WriteVarBytes("utf8", type); } Status Visit(const BinaryType& type) override { return WriteVarBytes("binary", type); } - Status Visit(const DateType& type) override { - return WritePrimitive("date", type, {kValidityBuffer, kValues64}); - } + Status Visit(const DateType& type) override { return WritePrimitive("date", type); } - Status Visit(const TimeType& type) override { - return WritePrimitive("time", type, {kValidityBuffer, kValues64}); - } + Status Visit(const TimeType& type) override { return WritePrimitive("time", type); } Status Visit(const TimestampType& type) override { - return WritePrimitive("timestamp", type, {kValidityBuffer, kValues64}); + return WritePrimitive("timestamp", type); } Status Visit(const IntervalType& type) override { - return WritePrimitive("interval", type, {kValidityBuffer, kValues64}); + return WritePrimitive("interval", type); } Status Visit(const DecimalType& type) override { return Status::NotImplemented("NYI"); } @@ -361,26 +318,21 @@ class JsonSchemaWriter : public TypeVisitor { Status Visit(const ListType& type) override { WriteName("list", type); RETURN_NOT_OK(WriteChildren(type.children())); - WriteBufferLayout({kValidityBuffer, kOffsetBuffer}); + WriteBufferLayout(type.GetBufferLayout()); return Status::OK(); } Status Visit(const StructType& type) override { WriteName("struct", type); WriteChildren(type.children()); - WriteBufferLayout({kValidityBuffer, kTypeBuffer}); + WriteBufferLayout(type.GetBufferLayout()); return Status::OK(); } Status Visit(const UnionType& type) override { WriteName("union", type); WriteChildren(type.children()); - - if (type.mode == UnionMode::SPARSE) { - WriteBufferLayout({kValidityBuffer, kTypeBuffer}); - } else { - WriteBufferLayout({kValidityBuffer, kTypeBuffer, kOffsetBuffer}); - } + WriteBufferLayout(type.GetBufferLayout()); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/metadata-internal.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc index 7102012..b995228 100644 --- a/cpp/src/arrow/ipc/metadata-internal.cc +++ b/cpp/src/arrow/ipc/metadata-internal.cc @@ -37,20 +37,6 @@ namespace flatbuf = org::apache::arrow::flatbuf; namespace ipc { -const std::shared_ptr<DataType> BOOL = std::make_shared<BooleanType>(); -const std::shared_ptr<DataType> INT8 = std::make_shared<Int8Type>(); -const std::shared_ptr<DataType> INT16 = std::make_shared<Int16Type>(); -const std::shared_ptr<DataType> INT32 = std::make_shared<Int32Type>(); -const std::shared_ptr<DataType> INT64 = std::make_shared<Int64Type>(); -const std::shared_ptr<DataType> UINT8 = std::make_shared<UInt8Type>(); -const std::shared_ptr<DataType> UINT16 = std::make_shared<UInt16Type>(); -const std::shared_ptr<DataType> UINT32 = std::make_shared<UInt32Type>(); -const std::shared_ptr<DataType> UINT64 = std::make_shared<UInt64Type>(); -const std::shared_ptr<DataType> FLOAT = std::make_shared<FloatType>(); -const std::shared_ptr<DataType> DOUBLE = std::make_shared<DoubleType>(); -const std::shared_ptr<DataType> STRING = std::make_shared<StringType>(); -const std::shared_ptr<DataType> BINARY = std::make_shared<BinaryType>(); - static Status IntFromFlatbuffer( const flatbuf::Int* int_data, std::shared_ptr<DataType>* out) { if (int_data->bitWidth() > 64) { @@ -62,16 +48,16 @@ static Status IntFromFlatbuffer( switch (int_data->bitWidth()) { case 8: - *out = int_data->is_signed() ? INT8 : UINT8; + *out = int_data->is_signed() ? int8() : uint8(); break; case 16: - *out = int_data->is_signed() ? INT16 : UINT16; + *out = int_data->is_signed() ? int16() : uint16(); break; case 32: - *out = int_data->is_signed() ? INT32 : UINT32; + *out = int_data->is_signed() ? int32() : uint32(); break; case 64: - *out = int_data->is_signed() ? INT64 : UINT64; + *out = int_data->is_signed() ? int64() : uint64(); break; default: return Status::NotImplemented("Integers not in cstdint are not implemented"); @@ -81,10 +67,12 @@ static Status IntFromFlatbuffer( static Status FloatFromFlatuffer( const flatbuf::FloatingPoint* float_data, std::shared_ptr<DataType>* out) { - if (float_data->precision() == flatbuf::Precision_SINGLE) { - *out = FLOAT; + if (float_data->precision() == flatbuf::Precision_HALF) { + *out = float16(); + } else if (float_data->precision() == flatbuf::Precision_SINGLE) { + *out = float32(); } else { - *out = DOUBLE; + *out = float64(); } return Status::OK(); } @@ -100,13 +88,13 @@ static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data, return FloatFromFlatuffer( static_cast<const flatbuf::FloatingPoint*>(type_data), out); case flatbuf::Type_Binary: - *out = BINARY; + *out = binary(); return Status::OK(); case flatbuf::Type_Utf8: - *out = STRING; + *out = utf8(); return Status::OK(); case flatbuf::Type_Bool: - *out = BOOL; + *out = boolean(); return Status::OK(); case flatbuf::Type_Decimal: case flatbuf::Type_Timestamp: @@ -164,7 +152,32 @@ static Status StructToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type break; static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type, - std::vector<FieldOffset>* children, flatbuf::Type* out_type, Offset* offset) { + std::vector<FieldOffset>* children, std::vector<VectorLayoutOffset>* layout, + flatbuf::Type* out_type, Offset* offset) { + std::vector<BufferDescr> buffer_layout = type->GetBufferLayout(); + for (const BufferDescr& descr : buffer_layout) { + flatbuf::VectorType vector_type; + switch (descr.type()) { + case BufferType::OFFSET: + vector_type = flatbuf::VectorType_OFFSET; + break; + case BufferType::DATA: + vector_type = flatbuf::VectorType_DATA; + break; + case BufferType::VALIDITY: + vector_type = flatbuf::VectorType_VALIDITY; + break; + case BufferType::TYPE: + vector_type = flatbuf::VectorType_TYPE; + break; + default: + vector_type = flatbuf::VectorType_DATA; + break; + } + auto offset = flatbuf::CreateVectorLayout(fbb, descr.bit_width(), vector_type); + layout->push_back(offset); + } + switch (type->type) { case Type::BOOL: *out_type = flatbuf::Type_Bool; @@ -223,14 +236,18 @@ static Status FieldToFlatbuffer( flatbuf::Type type_enum; Offset type_data; + Offset type_layout; std::vector<FieldOffset> children; + std::vector<VectorLayoutOffset> layout; - RETURN_NOT_OK(TypeToFlatbuffer(fbb, field->type, &children, &type_enum, &type_data)); + RETURN_NOT_OK( + TypeToFlatbuffer(fbb, field->type, &children, &layout, &type_enum, &type_data)); auto fb_children = fbb.CreateVector(children); + auto fb_layout = fbb.CreateVector(layout); // TODO: produce the list of VectorTypes *offset = flatbuf::CreateField(fbb, fb_name, field->nullable, type_enum, type_data, - field->dictionary, fb_children); + field->dictionary, fb_children, fb_layout); return Status::OK(); } @@ -300,13 +317,26 @@ Status MessageBuilder::SetRecordBatch(int32_t length, int64_t body_length, return Status::OK(); } -Status WriteDataHeader(int32_t length, int64_t body_length, +Status WriteRecordBatchMetadata(int32_t length, int64_t body_length, const std::vector<flatbuf::FieldNode>& nodes, const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out) { - MessageBuilder message; - RETURN_NOT_OK(message.SetRecordBatch(length, body_length, nodes, buffers)); - RETURN_NOT_OK(message.Finish()); - return message.GetBuffer(out); + flatbuffers::FlatBufferBuilder fbb; + + auto batch = flatbuf::CreateRecordBatch( + fbb, length, fbb.CreateVectorOfStructs(nodes), fbb.CreateVectorOfStructs(buffers)); + + fbb.Finish(batch); + + int32_t size = fbb.GetSize(); + + auto result = std::make_shared<PoolBuffer>(); + RETURN_NOT_OK(result->Resize(size)); + + uint8_t* dst = result->mutable_data(); + memcpy(dst, fbb.GetBufferPointer(), size); + + *out = result; + return Status::OK(); } Status MessageBuilder::Finish() { @@ -317,17 +347,13 @@ Status MessageBuilder::Finish() { } Status MessageBuilder::GetBuffer(std::shared_ptr<Buffer>* out) { - // The message buffer is suffixed by the size of the complete flatbuffer as - // int32_t - // <uint8_t*: flatbuffer data><int32_t: flatbuffer size> int32_t size = fbb_.GetSize(); auto result = std::make_shared<PoolBuffer>(); - RETURN_NOT_OK(result->Resize(size + sizeof(int32_t))); + RETURN_NOT_OK(result->Resize(size)); uint8_t* dst = result->mutable_data(); memcpy(dst, fbb_.GetBufferPointer(), size); - memcpy(dst + size, reinterpret_cast<int32_t*>(&size), sizeof(int32_t)); *out = result; return Status::OK(); http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/metadata-internal.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h index c404cfd..4826ebe 100644 --- a/cpp/src/arrow/ipc/metadata-internal.h +++ b/cpp/src/arrow/ipc/metadata-internal.h @@ -41,10 +41,10 @@ namespace ipc { using FBB = flatbuffers::FlatBufferBuilder; using FieldOffset = flatbuffers::Offset<arrow::flatbuf::Field>; +using VectorLayoutOffset = flatbuffers::Offset<arrow::flatbuf::VectorLayout>; using Offset = flatbuffers::Offset<void>; -static constexpr flatbuf::MetadataVersion kMetadataVersion = - flatbuf::MetadataVersion_V1_SNAPSHOT; +static constexpr flatbuf::MetadataVersion kMetadataVersion = flatbuf::MetadataVersion_V2; Status FieldFromFlatbuffer(const flatbuf::Field* field, std::shared_ptr<Field>* out); @@ -70,7 +70,7 @@ class MessageBuilder { flatbuffers::FlatBufferBuilder fbb_; }; -Status WriteDataHeader(int32_t length, int64_t body_length, +Status WriteRecordBatchMetadata(int32_t length, int64_t body_length, const std::vector<flatbuf::FieldNode>& nodes, const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out); http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/metadata.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc index 66df8a6..44d3939 100644 --- a/cpp/src/arrow/ipc/metadata.cc +++ b/cpp/src/arrow/ipc/metadata.cc @@ -50,9 +50,15 @@ Status WriteSchema(const Schema* schema, std::shared_ptr<Buffer>* out) { class Message::MessageImpl { public: - explicit MessageImpl( - const std::shared_ptr<Buffer>& buffer, const flatbuf::Message* message) - : buffer_(buffer), message_(message) {} + explicit MessageImpl(const std::shared_ptr<Buffer>& buffer, int64_t offset) + : buffer_(buffer), offset_(offset), message_(nullptr) {} + + Status Open() { + message_ = flatbuf::GetMessage(buffer_->data() + offset_); + + // TODO(wesm): verify the message + return Status::OK(); + } Message::Type type() const { switch (message_->header_type()) { @@ -72,25 +78,23 @@ class Message::MessageImpl { int64_t body_length() const { return message_->bodyLength(); } private: - // Owns the memory this message accesses + // Retain reference to memory std::shared_ptr<Buffer> buffer_; + int64_t offset_; const flatbuf::Message* message_; }; -Message::Message() {} - -Status Message::Open( - const std::shared_ptr<Buffer>& buffer, std::shared_ptr<Message>* out) { - std::shared_ptr<Message> result(new Message()); - - const flatbuf::Message* message = flatbuf::GetMessage(buffer->data()); +Message::Message(const std::shared_ptr<Buffer>& buffer, int64_t offset) { + impl_.reset(new MessageImpl(buffer, offset)); +} - // TODO(wesm): verify message - result->impl_.reset(new MessageImpl(buffer, message)); - *out = result; +Status Message::Open(const std::shared_ptr<Buffer>& buffer, int64_t offset, + std::shared_ptr<Message>* out) { + // ctor is private - return Status::OK(); + *out = std::shared_ptr<Message>(new Message(buffer, offset)); + return (*out)->impl_->Open(); } Message::Type Message::type() const { @@ -101,20 +105,12 @@ int64_t Message::body_length() const { return impl_->body_length(); } -std::shared_ptr<Message> Message::get_shared_ptr() { - return this->shared_from_this(); -} - -std::shared_ptr<SchemaMessage> Message::GetSchema() { - return std::make_shared<SchemaMessage>(this->shared_from_this(), impl_->header()); -} - // ---------------------------------------------------------------------- -// SchemaMessage +// SchemaMetadata -class SchemaMessage::SchemaMessageImpl { +class SchemaMetadata::SchemaMetadataImpl { public: - explicit SchemaMessageImpl(const void* schema) + explicit SchemaMetadataImpl(const void* schema) : schema_(static_cast<const flatbuf::Schema*>(schema)) {} const flatbuf::Field* field(int i) const { return schema_->fields()->Get(i); } @@ -125,22 +121,29 @@ class SchemaMessage::SchemaMessageImpl { const flatbuf::Schema* schema_; }; -SchemaMessage::SchemaMessage( - const std::shared_ptr<Message>& message, const void* schema) { +SchemaMetadata::SchemaMetadata( + const std::shared_ptr<Message>& message, const void* flatbuf) { + message_ = message; + impl_.reset(new SchemaMetadataImpl(flatbuf)); +} + +SchemaMetadata::SchemaMetadata(const std::shared_ptr<Message>& message) { message_ = message; - impl_.reset(new SchemaMessageImpl(schema)); + impl_.reset(new SchemaMetadataImpl(message->impl_->header())); } -int SchemaMessage::num_fields() const { +SchemaMetadata::~SchemaMetadata() {} + +int SchemaMetadata::num_fields() const { return impl_->num_fields(); } -Status SchemaMessage::GetField(int i, std::shared_ptr<Field>* out) const { +Status SchemaMetadata::GetField(int i, std::shared_ptr<Field>* out) const { const flatbuf::Field* field = impl_->field(i); return FieldFromFlatbuffer(field, out); } -Status SchemaMessage::GetSchema(std::shared_ptr<Schema>* out) const { +Status SchemaMetadata::GetSchema(std::shared_ptr<Schema>* out) const { std::vector<std::shared_ptr<Field>> fields(num_fields()); for (int i = 0; i < this->num_fields(); ++i) { RETURN_NOT_OK(GetField(i, &fields[i])); @@ -150,11 +153,11 @@ Status SchemaMessage::GetSchema(std::shared_ptr<Schema>* out) const { } // ---------------------------------------------------------------------- -// RecordBatchMessage +// RecordBatchMetadata -class RecordBatchMessage::RecordBatchMessageImpl { +class RecordBatchMetadata::RecordBatchMetadataImpl { public: - explicit RecordBatchMessageImpl(const void* batch) + explicit RecordBatchMetadataImpl(const void* batch) : batch_(static_cast<const flatbuf::RecordBatch*>(batch)) { nodes_ = batch_->nodes(); buffers_ = batch_->buffers(); @@ -176,19 +179,29 @@ class RecordBatchMessage::RecordBatchMessageImpl { const flatbuffers::Vector<const flatbuf::Buffer*>* buffers_; }; -std::shared_ptr<RecordBatchMessage> Message::GetRecordBatch() { - return std::make_shared<RecordBatchMessage>(this->shared_from_this(), impl_->header()); +RecordBatchMetadata::RecordBatchMetadata(const std::shared_ptr<Message>& message) { + message_ = message; + impl_.reset(new RecordBatchMetadataImpl(message->impl_->header())); } -RecordBatchMessage::RecordBatchMessage( - const std::shared_ptr<Message>& message, const void* batch) { - message_ = message; - impl_.reset(new RecordBatchMessageImpl(batch)); +RecordBatchMetadata::RecordBatchMetadata( + const std::shared_ptr<Buffer>& buffer, int64_t offset) { + message_ = nullptr; + buffer_ = buffer; + + const flatbuf::RecordBatch* metadata = + flatbuffers::GetRoot<flatbuf::RecordBatch>(buffer->data() + offset); + + // TODO(wesm): validate table + + impl_.reset(new RecordBatchMetadataImpl(metadata)); } +RecordBatchMetadata::~RecordBatchMetadata() {} + // TODO(wesm): Copying the flatbuffer data isn't great, but this will do for // now -FieldMetadata RecordBatchMessage::field(int i) const { +FieldMetadata RecordBatchMetadata::field(int i) const { const flatbuf::FieldNode* node = impl_->field(i); FieldMetadata result; @@ -197,7 +210,7 @@ FieldMetadata RecordBatchMessage::field(int i) const { return result; } -BufferMetadata RecordBatchMessage::buffer(int i) const { +BufferMetadata RecordBatchMetadata::buffer(int i) const { const flatbuf::Buffer* buffer = impl_->buffer(i); BufferMetadata result; @@ -207,15 +220,15 @@ BufferMetadata RecordBatchMessage::buffer(int i) const { return result; } -int32_t RecordBatchMessage::length() const { +int32_t RecordBatchMetadata::length() const { return impl_->length(); } -int RecordBatchMessage::num_buffers() const { +int RecordBatchMetadata::num_buffers() const { return impl_->num_buffers(); } -int RecordBatchMessage::num_fields() const { +int RecordBatchMetadata::num_fields() const { return impl_->num_fields(); } @@ -268,11 +281,13 @@ class FileFooter::FileFooterImpl { MetadataVersion::type version() const { switch (footer_->version()) { - case flatbuf::MetadataVersion_V1_SNAPSHOT: - return MetadataVersion::V1_SNAPSHOT; + case flatbuf::MetadataVersion_V1: + return MetadataVersion::V1; + case flatbuf::MetadataVersion_V2: + return MetadataVersion::V2; // Add cases as other versions become available default: - return MetadataVersion::V1_SNAPSHOT; + return MetadataVersion::V2; } } @@ -285,7 +300,7 @@ class FileFooter::FileFooterImpl { } Status GetSchema(std::shared_ptr<Schema>* out) const { - auto schema_msg = std::make_shared<SchemaMessage>(nullptr, footer_->schema()); + auto schema_msg = std::make_shared<SchemaMetadata>(nullptr, footer_->schema()); return schema_msg->GetSchema(out); } http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/metadata.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h index 2f0e853..1c4ef64 100644 --- a/cpp/src/arrow/ipc/metadata.h +++ b/cpp/src/arrow/ipc/metadata.h @@ -42,7 +42,7 @@ class OutputStream; namespace ipc { struct MetadataVersion { - enum type { V1_SNAPSHOT }; + enum type { V1, V2 }; }; //---------------------------------------------------------------------- @@ -58,10 +58,14 @@ Status WriteSchema(const Schema* schema, std::shared_ptr<Buffer>* out); class Message; // Container for serialized Schema metadata contained in an IPC message -class ARROW_EXPORT SchemaMessage { +class ARROW_EXPORT SchemaMetadata { public: + explicit SchemaMetadata(const std::shared_ptr<Message>& message); + // Accepts an opaque flatbuffer pointer - SchemaMessage(const std::shared_ptr<Message>& message, const void* schema); + SchemaMetadata(const std::shared_ptr<Message>& message, const void* schema); + + ~SchemaMetadata(); int num_fields() const; @@ -76,8 +80,8 @@ class ARROW_EXPORT SchemaMessage { // Parent, owns the flatbuffer data std::shared_ptr<Message> message_; - class SchemaMessageImpl; - std::unique_ptr<SchemaMessageImpl> impl_; + class SchemaMetadataImpl; + std::unique_ptr<SchemaMetadataImpl> impl_; }; // Field metadata @@ -93,10 +97,13 @@ struct BufferMetadata { }; // Container for serialized record batch metadata contained in an IPC message -class ARROW_EXPORT RecordBatchMessage { +class ARROW_EXPORT RecordBatchMetadata { public: - // Accepts an opaque flatbuffer pointer - RecordBatchMessage(const std::shared_ptr<Message>& message, const void* batch_meta); + explicit RecordBatchMetadata(const std::shared_ptr<Message>& message); + + RecordBatchMetadata(const std::shared_ptr<Buffer>& message, int64_t offset); + + ~RecordBatchMetadata(); FieldMetadata field(int i) const; BufferMetadata buffer(int i) const; @@ -108,37 +115,34 @@ class ARROW_EXPORT RecordBatchMessage { private: // Parent, owns the flatbuffer data std::shared_ptr<Message> message_; + std::shared_ptr<Buffer> buffer_; - class RecordBatchMessageImpl; - std::unique_ptr<RecordBatchMessageImpl> impl_; + class RecordBatchMetadataImpl; + std::unique_ptr<RecordBatchMetadataImpl> impl_; }; -class ARROW_EXPORT DictionaryBatchMessage { +class ARROW_EXPORT DictionaryBatchMetadata { public: int64_t id() const; - std::unique_ptr<RecordBatchMessage> data() const; + std::unique_ptr<RecordBatchMetadata> data() const; }; -class ARROW_EXPORT Message : public std::enable_shared_from_this<Message> { +class ARROW_EXPORT Message { public: enum Type { NONE, SCHEMA, DICTIONARY_BATCH, RECORD_BATCH }; - static Status Open( - const std::shared_ptr<Buffer>& buffer, std::shared_ptr<Message>* out); - - std::shared_ptr<Message> get_shared_ptr(); + static Status Open(const std::shared_ptr<Buffer>& buffer, int64_t offset, + std::shared_ptr<Message>* out); int64_t body_length() const; Type type() const; - // These methods only to be invoked if you have checked the message type - std::shared_ptr<SchemaMessage> GetSchema(); - std::shared_ptr<RecordBatchMessage> GetRecordBatch(); - std::shared_ptr<DictionaryBatchMessage> GetDictionaryBatch(); - private: - Message(); + Message(const std::shared_ptr<Buffer>& buffer, int64_t offset); + + friend class RecordBatchMetadata; + friend class SchemaMetadata; // Hide serialization details from user API class MessageImpl; http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/test-common.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h index 9abc20d..65b3782 100644 --- a/cpp/src/arrow/ipc/test-common.h +++ b/cpp/src/arrow/ipc/test-common.h @@ -39,8 +39,7 @@ namespace arrow { namespace ipc { -const auto kInt32 = std::make_shared<Int32Type>(); -const auto kListInt32 = list(kInt32); +const auto kListInt32 = list(int32()); const auto kListListInt32 = list(kListInt32); Status MakeRandomInt32Array( @@ -99,8 +98,8 @@ Status MakeIntRecordBatch(std::shared_ptr<RecordBatch>* out) { const int length = 1000; // Make the schema - auto f0 = std::make_shared<Field>("f0", kInt32); - auto f1 = std::make_shared<Field>("f1", kInt32); + auto f0 = std::make_shared<Field>("f0", int32()); + auto f1 = std::make_shared<Field>("f1", int32()); std::shared_ptr<Schema> schema(new Schema({f0, f1})); // Example data @@ -161,7 +160,7 @@ Status MakeListRecordBatch(std::shared_ptr<RecordBatch>* out) { // Make the schema auto f0 = std::make_shared<Field>("f0", kListInt32); auto f1 = std::make_shared<Field>("f1", kListListInt32); - auto f2 = std::make_shared<Field>("f2", kInt32); + auto f2 = std::make_shared<Field>("f2", int32()); std::shared_ptr<Schema> schema(new Schema({f0, f1, f2})); // Example data @@ -184,7 +183,7 @@ Status MakeZeroLengthRecordBatch(std::shared_ptr<RecordBatch>* out) { // Make the schema auto f0 = std::make_shared<Field>("f0", kListInt32); auto f1 = std::make_shared<Field>("f1", kListListInt32); - auto f2 = std::make_shared<Field>("f2", kInt32); + auto f2 = std::make_shared<Field>("f2", int32()); std::shared_ptr<Schema> schema(new Schema({f0, f1, f2})); // Example data @@ -205,7 +204,7 @@ Status MakeNonNullRecordBatch(std::shared_ptr<RecordBatch>* out) { // Make the schema auto f0 = std::make_shared<Field>("f0", kListInt32); auto f1 = std::make_shared<Field>("f1", kListListInt32); - auto f2 = std::make_shared<Field>("f2", kInt32); + auto f2 = std::make_shared<Field>("f2", int32()); std::shared_ptr<Schema> schema(new Schema({f0, f1, f2})); // Example data @@ -226,7 +225,7 @@ Status MakeNonNullRecordBatch(std::shared_ptr<RecordBatch>* out) { Status MakeDeeplyNestedList(std::shared_ptr<RecordBatch>* out) { const int batch_length = 5; - TypePtr type = kInt32; + TypePtr type = int32(); MemoryPool* pool = default_memory_pool(); ArrayPtr array; http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/util.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/util.h b/cpp/src/arrow/ipc/util.h index 9000d1b..242d662 100644 --- a/cpp/src/arrow/ipc/util.h +++ b/cpp/src/arrow/ipc/util.h @@ -28,12 +28,10 @@ namespace arrow { namespace ipc { // Align on 8-byte boundaries -static constexpr int kArrowAlignment = 8; - // Buffers are padded to 64-byte boundaries (for SIMD) -static constexpr int kArrowBufferAlignment = 64; +static constexpr int kArrowAlignment = 64; -static constexpr uint8_t kPaddingBytes[kArrowBufferAlignment] = {0}; +static constexpr uint8_t kPaddingBytes[kArrowAlignment] = {0}; static inline int64_t PaddedLength(int64_t nbytes, int64_t alignment = kArrowAlignment) { return ((nbytes + alignment - 1) / alignment) * alignment; http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/test-util.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h index 93dd5b6..63c2166 100644 --- a/cpp/src/arrow/test-util.h +++ b/cpp/src/arrow/test-util.h @@ -61,10 +61,10 @@ // Alias MSVC popcount to GCC name #ifdef _MSC_VER -# include <intrin.h> -# define __builtin_popcount __popcnt -# include <nmmintrin.h> -# define __builtin_popcountll _mm_popcnt_u64 +#include <intrin.h> +#define __builtin_popcount __popcnt +#include <nmmintrin.h> +#define __builtin_popcountll _mm_popcnt_u64 #endif namespace arrow { http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/type.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc index 589bdad..80f295c 100644 --- a/cpp/src/arrow/type.cc +++ b/cpp/src/arrow/type.cc @@ -105,10 +105,6 @@ std::string UnionType::ToString() const { return s.str(); } -int NullType::bit_width() const { - return 0; -} - std::string NullType::ToString() const { return name(); } @@ -187,4 +183,46 @@ std::shared_ptr<Field> field( return std::make_shared<Field>(name, type, nullable, dictionary); } +static const BufferDescr kValidityBuffer(BufferType::VALIDITY, 1); +static const BufferDescr kOffsetBuffer(BufferType::OFFSET, 32); +static const BufferDescr kTypeBuffer(BufferType::TYPE, 32); +static const BufferDescr kBooleanBuffer(BufferType::DATA, 1); +static const BufferDescr kValues64(BufferType::DATA, 64); +static const BufferDescr kValues32(BufferType::DATA, 32); +static const BufferDescr kValues16(BufferType::DATA, 16); +static const BufferDescr kValues8(BufferType::DATA, 8); + +std::vector<BufferDescr> FixedWidthType::GetBufferLayout() const { + return {kValidityBuffer, BufferDescr(BufferType::DATA, bit_width())}; +} + +std::vector<BufferDescr> NullType::GetBufferLayout() const { + return {}; +} + +std::vector<BufferDescr> BinaryType::GetBufferLayout() const { + return {kValidityBuffer, kOffsetBuffer, kValues8}; +} + +std::vector<BufferDescr> ListType::GetBufferLayout() const { + return {kValidityBuffer, kOffsetBuffer}; +} + +std::vector<BufferDescr> StructType::GetBufferLayout() const { + return {kValidityBuffer, kTypeBuffer}; +} + +std::vector<BufferDescr> UnionType::GetBufferLayout() const { + if (mode == UnionMode::SPARSE) { + return {kValidityBuffer, kTypeBuffer}; + } else { + return {kValidityBuffer, kTypeBuffer, kOffsetBuffer}; + } +} + +std::vector<BufferDescr> DecimalType::GetBufferLayout() const { + // TODO(wesm) + return {}; +} + } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/type.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index 876d7ea..3077738 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -101,6 +101,20 @@ struct Type { }; }; +enum class BufferType : char { DATA, OFFSET, TYPE, VALIDITY }; + +class BufferDescr { + public: + BufferDescr(BufferType type, int bit_width) : type_(type), bit_width_(bit_width) {} + + BufferType type() const { return type_; } + int bit_width() const { return bit_width_; } + + private: + BufferType type_; + int bit_width_; +}; + struct ARROW_EXPORT DataType { Type::type type; @@ -129,12 +143,18 @@ struct ARROW_EXPORT DataType { virtual Status Accept(TypeVisitor* visitor) const = 0; virtual std::string ToString() const = 0; + + virtual std::vector<BufferDescr> GetBufferLayout() const = 0; }; typedef std::shared_ptr<DataType> TypePtr; -struct ARROW_EXPORT FixedWidthMeta { +struct ARROW_EXPORT FixedWidthType : public DataType { + using DataType::DataType; + virtual int bit_width() const = 0; + + std::vector<BufferDescr> GetBufferLayout() const override; }; struct ARROW_EXPORT IntegerMeta { @@ -184,12 +204,12 @@ struct ARROW_EXPORT Field { }; typedef std::shared_ptr<Field> FieldPtr; -struct ARROW_EXPORT PrimitiveCType : public DataType { - using DataType::DataType; +struct ARROW_EXPORT PrimitiveCType : public FixedWidthType { + using FixedWidthType::FixedWidthType; }; template <typename DERIVED, Type::type TYPE_ID, typename C_TYPE> -struct ARROW_EXPORT CTypeImpl : public PrimitiveCType, public FixedWidthMeta { +struct ARROW_EXPORT CTypeImpl : public PrimitiveCType { using c_type = C_TYPE; static constexpr Type::type type_id = TYPE_ID; @@ -204,16 +224,17 @@ struct ARROW_EXPORT CTypeImpl : public PrimitiveCType, public FixedWidthMeta { std::string ToString() const override { return std::string(DERIVED::name()); } }; -struct ARROW_EXPORT NullType : public DataType, public FixedWidthMeta { +struct ARROW_EXPORT NullType : public DataType { static constexpr Type::type type_id = Type::NA; NullType() : DataType(Type::NA) {} - int bit_width() const override; Status Accept(TypeVisitor* visitor) const override; std::string ToString() const override; static std::string name() { return "null"; } + + std::vector<BufferDescr> GetBufferLayout() const override; }; template <typename DERIVED, Type::type TYPE_ID, typename C_TYPE> @@ -221,10 +242,10 @@ struct IntegerTypeImpl : public CTypeImpl<DERIVED, TYPE_ID, C_TYPE>, public Inte bool is_signed() const override { return std::is_signed<C_TYPE>::value; } }; -struct ARROW_EXPORT BooleanType : public DataType, FixedWidthMeta { +struct ARROW_EXPORT BooleanType : public FixedWidthType { static constexpr Type::type type_id = Type::BOOL; - BooleanType() : DataType(Type::BOOL) {} + BooleanType() : FixedWidthType(Type::BOOL) {} Status Accept(TypeVisitor* visitor) const override; std::string ToString() const override; @@ -306,6 +327,8 @@ struct ARROW_EXPORT ListType : public DataType, public NoExtraMeta { std::string ToString() const override; static std::string name() { return "list"; } + + std::vector<BufferDescr> GetBufferLayout() const override; }; // BinaryType type is reprsents lists of 1-byte values. @@ -318,6 +341,8 @@ struct ARROW_EXPORT BinaryType : public DataType, public NoExtraMeta { std::string ToString() const override; static std::string name() { return "binary"; } + std::vector<BufferDescr> GetBufferLayout() const override; + protected: // Allow subclasses to change the logical type. explicit BinaryType(Type::type logical_type) : DataType(logical_type) {} @@ -345,6 +370,8 @@ struct ARROW_EXPORT StructType : public DataType, public NoExtraMeta { Status Accept(TypeVisitor* visitor) const override; std::string ToString() const override; static std::string name() { return "struct"; } + + std::vector<BufferDescr> GetBufferLayout() const override; }; struct ARROW_EXPORT DecimalType : public DataType { @@ -358,6 +385,8 @@ struct ARROW_EXPORT DecimalType : public DataType { Status Accept(TypeVisitor* visitor) const override; std::string ToString() const override; static std::string name() { return "decimal"; } + + std::vector<BufferDescr> GetBufferLayout() const override; }; enum class UnionMode : char { SPARSE, DENSE }; @@ -375,14 +404,20 @@ struct ARROW_EXPORT UnionType : public DataType { static std::string name() { return "union"; } Status Accept(TypeVisitor* visitor) const override; + std::vector<BufferDescr> GetBufferLayout() const override; + UnionMode mode; std::vector<uint8_t> type_ids; }; -struct ARROW_EXPORT DateType : public DataType, public NoExtraMeta { +struct ARROW_EXPORT DateType : public FixedWidthType { static constexpr Type::type type_id = Type::DATE; - DateType() : DataType(Type::DATE) {} + using c_type = int32_t; + + DateType() : FixedWidthType(Type::DATE) {} + + int bit_width() const override { return sizeof(c_type) * 8; } Status Accept(TypeVisitor* visitor) const override; std::string ToString() const override { return name(); } @@ -391,13 +426,17 @@ struct ARROW_EXPORT DateType : public DataType, public NoExtraMeta { enum class TimeUnit : char { SECOND = 0, MILLI = 1, MICRO = 2, NANO = 3 }; -struct ARROW_EXPORT TimeType : public DataType { +struct ARROW_EXPORT TimeType : public FixedWidthType { static constexpr Type::type type_id = Type::TIME; using Unit = TimeUnit; + using c_type = int64_t; TimeUnit unit; - explicit TimeType(TimeUnit unit = TimeUnit::MILLI) : DataType(Type::TIME), unit(unit) {} + int bit_width() const override { return sizeof(c_type) * 8; } + + explicit TimeType(TimeUnit unit = TimeUnit::MILLI) + : FixedWidthType(Type::TIME), unit(unit) {} TimeType(const TimeType& other) : TimeType(other.unit) {} Status Accept(TypeVisitor* visitor) const override; @@ -405,7 +444,7 @@ struct ARROW_EXPORT TimeType : public DataType { static std::string name() { return "time"; } }; -struct ARROW_EXPORT TimestampType : public DataType, public FixedWidthMeta { +struct ARROW_EXPORT TimestampType : public FixedWidthType { using Unit = TimeUnit; typedef int64_t c_type; @@ -416,7 +455,7 @@ struct ARROW_EXPORT TimestampType : public DataType, public FixedWidthMeta { TimeUnit unit; explicit TimestampType(TimeUnit unit = TimeUnit::MILLI) - : DataType(Type::TIMESTAMP), unit(unit) {} + : FixedWidthType(Type::TIMESTAMP), unit(unit) {} TimestampType(const TimestampType& other) : TimestampType(other.unit) {} @@ -425,10 +464,10 @@ struct ARROW_EXPORT TimestampType : public DataType, public FixedWidthMeta { static std::string name() { return "timestamp"; } }; -struct ARROW_EXPORT IntervalType : public DataType, public FixedWidthMeta { +struct ARROW_EXPORT IntervalType : public FixedWidthType { enum class Unit : char { YEAR_MONTH = 0, DAY_TIME = 1 }; - typedef int64_t c_type; + using c_type = int64_t; static constexpr Type::type type_id = Type::INTERVAL; int bit_width() const override { return sizeof(int64_t) * 8; } @@ -436,7 +475,7 @@ struct ARROW_EXPORT IntervalType : public DataType, public FixedWidthMeta { Unit unit; explicit IntervalType(Unit unit = Unit::YEAR_MONTH) - : DataType(Type::INTERVAL), unit(unit) {} + : FixedWidthType(Type::INTERVAL), unit(unit) {} IntervalType(const IntervalType& other) : IntervalType(other.unit) {} http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/types/primitive.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/types/primitive.cc b/cpp/src/arrow/types/primitive.cc index 14667ee..f42a3ca 100644 --- a/cpp/src/arrow/types/primitive.cc +++ b/cpp/src/arrow/types/primitive.cc @@ -49,7 +49,7 @@ bool PrimitiveArray::EqualsExact(const PrimitiveArray& other) const { const uint8_t* this_data = raw_data_; const uint8_t* other_data = other.raw_data_; - auto size_meta = dynamic_cast<const FixedWidthMeta*>(type_.get()); + auto size_meta = dynamic_cast<const FixedWidthType*>(type_.get()); int value_byte_size = size_meta->bit_width() / 8; DCHECK_GT(value_byte_size, 0); http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/util/bit-util.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/bit-util.h b/cpp/src/arrow/util/bit-util.h index 13b7e19..5c8055f 100644 --- a/cpp/src/arrow/util/bit-util.h +++ b/cpp/src/arrow/util/bit-util.h @@ -78,6 +78,10 @@ static inline bool IsMultipleOf64(int64_t n) { return (n & 63) == 0; } +static inline bool IsMultipleOf8(int64_t n) { + return (n & 7) == 0; +} + inline int64_t RoundUpToMultipleOf64(int64_t num) { // TODO(wesm): is this definitely needed? // DCHECK_GE(num, 0); http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/dev/release/run-rat.sh ---------------------------------------------------------------------- diff --git a/dev/release/run-rat.sh b/dev/release/run-rat.sh index d8ec650..e26dd58 100755 --- a/dev/release/run-rat.sh +++ b/dev/release/run-rat.sh @@ -28,6 +28,7 @@ $RAT $1 \ -e ".*" \ -e mman.h \ -e "*_generated.h" \ + -e "*.json" \ -e random.h \ -e status.cc \ -e status.h \ @@ -49,5 +50,3 @@ else echo "${UNAPPROVED} unapproved licences. Check rat report: rat.txt" exit 1 fi - -
