Repository: arrow
Updated Branches:
  refs/heads/master 8378c48df -> 9deb3251e


ARROW-109: [C++] Add nesting stress tests up to 500 recursion depth

There doesn't appear to be any limit to the nesting depth permitted in the 
flatbuffers. I think what @emkornfield was running into was the size of the IPC 
payload exceeding the size of the memory map that was being allocated to 
accommodate it. I expanded the memory map size and was able to write schemas 
with 1000 and 5000 levels of nesting. I left a unit test with 500 depth which 
doesn't take too long to run.

Author: Wes McKinney <[email protected]>

Closes #357 from wesm/ARROW-109 and squashes the following commits:

fa78976 [Wes McKinney] Add nesting stress tests up to 500 recursion depth, 
expand size of memory map


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/9deb3251
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/9deb3251
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/9deb3251

Branch: refs/heads/master
Commit: 9deb3251ec89f5afb14b5bc768f2c3a88cad1627
Parents: 8378c48
Author: Wes McKinney <[email protected]>
Authored: Sun Mar 5 08:52:20 2017 -0500
Committer: Wes McKinney <[email protected]>
Committed: Sun Mar 5 08:52:20 2017 -0500

----------------------------------------------------------------------
 cpp/src/arrow/ipc/adapter.h           |  6 ++-
 cpp/src/arrow/ipc/ipc-adapter-test.cc | 60 ++++++++++++++++++++++++------
 2 files changed, 53 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/9deb3251/cpp/src/arrow/ipc/adapter.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h
index b7d8fa9..933d3a4 100644
--- a/cpp/src/arrow/ipc/adapter.h
+++ b/cpp/src/arrow/ipc/adapter.h
@@ -47,8 +47,10 @@ namespace ipc {
 
 // ----------------------------------------------------------------------
 // Write path
-// We have trouble decoding flatbuffers if the size i > 70, so 64 is a nice 
round number
-// TODO(emkornfield) investigate this more
+//
+// ARROW-109: We set this number arbitrarily to help catch user mistakes. For
+// deeply nested schemas, it is expected the user will indicate explicitly the
+// maximum allowed recursion depth
 constexpr int kMaxIpcRecursionDepth = 64;
 
 // Write the RecordBatch (collection of equal-length Arrow arrays) to the

http://git-wip-us.apache.org/repos/asf/arrow/blob/9deb3251/cpp/src/arrow/ipc/ipc-adapter-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc 
b/cpp/src/arrow/ipc/ipc-adapter-test.cc
index 8999363..6678fd5 100644
--- a/cpp/src/arrow/ipc/ipc-adapter-test.cc
+++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc
@@ -213,7 +213,8 @@ class RecursionLimits : public ::testing::Test, public 
io::MemoryMapFixture {
   void TearDown() { io::MemoryMapFixture::TearDown(); }
 
   Status WriteToMmap(int recursion_level, bool override_level, int32_t* 
metadata_length,
-      int64_t* body_length, std::shared_ptr<Schema>* schema) {
+      int64_t* body_length, std::shared_ptr<RecordBatch>* batch,
+      std::shared_ptr<Schema>* schema) {
     const int batch_length = 5;
     TypePtr type = int32();
     std::shared_ptr<Array> array;
@@ -230,18 +231,18 @@ class RecursionLimits : public ::testing::Test, public 
io::MemoryMapFixture {
     *schema = std::shared_ptr<Schema>(new Schema({f0}));
 
     std::vector<std::shared_ptr<Array>> arrays = {array};
-    auto batch = std::make_shared<RecordBatch>(*schema, batch_length, arrays);
+    *batch = std::make_shared<RecordBatch>(*schema, batch_length, arrays);
 
     std::string path = "test-write-past-max-recursion";
-    const int memory_map_size = 1 << 16;
+    const int memory_map_size = 1 << 20;
     io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
 
     if (override_level) {
-      return WriteRecordBatch(*batch, 0, mmap_.get(), metadata_length, 
body_length, pool_,
-          recursion_level + 1);
+      return WriteRecordBatch(**batch, 0, mmap_.get(), metadata_length, 
body_length,
+          pool_, recursion_level + 1);
     } else {
       return WriteRecordBatch(
-          *batch, 0, mmap_.get(), metadata_length, body_length, pool_);
+          **batch, 0, mmap_.get(), metadata_length, body_length, pool_);
     }
   }
 
@@ -254,15 +255,21 @@ TEST_F(RecursionLimits, WriteLimit) {
   int32_t metadata_length = -1;
   int64_t body_length = -1;
   std::shared_ptr<Schema> schema;
-  ASSERT_RAISES(
-      Invalid, WriteToMmap((1 << 8) + 1, false, &metadata_length, 
&body_length, &schema));
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_RAISES(Invalid,
+      WriteToMmap((1 << 8) + 1, false, &metadata_length, &body_length, &batch, 
&schema));
 }
 
 TEST_F(RecursionLimits, ReadLimit) {
   int32_t metadata_length = -1;
   int64_t body_length = -1;
   std::shared_ptr<Schema> schema;
-  ASSERT_OK(WriteToMmap(64, true, &metadata_length, &body_length, &schema));
+
+  const int recursion_depth = 64;
+
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK(WriteToMmap(
+      recursion_depth, true, &metadata_length, &body_length, &batch, &schema));
 
   std::shared_ptr<Message> message;
   ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
@@ -273,8 +280,39 @@ TEST_F(RecursionLimits, ReadLimit) {
 
   io::BufferReader reader(payload);
 
-  std::shared_ptr<RecordBatch> batch;
-  ASSERT_RAISES(Invalid, ReadRecordBatch(*metadata, schema, &reader, &batch));
+  std::shared_ptr<RecordBatch> result;
+  ASSERT_RAISES(Invalid, ReadRecordBatch(*metadata, schema, &reader, &result));
+}
+
+TEST_F(RecursionLimits, StressLimit) {
+  auto CheckDepth = [this](int recursion_depth, bool* it_works) {
+    int32_t metadata_length = -1;
+    int64_t body_length = -1;
+    std::shared_ptr<Schema> schema;
+    std::shared_ptr<RecordBatch> batch;
+    ASSERT_OK(WriteToMmap(
+        recursion_depth, true, &metadata_length, &body_length, &batch, 
&schema));
+
+    std::shared_ptr<Message> message;
+    ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
+    auto metadata = std::make_shared<RecordBatchMetadata>(message);
+
+    std::shared_ptr<Buffer> payload;
+    ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload));
+
+    io::BufferReader reader(payload);
+
+    std::shared_ptr<RecordBatch> result;
+    ASSERT_OK(ReadRecordBatch(*metadata, schema, recursion_depth + 1, &reader, 
&result));
+    *it_works = result->Equals(*batch);
+  };
+
+  bool it_works = false;
+  CheckDepth(100, &it_works);
+  ASSERT_TRUE(it_works);
+
+  CheckDepth(500, &it_works);
+  ASSERT_TRUE(it_works);
 }
 
 }  // namespace ipc

Reply via email to