This is an automated email from the ASF dual-hosted git repository.

kszucs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit b9c154a8bad81217621bb638a72f6f454ad18806
Author: Antoine Pitrou <anto...@python.org>
AuthorDate: Thu Oct 3 16:57:05 2019 +0200

    ARROW-6762: [C++] Support reading JSON files with no newline at end
    
    Also fix some lifetime issues in parallel mode, and add tests.
    
    Closes #5564 from pitrou/ARROW-6762-json-parser-trailing-newline and 
squashes the following commits:
    
    562783d33 <Antoine Pitrou> ARROW-6762:  Support reading JSON files with no 
newline at end
    
    Authored-by: Antoine Pitrou <anto...@python.org>
    Signed-off-by: Antoine Pitrou <anto...@python.org>
---
 cpp/src/arrow/json/chunked_builder.cc | 55 ++++++++++++----------
 cpp/src/arrow/json/chunked_builder.h  |  2 +-
 cpp/src/arrow/json/chunker.cc         | 75 +++++++++++++++++++++++------
 cpp/src/arrow/json/chunker.h          | 33 +++++++++++--
 cpp/src/arrow/json/chunker_test.cc    | 88 +++++++++++++++++++++++++++++++----
 cpp/src/arrow/json/options.h          |  3 +-
 cpp/src/arrow/json/reader.cc          | 64 +++++++++++++++----------
 cpp/src/arrow/json/reader_test.cc     | 16 +++++++
 python/pyarrow/tests/test_json.py     | 83 +++++++++++++++++++++++++++++++++
 9 files changed, 338 insertions(+), 81 deletions(-)

diff --git a/cpp/src/arrow/json/chunked_builder.cc 
b/cpp/src/arrow/json/chunked_builder.cc
index f7f58e5..8c92061 100644
--- a/cpp/src/arrow/json/chunked_builder.cc
+++ b/cpp/src/arrow/json/chunked_builder.cc
@@ -26,7 +26,6 @@
 #include "arrow/json/converter.h"
 #include "arrow/table.h"
 #include "arrow/util/logging.h"
-#include "arrow/util/stl.h"
 #include "arrow/util/task_group.h"
 
 namespace arrow {
@@ -60,7 +59,9 @@ class NonNestedChunkedArrayBuilder : public 
ChunkedArrayBuilder {
   std::shared_ptr<Converter> converter_;
 };
 
-class TypedChunkedArrayBuilder : public NonNestedChunkedArrayBuilder {
+class TypedChunkedArrayBuilder
+    : public NonNestedChunkedArrayBuilder,
+      public std::enable_shared_from_this<TypedChunkedArrayBuilder> {
  public:
   using NonNestedChunkedArrayBuilder::NonNestedChunkedArrayBuilder;
 
@@ -72,17 +73,21 @@ class TypedChunkedArrayBuilder : public 
NonNestedChunkedArrayBuilder {
     }
     lock.unlock();
 
-    task_group_->Append([this, block_index, unconverted] {
+    auto self = shared_from_this();
+
+    task_group_->Append([self, block_index, unconverted] {
       std::shared_ptr<Array> converted;
-      RETURN_NOT_OK(converter_->Convert(unconverted, &converted));
-      std::unique_lock<std::mutex> lock(mutex_);
-      chunks_[block_index] = std::move(converted);
+      RETURN_NOT_OK(self->converter_->Convert(unconverted, &converted));
+      std::unique_lock<std::mutex> lock(self->mutex_);
+      self->chunks_[block_index] = std::move(converted);
       return Status::OK();
     });
   }
 };
 
-class InferringChunkedArrayBuilder : public NonNestedChunkedArrayBuilder {
+class InferringChunkedArrayBuilder
+    : public NonNestedChunkedArrayBuilder,
+      public std::enable_shared_from_this<InferringChunkedArrayBuilder> {
  public:
   InferringChunkedArrayBuilder(const std::shared_ptr<TaskGroup>& task_group,
                                const PromotionGraph* promotion_graph,
@@ -105,8 +110,9 @@ class InferringChunkedArrayBuilder : public 
NonNestedChunkedArrayBuilder {
   }
 
   void ScheduleConvertChunk(int64_t block_index) {
-    task_group_->Append([this, block_index] {
-      return TryConvertChunk(static_cast<size_t>(block_index));
+    auto self = shared_from_this();
+    task_group_->Append([self, block_index] {
+      return self->TryConvertChunk(static_cast<size_t>(block_index));
     });
   }
 
@@ -173,7 +179,7 @@ class InferringChunkedArrayBuilder : public 
NonNestedChunkedArrayBuilder {
 class ChunkedListArrayBuilder : public ChunkedArrayBuilder {
  public:
   ChunkedListArrayBuilder(const std::shared_ptr<TaskGroup>& task_group, 
MemoryPool* pool,
-                          std::unique_ptr<ChunkedArrayBuilder> value_builder,
+                          std::shared_ptr<ChunkedArrayBuilder> value_builder,
                           const std::shared_ptr<Field>& value_field)
       : ChunkedArrayBuilder(task_group),
         pool_(pool),
@@ -250,7 +256,7 @@ class ChunkedListArrayBuilder : public ChunkedArrayBuilder {
 
   std::mutex mutex_;
   MemoryPool* pool_;
-  std::unique_ptr<ChunkedArrayBuilder> value_builder_;
+  std::shared_ptr<ChunkedArrayBuilder> value_builder_;
   BufferVector offset_chunks_, null_bitmap_chunks_;
   std::shared_ptr<Field> value_field_;
 };
@@ -260,7 +266,7 @@ class ChunkedStructArrayBuilder : public 
ChunkedArrayBuilder {
   ChunkedStructArrayBuilder(
       const std::shared_ptr<TaskGroup>& task_group, MemoryPool* pool,
       const PromotionGraph* promotion_graph,
-      std::vector<std::pair<std::string, std::unique_ptr<ChunkedArrayBuilder>>>
+      std::vector<std::pair<std::string, std::shared_ptr<ChunkedArrayBuilder>>>
           name_builders)
       : ChunkedArrayBuilder(task_group), pool_(pool), 
promotion_graph_(promotion_graph) {
     for (auto&& name_builder : name_builders) {
@@ -390,7 +396,7 @@ class ChunkedStructArrayBuilder : public 
ChunkedArrayBuilder {
         auto new_index = static_cast<int>(name_to_index_.size());
         it = name_to_index_.emplace(fields[i]->name(), new_index).first;
 
-        std::unique_ptr<ChunkedArrayBuilder> child_builder;
+        std::shared_ptr<ChunkedArrayBuilder> child_builder;
         RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group_, pool_, 
promotion_graph_, type,
                                               &child_builder));
         child_builders_.emplace_back(std::move(child_builder));
@@ -411,7 +417,7 @@ class ChunkedStructArrayBuilder : public 
ChunkedArrayBuilder {
   MemoryPool* pool_;
   const PromotionGraph* promotion_graph_;
   std::unordered_map<std::string, int> name_to_index_;
-  std::vector<std::unique_ptr<ChunkedArrayBuilder>> child_builders_;
+  std::vector<std::shared_ptr<ChunkedArrayBuilder>> child_builders_;
   std::vector<std::vector<bool>> child_absent_;
   BufferVector null_bitmap_chunks_;
   std::vector<int64_t> chunk_lengths_;
@@ -420,37 +426,36 @@ class ChunkedStructArrayBuilder : public 
ChunkedArrayBuilder {
 Status MakeChunkedArrayBuilder(const std::shared_ptr<TaskGroup>& task_group,
                                MemoryPool* pool, const PromotionGraph* 
promotion_graph,
                                const std::shared_ptr<DataType>& type,
-                               std::unique_ptr<ChunkedArrayBuilder>* out) {
+                               std::shared_ptr<ChunkedArrayBuilder>* out) {
   if (type->id() == Type::STRUCT) {
-    std::vector<std::pair<std::string, std::unique_ptr<ChunkedArrayBuilder>>>
+    std::vector<std::pair<std::string, std::shared_ptr<ChunkedArrayBuilder>>>
         child_builders;
     for (const auto& f : type->children()) {
-      std::unique_ptr<ChunkedArrayBuilder> child_builder;
+      std::shared_ptr<ChunkedArrayBuilder> child_builder;
       RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group, pool, promotion_graph, 
f->type(),
                                             &child_builder));
       child_builders.emplace_back(f->name(), std::move(child_builder));
     }
-    *out = internal::make_unique<ChunkedStructArrayBuilder>(
-        task_group, pool, promotion_graph, std::move(child_builders));
+    *out = std::make_shared<ChunkedStructArrayBuilder>(task_group, pool, 
promotion_graph,
+                                                       
std::move(child_builders));
     return Status::OK();
   }
   if (type->id() == Type::LIST) {
     auto list_type = static_cast<const ListType*>(type.get());
-    std::unique_ptr<ChunkedArrayBuilder> value_builder;
+    std::shared_ptr<ChunkedArrayBuilder> value_builder;
     RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group, pool, promotion_graph,
                                           list_type->value_type(), 
&value_builder));
-    *out = internal::make_unique<ChunkedListArrayBuilder>(
+    *out = std::make_shared<ChunkedListArrayBuilder>(
         task_group, pool, std::move(value_builder), list_type->value_field());
     return Status::OK();
   }
   std::shared_ptr<Converter> converter;
   RETURN_NOT_OK(MakeConverter(type, pool, &converter));
   if (promotion_graph) {
-    *out = internal::make_unique<InferringChunkedArrayBuilder>(
-        task_group, promotion_graph, std::move(converter));
+    *out = std::make_shared<InferringChunkedArrayBuilder>(task_group, 
promotion_graph,
+                                                          
std::move(converter));
   } else {
-    *out =
-        internal::make_unique<TypedChunkedArrayBuilder>(task_group, 
std::move(converter));
+    *out = std::make_shared<TypedChunkedArrayBuilder>(task_group, 
std::move(converter));
   }
   return Status::OK();
 }
diff --git a/cpp/src/arrow/json/chunked_builder.h 
b/cpp/src/arrow/json/chunked_builder.h
index b2cfbef..f872c72 100644
--- a/cpp/src/arrow/json/chunked_builder.h
+++ b/cpp/src/arrow/json/chunked_builder.h
@@ -70,7 +70,7 @@ class ARROW_EXPORT ChunkedArrayBuilder {
 ARROW_EXPORT Status MakeChunkedArrayBuilder(
     const std::shared_ptr<internal::TaskGroup>& task_group, MemoryPool* pool,
     const PromotionGraph* promotion_graph, const std::shared_ptr<DataType>& 
type,
-    std::unique_ptr<ChunkedArrayBuilder>* out);
+    std::shared_ptr<ChunkedArrayBuilder>* out);
 
 }  // namespace json
 }  // namespace arrow
diff --git a/cpp/src/arrow/json/chunker.cc b/cpp/src/arrow/json/chunker.cc
index 5ec45d7..cd21ca1 100644
--- a/cpp/src/arrow/json/chunker.cc
+++ b/cpp/src/arrow/json/chunker.cc
@@ -38,7 +38,8 @@ using internal::make_unique;
 using util::string_view;
 
 static Status StraddlingTooLarge() {
-  return Status::Invalid("straddling object straddles two block boundaries");
+  return Status::Invalid(
+      "straddling object straddles two block boundaries (try to increase block 
size?)");
 }
 
 static size_t ConsumeWhitespace(std::shared_ptr<Buffer>* buf) {
@@ -58,9 +59,11 @@ static size_t ConsumeWhitespace(std::shared_ptr<Buffer>* 
buf) {
 #endif
 }
 
+// A chunker implementation that assumes JSON objects don't contain raw 
newlines.
+// This allows fast chunk delimitation using a simple newline search.
 class NewlinesStrictlyDelimitChunker : public Chunker {
  public:
-  Status Process(const std::shared_ptr<Buffer>& block, 
std::shared_ptr<Buffer>* whole,
+  Status Process(std::shared_ptr<Buffer> block, std::shared_ptr<Buffer>* whole,
                  std::shared_ptr<Buffer>* partial) override {
     auto last_newline = string_view(*block).find_last_of("\n\r");
     if (last_newline == string_view::npos) {
@@ -74,11 +77,24 @@ class NewlinesStrictlyDelimitChunker : public Chunker {
     return Status::OK();
   }
 
-  Status ProcessWithPartial(const std::shared_ptr<Buffer>& partial_original,
-                            const std::shared_ptr<Buffer>& block,
+  Status ProcessWithPartial(std::shared_ptr<Buffer> partial_original,
+                            std::shared_ptr<Buffer> block,
                             std::shared_ptr<Buffer>* completion,
                             std::shared_ptr<Buffer>* rest) override {
-    auto partial = partial_original;
+    return DoProcessWithPartial(partial_original, block, false, completion, 
rest);
+  }
+
+  Status ProcessFinal(std::shared_ptr<Buffer> partial_original,
+                      std::shared_ptr<Buffer> block, std::shared_ptr<Buffer>* 
completion,
+                      std::shared_ptr<Buffer>* rest) override {
+    return DoProcessWithPartial(partial_original, block, true, completion, 
rest);
+  }
+
+ protected:
+  Status DoProcessWithPartial(std::shared_ptr<Buffer> partial,
+                              std::shared_ptr<Buffer> block, bool is_final,
+                              std::shared_ptr<Buffer>* completion,
+                              std::shared_ptr<Buffer>* rest) {
     ConsumeWhitespace(&partial);
     if (partial->size() == 0) {
       // if partial is empty, don't bother looking for completion
@@ -88,9 +104,16 @@ class NewlinesStrictlyDelimitChunker : public Chunker {
     }
     auto first_newline = string_view(*block).find_first_of("\n\r");
     if (first_newline == string_view::npos) {
-      // no newlines in this block; straddling object straddles *two* block 
boundaries.
-      // retry with larger buffer
-      return StraddlingTooLarge();
+      // no newlines in this block
+      if (is_final) {
+        // => it's entirely a completion of partial
+        *completion = block;
+        *rest = SliceBuffer(block, 0, 0);
+        return Status::OK();
+      } else {
+        // => the current object is too large for block size
+        return StraddlingTooLarge();
+      }
     }
     *completion = SliceBuffer(block, 0, first_newline + 1);
     *rest = SliceBuffer(block, first_newline + 1);
@@ -164,9 +187,11 @@ static size_t ConsumeWholeObject(Stream&& stream) {
   }
 }
 
+// A chunker implementation that assumes JSON objects can contain raw newlines,
+// and uses actual JSON parsing to delimit chunks.
 class ParsingChunker : public Chunker {
  public:
-  Status Process(const std::shared_ptr<Buffer>& block, 
std::shared_ptr<Buffer>* whole,
+  Status Process(std::shared_ptr<Buffer> block, std::shared_ptr<Buffer>* whole,
                  std::shared_ptr<Buffer>* partial) override {
     if (block->size() == 0) {
       *whole = SliceBuffer(block, 0, 0);
@@ -194,11 +219,24 @@ class ParsingChunker : public Chunker {
     return Status::OK();
   }
 
-  Status ProcessWithPartial(const std::shared_ptr<Buffer>& partial_original,
-                            const std::shared_ptr<Buffer>& block,
+  Status ProcessWithPartial(std::shared_ptr<Buffer> partial_original,
+                            std::shared_ptr<Buffer> block,
                             std::shared_ptr<Buffer>* completion,
                             std::shared_ptr<Buffer>* rest) override {
-    auto partial = partial_original;
+    return DoProcessWithPartial(partial_original, block, false, completion, 
rest);
+  }
+
+  Status ProcessFinal(std::shared_ptr<Buffer> partial_original,
+                      std::shared_ptr<Buffer> block, std::shared_ptr<Buffer>* 
completion,
+                      std::shared_ptr<Buffer>* rest) override {
+    return DoProcessWithPartial(partial_original, block, true, completion, 
rest);
+  }
+
+ protected:
+  Status DoProcessWithPartial(std::shared_ptr<Buffer> partial,
+                              std::shared_ptr<Buffer> block, bool is_final,
+                              std::shared_ptr<Buffer>* completion,
+                              std::shared_ptr<Buffer>* rest) {
     ConsumeWhitespace(&partial);
     if (partial->size() == 0) {
       // if partial is empty, don't bother looking for completion
@@ -208,9 +246,16 @@ class ParsingChunker : public Chunker {
     }
     auto length = ConsumeWholeObject(MultiStringStream({partial, block}));
     if (length == string_view::npos) {
-      // straddling object straddles *two* block boundaries.
-      // retry with larger buffer
-      return StraddlingTooLarge();
+      // no newlines in this block
+      if (is_final) {
+        // => it's entirely a completion of partial
+        *completion = block;
+        *rest = SliceBuffer(block, 0, 0);
+        return Status::OK();
+      } else {
+        // => the current object is too large for block size
+        return StraddlingTooLarge();
+      }
     }
     auto completion_length = length - partial->size();
     *completion = SliceBuffer(block, 0, completion_length);
diff --git a/cpp/src/arrow/json/chunker.h b/cpp/src/arrow/json/chunker.h
index 0f94d81..7df1b60 100644
--- a/cpp/src/arrow/json/chunker.h
+++ b/cpp/src/arrow/json/chunker.h
@@ -41,23 +41,48 @@ class ARROW_EXPORT Chunker {
   virtual ~Chunker() = default;
 
   /// \brief Carve up a chunk in a block of data to contain only whole objects
+  ///
+  /// Post-conditions:
+  /// - block == whole + partial
+  /// - `whole` is a valid block of JSON data
+  /// - `partial` doesn't contain an entire JSON object
+  ///
   /// \param[in] block json data to be chunked
   /// \param[out] whole subrange of block containing whole json objects
   /// \param[out] partial subrange of block a partial json object
-  virtual Status Process(const std::shared_ptr<Buffer>& block,
-                         std::shared_ptr<Buffer>* whole,
+  virtual Status Process(std::shared_ptr<Buffer> block, 
std::shared_ptr<Buffer>* whole,
                          std::shared_ptr<Buffer>* partial) = 0;
 
   /// \brief Carve the completion of a partial object out of a block
+  ///
+  /// Post-conditions:
+  /// - block == completion + rest
+  /// - `partial + completion` is a valid block of JSON data
+  /// - `completion` doesn't contain an entire JSON object
+  ///
   /// \param[in] partial incomplete json object
   /// \param[in] block json data
   /// \param[out] completion subrange of block containing the completion of 
partial
   /// \param[out] rest subrange of block containing what completion does not 
cover
-  virtual Status ProcessWithPartial(const std::shared_ptr<Buffer>& partial,
-                                    const std::shared_ptr<Buffer>& block,
+  virtual Status ProcessWithPartial(std::shared_ptr<Buffer> partial,
+                                    std::shared_ptr<Buffer> block,
                                     std::shared_ptr<Buffer>* completion,
                                     std::shared_ptr<Buffer>* rest) = 0;
 
+  /// \brief Like ProcessWithPartial, but for the lastblock of a file
+  ///
+  /// This method allows for a final JSON object without a trailing newline
+  /// (ProcessWithPartial would return an error in that case).
+  ///
+  /// Post-conditions:
+  /// - block == completion + rest
+  /// - `partial + completion` is a valid block of JSON data
+  /// - `completion` doesn't contain an entire JSON object
+  virtual Status ProcessFinal(std::shared_ptr<Buffer> partial,
+                              std::shared_ptr<Buffer> block,
+                              std::shared_ptr<Buffer>* completion,
+                              std::shared_ptr<Buffer>* rest) = 0;
+
   static std::unique_ptr<Chunker> Make(const ParseOptions& options);
 
  protected:
diff --git a/cpp/src/arrow/json/chunker_test.cc 
b/cpp/src/arrow/json/chunker_test.cc
index fbe5c00..70bca68 100644
--- a/cpp/src/arrow/json/chunker_test.cc
+++ b/cpp/src/arrow/json/chunker_test.cc
@@ -27,6 +27,7 @@
 #include "arrow/json/chunker.h"
 #include "arrow/json/test_common.h"
 #include "arrow/testing/gtest_util.h"
+#include "arrow/util/logging.h"
 #include "arrow/util/string_view.h"
 
 namespace arrow {
@@ -39,7 +40,8 @@ namespace json {
 using util::string_view;
 
 template <typename Lines>
-static std::shared_ptr<Buffer> join(Lines&& lines, std::string delimiter) {
+static std::shared_ptr<Buffer> join(Lines&& lines, std::string delimiter,
+                                    bool delimiter_at_end = true) {
   std::shared_ptr<Buffer> joined;
   BufferVector line_buffers;
   auto delimiter_buffer = std::make_shared<Buffer>(delimiter);
@@ -47,6 +49,9 @@ static std::shared_ptr<Buffer> join(Lines&& lines, 
std::string delimiter) {
     line_buffers.push_back(std::make_shared<Buffer>(line));
     line_buffers.push_back(delimiter_buffer);
   }
+  if (!delimiter_at_end) {
+    line_buffers.pop_back();
+  }
   ABORT_NOT_OK(ConcatenateBuffers(line_buffers, default_memory_pool(), 
&joined));
   return joined;
 }
@@ -75,15 +80,23 @@ static std::size_t 
ConsumeWholeObject(std::shared_ptr<Buffer>* buf) {
   return length;
 }
 
+void AssertOnlyWholeObjects(Chunker& chunker, std::shared_ptr<Buffer> whole, 
int* count) {
+  *count = 0;
+  while (whole && !WhitespaceOnly(whole)) {
+    auto buf = whole;
+    if (ConsumeWholeObject(&whole) == string_view::npos) {
+      FAIL() << "Not a whole JSON object: '" << buf->ToString() << "'";
+    }
+    ++*count;
+  }
+}
+
 void AssertWholeObjects(Chunker& chunker, const std::shared_ptr<Buffer>& block,
                         int expected_count) {
   std::shared_ptr<Buffer> whole, partial;
   ASSERT_OK(chunker.Process(block, &whole, &partial));
-  int count = 0;
-  while (whole && !WhitespaceOnly(whole)) {
-    if (ConsumeWholeObject(&whole) == string_view::npos) FAIL();
-    ++count;
-  }
+  int count;
+  AssertOnlyWholeObjects(chunker, whole, &count);
   ASSERT_EQ(count, expected_count);
 }
 
@@ -103,6 +116,39 @@ void AssertChunking(Chunker& chunker, 
std::shared_ptr<Buffer> buf, int total_cou
   }
 }
 
+void AssertChunkingBlockSize(Chunker& chunker, std::shared_ptr<Buffer> buf,
+                             int64_t block_size, int expected_count) {
+  std::shared_ptr<Buffer> partial = Buffer::FromString({});
+  int64_t pos = 0;
+  int total_count = 0;
+  while (pos < buf->size()) {
+    int count;
+    auto block = SliceBuffer(buf, pos, std::min(block_size, buf->size() - 
pos));
+    pos += block->size();
+    std::shared_ptr<Buffer> completion, whole, next_partial;
+
+    if (pos == buf->size()) {
+      // Last block
+      ASSERT_OK(chunker.ProcessFinal(partial, block, &completion, &whole));
+    } else {
+      std::shared_ptr<Buffer> starts_with_whole;
+      ASSERT_OK(
+          chunker.ProcessWithPartial(partial, block, &completion, 
&starts_with_whole));
+      ASSERT_OK(chunker.Process(starts_with_whole, &whole, &next_partial));
+    }
+    // partial + completion should be a valid JSON block
+    ASSERT_OK(ConcatenateBuffers({partial, completion}, default_memory_pool(), 
&partial));
+    AssertOnlyWholeObjects(chunker, partial, &count);
+    total_count += count;
+    // whole should be a valid JSON block
+    AssertOnlyWholeObjects(chunker, whole, &count);
+    total_count += count;
+    partial = next_partial;
+  }
+  ASSERT_EQ(pos, buf->size());
+  ASSERT_EQ(total_count, expected_count);
+}
+
 void AssertStraddledChunking(Chunker& chunker, const std::shared_ptr<Buffer>& 
buf) {
   auto first_half = SliceBuffer(buf, 0, buf->size() / 2);
   auto second_half = SliceBuffer(buf, buf->size() / 2);
@@ -143,11 +189,18 @@ INSTANTIATE_TEST_CASE_P(NoNewlineChunkerTest, 
BaseChunkerTest, ::testing::Values
 
 INSTANTIATE_TEST_CASE_P(ChunkerTest, BaseChunkerTest, ::testing::Values(true));
 
-constexpr auto object_count = 3;
+constexpr int object_count = 4;
+constexpr int min_block_size = 28;
+
 static const std::vector<std::string>& lines() {
-  static const std::vector<std::string> l = {R"({"0":"ab","1":"c","2":""})",
-                                             R"({"0":"def","1":"","2":"gh"})",
-                                             R"({"0":"","1":"ij","2":"kl"})"};
+  // clang-format off
+  static const std::vector<std::string> l = {
+    R"({"0":"ab","1":"c","2":""})",
+    R"({"0":"def","1":"","2":"gh"})",
+    R"({"0":null})",
+    R"({"0":"","1":"ij","2":"kl"})"
+  };
+  // clang-format on
   return l;
 }
 
@@ -155,6 +208,21 @@ TEST_P(BaseChunkerTest, Basics) {
   AssertChunking(*chunker_, join(lines(), "\n"), object_count);
 }
 
+TEST_P(BaseChunkerTest, BlockSizes) {
+  auto check_block_sizes = [&](std::shared_ptr<Buffer> data) {
+    for (int64_t block_size = min_block_size; block_size < min_block_size + 30;
+         ++block_size) {
+      AssertChunkingBlockSize(*chunker_, data, block_size, object_count);
+    }
+  };
+
+  check_block_sizes(join(lines(), "\n"));
+  check_block_sizes(join(lines(), "\r\n"));
+  // Without ending newline
+  check_block_sizes(join(lines(), "\n", false));
+  check_block_sizes(join(lines(), "\r\n", false));
+}
+
 TEST_P(BaseChunkerTest, Empty) {
   auto empty = std::make_shared<Buffer>("\n");
   AssertChunking(*chunker_, empty, 0);
diff --git a/cpp/src/arrow/json/options.h b/cpp/src/arrow/json/options.h
index f075041..03d46ad 100644
--- a/cpp/src/arrow/json/options.h
+++ b/cpp/src/arrow/json/options.h
@@ -46,8 +46,7 @@ struct ARROW_EXPORT ParseOptions {
 
   /// Whether objects may be printed across multiple lines (for example 
pretty-printed)
   ///
-  /// If true, parsing may be slower
-  /// If false, input must end with an empty line
+  /// If true, parsing may be slower.
   bool newlines_in_values = false;
 
   /// How JSON fields outside of explicit_schema (if given) are treated
diff --git a/cpp/src/arrow/json/reader.cc b/cpp/src/arrow/json/reader.cc
index 45f3e2e..459c107 100644
--- a/cpp/src/arrow/json/reader.cc
+++ b/cpp/src/arrow/json/reader.cc
@@ -47,7 +47,8 @@ using io::internal::ReadaheadSpooler;
 
 namespace json {
 
-class TableReaderImpl : public TableReader {
+class TableReaderImpl : public TableReader,
+                        public std::enable_shared_from_this<TableReaderImpl> {
  public:
   TableReaderImpl(MemoryPool* pool, std::shared_ptr<io::InputStream> input,
                   const ReadOptions& read_options, const ParseOptions& 
parse_options,
@@ -69,30 +70,37 @@ class TableReaderImpl : public TableReader {
       return Status::Invalid("Empty JSON file");
     }
 
+    auto self = shared_from_this();
     auto empty = std::make_shared<Buffer>("");
 
     int64_t block_index = 0;
-    for (std::shared_ptr<Buffer> partial = empty, completion = empty,
-                                 starts_with_whole = rh.buffer;
-         rh.buffer; ++block_index) {
-      // get completion of partial from previous block
-      RETURN_NOT_OK(chunker_->ProcessWithPartial(partial, rh.buffer, 
&completion,
-                                                 &starts_with_whole));
-
-      // get all whole objects entirely inside the current buffer
-      std::shared_ptr<Buffer> whole, next_partial;
-      RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, 
&next_partial));
-
-      // launch parse task
-      task_group_->Append([this, partial, completion, whole, block_index] {
-        return ParseAndInsert(partial, completion, whole, block_index);
-      });
+    std::shared_ptr<Buffer> partial = empty;
+
+    while (rh.buffer) {
+      std::shared_ptr<Buffer> block, whole, completion, next_partial;
 
+      block = rh.buffer;
       RETURN_NOT_OK(readahead_.Read(&rh));
-      if (rh.buffer == nullptr) {
-        DCHECK_EQ(string_view(*next_partial).find_first_not_of(" \t\n\r"),
-                  string_view::npos);
+
+      if (!rh.buffer) {
+        // End of file reached => compute completion from penultimate block
+        RETURN_NOT_OK(chunker_->ProcessFinal(partial, block, &completion, 
&whole));
+      } else {
+        std::shared_ptr<Buffer> starts_with_whole;
+        // Get completion of partial from previous block.
+        RETURN_NOT_OK(chunker_->ProcessWithPartial(partial, block, &completion,
+                                                   &starts_with_whole));
+
+        // Get all whole objects entirely inside the current buffer
+        RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, 
&next_partial));
       }
+
+      // Launch parse task
+      task_group_->Append([self, partial, completion, whole, block_index] {
+        return self->ParseAndInsert(partial, completion, whole, block_index);
+      });
+      block_index++;
+
       partial = next_partial;
     }
 
@@ -123,13 +131,21 @@ class TableReaderImpl : public TableReader {
     RETURN_NOT_OK(parser->ReserveScalarStorage(partial->size() + 
completion->size() +
                                                whole->size()));
 
-    if (completion->size() != 0) {
+    if (partial->size() != 0 || completion->size() != 0) {
       std::shared_ptr<Buffer> straddling;
-      RETURN_NOT_OK(ConcatenateBuffers({partial, completion}, pool_, 
&straddling));
+      if (partial->size() == 0) {
+        straddling = completion;
+      } else if (completion->size() == 0) {
+        straddling = partial;
+      } else {
+        RETURN_NOT_OK(ConcatenateBuffers({partial, completion}, pool_, 
&straddling));
+      }
       RETURN_NOT_OK(parser->Parse(straddling));
     }
 
-    RETURN_NOT_OK(parser->Parse(whole));
+    if (whole->size() != 0) {
+      RETURN_NOT_OK(parser->Parse(whole));
+    }
 
     std::shared_ptr<Array> parsed;
     RETURN_NOT_OK(parser->Finish(&parsed));
@@ -143,7 +159,7 @@ class TableReaderImpl : public TableReader {
   std::unique_ptr<Chunker> chunker_;
   std::shared_ptr<TaskGroup> task_group_;
   ReadaheadSpooler readahead_;
-  std::unique_ptr<ChunkedArrayBuilder> builder_;
+  std::shared_ptr<ChunkedArrayBuilder> builder_;
 };
 
 Status TableReader::Make(MemoryPool* pool, std::shared_ptr<io::InputStream> 
input,
@@ -174,7 +190,7 @@ Status ParseOne(ParseOptions options, 
std::shared_ptr<Buffer> json,
       options.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
           ? GetPromotionGraph()
           : nullptr;
-  std::unique_ptr<ChunkedArrayBuilder> builder;
+  std::shared_ptr<ChunkedArrayBuilder> builder;
   RETURN_NOT_OK(MakeChunkedArrayBuilder(internal::TaskGroup::MakeSerial(),
                                         default_memory_pool(), 
promotion_graph, type,
                                         &builder));
diff --git a/cpp/src/arrow/json/reader_test.cc 
b/cpp/src/arrow/json/reader_test.cc
index 016f49a..82f2c86 100644
--- a/cpp/src/arrow/json/reader_test.cc
+++ b/cpp/src/arrow/json/reader_test.cc
@@ -92,6 +92,22 @@ TEST_P(ReaderTest, Empty) {
   AssertTablesEqual(*expected_table, *table_);
 }
 
+TEST_P(ReaderTest, EmptyNoNewlineAtEnd) {
+  SetUpReader("{}\n{}");
+  ASSERT_OK(reader_->Read(&table_));
+
+  auto expected_table = Table::Make(schema({}), ArrayVector(), 2);
+  AssertTablesEqual(*expected_table, *table_);
+}
+
+TEST_P(ReaderTest, EmptyManyNewlines) {
+  SetUpReader("{}\n\r\n{}\n\r\n");
+  ASSERT_OK(reader_->Read(&table_));
+
+  auto expected_table = Table::Make(schema({}), ArrayVector(), 2);
+  AssertTablesEqual(*expected_table, *table_);
+}
+
 TEST_P(ReaderTest, Basics) {
   parse_options_.unexpected_field_behavior = 
UnexpectedFieldBehavior::InferType;
   auto src = scalars_only_src();
diff --git a/python/pyarrow/tests/test_json.py 
b/python/pyarrow/tests/test_json.py
index 2225978..e571894 100644
--- a/python/pyarrow/tests/test_json.py
+++ b/python/pyarrow/tests/test_json.py
@@ -16,14 +16,41 @@
 # under the License.
 
 import io
+import itertools
+import json
+import string
 import unittest
 
+import numpy as np
 import pytest
 
 import pyarrow as pa
 from pyarrow.json import read_json, ReadOptions, ParseOptions
 
 
+def generate_col_names():
+    # 'a', 'b'... 'z', then 'aa', 'ab'...
+    letters = string.ascii_lowercase
+    for letter in letters:
+        yield letter
+    for first in letter:
+        for second in letter:
+            yield first + second
+
+
+def make_random_json(num_cols=2, num_rows=10, linesep=u'\r\n'):
+    arr = np.random.RandomState(42).randint(0, 1000, size=(num_cols, num_rows))
+    col_names = list(itertools.islice(generate_col_names(), num_cols))
+    lines = []
+    for row in arr.T:
+        json_obj = {k: int(v) for (k, v) in zip(col_names, row)}
+        lines.append(json.dumps(json_obj))
+    data = linesep.join(lines).encode()
+    columns = [pa.array(col, type=pa.int64()) for col in arr]
+    expected = pa.Table.from_arrays(columns, col_names)
+    return data, expected
+
+
 def test_read_options():
     cls = ReadOptions
     opts = cls()
@@ -75,6 +102,37 @@ class BaseTestJSONRead:
         with pytest.raises(TypeError):
             self.read_json(sio)
 
+    def test_block_sizes(self):
+        rows = b'{"a": 1}\n{"a": 2}\n{"a": 3}'
+        read_options = ReadOptions()
+        parse_options = ParseOptions()
+
+        for data in [rows, rows + b'\n']:
+            for newlines_in_values in [False, True]:
+                parse_options.newlines_in_values = newlines_in_values
+                read_options.block_size = 4
+                with pytest.raises(ValueError,
+                                   match="try to increase block size"):
+                    self.read_bytes(data, read_options=read_options,
+                                    parse_options=parse_options)
+
+                # Validate reader behavior with various block sizes.
+                # There used to be bugs in this area.
+                for block_size in range(9, 20):
+                    read_options.block_size = block_size
+                    table = self.read_bytes(data, read_options=read_options,
+                                            parse_options=parse_options)
+                    assert table.to_pydict() == {'a': [1, 2, 3]}
+
+    def test_no_newline_at_end(self):
+        rows = b'{"a": 1,"b": 2, "c": 3}\n{"a": 4,"b": 5, "c": 6}'
+        table = self.read_bytes(rows)
+        assert table.to_pydict() == {
+            'a': [1, 4],
+            'b': [2, 5],
+            'c': [3, 6],
+            }
+
     def test_simple_ints(self):
         # Infer integer columns
         rows = b'{"a": 1,"b": 2, "c": 3}\n{"a": 4,"b": 5, "c": 6}\n'
@@ -126,6 +184,31 @@ class BaseTestJSONRead:
             'e': [None, True, False],
             }
 
+    def test_small_random_json(self):
+        data, expected = make_random_json(num_cols=2, num_rows=10)
+        table = self.read_bytes(data)
+        assert table.schema == expected.schema
+        assert table.equals(expected)
+        assert table.to_pydict() == expected.to_pydict()
+
+    def test_stress_block_sizes(self):
+        # Test a number of small block sizes to stress block stitching
+        data_base, expected = make_random_json(num_cols=2, num_rows=100)
+        read_options = ReadOptions()
+        parse_options = ParseOptions()
+
+        for data in [data_base, data_base.rstrip(b'\r\n')]:
+            for newlines_in_values in [False, True]:
+                parse_options.newlines_in_values = newlines_in_values
+                for block_size in [22, 23, 37]:
+                    read_options.block_size = block_size
+                    table = self.read_bytes(data, read_options=read_options,
+                                            parse_options=parse_options)
+                    assert table.schema == expected.schema
+                    if not table.equals(expected):
+                        # Better error output
+                        assert table.to_pydict() == expected.to_pydict()
+
 
 class TestSerialJSONRead(BaseTestJSONRead, unittest.TestCase):
 

Reply via email to