ARROW-661: [C++] Add LargeRecordBatch metadata type, IPC support, associated
refactoring
This patch enables the following code for writing record batches exceeding 2^31
- 1
```c++
RETURN_NOT_OK(WriteLargeRecordBatch(
batch, buffer_offset, mmap_.get(), &metadata_length, &body_length, pool_));
return ReadLargeRecordBatch(batch.schema(), 0, mmap_.get(), result);
```
This also does a fair amount of refactoring and code consolidation related to
ongoing code cleaning in arrow_ipc.
These APIs are marked experimental. This does add `LargeRecordBatch` flatbuffer
type to the Message union, but I've indicated that Arrow implementations (e.g.
Java) are not required to implement this type. It's strictly to enable C++
users to write very large datasets that have been embedded for convenience in
Arrow's structured data model.
cc @pcmoritz @robertnishihara
Author: Wes McKinney <[email protected]>
Closes #404 from wesm/ARROW-661 and squashes the following commits:
9c18a95 [Wes McKinney] Fix import ordering
d7811f2 [Wes McKinney] cpplint
179a1e3 [Wes McKinney] Add unit test for large record batches. Use bytewise
comparisons with aligned bitmaps
36c3862 [Wes McKinney] Get LargeRecordBatch round trip working. Add to Message
union for now
4c1d08c [Wes McKinney] Refactoring, failing test fixture for large record batch
f4c8830 [Wes McKinney] Consolidate ipc-metadata-test and ipc-read-write-test
and draft large record batch read/write path
85d1a1c [Wes McKinney] Add (untested) metadata writer for LargeRecordBatch
0f2722c [Wes McKinney] Consolidate metadata-internal.h into metadata.h. Use own
Arrow structs for IPC metadata and convert to flatbuffers later
e8f8973 [Wes McKinney] Split adapter.h/cc into reader.h/writer.h. Draft
LargeRecordBatch type
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/df2220f3
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/df2220f3
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/df2220f3
Branch: refs/heads/master
Commit: df2220f350282925a454ed911eed6618e4d53969
Parents: 4c5f79c
Author: Wes McKinney <[email protected]>
Authored: Mon Mar 20 10:48:34 2017 +0100
Committer: Uwe L. Korn <[email protected]>
Committed: Mon Mar 20 10:48:34 2017 +0100
----------------------------------------------------------------------
cpp/src/arrow/allocator-test.cc | 1 +
cpp/src/arrow/allocator.h | 1 +
cpp/src/arrow/io/test-common.h | 18 +
cpp/src/arrow/ipc/CMakeLists.txt | 15 +-
cpp/src/arrow/ipc/adapter.cc | 630 -----------------------
cpp/src/arrow/ipc/adapter.h | 104 ----
cpp/src/arrow/ipc/api.h | 1 -
cpp/src/arrow/ipc/ipc-adapter-test.cc | 320 ------------
cpp/src/arrow/ipc/ipc-file-test.cc | 228 ---------
cpp/src/arrow/ipc/ipc-metadata-test.cc | 100 ----
cpp/src/arrow/ipc/ipc-read-write-test.cc | 608 ++++++++++++++++++++++
cpp/src/arrow/ipc/metadata-internal.cc | 597 ----------------------
cpp/src/arrow/ipc/metadata-internal.h | 83 ---
cpp/src/arrow/ipc/metadata.cc | 692 +++++++++++++++++++++++++-
cpp/src/arrow/ipc/metadata.h | 40 +-
cpp/src/arrow/ipc/reader.cc | 171 ++++++-
cpp/src/arrow/ipc/reader.h | 22 +
cpp/src/arrow/ipc/test-common.h | 2 +-
cpp/src/arrow/ipc/writer.cc | 544 ++++++++++++++++++--
cpp/src/arrow/ipc/writer.h | 46 +-
cpp/src/arrow/loader.h | 25 +
cpp/src/arrow/type.h | 1 +
cpp/src/arrow/util/bit-util.cc | 16 +-
format/Message.fbs | 22 +-
24 files changed, 2131 insertions(+), 2156 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/allocator-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/allocator-test.cc b/cpp/src/arrow/allocator-test.cc
index 0b24267..811ef5a 100644
--- a/cpp/src/arrow/allocator-test.cc
+++ b/cpp/src/arrow/allocator-test.cc
@@ -16,6 +16,7 @@
// under the License.
#include "gtest/gtest.h"
+
#include "arrow/allocator.h"
#include "arrow/test-util.h"
http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/allocator.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/allocator.h b/cpp/src/arrow/allocator.h
index c976ba9..e00023d 100644
--- a/cpp/src/arrow/allocator.h
+++ b/cpp/src/arrow/allocator.h
@@ -21,6 +21,7 @@
#include <cstddef>
#include <memory>
#include <utility>
+
#include "arrow/memory_pool.h"
#include "arrow/status.h"
http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/io/test-common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/test-common.h b/cpp/src/arrow/io/test-common.h
index 8355714..4c11476 100644
--- a/cpp/src/arrow/io/test-common.h
+++ b/cpp/src/arrow/io/test-common.h
@@ -41,6 +41,24 @@
namespace arrow {
namespace io {
+static inline Status ZeroMemoryMap(MemoryMappedFile* file) {
+ constexpr int64_t kBufferSize = 512;
+ static constexpr uint8_t kZeroBytes[kBufferSize] = {0};
+
+ RETURN_NOT_OK(file->Seek(0));
+ int64_t position = 0;
+ int64_t file_size;
+ RETURN_NOT_OK(file->GetSize(&file_size));
+
+ int64_t chunksize;
+ while (position < file_size) {
+ chunksize = std::min(kBufferSize, file_size - position);
+ RETURN_NOT_OK(file->Write(kZeroBytes, chunksize));
+ position += chunksize;
+ }
+ return Status::OK();
+}
+
class MemoryMapFixture {
public:
void TearDown() {
http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt
index c73af63..5d470df 100644
--- a/cpp/src/arrow/ipc/CMakeLists.txt
+++ b/cpp/src/arrow/ipc/CMakeLists.txt
@@ -29,12 +29,10 @@ set(ARROW_IPC_TEST_LINK_LIBS
arrow_io_static)
set(ARROW_IPC_SRCS
- adapter.cc
feather.cc
json.cc
json-internal.cc
metadata.cc
- metadata-internal.cc
reader.cc
writer.cc
)
@@ -64,16 +62,8 @@ ADD_ARROW_TEST(feather-test)
ARROW_TEST_LINK_LIBRARIES(feather-test
${ARROW_IPC_TEST_LINK_LIBS})
-ADD_ARROW_TEST(ipc-adapter-test)
-ARROW_TEST_LINK_LIBRARIES(ipc-adapter-test
- ${ARROW_IPC_TEST_LINK_LIBS})
-
-ADD_ARROW_TEST(ipc-file-test)
-ARROW_TEST_LINK_LIBRARIES(ipc-file-test
- ${ARROW_IPC_TEST_LINK_LIBS})
-
-ADD_ARROW_TEST(ipc-metadata-test)
-ARROW_TEST_LINK_LIBRARIES(ipc-metadata-test
+ADD_ARROW_TEST(ipc-read-write-test)
+ARROW_TEST_LINK_LIBRARIES(ipc-read-write-test
${ARROW_IPC_TEST_LINK_LIBS})
ADD_ARROW_TEST(ipc-json-test)
@@ -148,7 +138,6 @@ add_dependencies(arrow_ipc_objlib metadata_fbs)
# Headers: top level
install(FILES
- adapter.h
api.h
feather.h
json.h
http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/adapter.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc
deleted file mode 100644
index db9f63c..0000000
--- a/cpp/src/arrow/ipc/adapter.cc
+++ /dev/null
@@ -1,630 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "arrow/ipc/adapter.h"
-
-#include <algorithm>
-#include <cstdint>
-#include <cstring>
-#include <limits>
-#include <sstream>
-#include <vector>
-
-#include "arrow/array.h"
-#include "arrow/buffer.h"
-#include "arrow/io/interfaces.h"
-#include "arrow/io/memory.h"
-#include "arrow/ipc/Message_generated.h"
-#include "arrow/ipc/metadata-internal.h"
-#include "arrow/ipc/metadata.h"
-#include "arrow/ipc/util.h"
-#include "arrow/loader.h"
-#include "arrow/memory_pool.h"
-#include "arrow/schema.h"
-#include "arrow/status.h"
-#include "arrow/table.h"
-#include "arrow/type.h"
-#include "arrow/type_fwd.h"
-#include "arrow/util/bit-util.h"
-#include "arrow/util/logging.h"
-
-namespace arrow {
-
-namespace flatbuf = org::apache::arrow::flatbuf;
-
-namespace ipc {
-
-// ----------------------------------------------------------------------
-// Record batch write path
-
-class RecordBatchWriter : public ArrayVisitor {
- public:
- RecordBatchWriter(
- MemoryPool* pool, int64_t buffer_start_offset, int max_recursion_depth)
- : pool_(pool),
- max_recursion_depth_(max_recursion_depth),
- buffer_start_offset_(buffer_start_offset) {
- DCHECK_GT(max_recursion_depth, 0);
- }
-
- virtual ~RecordBatchWriter() = default;
-
- Status VisitArray(const Array& arr) {
- if (max_recursion_depth_ <= 0) {
- return Status::Invalid("Max recursion depth reached");
- }
-
- if (arr.length() > std::numeric_limits<int32_t>::max()) {
- return Status::Invalid("Cannot write arrays larger than 2^31 - 1 in
length");
- }
-
- // push back all common elements
- field_nodes_.push_back(flatbuf::FieldNode(
- static_cast<int32_t>(arr.length()),
static_cast<int32_t>(arr.null_count())));
- if (arr.null_count() > 0) {
- std::shared_ptr<Buffer> bitmap = arr.null_bitmap();
-
- if (arr.offset() != 0) {
- // With a sliced array / non-zero offset, we must copy the bitmap
- RETURN_NOT_OK(
- CopyBitmap(pool_, bitmap->data(), arr.offset(), arr.length(),
&bitmap));
- }
-
- buffers_.push_back(bitmap);
- } else {
- // Push a dummy zero-length buffer, not to be copied
- buffers_.push_back(std::make_shared<Buffer>(nullptr, 0));
- }
- return arr.Accept(this);
- }
-
- Status Assemble(const RecordBatch& batch, int64_t* body_length) {
- if (field_nodes_.size() > 0) {
- field_nodes_.clear();
- buffer_meta_.clear();
- buffers_.clear();
- }
-
- // Perform depth-first traversal of the row-batch
- for (int i = 0; i < batch.num_columns(); ++i) {
- RETURN_NOT_OK(VisitArray(*batch.column(i)));
- }
-
- // The position for the start of a buffer relative to the passed frame of
- // reference. May be 0 or some other position in an address space
- int64_t offset = buffer_start_offset_;
-
- // Construct the buffer metadata for the record batch header
- for (size_t i = 0; i < buffers_.size(); ++i) {
- const Buffer* buffer = buffers_[i].get();
- int64_t size = 0;
- int64_t padding = 0;
-
- // The buffer might be null if we are handling zero row lengths.
- if (buffer) {
- size = buffer->size();
- padding = BitUtil::RoundUpToMultipleOf64(size) - size;
- }
-
- // TODO(wesm): We currently have no notion of shared memory page id's,
- // but we've included it in the metadata IDL for when we have it in the
- // future. Use page = -1 for now
- //
- // Note that page ids are a bespoke notion for Arrow and not a feature we
- // are using from any OS-level shared memory. The thought is that systems
- // may (in the future) associate integer page id's with physical memory
- // pages (according to whatever is the desired shared memory mechanism)
- buffer_meta_.push_back(flatbuf::Buffer(-1, offset, size + padding));
- offset += size + padding;
- }
-
- *body_length = offset - buffer_start_offset_;
- DCHECK(BitUtil::IsMultipleOf64(*body_length));
-
- return Status::OK();
- }
-
- // Override this for writing dictionary metadata
- virtual Status WriteMetadataMessage(
- int32_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) {
- return WriteRecordBatchMessage(
- num_rows, body_length, field_nodes_, buffer_meta_, out);
- }
-
- Status WriteMetadata(int32_t num_rows, int64_t body_length,
io::OutputStream* dst,
- int32_t* metadata_length) {
- // Now that we have computed the locations of all of the buffers in shared
- // memory, the data header can be converted to a flatbuffer and written out
- //
- // Note: The memory written here is prefixed by the size of the flatbuffer
- // itself as an int32_t.
- std::shared_ptr<Buffer> metadata_fb;
- RETURN_NOT_OK(WriteMetadataMessage(num_rows, body_length, &metadata_fb));
-
- // Need to write 4 bytes (metadata size), the metadata, plus padding to
- // end on an 8-byte offset
- int64_t start_offset;
- RETURN_NOT_OK(dst->Tell(&start_offset));
-
- int32_t padded_metadata_length = static_cast<int32_t>(metadata_fb->size())
+ 4;
- const int32_t remainder =
- (padded_metadata_length + static_cast<int32_t>(start_offset)) % 8;
- if (remainder != 0) { padded_metadata_length += 8 - remainder; }
-
- // The returned metadata size includes the length prefix, the flatbuffer,
- // plus padding
- *metadata_length = padded_metadata_length;
-
- // Write the flatbuffer size prefix including padding
- int32_t flatbuffer_size = padded_metadata_length - 4;
- RETURN_NOT_OK(
- dst->Write(reinterpret_cast<const uint8_t*>(&flatbuffer_size),
sizeof(int32_t)));
-
- // Write the flatbuffer
- RETURN_NOT_OK(dst->Write(metadata_fb->data(), metadata_fb->size()));
-
- // Write any padding
- int32_t padding =
- padded_metadata_length - static_cast<int32_t>(metadata_fb->size()) - 4;
- if (padding > 0) { RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); }
-
- return Status::OK();
- }
-
- Status Write(const RecordBatch& batch, io::OutputStream* dst, int32_t*
metadata_length,
- int64_t* body_length) {
- RETURN_NOT_OK(Assemble(batch, body_length));
-
-#ifndef NDEBUG
- int64_t start_position, current_position;
- RETURN_NOT_OK(dst->Tell(&start_position));
-#endif
-
- RETURN_NOT_OK(WriteMetadata(
- static_cast<int32_t>(batch.num_rows()), *body_length, dst,
metadata_length));
-
-#ifndef NDEBUG
- RETURN_NOT_OK(dst->Tell(¤t_position));
- DCHECK(BitUtil::IsMultipleOf8(current_position));
-#endif
-
- // Now write the buffers
- for (size_t i = 0; i < buffers_.size(); ++i) {
- const Buffer* buffer = buffers_[i].get();
- int64_t size = 0;
- int64_t padding = 0;
-
- // The buffer might be null if we are handling zero row lengths.
- if (buffer) {
- size = buffer->size();
- padding = BitUtil::RoundUpToMultipleOf64(size) - size;
- }
-
- if (size > 0) { RETURN_NOT_OK(dst->Write(buffer->data(), size)); }
-
- if (padding > 0) { RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); }
- }
-
-#ifndef NDEBUG
- RETURN_NOT_OK(dst->Tell(¤t_position));
- DCHECK(BitUtil::IsMultipleOf8(current_position));
-#endif
-
- return Status::OK();
- }
-
- Status GetTotalSize(const RecordBatch& batch, int64_t* size) {
- // emulates the behavior of Write without actually writing
- int32_t metadata_length = 0;
- int64_t body_length = 0;
- MockOutputStream dst;
- RETURN_NOT_OK(Write(batch, &dst, &metadata_length, &body_length));
- *size = dst.GetExtentBytesWritten();
- return Status::OK();
- }
-
- protected:
- template <typename ArrayType>
- Status VisitFixedWidth(const ArrayType& array) {
- std::shared_ptr<Buffer> data_buffer = array.data();
-
- if (array.offset() != 0) {
- // Non-zero offset, slice the buffer
- const auto& fw_type = static_cast<const FixedWidthType&>(*array.type());
- const int type_width = fw_type.bit_width() / 8;
- const int64_t byte_offset = array.offset() * type_width;
-
- // Send padding if it's available
- const int64_t buffer_length =
- std::min(BitUtil::RoundUpToMultipleOf64(array.length() * type_width),
- data_buffer->size() - byte_offset);
- data_buffer = SliceBuffer(data_buffer, byte_offset, buffer_length);
- }
- buffers_.push_back(data_buffer);
- return Status::OK();
- }
-
- template <typename ArrayType>
- Status GetZeroBasedValueOffsets(
- const ArrayType& array, std::shared_ptr<Buffer>* value_offsets) {
- // Share slicing logic between ListArray and BinaryArray
-
- auto offsets = array.value_offsets();
-
- if (array.offset() != 0) {
- // If we have a non-zero offset, then the value offsets do not start at
- // zero. We must a) create a new offsets array with shifted offsets and
- // b) slice the values array accordingly
-
- std::shared_ptr<MutableBuffer> shifted_offsets;
- RETURN_NOT_OK(AllocateBuffer(
- pool_, sizeof(int32_t) * (array.length() + 1), &shifted_offsets));
-
- int32_t* dest_offsets =
reinterpret_cast<int32_t*>(shifted_offsets->mutable_data());
- const int32_t start_offset = array.value_offset(0);
-
- for (int i = 0; i < array.length(); ++i) {
- dest_offsets[i] = array.value_offset(i) - start_offset;
- }
- // Final offset
- dest_offsets[array.length()] = array.value_offset(array.length()) -
start_offset;
- offsets = shifted_offsets;
- }
-
- *value_offsets = offsets;
- return Status::OK();
- }
-
- Status VisitBinary(const BinaryArray& array) {
- std::shared_ptr<Buffer> value_offsets;
- RETURN_NOT_OK(GetZeroBasedValueOffsets<BinaryArray>(array,
&value_offsets));
- auto data = array.data();
-
- if (array.offset() != 0) {
- // Slice the data buffer to include only the range we need now
- data = SliceBuffer(data, array.value_offset(0),
array.value_offset(array.length()));
- }
-
- buffers_.push_back(value_offsets);
- buffers_.push_back(data);
- return Status::OK();
- }
-
- Status Visit(const FixedWidthBinaryArray& array) override {
- auto data = array.data();
- int32_t width = array.byte_width();
-
- if (array.offset() != 0) {
- data = SliceBuffer(data, array.offset() * width, width * array.length());
- }
- buffers_.push_back(data);
- return Status::OK();
- }
-
- Status Visit(const BooleanArray& array) override {
- buffers_.push_back(array.data());
- return Status::OK();
- }
-
-#define VISIT_FIXED_WIDTH(TYPE) \
- Status Visit(const TYPE& array) override { return
VisitFixedWidth<TYPE>(array); }
-
- VISIT_FIXED_WIDTH(Int8Array);
- VISIT_FIXED_WIDTH(Int16Array);
- VISIT_FIXED_WIDTH(Int32Array);
- VISIT_FIXED_WIDTH(Int64Array);
- VISIT_FIXED_WIDTH(UInt8Array);
- VISIT_FIXED_WIDTH(UInt16Array);
- VISIT_FIXED_WIDTH(UInt32Array);
- VISIT_FIXED_WIDTH(UInt64Array);
- VISIT_FIXED_WIDTH(HalfFloatArray);
- VISIT_FIXED_WIDTH(FloatArray);
- VISIT_FIXED_WIDTH(DoubleArray);
- VISIT_FIXED_WIDTH(DateArray);
- VISIT_FIXED_WIDTH(Date32Array);
- VISIT_FIXED_WIDTH(TimeArray);
- VISIT_FIXED_WIDTH(TimestampArray);
-
-#undef VISIT_FIXED_WIDTH
-
- Status Visit(const StringArray& array) override { return VisitBinary(array);
}
-
- Status Visit(const BinaryArray& array) override { return VisitBinary(array);
}
-
- Status Visit(const ListArray& array) override {
- std::shared_ptr<Buffer> value_offsets;
- RETURN_NOT_OK(GetZeroBasedValueOffsets<ListArray>(array, &value_offsets));
- buffers_.push_back(value_offsets);
-
- --max_recursion_depth_;
- std::shared_ptr<Array> values = array.values();
-
- if (array.offset() != 0) {
- // For non-zero offset, we slice the values array accordingly
- const int32_t offset = array.value_offset(0);
- const int32_t length = array.value_offset(array.length()) - offset;
- values = values->Slice(offset, length);
- }
- RETURN_NOT_OK(VisitArray(*values));
- ++max_recursion_depth_;
- return Status::OK();
- }
-
- Status Visit(const StructArray& array) override {
- --max_recursion_depth_;
- for (std::shared_ptr<Array> field : array.fields()) {
- if (array.offset() != 0) {
- // If offset is non-zero, slice the child array
- field = field->Slice(array.offset(), array.length());
- }
- RETURN_NOT_OK(VisitArray(*field));
- }
- ++max_recursion_depth_;
- return Status::OK();
- }
-
- Status Visit(const UnionArray& array) override {
- auto type_ids = array.type_ids();
- if (array.offset() != 0) {
- type_ids = SliceBuffer(type_ids, array.offset() *
sizeof(UnionArray::type_id_t),
- array.length() * sizeof(UnionArray::type_id_t));
- }
-
- buffers_.push_back(type_ids);
-
- --max_recursion_depth_;
- if (array.mode() == UnionMode::DENSE) {
- const auto& type = static_cast<const UnionType&>(*array.type());
- auto value_offsets = array.value_offsets();
-
- // The Union type codes are not necessary 0-indexed
- uint8_t max_code = 0;
- for (uint8_t code : type.type_codes) {
- if (code > max_code) { max_code = code; }
- }
-
- // Allocate an array of child offsets. Set all to -1 to indicate that we
- // haven't observed a first occurrence of a particular child yet
- std::vector<int32_t> child_offsets(max_code + 1);
- std::vector<int32_t> child_lengths(max_code + 1, 0);
-
- if (array.offset() != 0) {
- // This is an unpleasant case. Because the offsets are different for
- // each child array, when we have a sliced array, we need to "rebase"
- // the value_offsets for each array
-
- const int32_t* unshifted_offsets = array.raw_value_offsets();
- const uint8_t* type_ids = array.raw_type_ids();
-
- // Allocate the shifted offsets
- std::shared_ptr<MutableBuffer> shifted_offsets_buffer;
- RETURN_NOT_OK(AllocateBuffer(
- pool_, array.length() * sizeof(int32_t), &shifted_offsets_buffer));
- int32_t* shifted_offsets =
- reinterpret_cast<int32_t*>(shifted_offsets_buffer->mutable_data());
-
- for (int64_t i = 0; i < array.length(); ++i) {
- const uint8_t code = type_ids[i];
- int32_t shift = child_offsets[code];
- if (shift == -1) { child_offsets[code] = shift =
unshifted_offsets[i]; }
- shifted_offsets[i] = unshifted_offsets[i] - shift;
-
- // Update the child length to account for observed value
- ++child_lengths[code];
- }
-
- value_offsets = shifted_offsets_buffer;
- }
- buffers_.push_back(value_offsets);
-
- // Visit children and slice accordingly
- for (int i = 0; i < type.num_children(); ++i) {
- std::shared_ptr<Array> child = array.child(i);
- if (array.offset() != 0) {
- const uint8_t code = type.type_codes[i];
- child = child->Slice(child_offsets[code], child_lengths[code]);
- }
- RETURN_NOT_OK(VisitArray(*child));
- }
- } else {
- for (std::shared_ptr<Array> child : array.children()) {
- // Sparse union, slicing is simpler
- if (array.offset() != 0) {
- // If offset is non-zero, slice the child array
- child = child->Slice(array.offset(), array.length());
- }
- RETURN_NOT_OK(VisitArray(*child));
- }
- }
- ++max_recursion_depth_;
- return Status::OK();
- }
-
- Status Visit(const DictionaryArray& array) override {
- // Dictionary written out separately. Slice offset contained in the indices
- return array.indices()->Accept(this);
- }
-
- // In some cases, intermediate buffers may need to be allocated (with sliced
arrays)
- MemoryPool* pool_;
-
- std::vector<flatbuf::FieldNode> field_nodes_;
- std::vector<flatbuf::Buffer> buffer_meta_;
- std::vector<std::shared_ptr<Buffer>> buffers_;
-
- int64_t max_recursion_depth_;
- int64_t buffer_start_offset_;
-};
-
-class DictionaryWriter : public RecordBatchWriter {
- public:
- using RecordBatchWriter::RecordBatchWriter;
-
- Status WriteMetadataMessage(
- int32_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out)
override {
- return WriteDictionaryMessage(
- dictionary_id_, num_rows, body_length, field_nodes_, buffer_meta_,
out);
- }
-
- Status Write(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
- io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length) {
- dictionary_id_ = dictionary_id;
-
- // Make a dummy record batch. A bit tedious as we have to make a schema
- std::vector<std::shared_ptr<Field>> fields = {
- arrow::field("dictionary", dictionary->type())};
- auto schema = std::make_shared<Schema>(fields);
- RecordBatch batch(schema, dictionary->length(), {dictionary});
-
- return RecordBatchWriter::Write(batch, dst, metadata_length, body_length);
- }
-
- private:
- // TODO(wesm): Setting this in Write is a bit unclean, but it works
- int64_t dictionary_id_;
-};
-
-Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
- io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
- MemoryPool* pool, int max_recursion_depth) {
- RecordBatchWriter writer(pool, buffer_start_offset, max_recursion_depth);
- return writer.Write(batch, dst, metadata_length, body_length);
-}
-
-Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>&
dictionary,
- int64_t buffer_start_offset, io::OutputStream* dst, int32_t*
metadata_length,
- int64_t* body_length, MemoryPool* pool) {
- DictionaryWriter writer(pool, buffer_start_offset, kMaxNestingDepth);
- return writer.Write(dictionary_id, dictionary, dst, metadata_length,
body_length);
-}
-
-Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
- RecordBatchWriter writer(default_memory_pool(), 0, kMaxNestingDepth);
- RETURN_NOT_OK(writer.GetTotalSize(batch, size));
- return Status::OK();
-}
-
-// ----------------------------------------------------------------------
-// Record batch read path
-
-class IpcComponentSource : public ArrayComponentSource {
- public:
- IpcComponentSource(const RecordBatchMetadata& metadata,
io::RandomAccessFile* file)
- : metadata_(metadata), file_(file) {}
-
- Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) override {
- BufferMetadata buffer_meta = metadata_.buffer(buffer_index);
- if (buffer_meta.length == 0) {
- *out = nullptr;
- return Status::OK();
- } else {
- return file_->ReadAt(buffer_meta.offset, buffer_meta.length, out);
- }
- }
-
- Status GetFieldMetadata(int field_index, FieldMetadata* metadata) override {
- // pop off a field
- if (field_index >= metadata_.num_fields()) {
- return Status::Invalid("Ran out of field metadata, likely malformed");
- }
- *metadata = metadata_.field(field_index);
- return Status::OK();
- }
-
- private:
- const RecordBatchMetadata& metadata_;
- io::RandomAccessFile* file_;
-};
-
-class RecordBatchReader {
- public:
- RecordBatchReader(const RecordBatchMetadata& metadata,
- const std::shared_ptr<Schema>& schema, int max_recursion_depth,
- io::RandomAccessFile* file)
- : metadata_(metadata),
- schema_(schema),
- max_recursion_depth_(max_recursion_depth),
- file_(file) {}
-
- Status Read(std::shared_ptr<RecordBatch>* out) {
- std::vector<std::shared_ptr<Array>> arrays(schema_->num_fields());
-
- IpcComponentSource source(metadata_, file_);
- ArrayLoaderContext context;
- context.source = &source;
- context.field_index = 0;
- context.buffer_index = 0;
- context.max_recursion_depth = max_recursion_depth_;
-
- for (int i = 0; i < schema_->num_fields(); ++i) {
- RETURN_NOT_OK(LoadArray(schema_->field(i)->type, &context, &arrays[i]));
- }
-
- *out = std::make_shared<RecordBatch>(schema_, metadata_.length(), arrays);
- return Status::OK();
- }
-
- private:
- const RecordBatchMetadata& metadata_;
- std::shared_ptr<Schema> schema_;
- int max_recursion_depth_;
- io::RandomAccessFile* file_;
-};
-
-Status ReadRecordBatch(const RecordBatchMetadata& metadata,
- const std::shared_ptr<Schema>& schema, io::RandomAccessFile* file,
- std::shared_ptr<RecordBatch>* out) {
- return ReadRecordBatch(metadata, schema, kMaxNestingDepth, file, out);
-}
-
-Status ReadRecordBatch(const RecordBatchMetadata& metadata,
- const std::shared_ptr<Schema>& schema, int max_recursion_depth,
- io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) {
- RecordBatchReader reader(metadata, schema, max_recursion_depth, file);
- return reader.Read(out);
-}
-
-Status ReadDictionary(const DictionaryBatchMetadata& metadata,
- const DictionaryTypeMap& dictionary_types, io::RandomAccessFile* file,
- std::shared_ptr<Array>* out) {
- int64_t id = metadata.id();
- auto it = dictionary_types.find(id);
- if (it == dictionary_types.end()) {
- std::stringstream ss;
- ss << "Do not have type metadata for dictionary with id: " << id;
- return Status::KeyError(ss.str());
- }
-
- std::vector<std::shared_ptr<Field>> fields = {it->second};
-
- // We need a schema for the record batch
- auto dummy_schema = std::make_shared<Schema>(fields);
-
- // The dictionary is embedded in a record batch with a single column
- std::shared_ptr<RecordBatch> batch;
- RETURN_NOT_OK(ReadRecordBatch(metadata.record_batch(), dummy_schema, file,
&batch));
-
- if (batch->num_columns() != 1) {
- return Status::Invalid("Dictionary record batch must only contain one
field");
- }
-
- *out = batch->column(0);
- return Status::OK();
-}
-
-} // namespace ipc
-} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/adapter.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h
deleted file mode 100644
index cea4686..0000000
--- a/cpp/src/arrow/ipc/adapter.h
+++ /dev/null
@@ -1,104 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Public API for writing and accessing (with zero copy, if possible) Arrow
-// IPC binary formatted data (e.g. in shared memory, or from some other IO
source)
-
-#ifndef ARROW_IPC_ADAPTER_H
-#define ARROW_IPC_ADAPTER_H
-
-#include <cstdint>
-#include <memory>
-#include <vector>
-
-#include "arrow/ipc/metadata.h"
-#include "arrow/loader.h"
-#include "arrow/util/visibility.h"
-
-namespace arrow {
-
-class Array;
-class MemoryPool;
-class RecordBatch;
-class Schema;
-class Status;
-
-namespace io {
-
-class RandomAccessFile;
-class OutputStream;
-
-} // namespace io
-
-namespace ipc {
-
-// ----------------------------------------------------------------------
-// Write path
-
-// Write the RecordBatch (collection of equal-length Arrow arrays) to the
-// output stream in a contiguous block. The record batch metadata is written as
-// a flatbuffer (see format/Message.fbs -- the RecordBatch message type)
-// prefixed by its size, followed by each of the memory buffers in the batch
-// written end to end (with appropriate alignment and padding):
-//
-// <int32: metadata size> <uint8*: metadata> <buffers>
-//
-// Finally, the absolute offsets (relative to the start of the output stream)
-// to the end of the body and end of the metadata / data header (suffixed by
-// the header size) is returned in out-variables
-//
-// @param(in) buffer_start_offset: the start offset to use in the buffer
metadata,
-// default should be 0
-//
-// @param(out) metadata_length: the size of the length-prefixed flatbuffer
-// including padding to a 64-byte boundary
-//
-// @param(out) body_length: the size of the contiguous buffer block plus
-// padding bytes
-Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
- io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
- MemoryPool* pool, int max_recursion_depth = kMaxNestingDepth);
-
-// Write Array as a DictionaryBatch message
-Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>&
dictionary,
- int64_t buffer_start_offset, io::OutputStream* dst, int32_t*
metadata_length,
- int64_t* body_length, MemoryPool* pool);
-
-// Compute the precise number of bytes needed in a contiguous memory segment to
-// write the record batch. This involves generating the complete serialized
-// Flatbuffers metadata.
-Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size);
-
-// ----------------------------------------------------------------------
-// "Read" path; does not copy data if the input supports zero copy reads
-
-Status ReadRecordBatch(const RecordBatchMetadata& metadata,
- const std::shared_ptr<Schema>& schema, io::RandomAccessFile* file,
- std::shared_ptr<RecordBatch>* out);
-
-Status ReadRecordBatch(const RecordBatchMetadata& metadata,
- const std::shared_ptr<Schema>& schema, int max_recursion_depth,
- io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out);
-
-Status ReadDictionary(const DictionaryBatchMetadata& metadata,
- const DictionaryTypeMap& dictionary_types, io::RandomAccessFile* file,
- std::shared_ptr<Array>* out);
-
-} // namespace ipc
-} // namespace arrow
-
-#endif // ARROW_IPC_MEMORY_H
http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/api.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/api.h b/cpp/src/arrow/ipc/api.h
index ad7cd84..3f05e69 100644
--- a/cpp/src/arrow/ipc/api.h
+++ b/cpp/src/arrow/ipc/api.h
@@ -18,7 +18,6 @@
#ifndef ARROW_IPC_API_H
#define ARROW_IPC_API_H
-#include "arrow/ipc/adapter.h"
#include "arrow/ipc/feather.h"
#include "arrow/ipc/json.h"
#include "arrow/ipc/metadata.h"
http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/ipc-adapter-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc
b/cpp/src/arrow/ipc/ipc-adapter-test.cc
deleted file mode 100644
index 638d98a..0000000
--- a/cpp/src/arrow/ipc/ipc-adapter-test.cc
+++ /dev/null
@@ -1,320 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <cstdint>
-#include <cstdio>
-#include <cstring>
-#include <memory>
-#include <string>
-#include <vector>
-
-#include "gtest/gtest.h"
-
-#include "arrow/io/memory.h"
-#include "arrow/io/test-common.h"
-#include "arrow/ipc/adapter.h"
-#include "arrow/ipc/metadata.h"
-#include "arrow/ipc/test-common.h"
-#include "arrow/ipc/util.h"
-
-#include "arrow/buffer.h"
-#include "arrow/memory_pool.h"
-#include "arrow/pretty_print.h"
-#include "arrow/status.h"
-#include "arrow/test-util.h"
-#include "arrow/util/bit-util.h"
-
-namespace arrow {
-namespace ipc {
-
-class IpcTestFixture : public io::MemoryMapFixture {
- public:
- Status RoundTripHelper(const RecordBatch& batch, int memory_map_size,
- std::shared_ptr<RecordBatch>* batch_result) {
- std::string path = "test-write-row-batch";
- io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
-
- int32_t metadata_length;
- int64_t body_length;
-
- const int64_t buffer_offset = 0;
-
- RETURN_NOT_OK(WriteRecordBatch(
- batch, buffer_offset, mmap_.get(), &metadata_length, &body_length,
pool_));
-
- std::shared_ptr<Message> message;
- RETURN_NOT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
- auto metadata = std::make_shared<RecordBatchMetadata>(message);
-
- // The buffer offsets start at 0, so we must construct a
- // RandomAccessFile according to that frame of reference
- std::shared_ptr<Buffer> buffer_payload;
- RETURN_NOT_OK(mmap_->ReadAt(metadata_length, body_length,
&buffer_payload));
- io::BufferReader buffer_reader(buffer_payload);
-
- return ReadRecordBatch(*metadata, batch.schema(), &buffer_reader,
batch_result);
- }
-
- void CheckRoundtrip(const RecordBatch& batch, int64_t buffer_size) {
- std::shared_ptr<RecordBatch> batch_result;
-
- ASSERT_OK(RoundTripHelper(batch, 1 << 16, &batch_result));
- EXPECT_EQ(batch.num_rows(), batch_result->num_rows());
-
- ASSERT_TRUE(batch.schema()->Equals(batch_result->schema()));
- ASSERT_EQ(batch.num_columns(), batch_result->num_columns())
- << batch.schema()->ToString()
- << " result: " << batch_result->schema()->ToString();
-
- for (int i = 0; i < batch.num_columns(); ++i) {
- const auto& left = *batch.column(i);
- const auto& right = *batch_result->column(i);
- if (!left.Equals(right)) {
- std::stringstream pp_result;
- std::stringstream pp_expected;
-
- ASSERT_OK(PrettyPrint(left, 0, &pp_expected));
- ASSERT_OK(PrettyPrint(right, 0, &pp_result));
-
- FAIL() << "Index: " << i << " Expected: " << pp_expected.str()
- << "\nGot: " << pp_result.str();
- }
- }
- }
-
- void CheckRoundtrip(const std::shared_ptr<Array>& array, int64_t
buffer_size) {
- auto f0 = arrow::field("f0", array->type());
- std::vector<std::shared_ptr<Field>> fields = {f0};
- auto schema = std::make_shared<Schema>(fields);
-
- RecordBatch batch(schema, 0, {array});
- CheckRoundtrip(batch, buffer_size);
- }
-
- protected:
- std::shared_ptr<io::MemoryMappedFile> mmap_;
- MemoryPool* pool_;
-};
-
-class TestWriteRecordBatch : public ::testing::Test, public IpcTestFixture {
- public:
- void SetUp() { pool_ = default_memory_pool(); }
- void TearDown() { io::MemoryMapFixture::TearDown(); }
-};
-
-class TestRecordBatchParam : public ::testing::TestWithParam<MakeRecordBatch*>,
- public IpcTestFixture {
- public:
- void SetUp() { pool_ = default_memory_pool(); }
- void TearDown() { io::MemoryMapFixture::TearDown(); }
- using IpcTestFixture::RoundTripHelper;
- using IpcTestFixture::CheckRoundtrip;
-};
-
-TEST_P(TestRecordBatchParam, RoundTrip) {
- std::shared_ptr<RecordBatch> batch;
- ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue
-
- CheckRoundtrip(*batch, 1 << 20);
-}
-
-TEST_P(TestRecordBatchParam, SliceRoundTrip) {
- std::shared_ptr<RecordBatch> batch;
- ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue
-
- // Skip the zero-length case
- if (batch->num_rows() < 2) { return; }
-
- auto sliced_batch = batch->Slice(2, 10);
- CheckRoundtrip(*sliced_batch, 1 << 20);
-}
-
-TEST_P(TestRecordBatchParam, ZeroLengthArrays) {
- std::shared_ptr<RecordBatch> batch;
- ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue
-
- std::shared_ptr<RecordBatch> zero_length_batch;
- if (batch->num_rows() > 2) {
- zero_length_batch = batch->Slice(2, 0);
- } else {
- zero_length_batch = batch->Slice(0, 0);
- }
-
- CheckRoundtrip(*zero_length_batch, 1 << 20);
-
- // ARROW-544: check binary array
- std::shared_ptr<MutableBuffer> value_offsets;
- ASSERT_OK(AllocateBuffer(pool_, sizeof(int32_t), &value_offsets));
- *reinterpret_cast<int32_t*>(value_offsets->mutable_data()) = 0;
-
- std::shared_ptr<Array> bin_array = std::make_shared<BinaryArray>(0,
value_offsets,
- std::make_shared<Buffer>(nullptr, 0), std::make_shared<Buffer>(nullptr,
0));
-
- // null value_offsets
- std::shared_ptr<Array> bin_array2 = std::make_shared<BinaryArray>(0,
nullptr, nullptr);
-
- CheckRoundtrip(bin_array, 1 << 20);
- CheckRoundtrip(bin_array2, 1 << 20);
-}
-
-INSTANTIATE_TEST_CASE_P(
- RoundTripTests, TestRecordBatchParam,
- ::testing::Values(&MakeIntRecordBatch, &MakeStringTypesRecordBatch,
- &MakeNonNullRecordBatch, &MakeZeroLengthRecordBatch,
&MakeListRecordBatch,
- &MakeDeeplyNestedList, &MakeStruct, &MakeUnion, &MakeDictionary,
&MakeDate,
- &MakeTimestamps, &MakeTimes, &MakeFWBinary));
-
-void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) {
- ipc::MockOutputStream mock;
- int32_t mock_metadata_length = -1;
- int64_t mock_body_length = -1;
- int64_t size = -1;
- ASSERT_OK(WriteRecordBatch(
- *batch, 0, &mock, &mock_metadata_length, &mock_body_length,
default_memory_pool()));
- ASSERT_OK(GetRecordBatchSize(*batch, &size));
- ASSERT_EQ(mock.GetExtentBytesWritten(), size);
-}
-
-TEST_F(TestWriteRecordBatch, IntegerGetRecordBatchSize) {
- std::shared_ptr<RecordBatch> batch;
-
- ASSERT_OK(MakeIntRecordBatch(&batch));
- TestGetRecordBatchSize(batch);
-
- ASSERT_OK(MakeListRecordBatch(&batch));
- TestGetRecordBatchSize(batch);
-
- ASSERT_OK(MakeZeroLengthRecordBatch(&batch));
- TestGetRecordBatchSize(batch);
-
- ASSERT_OK(MakeNonNullRecordBatch(&batch));
- TestGetRecordBatchSize(batch);
-
- ASSERT_OK(MakeDeeplyNestedList(&batch));
- TestGetRecordBatchSize(batch);
-}
-
-class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture {
- public:
- void SetUp() { pool_ = default_memory_pool(); }
- void TearDown() { io::MemoryMapFixture::TearDown(); }
-
- Status WriteToMmap(int recursion_level, bool override_level, int32_t*
metadata_length,
- int64_t* body_length, std::shared_ptr<RecordBatch>* batch,
- std::shared_ptr<Schema>* schema) {
- const int batch_length = 5;
- TypePtr type = int32();
- std::shared_ptr<Array> array;
- const bool include_nulls = true;
- RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool_, &array));
- for (int i = 0; i < recursion_level; ++i) {
- type = list(type);
- RETURN_NOT_OK(
- MakeRandomListArray(array, batch_length, include_nulls, pool_,
&array));
- }
-
- auto f0 = field("f0", type);
-
- *schema = std::shared_ptr<Schema>(new Schema({f0}));
-
- std::vector<std::shared_ptr<Array>> arrays = {array};
- *batch = std::make_shared<RecordBatch>(*schema, batch_length, arrays);
-
- std::string path = "test-write-past-max-recursion";
- const int memory_map_size = 1 << 20;
- io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
-
- if (override_level) {
- return WriteRecordBatch(**batch, 0, mmap_.get(), metadata_length,
body_length,
- pool_, recursion_level + 1);
- } else {
- return WriteRecordBatch(
- **batch, 0, mmap_.get(), metadata_length, body_length, pool_);
- }
- }
-
- protected:
- std::shared_ptr<io::MemoryMappedFile> mmap_;
- MemoryPool* pool_;
-};
-
-TEST_F(RecursionLimits, WriteLimit) {
- int32_t metadata_length = -1;
- int64_t body_length = -1;
- std::shared_ptr<Schema> schema;
- std::shared_ptr<RecordBatch> batch;
- ASSERT_RAISES(Invalid,
- WriteToMmap((1 << 8) + 1, false, &metadata_length, &body_length, &batch,
&schema));
-}
-
-TEST_F(RecursionLimits, ReadLimit) {
- int32_t metadata_length = -1;
- int64_t body_length = -1;
- std::shared_ptr<Schema> schema;
-
- const int recursion_depth = 64;
-
- std::shared_ptr<RecordBatch> batch;
- ASSERT_OK(WriteToMmap(
- recursion_depth, true, &metadata_length, &body_length, &batch, &schema));
-
- std::shared_ptr<Message> message;
- ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
- auto metadata = std::make_shared<RecordBatchMetadata>(message);
-
- std::shared_ptr<Buffer> payload;
- ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload));
-
- io::BufferReader reader(payload);
-
- std::shared_ptr<RecordBatch> result;
- ASSERT_RAISES(Invalid, ReadRecordBatch(*metadata, schema, &reader, &result));
-}
-
-TEST_F(RecursionLimits, StressLimit) {
- auto CheckDepth = [this](int recursion_depth, bool* it_works) {
- int32_t metadata_length = -1;
- int64_t body_length = -1;
- std::shared_ptr<Schema> schema;
- std::shared_ptr<RecordBatch> batch;
- ASSERT_OK(WriteToMmap(
- recursion_depth, true, &metadata_length, &body_length, &batch,
&schema));
-
- std::shared_ptr<Message> message;
- ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
- auto metadata = std::make_shared<RecordBatchMetadata>(message);
-
- std::shared_ptr<Buffer> payload;
- ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload));
-
- io::BufferReader reader(payload);
-
- std::shared_ptr<RecordBatch> result;
- ASSERT_OK(ReadRecordBatch(*metadata, schema, recursion_depth + 1, &reader,
&result));
- *it_works = result->Equals(*batch);
- };
-
- bool it_works = false;
- CheckDepth(100, &it_works);
- ASSERT_TRUE(it_works);
-
- CheckDepth(500, &it_works);
- ASSERT_TRUE(it_works);
-}
-
-} // namespace ipc
-} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/ipc-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-file-test.cc
b/cpp/src/arrow/ipc/ipc-file-test.cc
deleted file mode 100644
index b457822..0000000
--- a/cpp/src/arrow/ipc/ipc-file-test.cc
+++ /dev/null
@@ -1,228 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <cstdint>
-#include <cstdio>
-#include <cstring>
-#include <memory>
-#include <string>
-#include <vector>
-
-#include "gtest/gtest.h"
-
-#include "arrow/array.h"
-#include "arrow/io/memory.h"
-#include "arrow/io/test-common.h"
-#include "arrow/ipc/adapter.h"
-#include "arrow/ipc/reader.h"
-#include "arrow/ipc/test-common.h"
-#include "arrow/ipc/util.h"
-#include "arrow/ipc/writer.h"
-
-#include "arrow/buffer.h"
-#include "arrow/memory_pool.h"
-#include "arrow/status.h"
-#include "arrow/test-util.h"
-#include "arrow/util/bit-util.h"
-
-namespace arrow {
-namespace ipc {
-
-void CompareBatch(const RecordBatch& left, const RecordBatch& right) {
- if (!left.schema()->Equals(right.schema())) {
- FAIL() << "Left schema: " << left.schema()->ToString()
- << "\nRight schema: " << right.schema()->ToString();
- }
- ASSERT_EQ(left.num_columns(), right.num_columns())
- << left.schema()->ToString() << " result: " <<
right.schema()->ToString();
- EXPECT_EQ(left.num_rows(), right.num_rows());
- for (int i = 0; i < left.num_columns(); ++i) {
- EXPECT_TRUE(left.column(i)->Equals(right.column(i)))
- << "Idx: " << i << " Name: " << left.column_name(i);
- }
-}
-
-using BatchVector = std::vector<std::shared_ptr<RecordBatch>>;
-
-class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
- public:
- void SetUp() {
- pool_ = default_memory_pool();
- buffer_ = std::make_shared<PoolBuffer>(pool_);
- sink_.reset(new io::BufferOutputStream(buffer_));
- }
- void TearDown() {}
-
- Status RoundTripHelper(const BatchVector& in_batches, BatchVector*
out_batches) {
- // Write the file
- std::shared_ptr<FileWriter> writer;
- RETURN_NOT_OK(FileWriter::Open(sink_.get(), in_batches[0]->schema(),
&writer));
-
- const int num_batches = static_cast<int>(in_batches.size());
-
- for (const auto& batch : in_batches) {
- RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
- }
- RETURN_NOT_OK(writer->Close());
- RETURN_NOT_OK(sink_->Close());
-
- // Current offset into stream is the end of the file
- int64_t footer_offset;
- RETURN_NOT_OK(sink_->Tell(&footer_offset));
-
- // Open the file
- auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
- std::shared_ptr<FileReader> reader;
- RETURN_NOT_OK(FileReader::Open(buf_reader, footer_offset, &reader));
-
- EXPECT_EQ(num_batches, reader->num_record_batches());
- for (int i = 0; i < num_batches; ++i) {
- std::shared_ptr<RecordBatch> chunk;
- RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk));
- out_batches->emplace_back(chunk);
- }
-
- return Status::OK();
- }
-
- protected:
- MemoryPool* pool_;
-
- std::unique_ptr<io::BufferOutputStream> sink_;
- std::shared_ptr<PoolBuffer> buffer_;
-};
-
-TEST_P(TestFileFormat, RoundTrip) {
- std::shared_ptr<RecordBatch> batch1;
- std::shared_ptr<RecordBatch> batch2;
- ASSERT_OK((*GetParam())(&batch1)); // NOLINT clang-tidy gtest issue
- ASSERT_OK((*GetParam())(&batch2)); // NOLINT clang-tidy gtest issue
-
- std::vector<std::shared_ptr<RecordBatch>> in_batches = {batch1, batch2};
- std::vector<std::shared_ptr<RecordBatch>> out_batches;
-
- ASSERT_OK(RoundTripHelper(in_batches, &out_batches));
-
- // Compare batches
- for (size_t i = 0; i < in_batches.size(); ++i) {
- CompareBatch(*in_batches[i], *out_batches[i]);
- }
-}
-
-class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
- public:
- void SetUp() {
- pool_ = default_memory_pool();
- buffer_ = std::make_shared<PoolBuffer>(pool_);
- sink_.reset(new io::BufferOutputStream(buffer_));
- }
- void TearDown() {}
-
- Status RoundTripHelper(
- const RecordBatch& batch, std::vector<std::shared_ptr<RecordBatch>>*
out_batches) {
- // Write the file
- std::shared_ptr<StreamWriter> writer;
- RETURN_NOT_OK(StreamWriter::Open(sink_.get(), batch.schema(), &writer));
- int num_batches = 5;
- for (int i = 0; i < num_batches; ++i) {
- RETURN_NOT_OK(writer->WriteRecordBatch(batch));
- }
- RETURN_NOT_OK(writer->Close());
- RETURN_NOT_OK(sink_->Close());
-
- // Open the file
- auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
-
- std::shared_ptr<StreamReader> reader;
- RETURN_NOT_OK(StreamReader::Open(buf_reader, &reader));
-
- std::shared_ptr<RecordBatch> chunk;
- while (true) {
- RETURN_NOT_OK(reader->GetNextRecordBatch(&chunk));
- if (chunk == nullptr) { break; }
- out_batches->emplace_back(chunk);
- }
- return Status::OK();
- }
-
- protected:
- MemoryPool* pool_;
-
- std::unique_ptr<io::BufferOutputStream> sink_;
- std::shared_ptr<PoolBuffer> buffer_;
-};
-
-TEST_P(TestStreamFormat, RoundTrip) {
- std::shared_ptr<RecordBatch> batch;
- ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue
-
- std::vector<std::shared_ptr<RecordBatch>> out_batches;
-
- ASSERT_OK(RoundTripHelper(*batch, &out_batches));
-
- // Compare batches. Same
- for (size_t i = 0; i < out_batches.size(); ++i) {
- CompareBatch(*batch, *out_batches[i]);
- }
-}
-
-#define BATCH_CASES()
\
- ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch,
&MakeNonNullRecordBatch, \
- &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList,
&MakeStringTypesRecordBatch, \
- &MakeStruct, &MakeUnion, &MakeDictionary, &MakeDate, &MakeTimestamps,
&MakeTimes, \
- &MakeFWBinary);
-
-INSTANTIATE_TEST_CASE_P(FileRoundTripTests, TestFileFormat, BATCH_CASES());
-INSTANTIATE_TEST_CASE_P(StreamRoundTripTests, TestStreamFormat, BATCH_CASES());
-
-void CheckBatchDictionaries(const RecordBatch& batch) {
- // Check that dictionaries that should be the same are the same
- auto schema = batch.schema();
-
- const auto& t0 = static_cast<const DictionaryType&>(*schema->field(0)->type);
- const auto& t1 = static_cast<const DictionaryType&>(*schema->field(1)->type);
-
- ASSERT_EQ(t0.dictionary().get(), t1.dictionary().get());
-
- // Same dictionary used for list values
- const auto& t3 = static_cast<const ListType&>(*schema->field(3)->type);
- const auto& t3_value = static_cast<const DictionaryType&>(*t3.value_type());
- ASSERT_EQ(t0.dictionary().get(), t3_value.dictionary().get());
-}
-
-TEST_F(TestStreamFormat, DictionaryRoundTrip) {
- std::shared_ptr<RecordBatch> batch;
- ASSERT_OK(MakeDictionary(&batch));
-
- std::vector<std::shared_ptr<RecordBatch>> out_batches;
- ASSERT_OK(RoundTripHelper(*batch, &out_batches));
-
- CheckBatchDictionaries(*out_batches[0]);
-}
-
-TEST_F(TestFileFormat, DictionaryRoundTrip) {
- std::shared_ptr<RecordBatch> batch;
- ASSERT_OK(MakeDictionary(&batch));
-
- std::vector<std::shared_ptr<RecordBatch>> out_batches;
- ASSERT_OK(RoundTripHelper({batch}, &out_batches));
-
- CheckBatchDictionaries(*out_batches[0]);
-}
-
-} // namespace ipc
-} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/ipc-metadata-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-metadata-test.cc
b/cpp/src/arrow/ipc/ipc-metadata-test.cc
deleted file mode 100644
index 4fb3204..0000000
--- a/cpp/src/arrow/ipc/ipc-metadata-test.cc
+++ /dev/null
@@ -1,100 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <memory>
-#include <sstream>
-#include <string>
-
-#include "gtest/gtest.h"
-
-#include "arrow/io/memory.h"
-#include "arrow/ipc/metadata-internal.h"
-#include "arrow/ipc/metadata.h"
-#include "arrow/ipc/test-common.h"
-#include "arrow/schema.h"
-#include "arrow/status.h"
-#include "arrow/test-util.h"
-#include "arrow/type.h"
-
-namespace arrow {
-
-class Buffer;
-
-namespace ipc {
-
-class TestSchemaMetadata : public ::testing::Test {
- public:
- void SetUp() {}
-
- void CheckRoundtrip(const Schema& schema, DictionaryMemo* memo) {
- std::shared_ptr<Buffer> buffer;
- ASSERT_OK(WriteSchemaMessage(schema, memo, &buffer));
-
- std::shared_ptr<Message> message;
- ASSERT_OK(Message::Open(buffer, 0, &message));
-
- ASSERT_EQ(Message::SCHEMA, message->type());
-
- auto schema_msg = std::make_shared<SchemaMetadata>(message);
- ASSERT_EQ(schema.num_fields(), schema_msg->num_fields());
-
- DictionaryMemo empty_memo;
-
- std::shared_ptr<Schema> schema2;
- ASSERT_OK(schema_msg->GetSchema(empty_memo, &schema2));
-
- AssertSchemaEqual(schema, *schema2);
- }
-};
-
-const std::shared_ptr<DataType> INT32 = std::make_shared<Int32Type>();
-
-TEST_F(TestSchemaMetadata, PrimitiveFields) {
- auto f0 = std::make_shared<Field>("f0", std::make_shared<Int8Type>());
- auto f1 = std::make_shared<Field>("f1", std::make_shared<Int16Type>(),
false);
- auto f2 = std::make_shared<Field>("f2", std::make_shared<Int32Type>());
- auto f3 = std::make_shared<Field>("f3", std::make_shared<Int64Type>());
- auto f4 = std::make_shared<Field>("f4", std::make_shared<UInt8Type>());
- auto f5 = std::make_shared<Field>("f5", std::make_shared<UInt16Type>());
- auto f6 = std::make_shared<Field>("f6", std::make_shared<UInt32Type>());
- auto f7 = std::make_shared<Field>("f7", std::make_shared<UInt64Type>());
- auto f8 = std::make_shared<Field>("f8", std::make_shared<FloatType>());
- auto f9 = std::make_shared<Field>("f9", std::make_shared<DoubleType>(),
false);
- auto f10 = std::make_shared<Field>("f10", std::make_shared<BooleanType>());
-
- Schema schema({f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10});
- DictionaryMemo memo;
-
- CheckRoundtrip(schema, &memo);
-}
-
-TEST_F(TestSchemaMetadata, NestedFields) {
- auto type = std::make_shared<ListType>(std::make_shared<Int32Type>());
- auto f0 = std::make_shared<Field>("f0", type);
-
- std::shared_ptr<StructType> type2(new
StructType({std::make_shared<Field>("k1", INT32),
- std::make_shared<Field>("k2", INT32), std::make_shared<Field>("k3",
INT32)}));
- auto f1 = std::make_shared<Field>("f1", type2);
-
- Schema schema({f0, f1});
- DictionaryMemo memo;
-
- CheckRoundtrip(schema, &memo);
-}
-
-} // namespace ipc
-} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/ipc-read-write-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc
b/cpp/src/arrow/ipc/ipc-read-write-test.cc
new file mode 100644
index 0000000..261ca1d
--- /dev/null
+++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc
@@ -0,0 +1,608 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstdio>
+#include <cstring>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+#include "arrow/array.h"
+#include "arrow/io/memory.h"
+#include "arrow/io/test-common.h"
+#include "arrow/ipc/api.h"
+#include "arrow/ipc/test-common.h"
+#include "arrow/ipc/util.h"
+
+#include "arrow/buffer.h"
+#include "arrow/memory_pool.h"
+#include "arrow/pretty_print.h"
+#include "arrow/status.h"
+#include "arrow/test-util.h"
+#include "arrow/util/bit-util.h"
+
+namespace arrow {
+namespace ipc {
+
+void CompareBatch(const RecordBatch& left, const RecordBatch& right) {
+ if (!left.schema()->Equals(right.schema())) {
+ FAIL() << "Left schema: " << left.schema()->ToString()
+ << "\nRight schema: " << right.schema()->ToString();
+ }
+ ASSERT_EQ(left.num_columns(), right.num_columns())
+ << left.schema()->ToString() << " result: " <<
right.schema()->ToString();
+ EXPECT_EQ(left.num_rows(), right.num_rows());
+ for (int i = 0; i < left.num_columns(); ++i) {
+ EXPECT_TRUE(left.column(i)->Equals(right.column(i)))
+ << "Idx: " << i << " Name: " << left.column_name(i);
+ }
+}
+
+using BatchVector = std::vector<std::shared_ptr<RecordBatch>>;
+
+class TestSchemaMetadata : public ::testing::Test {
+ public:
+ void SetUp() {}
+
+ void CheckRoundtrip(const Schema& schema, DictionaryMemo* memo) {
+ std::shared_ptr<Buffer> buffer;
+ ASSERT_OK(WriteSchemaMessage(schema, memo, &buffer));
+
+ std::shared_ptr<Message> message;
+ ASSERT_OK(Message::Open(buffer, 0, &message));
+
+ ASSERT_EQ(Message::SCHEMA, message->type());
+
+ auto schema_msg = std::make_shared<SchemaMetadata>(message);
+ ASSERT_EQ(schema.num_fields(), schema_msg->num_fields());
+
+ DictionaryMemo empty_memo;
+
+ std::shared_ptr<Schema> schema2;
+ ASSERT_OK(schema_msg->GetSchema(empty_memo, &schema2));
+
+ AssertSchemaEqual(schema, *schema2);
+ }
+};
+
+const std::shared_ptr<DataType> INT32 = std::make_shared<Int32Type>();
+
+TEST_F(TestSchemaMetadata, PrimitiveFields) {
+ auto f0 = std::make_shared<Field>("f0", std::make_shared<Int8Type>());
+ auto f1 = std::make_shared<Field>("f1", std::make_shared<Int16Type>(),
false);
+ auto f2 = std::make_shared<Field>("f2", std::make_shared<Int32Type>());
+ auto f3 = std::make_shared<Field>("f3", std::make_shared<Int64Type>());
+ auto f4 = std::make_shared<Field>("f4", std::make_shared<UInt8Type>());
+ auto f5 = std::make_shared<Field>("f5", std::make_shared<UInt16Type>());
+ auto f6 = std::make_shared<Field>("f6", std::make_shared<UInt32Type>());
+ auto f7 = std::make_shared<Field>("f7", std::make_shared<UInt64Type>());
+ auto f8 = std::make_shared<Field>("f8", std::make_shared<FloatType>());
+ auto f9 = std::make_shared<Field>("f9", std::make_shared<DoubleType>(),
false);
+ auto f10 = std::make_shared<Field>("f10", std::make_shared<BooleanType>());
+
+ Schema schema({f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10});
+ DictionaryMemo memo;
+
+ CheckRoundtrip(schema, &memo);
+}
+
+TEST_F(TestSchemaMetadata, NestedFields) {
+ auto type = std::make_shared<ListType>(std::make_shared<Int32Type>());
+ auto f0 = std::make_shared<Field>("f0", type);
+
+ std::shared_ptr<StructType> type2(new
StructType({std::make_shared<Field>("k1", INT32),
+ std::make_shared<Field>("k2", INT32), std::make_shared<Field>("k3",
INT32)}));
+ auto f1 = std::make_shared<Field>("f1", type2);
+
+ Schema schema({f0, f1});
+ DictionaryMemo memo;
+
+ CheckRoundtrip(schema, &memo);
+}
+
+#define BATCH_CASES()
\
+ ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch,
&MakeNonNullRecordBatch, \
+ &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList,
&MakeStringTypesRecordBatch, \
+ &MakeStruct, &MakeUnion, &MakeDictionary, &MakeDate, &MakeTimestamps,
&MakeTimes, \
+ &MakeFWBinary);
+
+class IpcTestFixture : public io::MemoryMapFixture {
+ public:
+ Status DoStandardRoundTrip(const RecordBatch& batch, bool zero_data,
+ std::shared_ptr<RecordBatch>* batch_result) {
+ int32_t metadata_length;
+ int64_t body_length;
+
+ const int64_t buffer_offset = 0;
+
+ if (zero_data) { RETURN_NOT_OK(ZeroMemoryMap(mmap_.get())); }
+ RETURN_NOT_OK(mmap_->Seek(0));
+
+ RETURN_NOT_OK(WriteRecordBatch(
+ batch, buffer_offset, mmap_.get(), &metadata_length, &body_length,
pool_));
+
+ std::shared_ptr<Message> message;
+ RETURN_NOT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
+ auto metadata = std::make_shared<RecordBatchMetadata>(message);
+
+ // The buffer offsets start at 0, so we must construct a
+ // RandomAccessFile according to that frame of reference
+ std::shared_ptr<Buffer> buffer_payload;
+ RETURN_NOT_OK(mmap_->ReadAt(metadata_length, body_length,
&buffer_payload));
+ io::BufferReader buffer_reader(buffer_payload);
+
+ return ReadRecordBatch(*metadata, batch.schema(), &buffer_reader,
batch_result);
+ }
+
+ Status DoLargeRoundTrip(
+ const RecordBatch& batch, bool zero_data, std::shared_ptr<RecordBatch>*
result) {
+ int32_t metadata_length;
+ int64_t body_length;
+
+ const int64_t buffer_offset = 0;
+
+ if (zero_data) { RETURN_NOT_OK(ZeroMemoryMap(mmap_.get())); }
+ RETURN_NOT_OK(mmap_->Seek(0));
+
+ RETURN_NOT_OK(WriteLargeRecordBatch(
+ batch, buffer_offset, mmap_.get(), &metadata_length, &body_length,
pool_));
+ return ReadLargeRecordBatch(batch.schema(), 0, mmap_.get(), result);
+ }
+
+ void CheckReadResult(const RecordBatch& result, const RecordBatch& expected)
{
+ EXPECT_EQ(expected.num_rows(), result.num_rows());
+
+ ASSERT_TRUE(expected.schema()->Equals(result.schema()));
+ ASSERT_EQ(expected.num_columns(), result.num_columns())
+ << expected.schema()->ToString() << " result: " <<
result.schema()->ToString();
+
+ for (int i = 0; i < expected.num_columns(); ++i) {
+ const auto& left = *expected.column(i);
+ const auto& right = *result.column(i);
+ if (!left.Equals(right)) {
+ std::stringstream pp_result;
+ std::stringstream pp_expected;
+
+ ASSERT_OK(PrettyPrint(left, 0, &pp_expected));
+ ASSERT_OK(PrettyPrint(right, 0, &pp_result));
+
+ FAIL() << "Index: " << i << " Expected: " << pp_expected.str()
+ << "\nGot: " << pp_result.str();
+ }
+ }
+ }
+
+ void CheckRoundtrip(const RecordBatch& batch, int64_t buffer_size) {
+ std::string path = "test-write-row-batch";
+ ASSERT_OK(io::MemoryMapFixture::InitMemoryMap(buffer_size, path, &mmap_));
+
+ std::shared_ptr<RecordBatch> result;
+ ASSERT_OK(DoStandardRoundTrip(batch, true, &result));
+ CheckReadResult(*result, batch);
+
+ ASSERT_OK(DoLargeRoundTrip(batch, true, &result));
+ CheckReadResult(*result, batch);
+ }
+
+ void CheckRoundtrip(const std::shared_ptr<Array>& array, int64_t
buffer_size) {
+ auto f0 = arrow::field("f0", array->type());
+ std::vector<std::shared_ptr<Field>> fields = {f0};
+ auto schema = std::make_shared<Schema>(fields);
+
+ RecordBatch batch(schema, 0, {array});
+ CheckRoundtrip(batch, buffer_size);
+ }
+
+ protected:
+ std::shared_ptr<io::MemoryMappedFile> mmap_;
+ MemoryPool* pool_;
+};
+
+class TestWriteRecordBatch : public ::testing::Test, public IpcTestFixture {
+ public:
+ void SetUp() { pool_ = default_memory_pool(); }
+ void TearDown() { io::MemoryMapFixture::TearDown(); }
+};
+
+class TestIpcRoundTrip : public ::testing::TestWithParam<MakeRecordBatch*>,
+ public IpcTestFixture {
+ public:
+ void SetUp() { pool_ = default_memory_pool(); }
+ void TearDown() { io::MemoryMapFixture::TearDown(); }
+};
+
+TEST_P(TestIpcRoundTrip, RoundTrip) {
+ std::shared_ptr<RecordBatch> batch;
+ ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue
+
+ CheckRoundtrip(*batch, 1 << 20);
+}
+
+TEST_P(TestIpcRoundTrip, SliceRoundTrip) {
+ std::shared_ptr<RecordBatch> batch;
+ ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue
+
+ // Skip the zero-length case
+ if (batch->num_rows() < 2) { return; }
+
+ auto sliced_batch = batch->Slice(2, 10);
+ CheckRoundtrip(*sliced_batch, 1 << 20);
+}
+
+TEST_P(TestIpcRoundTrip, ZeroLengthArrays) {
+ std::shared_ptr<RecordBatch> batch;
+ ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue
+
+ std::shared_ptr<RecordBatch> zero_length_batch;
+ if (batch->num_rows() > 2) {
+ zero_length_batch = batch->Slice(2, 0);
+ } else {
+ zero_length_batch = batch->Slice(0, 0);
+ }
+
+ CheckRoundtrip(*zero_length_batch, 1 << 20);
+
+ // ARROW-544: check binary array
+ std::shared_ptr<MutableBuffer> value_offsets;
+ ASSERT_OK(AllocateBuffer(pool_, sizeof(int32_t), &value_offsets));
+ *reinterpret_cast<int32_t*>(value_offsets->mutable_data()) = 0;
+
+ std::shared_ptr<Array> bin_array = std::make_shared<BinaryArray>(0,
value_offsets,
+ std::make_shared<Buffer>(nullptr, 0), std::make_shared<Buffer>(nullptr,
0));
+
+ // null value_offsets
+ std::shared_ptr<Array> bin_array2 = std::make_shared<BinaryArray>(0,
nullptr, nullptr);
+
+ CheckRoundtrip(bin_array, 1 << 20);
+ CheckRoundtrip(bin_array2, 1 << 20);
+}
+
+void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) {
+ ipc::MockOutputStream mock;
+ int32_t mock_metadata_length = -1;
+ int64_t mock_body_length = -1;
+ int64_t size = -1;
+ ASSERT_OK(WriteRecordBatch(
+ *batch, 0, &mock, &mock_metadata_length, &mock_body_length,
default_memory_pool()));
+ ASSERT_OK(GetRecordBatchSize(*batch, &size));
+ ASSERT_EQ(mock.GetExtentBytesWritten(), size);
+}
+
+TEST_F(TestWriteRecordBatch, IntegerGetRecordBatchSize) {
+ std::shared_ptr<RecordBatch> batch;
+
+ ASSERT_OK(MakeIntRecordBatch(&batch));
+ TestGetRecordBatchSize(batch);
+
+ ASSERT_OK(MakeListRecordBatch(&batch));
+ TestGetRecordBatchSize(batch);
+
+ ASSERT_OK(MakeZeroLengthRecordBatch(&batch));
+ TestGetRecordBatchSize(batch);
+
+ ASSERT_OK(MakeNonNullRecordBatch(&batch));
+ TestGetRecordBatchSize(batch);
+
+ ASSERT_OK(MakeDeeplyNestedList(&batch));
+ TestGetRecordBatchSize(batch);
+}
+
+class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture {
+ public:
+ void SetUp() { pool_ = default_memory_pool(); }
+ void TearDown() { io::MemoryMapFixture::TearDown(); }
+
+ Status WriteToMmap(int recursion_level, bool override_level, int32_t*
metadata_length,
+ int64_t* body_length, std::shared_ptr<RecordBatch>* batch,
+ std::shared_ptr<Schema>* schema) {
+ const int batch_length = 5;
+ TypePtr type = int32();
+ std::shared_ptr<Array> array;
+ const bool include_nulls = true;
+ RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool_, &array));
+ for (int i = 0; i < recursion_level; ++i) {
+ type = list(type);
+ RETURN_NOT_OK(
+ MakeRandomListArray(array, batch_length, include_nulls, pool_,
&array));
+ }
+
+ auto f0 = field("f0", type);
+
+ *schema = std::shared_ptr<Schema>(new Schema({f0}));
+
+ std::vector<std::shared_ptr<Array>> arrays = {array};
+ *batch = std::make_shared<RecordBatch>(*schema, batch_length, arrays);
+
+ std::string path = "test-write-past-max-recursion";
+ const int memory_map_size = 1 << 20;
+ io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
+
+ if (override_level) {
+ return WriteRecordBatch(**batch, 0, mmap_.get(), metadata_length,
body_length,
+ pool_, recursion_level + 1);
+ } else {
+ return WriteRecordBatch(
+ **batch, 0, mmap_.get(), metadata_length, body_length, pool_);
+ }
+ }
+
+ protected:
+ std::shared_ptr<io::MemoryMappedFile> mmap_;
+ MemoryPool* pool_;
+};
+
+TEST_F(RecursionLimits, WriteLimit) {
+ int32_t metadata_length = -1;
+ int64_t body_length = -1;
+ std::shared_ptr<Schema> schema;
+ std::shared_ptr<RecordBatch> batch;
+ ASSERT_RAISES(Invalid,
+ WriteToMmap((1 << 8) + 1, false, &metadata_length, &body_length, &batch,
&schema));
+}
+
+TEST_F(RecursionLimits, ReadLimit) {
+ int32_t metadata_length = -1;
+ int64_t body_length = -1;
+ std::shared_ptr<Schema> schema;
+
+ const int recursion_depth = 64;
+
+ std::shared_ptr<RecordBatch> batch;
+ ASSERT_OK(WriteToMmap(
+ recursion_depth, true, &metadata_length, &body_length, &batch, &schema));
+
+ std::shared_ptr<Message> message;
+ ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
+ auto metadata = std::make_shared<RecordBatchMetadata>(message);
+
+ std::shared_ptr<Buffer> payload;
+ ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload));
+
+ io::BufferReader reader(payload);
+
+ std::shared_ptr<RecordBatch> result;
+ ASSERT_RAISES(Invalid, ReadRecordBatch(*metadata, schema, &reader, &result));
+}
+
+TEST_F(RecursionLimits, StressLimit) {
+ auto CheckDepth = [this](int recursion_depth, bool* it_works) {
+ int32_t metadata_length = -1;
+ int64_t body_length = -1;
+ std::shared_ptr<Schema> schema;
+ std::shared_ptr<RecordBatch> batch;
+ ASSERT_OK(WriteToMmap(
+ recursion_depth, true, &metadata_length, &body_length, &batch,
&schema));
+
+ std::shared_ptr<Message> message;
+ ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
+ auto metadata = std::make_shared<RecordBatchMetadata>(message);
+
+ std::shared_ptr<Buffer> payload;
+ ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload));
+
+ io::BufferReader reader(payload);
+
+ std::shared_ptr<RecordBatch> result;
+ ASSERT_OK(ReadRecordBatch(*metadata, schema, recursion_depth + 1, &reader,
&result));
+ *it_works = result->Equals(*batch);
+ };
+
+ bool it_works = false;
+ CheckDepth(100, &it_works);
+ ASSERT_TRUE(it_works);
+
+ CheckDepth(500, &it_works);
+ ASSERT_TRUE(it_works);
+}
+
+class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
+ public:
+ void SetUp() {
+ pool_ = default_memory_pool();
+ buffer_ = std::make_shared<PoolBuffer>(pool_);
+ sink_.reset(new io::BufferOutputStream(buffer_));
+ }
+ void TearDown() {}
+
+ Status RoundTripHelper(const BatchVector& in_batches, BatchVector*
out_batches) {
+ // Write the file
+ std::shared_ptr<FileWriter> writer;
+ RETURN_NOT_OK(FileWriter::Open(sink_.get(), in_batches[0]->schema(),
&writer));
+
+ const int num_batches = static_cast<int>(in_batches.size());
+
+ for (const auto& batch : in_batches) {
+ RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
+ }
+ RETURN_NOT_OK(writer->Close());
+ RETURN_NOT_OK(sink_->Close());
+
+ // Current offset into stream is the end of the file
+ int64_t footer_offset;
+ RETURN_NOT_OK(sink_->Tell(&footer_offset));
+
+ // Open the file
+ auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
+ std::shared_ptr<FileReader> reader;
+ RETURN_NOT_OK(FileReader::Open(buf_reader, footer_offset, &reader));
+
+ EXPECT_EQ(num_batches, reader->num_record_batches());
+ for (int i = 0; i < num_batches; ++i) {
+ std::shared_ptr<RecordBatch> chunk;
+ RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk));
+ out_batches->emplace_back(chunk);
+ }
+
+ return Status::OK();
+ }
+
+ protected:
+ MemoryPool* pool_;
+
+ std::unique_ptr<io::BufferOutputStream> sink_;
+ std::shared_ptr<PoolBuffer> buffer_;
+};
+
+TEST_P(TestFileFormat, RoundTrip) {
+ std::shared_ptr<RecordBatch> batch1;
+ std::shared_ptr<RecordBatch> batch2;
+ ASSERT_OK((*GetParam())(&batch1)); // NOLINT clang-tidy gtest issue
+ ASSERT_OK((*GetParam())(&batch2)); // NOLINT clang-tidy gtest issue
+
+ std::vector<std::shared_ptr<RecordBatch>> in_batches = {batch1, batch2};
+ std::vector<std::shared_ptr<RecordBatch>> out_batches;
+
+ ASSERT_OK(RoundTripHelper(in_batches, &out_batches));
+
+ // Compare batches
+ for (size_t i = 0; i < in_batches.size(); ++i) {
+ CompareBatch(*in_batches[i], *out_batches[i]);
+ }
+}
+
+class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
+ public:
+ void SetUp() {
+ pool_ = default_memory_pool();
+ buffer_ = std::make_shared<PoolBuffer>(pool_);
+ sink_.reset(new io::BufferOutputStream(buffer_));
+ }
+ void TearDown() {}
+
+ Status RoundTripHelper(
+ const RecordBatch& batch, std::vector<std::shared_ptr<RecordBatch>>*
out_batches) {
+ // Write the file
+ std::shared_ptr<StreamWriter> writer;
+ RETURN_NOT_OK(StreamWriter::Open(sink_.get(), batch.schema(), &writer));
+ int num_batches = 5;
+ for (int i = 0; i < num_batches; ++i) {
+ RETURN_NOT_OK(writer->WriteRecordBatch(batch));
+ }
+ RETURN_NOT_OK(writer->Close());
+ RETURN_NOT_OK(sink_->Close());
+
+ // Open the file
+ auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
+
+ std::shared_ptr<StreamReader> reader;
+ RETURN_NOT_OK(StreamReader::Open(buf_reader, &reader));
+
+ std::shared_ptr<RecordBatch> chunk;
+ while (true) {
+ RETURN_NOT_OK(reader->GetNextRecordBatch(&chunk));
+ if (chunk == nullptr) { break; }
+ out_batches->emplace_back(chunk);
+ }
+ return Status::OK();
+ }
+
+ protected:
+ MemoryPool* pool_;
+
+ std::unique_ptr<io::BufferOutputStream> sink_;
+ std::shared_ptr<PoolBuffer> buffer_;
+};
+
+TEST_P(TestStreamFormat, RoundTrip) {
+ std::shared_ptr<RecordBatch> batch;
+ ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue
+
+ std::vector<std::shared_ptr<RecordBatch>> out_batches;
+
+ ASSERT_OK(RoundTripHelper(*batch, &out_batches));
+
+ // Compare batches. Same
+ for (size_t i = 0; i < out_batches.size(); ++i) {
+ CompareBatch(*batch, *out_batches[i]);
+ }
+}
+
+INSTANTIATE_TEST_CASE_P(GenericIpcRoundTripTests, TestIpcRoundTrip,
BATCH_CASES());
+INSTANTIATE_TEST_CASE_P(FileRoundTripTests, TestFileFormat, BATCH_CASES());
+INSTANTIATE_TEST_CASE_P(StreamRoundTripTests, TestStreamFormat, BATCH_CASES());
+
+TEST_F(TestIpcRoundTrip, LargeRecordBatch) {
+ const int64_t length =
static_cast<int64_t>(std::numeric_limits<int32_t>::max()) + 1;
+
+ BooleanBuilder builder(default_memory_pool());
+ ASSERT_OK(builder.Reserve(length));
+ ASSERT_OK(builder.Advance(length));
+
+ std::shared_ptr<Array> array;
+ ASSERT_OK(builder.Finish(&array));
+
+ auto f0 = arrow::field("f0", array->type());
+ std::vector<std::shared_ptr<Field>> fields = {f0};
+ auto schema = std::make_shared<Schema>(fields);
+
+ RecordBatch batch(schema, 0, {array});
+
+ std::string path = "test-write-large-record_batch";
+
+ // 512 MB
+ constexpr int64_t kBufferSize = 1 << 29;
+
+ ASSERT_OK(io::MemoryMapFixture::InitMemoryMap(kBufferSize, path, &mmap_));
+
+ std::shared_ptr<RecordBatch> result;
+ ASSERT_OK(DoLargeRoundTrip(batch, false, &result));
+ CheckReadResult(*result, batch);
+
+ // Fails if we try to write this with the normal code path
+ ASSERT_RAISES(Invalid, DoStandardRoundTrip(batch, false, &result));
+}
+
+void CheckBatchDictionaries(const RecordBatch& batch) {
+ // Check that dictionaries that should be the same are the same
+ auto schema = batch.schema();
+
+ const auto& t0 = static_cast<const DictionaryType&>(*schema->field(0)->type);
+ const auto& t1 = static_cast<const DictionaryType&>(*schema->field(1)->type);
+
+ ASSERT_EQ(t0.dictionary().get(), t1.dictionary().get());
+
+ // Same dictionary used for list values
+ const auto& t3 = static_cast<const ListType&>(*schema->field(3)->type);
+ const auto& t3_value = static_cast<const DictionaryType&>(*t3.value_type());
+ ASSERT_EQ(t0.dictionary().get(), t3_value.dictionary().get());
+}
+
+TEST_F(TestStreamFormat, DictionaryRoundTrip) {
+ std::shared_ptr<RecordBatch> batch;
+ ASSERT_OK(MakeDictionary(&batch));
+
+ std::vector<std::shared_ptr<RecordBatch>> out_batches;
+ ASSERT_OK(RoundTripHelper(*batch, &out_batches));
+
+ CheckBatchDictionaries(*out_batches[0]);
+}
+
+TEST_F(TestFileFormat, DictionaryRoundTrip) {
+ std::shared_ptr<RecordBatch> batch;
+ ASSERT_OK(MakeDictionary(&batch));
+
+ std::vector<std::shared_ptr<RecordBatch>> out_batches;
+ ASSERT_OK(RoundTripHelper({batch}, &out_batches));
+
+ CheckBatchDictionaries(*out_batches[0]);
+}
+
+} // namespace ipc
+} // namespace arrow