Repository: arrow Updated Branches: refs/heads/master 8378c48df -> 9deb3251e
ARROW-109: [C++] Add nesting stress tests up to 500 recursion depth There doesn't appear to be any limit to the nesting depth permitted in the flatbuffers. I think what @emkornfield was running into was the size of the IPC payload exceeding the size of the memory map that was being allocated to accommodate it. I expanded the memory map size and was able to write schemas with 1000 and 5000 levels of nesting. I left a unit test with 500 depth which doesn't take too long to run. Author: Wes McKinney <[email protected]> Closes #357 from wesm/ARROW-109 and squashes the following commits: fa78976 [Wes McKinney] Add nesting stress tests up to 500 recursion depth, expand size of memory map Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/9deb3251 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/9deb3251 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/9deb3251 Branch: refs/heads/master Commit: 9deb3251ec89f5afb14b5bc768f2c3a88cad1627 Parents: 8378c48 Author: Wes McKinney <[email protected]> Authored: Sun Mar 5 08:52:20 2017 -0500 Committer: Wes McKinney <[email protected]> Committed: Sun Mar 5 08:52:20 2017 -0500 ---------------------------------------------------------------------- cpp/src/arrow/ipc/adapter.h | 6 ++- cpp/src/arrow/ipc/ipc-adapter-test.cc | 60 ++++++++++++++++++++++++------ 2 files changed, 53 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/9deb3251/cpp/src/arrow/ipc/adapter.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h index b7d8fa9..933d3a4 100644 --- a/cpp/src/arrow/ipc/adapter.h +++ b/cpp/src/arrow/ipc/adapter.h @@ -47,8 +47,10 @@ namespace ipc { // ---------------------------------------------------------------------- // Write path -// We have trouble decoding flatbuffers if the size i > 70, so 64 is a nice round number -// TODO(emkornfield) investigate this more +// +// ARROW-109: We set this number arbitrarily to help catch user mistakes. For +// deeply nested schemas, it is expected the user will indicate explicitly the +// maximum allowed recursion depth constexpr int kMaxIpcRecursionDepth = 64; // Write the RecordBatch (collection of equal-length Arrow arrays) to the http://git-wip-us.apache.org/repos/asf/arrow/blob/9deb3251/cpp/src/arrow/ipc/ipc-adapter-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc index 8999363..6678fd5 100644 --- a/cpp/src/arrow/ipc/ipc-adapter-test.cc +++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc @@ -213,7 +213,8 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture { void TearDown() { io::MemoryMapFixture::TearDown(); } Status WriteToMmap(int recursion_level, bool override_level, int32_t* metadata_length, - int64_t* body_length, std::shared_ptr<Schema>* schema) { + int64_t* body_length, std::shared_ptr<RecordBatch>* batch, + std::shared_ptr<Schema>* schema) { const int batch_length = 5; TypePtr type = int32(); std::shared_ptr<Array> array; @@ -230,18 +231,18 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture { *schema = std::shared_ptr<Schema>(new Schema({f0})); std::vector<std::shared_ptr<Array>> arrays = {array}; - auto batch = std::make_shared<RecordBatch>(*schema, batch_length, arrays); + *batch = std::make_shared<RecordBatch>(*schema, batch_length, arrays); std::string path = "test-write-past-max-recursion"; - const int memory_map_size = 1 << 16; + const int memory_map_size = 1 << 20; io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_); if (override_level) { - return WriteRecordBatch(*batch, 0, mmap_.get(), metadata_length, body_length, pool_, - recursion_level + 1); + return WriteRecordBatch(**batch, 0, mmap_.get(), metadata_length, body_length, + pool_, recursion_level + 1); } else { return WriteRecordBatch( - *batch, 0, mmap_.get(), metadata_length, body_length, pool_); + **batch, 0, mmap_.get(), metadata_length, body_length, pool_); } } @@ -254,15 +255,21 @@ TEST_F(RecursionLimits, WriteLimit) { int32_t metadata_length = -1; int64_t body_length = -1; std::shared_ptr<Schema> schema; - ASSERT_RAISES( - Invalid, WriteToMmap((1 << 8) + 1, false, &metadata_length, &body_length, &schema)); + std::shared_ptr<RecordBatch> batch; + ASSERT_RAISES(Invalid, + WriteToMmap((1 << 8) + 1, false, &metadata_length, &body_length, &batch, &schema)); } TEST_F(RecursionLimits, ReadLimit) { int32_t metadata_length = -1; int64_t body_length = -1; std::shared_ptr<Schema> schema; - ASSERT_OK(WriteToMmap(64, true, &metadata_length, &body_length, &schema)); + + const int recursion_depth = 64; + + std::shared_ptr<RecordBatch> batch; + ASSERT_OK(WriteToMmap( + recursion_depth, true, &metadata_length, &body_length, &batch, &schema)); std::shared_ptr<Message> message; ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message)); @@ -273,8 +280,39 @@ TEST_F(RecursionLimits, ReadLimit) { io::BufferReader reader(payload); - std::shared_ptr<RecordBatch> batch; - ASSERT_RAISES(Invalid, ReadRecordBatch(*metadata, schema, &reader, &batch)); + std::shared_ptr<RecordBatch> result; + ASSERT_RAISES(Invalid, ReadRecordBatch(*metadata, schema, &reader, &result)); +} + +TEST_F(RecursionLimits, StressLimit) { + auto CheckDepth = [this](int recursion_depth, bool* it_works) { + int32_t metadata_length = -1; + int64_t body_length = -1; + std::shared_ptr<Schema> schema; + std::shared_ptr<RecordBatch> batch; + ASSERT_OK(WriteToMmap( + recursion_depth, true, &metadata_length, &body_length, &batch, &schema)); + + std::shared_ptr<Message> message; + ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message)); + auto metadata = std::make_shared<RecordBatchMetadata>(message); + + std::shared_ptr<Buffer> payload; + ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload)); + + io::BufferReader reader(payload); + + std::shared_ptr<RecordBatch> result; + ASSERT_OK(ReadRecordBatch(*metadata, schema, recursion_depth + 1, &reader, &result)); + *it_works = result->Equals(*batch); + }; + + bool it_works = false; + CheckDepth(100, &it_works); + ASSERT_TRUE(it_works); + + CheckDepth(500, &it_works); + ASSERT_TRUE(it_works); } } // namespace ipc
