This is an automated email from the ASF dual-hosted git repository. kszucs pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
commit b9c154a8bad81217621bb638a72f6f454ad18806 Author: Antoine Pitrou <anto...@python.org> AuthorDate: Thu Oct 3 16:57:05 2019 +0200 ARROW-6762: [C++] Support reading JSON files with no newline at end Also fix some lifetime issues in parallel mode, and add tests. Closes #5564 from pitrou/ARROW-6762-json-parser-trailing-newline and squashes the following commits: 562783d33 <Antoine Pitrou> ARROW-6762: Support reading JSON files with no newline at end Authored-by: Antoine Pitrou <anto...@python.org> Signed-off-by: Antoine Pitrou <anto...@python.org> --- cpp/src/arrow/json/chunked_builder.cc | 55 ++++++++++++---------- cpp/src/arrow/json/chunked_builder.h | 2 +- cpp/src/arrow/json/chunker.cc | 75 +++++++++++++++++++++++------ cpp/src/arrow/json/chunker.h | 33 +++++++++++-- cpp/src/arrow/json/chunker_test.cc | 88 +++++++++++++++++++++++++++++++---- cpp/src/arrow/json/options.h | 3 +- cpp/src/arrow/json/reader.cc | 64 +++++++++++++++---------- cpp/src/arrow/json/reader_test.cc | 16 +++++++ python/pyarrow/tests/test_json.py | 83 +++++++++++++++++++++++++++++++++ 9 files changed, 338 insertions(+), 81 deletions(-) diff --git a/cpp/src/arrow/json/chunked_builder.cc b/cpp/src/arrow/json/chunked_builder.cc index f7f58e5..8c92061 100644 --- a/cpp/src/arrow/json/chunked_builder.cc +++ b/cpp/src/arrow/json/chunked_builder.cc @@ -26,7 +26,6 @@ #include "arrow/json/converter.h" #include "arrow/table.h" #include "arrow/util/logging.h" -#include "arrow/util/stl.h" #include "arrow/util/task_group.h" namespace arrow { @@ -60,7 +59,9 @@ class NonNestedChunkedArrayBuilder : public ChunkedArrayBuilder { std::shared_ptr<Converter> converter_; }; -class TypedChunkedArrayBuilder : public NonNestedChunkedArrayBuilder { +class TypedChunkedArrayBuilder + : public NonNestedChunkedArrayBuilder, + public std::enable_shared_from_this<TypedChunkedArrayBuilder> { public: using NonNestedChunkedArrayBuilder::NonNestedChunkedArrayBuilder; @@ -72,17 +73,21 @@ class TypedChunkedArrayBuilder : public NonNestedChunkedArrayBuilder { } lock.unlock(); - task_group_->Append([this, block_index, unconverted] { + auto self = shared_from_this(); + + task_group_->Append([self, block_index, unconverted] { std::shared_ptr<Array> converted; - RETURN_NOT_OK(converter_->Convert(unconverted, &converted)); - std::unique_lock<std::mutex> lock(mutex_); - chunks_[block_index] = std::move(converted); + RETURN_NOT_OK(self->converter_->Convert(unconverted, &converted)); + std::unique_lock<std::mutex> lock(self->mutex_); + self->chunks_[block_index] = std::move(converted); return Status::OK(); }); } }; -class InferringChunkedArrayBuilder : public NonNestedChunkedArrayBuilder { +class InferringChunkedArrayBuilder + : public NonNestedChunkedArrayBuilder, + public std::enable_shared_from_this<InferringChunkedArrayBuilder> { public: InferringChunkedArrayBuilder(const std::shared_ptr<TaskGroup>& task_group, const PromotionGraph* promotion_graph, @@ -105,8 +110,9 @@ class InferringChunkedArrayBuilder : public NonNestedChunkedArrayBuilder { } void ScheduleConvertChunk(int64_t block_index) { - task_group_->Append([this, block_index] { - return TryConvertChunk(static_cast<size_t>(block_index)); + auto self = shared_from_this(); + task_group_->Append([self, block_index] { + return self->TryConvertChunk(static_cast<size_t>(block_index)); }); } @@ -173,7 +179,7 @@ class InferringChunkedArrayBuilder : public NonNestedChunkedArrayBuilder { class ChunkedListArrayBuilder : public ChunkedArrayBuilder { public: ChunkedListArrayBuilder(const std::shared_ptr<TaskGroup>& task_group, MemoryPool* pool, - std::unique_ptr<ChunkedArrayBuilder> value_builder, + std::shared_ptr<ChunkedArrayBuilder> value_builder, const std::shared_ptr<Field>& value_field) : ChunkedArrayBuilder(task_group), pool_(pool), @@ -250,7 +256,7 @@ class ChunkedListArrayBuilder : public ChunkedArrayBuilder { std::mutex mutex_; MemoryPool* pool_; - std::unique_ptr<ChunkedArrayBuilder> value_builder_; + std::shared_ptr<ChunkedArrayBuilder> value_builder_; BufferVector offset_chunks_, null_bitmap_chunks_; std::shared_ptr<Field> value_field_; }; @@ -260,7 +266,7 @@ class ChunkedStructArrayBuilder : public ChunkedArrayBuilder { ChunkedStructArrayBuilder( const std::shared_ptr<TaskGroup>& task_group, MemoryPool* pool, const PromotionGraph* promotion_graph, - std::vector<std::pair<std::string, std::unique_ptr<ChunkedArrayBuilder>>> + std::vector<std::pair<std::string, std::shared_ptr<ChunkedArrayBuilder>>> name_builders) : ChunkedArrayBuilder(task_group), pool_(pool), promotion_graph_(promotion_graph) { for (auto&& name_builder : name_builders) { @@ -390,7 +396,7 @@ class ChunkedStructArrayBuilder : public ChunkedArrayBuilder { auto new_index = static_cast<int>(name_to_index_.size()); it = name_to_index_.emplace(fields[i]->name(), new_index).first; - std::unique_ptr<ChunkedArrayBuilder> child_builder; + std::shared_ptr<ChunkedArrayBuilder> child_builder; RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group_, pool_, promotion_graph_, type, &child_builder)); child_builders_.emplace_back(std::move(child_builder)); @@ -411,7 +417,7 @@ class ChunkedStructArrayBuilder : public ChunkedArrayBuilder { MemoryPool* pool_; const PromotionGraph* promotion_graph_; std::unordered_map<std::string, int> name_to_index_; - std::vector<std::unique_ptr<ChunkedArrayBuilder>> child_builders_; + std::vector<std::shared_ptr<ChunkedArrayBuilder>> child_builders_; std::vector<std::vector<bool>> child_absent_; BufferVector null_bitmap_chunks_; std::vector<int64_t> chunk_lengths_; @@ -420,37 +426,36 @@ class ChunkedStructArrayBuilder : public ChunkedArrayBuilder { Status MakeChunkedArrayBuilder(const std::shared_ptr<TaskGroup>& task_group, MemoryPool* pool, const PromotionGraph* promotion_graph, const std::shared_ptr<DataType>& type, - std::unique_ptr<ChunkedArrayBuilder>* out) { + std::shared_ptr<ChunkedArrayBuilder>* out) { if (type->id() == Type::STRUCT) { - std::vector<std::pair<std::string, std::unique_ptr<ChunkedArrayBuilder>>> + std::vector<std::pair<std::string, std::shared_ptr<ChunkedArrayBuilder>>> child_builders; for (const auto& f : type->children()) { - std::unique_ptr<ChunkedArrayBuilder> child_builder; + std::shared_ptr<ChunkedArrayBuilder> child_builder; RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group, pool, promotion_graph, f->type(), &child_builder)); child_builders.emplace_back(f->name(), std::move(child_builder)); } - *out = internal::make_unique<ChunkedStructArrayBuilder>( - task_group, pool, promotion_graph, std::move(child_builders)); + *out = std::make_shared<ChunkedStructArrayBuilder>(task_group, pool, promotion_graph, + std::move(child_builders)); return Status::OK(); } if (type->id() == Type::LIST) { auto list_type = static_cast<const ListType*>(type.get()); - std::unique_ptr<ChunkedArrayBuilder> value_builder; + std::shared_ptr<ChunkedArrayBuilder> value_builder; RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group, pool, promotion_graph, list_type->value_type(), &value_builder)); - *out = internal::make_unique<ChunkedListArrayBuilder>( + *out = std::make_shared<ChunkedListArrayBuilder>( task_group, pool, std::move(value_builder), list_type->value_field()); return Status::OK(); } std::shared_ptr<Converter> converter; RETURN_NOT_OK(MakeConverter(type, pool, &converter)); if (promotion_graph) { - *out = internal::make_unique<InferringChunkedArrayBuilder>( - task_group, promotion_graph, std::move(converter)); + *out = std::make_shared<InferringChunkedArrayBuilder>(task_group, promotion_graph, + std::move(converter)); } else { - *out = - internal::make_unique<TypedChunkedArrayBuilder>(task_group, std::move(converter)); + *out = std::make_shared<TypedChunkedArrayBuilder>(task_group, std::move(converter)); } return Status::OK(); } diff --git a/cpp/src/arrow/json/chunked_builder.h b/cpp/src/arrow/json/chunked_builder.h index b2cfbef..f872c72 100644 --- a/cpp/src/arrow/json/chunked_builder.h +++ b/cpp/src/arrow/json/chunked_builder.h @@ -70,7 +70,7 @@ class ARROW_EXPORT ChunkedArrayBuilder { ARROW_EXPORT Status MakeChunkedArrayBuilder( const std::shared_ptr<internal::TaskGroup>& task_group, MemoryPool* pool, const PromotionGraph* promotion_graph, const std::shared_ptr<DataType>& type, - std::unique_ptr<ChunkedArrayBuilder>* out); + std::shared_ptr<ChunkedArrayBuilder>* out); } // namespace json } // namespace arrow diff --git a/cpp/src/arrow/json/chunker.cc b/cpp/src/arrow/json/chunker.cc index 5ec45d7..cd21ca1 100644 --- a/cpp/src/arrow/json/chunker.cc +++ b/cpp/src/arrow/json/chunker.cc @@ -38,7 +38,8 @@ using internal::make_unique; using util::string_view; static Status StraddlingTooLarge() { - return Status::Invalid("straddling object straddles two block boundaries"); + return Status::Invalid( + "straddling object straddles two block boundaries (try to increase block size?)"); } static size_t ConsumeWhitespace(std::shared_ptr<Buffer>* buf) { @@ -58,9 +59,11 @@ static size_t ConsumeWhitespace(std::shared_ptr<Buffer>* buf) { #endif } +// A chunker implementation that assumes JSON objects don't contain raw newlines. +// This allows fast chunk delimitation using a simple newline search. class NewlinesStrictlyDelimitChunker : public Chunker { public: - Status Process(const std::shared_ptr<Buffer>& block, std::shared_ptr<Buffer>* whole, + Status Process(std::shared_ptr<Buffer> block, std::shared_ptr<Buffer>* whole, std::shared_ptr<Buffer>* partial) override { auto last_newline = string_view(*block).find_last_of("\n\r"); if (last_newline == string_view::npos) { @@ -74,11 +77,24 @@ class NewlinesStrictlyDelimitChunker : public Chunker { return Status::OK(); } - Status ProcessWithPartial(const std::shared_ptr<Buffer>& partial_original, - const std::shared_ptr<Buffer>& block, + Status ProcessWithPartial(std::shared_ptr<Buffer> partial_original, + std::shared_ptr<Buffer> block, std::shared_ptr<Buffer>* completion, std::shared_ptr<Buffer>* rest) override { - auto partial = partial_original; + return DoProcessWithPartial(partial_original, block, false, completion, rest); + } + + Status ProcessFinal(std::shared_ptr<Buffer> partial_original, + std::shared_ptr<Buffer> block, std::shared_ptr<Buffer>* completion, + std::shared_ptr<Buffer>* rest) override { + return DoProcessWithPartial(partial_original, block, true, completion, rest); + } + + protected: + Status DoProcessWithPartial(std::shared_ptr<Buffer> partial, + std::shared_ptr<Buffer> block, bool is_final, + std::shared_ptr<Buffer>* completion, + std::shared_ptr<Buffer>* rest) { ConsumeWhitespace(&partial); if (partial->size() == 0) { // if partial is empty, don't bother looking for completion @@ -88,9 +104,16 @@ class NewlinesStrictlyDelimitChunker : public Chunker { } auto first_newline = string_view(*block).find_first_of("\n\r"); if (first_newline == string_view::npos) { - // no newlines in this block; straddling object straddles *two* block boundaries. - // retry with larger buffer - return StraddlingTooLarge(); + // no newlines in this block + if (is_final) { + // => it's entirely a completion of partial + *completion = block; + *rest = SliceBuffer(block, 0, 0); + return Status::OK(); + } else { + // => the current object is too large for block size + return StraddlingTooLarge(); + } } *completion = SliceBuffer(block, 0, first_newline + 1); *rest = SliceBuffer(block, first_newline + 1); @@ -164,9 +187,11 @@ static size_t ConsumeWholeObject(Stream&& stream) { } } +// A chunker implementation that assumes JSON objects can contain raw newlines, +// and uses actual JSON parsing to delimit chunks. class ParsingChunker : public Chunker { public: - Status Process(const std::shared_ptr<Buffer>& block, std::shared_ptr<Buffer>* whole, + Status Process(std::shared_ptr<Buffer> block, std::shared_ptr<Buffer>* whole, std::shared_ptr<Buffer>* partial) override { if (block->size() == 0) { *whole = SliceBuffer(block, 0, 0); @@ -194,11 +219,24 @@ class ParsingChunker : public Chunker { return Status::OK(); } - Status ProcessWithPartial(const std::shared_ptr<Buffer>& partial_original, - const std::shared_ptr<Buffer>& block, + Status ProcessWithPartial(std::shared_ptr<Buffer> partial_original, + std::shared_ptr<Buffer> block, std::shared_ptr<Buffer>* completion, std::shared_ptr<Buffer>* rest) override { - auto partial = partial_original; + return DoProcessWithPartial(partial_original, block, false, completion, rest); + } + + Status ProcessFinal(std::shared_ptr<Buffer> partial_original, + std::shared_ptr<Buffer> block, std::shared_ptr<Buffer>* completion, + std::shared_ptr<Buffer>* rest) override { + return DoProcessWithPartial(partial_original, block, true, completion, rest); + } + + protected: + Status DoProcessWithPartial(std::shared_ptr<Buffer> partial, + std::shared_ptr<Buffer> block, bool is_final, + std::shared_ptr<Buffer>* completion, + std::shared_ptr<Buffer>* rest) { ConsumeWhitespace(&partial); if (partial->size() == 0) { // if partial is empty, don't bother looking for completion @@ -208,9 +246,16 @@ class ParsingChunker : public Chunker { } auto length = ConsumeWholeObject(MultiStringStream({partial, block})); if (length == string_view::npos) { - // straddling object straddles *two* block boundaries. - // retry with larger buffer - return StraddlingTooLarge(); + // no newlines in this block + if (is_final) { + // => it's entirely a completion of partial + *completion = block; + *rest = SliceBuffer(block, 0, 0); + return Status::OK(); + } else { + // => the current object is too large for block size + return StraddlingTooLarge(); + } } auto completion_length = length - partial->size(); *completion = SliceBuffer(block, 0, completion_length); diff --git a/cpp/src/arrow/json/chunker.h b/cpp/src/arrow/json/chunker.h index 0f94d81..7df1b60 100644 --- a/cpp/src/arrow/json/chunker.h +++ b/cpp/src/arrow/json/chunker.h @@ -41,23 +41,48 @@ class ARROW_EXPORT Chunker { virtual ~Chunker() = default; /// \brief Carve up a chunk in a block of data to contain only whole objects + /// + /// Post-conditions: + /// - block == whole + partial + /// - `whole` is a valid block of JSON data + /// - `partial` doesn't contain an entire JSON object + /// /// \param[in] block json data to be chunked /// \param[out] whole subrange of block containing whole json objects /// \param[out] partial subrange of block a partial json object - virtual Status Process(const std::shared_ptr<Buffer>& block, - std::shared_ptr<Buffer>* whole, + virtual Status Process(std::shared_ptr<Buffer> block, std::shared_ptr<Buffer>* whole, std::shared_ptr<Buffer>* partial) = 0; /// \brief Carve the completion of a partial object out of a block + /// + /// Post-conditions: + /// - block == completion + rest + /// - `partial + completion` is a valid block of JSON data + /// - `completion` doesn't contain an entire JSON object + /// /// \param[in] partial incomplete json object /// \param[in] block json data /// \param[out] completion subrange of block containing the completion of partial /// \param[out] rest subrange of block containing what completion does not cover - virtual Status ProcessWithPartial(const std::shared_ptr<Buffer>& partial, - const std::shared_ptr<Buffer>& block, + virtual Status ProcessWithPartial(std::shared_ptr<Buffer> partial, + std::shared_ptr<Buffer> block, std::shared_ptr<Buffer>* completion, std::shared_ptr<Buffer>* rest) = 0; + /// \brief Like ProcessWithPartial, but for the lastblock of a file + /// + /// This method allows for a final JSON object without a trailing newline + /// (ProcessWithPartial would return an error in that case). + /// + /// Post-conditions: + /// - block == completion + rest + /// - `partial + completion` is a valid block of JSON data + /// - `completion` doesn't contain an entire JSON object + virtual Status ProcessFinal(std::shared_ptr<Buffer> partial, + std::shared_ptr<Buffer> block, + std::shared_ptr<Buffer>* completion, + std::shared_ptr<Buffer>* rest) = 0; + static std::unique_ptr<Chunker> Make(const ParseOptions& options); protected: diff --git a/cpp/src/arrow/json/chunker_test.cc b/cpp/src/arrow/json/chunker_test.cc index fbe5c00..70bca68 100644 --- a/cpp/src/arrow/json/chunker_test.cc +++ b/cpp/src/arrow/json/chunker_test.cc @@ -27,6 +27,7 @@ #include "arrow/json/chunker.h" #include "arrow/json/test_common.h" #include "arrow/testing/gtest_util.h" +#include "arrow/util/logging.h" #include "arrow/util/string_view.h" namespace arrow { @@ -39,7 +40,8 @@ namespace json { using util::string_view; template <typename Lines> -static std::shared_ptr<Buffer> join(Lines&& lines, std::string delimiter) { +static std::shared_ptr<Buffer> join(Lines&& lines, std::string delimiter, + bool delimiter_at_end = true) { std::shared_ptr<Buffer> joined; BufferVector line_buffers; auto delimiter_buffer = std::make_shared<Buffer>(delimiter); @@ -47,6 +49,9 @@ static std::shared_ptr<Buffer> join(Lines&& lines, std::string delimiter) { line_buffers.push_back(std::make_shared<Buffer>(line)); line_buffers.push_back(delimiter_buffer); } + if (!delimiter_at_end) { + line_buffers.pop_back(); + } ABORT_NOT_OK(ConcatenateBuffers(line_buffers, default_memory_pool(), &joined)); return joined; } @@ -75,15 +80,23 @@ static std::size_t ConsumeWholeObject(std::shared_ptr<Buffer>* buf) { return length; } +void AssertOnlyWholeObjects(Chunker& chunker, std::shared_ptr<Buffer> whole, int* count) { + *count = 0; + while (whole && !WhitespaceOnly(whole)) { + auto buf = whole; + if (ConsumeWholeObject(&whole) == string_view::npos) { + FAIL() << "Not a whole JSON object: '" << buf->ToString() << "'"; + } + ++*count; + } +} + void AssertWholeObjects(Chunker& chunker, const std::shared_ptr<Buffer>& block, int expected_count) { std::shared_ptr<Buffer> whole, partial; ASSERT_OK(chunker.Process(block, &whole, &partial)); - int count = 0; - while (whole && !WhitespaceOnly(whole)) { - if (ConsumeWholeObject(&whole) == string_view::npos) FAIL(); - ++count; - } + int count; + AssertOnlyWholeObjects(chunker, whole, &count); ASSERT_EQ(count, expected_count); } @@ -103,6 +116,39 @@ void AssertChunking(Chunker& chunker, std::shared_ptr<Buffer> buf, int total_cou } } +void AssertChunkingBlockSize(Chunker& chunker, std::shared_ptr<Buffer> buf, + int64_t block_size, int expected_count) { + std::shared_ptr<Buffer> partial = Buffer::FromString({}); + int64_t pos = 0; + int total_count = 0; + while (pos < buf->size()) { + int count; + auto block = SliceBuffer(buf, pos, std::min(block_size, buf->size() - pos)); + pos += block->size(); + std::shared_ptr<Buffer> completion, whole, next_partial; + + if (pos == buf->size()) { + // Last block + ASSERT_OK(chunker.ProcessFinal(partial, block, &completion, &whole)); + } else { + std::shared_ptr<Buffer> starts_with_whole; + ASSERT_OK( + chunker.ProcessWithPartial(partial, block, &completion, &starts_with_whole)); + ASSERT_OK(chunker.Process(starts_with_whole, &whole, &next_partial)); + } + // partial + completion should be a valid JSON block + ASSERT_OK(ConcatenateBuffers({partial, completion}, default_memory_pool(), &partial)); + AssertOnlyWholeObjects(chunker, partial, &count); + total_count += count; + // whole should be a valid JSON block + AssertOnlyWholeObjects(chunker, whole, &count); + total_count += count; + partial = next_partial; + } + ASSERT_EQ(pos, buf->size()); + ASSERT_EQ(total_count, expected_count); +} + void AssertStraddledChunking(Chunker& chunker, const std::shared_ptr<Buffer>& buf) { auto first_half = SliceBuffer(buf, 0, buf->size() / 2); auto second_half = SliceBuffer(buf, buf->size() / 2); @@ -143,11 +189,18 @@ INSTANTIATE_TEST_CASE_P(NoNewlineChunkerTest, BaseChunkerTest, ::testing::Values INSTANTIATE_TEST_CASE_P(ChunkerTest, BaseChunkerTest, ::testing::Values(true)); -constexpr auto object_count = 3; +constexpr int object_count = 4; +constexpr int min_block_size = 28; + static const std::vector<std::string>& lines() { - static const std::vector<std::string> l = {R"({"0":"ab","1":"c","2":""})", - R"({"0":"def","1":"","2":"gh"})", - R"({"0":"","1":"ij","2":"kl"})"}; + // clang-format off + static const std::vector<std::string> l = { + R"({"0":"ab","1":"c","2":""})", + R"({"0":"def","1":"","2":"gh"})", + R"({"0":null})", + R"({"0":"","1":"ij","2":"kl"})" + }; + // clang-format on return l; } @@ -155,6 +208,21 @@ TEST_P(BaseChunkerTest, Basics) { AssertChunking(*chunker_, join(lines(), "\n"), object_count); } +TEST_P(BaseChunkerTest, BlockSizes) { + auto check_block_sizes = [&](std::shared_ptr<Buffer> data) { + for (int64_t block_size = min_block_size; block_size < min_block_size + 30; + ++block_size) { + AssertChunkingBlockSize(*chunker_, data, block_size, object_count); + } + }; + + check_block_sizes(join(lines(), "\n")); + check_block_sizes(join(lines(), "\r\n")); + // Without ending newline + check_block_sizes(join(lines(), "\n", false)); + check_block_sizes(join(lines(), "\r\n", false)); +} + TEST_P(BaseChunkerTest, Empty) { auto empty = std::make_shared<Buffer>("\n"); AssertChunking(*chunker_, empty, 0); diff --git a/cpp/src/arrow/json/options.h b/cpp/src/arrow/json/options.h index f075041..03d46ad 100644 --- a/cpp/src/arrow/json/options.h +++ b/cpp/src/arrow/json/options.h @@ -46,8 +46,7 @@ struct ARROW_EXPORT ParseOptions { /// Whether objects may be printed across multiple lines (for example pretty-printed) /// - /// If true, parsing may be slower - /// If false, input must end with an empty line + /// If true, parsing may be slower. bool newlines_in_values = false; /// How JSON fields outside of explicit_schema (if given) are treated diff --git a/cpp/src/arrow/json/reader.cc b/cpp/src/arrow/json/reader.cc index 45f3e2e..459c107 100644 --- a/cpp/src/arrow/json/reader.cc +++ b/cpp/src/arrow/json/reader.cc @@ -47,7 +47,8 @@ using io::internal::ReadaheadSpooler; namespace json { -class TableReaderImpl : public TableReader { +class TableReaderImpl : public TableReader, + public std::enable_shared_from_this<TableReaderImpl> { public: TableReaderImpl(MemoryPool* pool, std::shared_ptr<io::InputStream> input, const ReadOptions& read_options, const ParseOptions& parse_options, @@ -69,30 +70,37 @@ class TableReaderImpl : public TableReader { return Status::Invalid("Empty JSON file"); } + auto self = shared_from_this(); auto empty = std::make_shared<Buffer>(""); int64_t block_index = 0; - for (std::shared_ptr<Buffer> partial = empty, completion = empty, - starts_with_whole = rh.buffer; - rh.buffer; ++block_index) { - // get completion of partial from previous block - RETURN_NOT_OK(chunker_->ProcessWithPartial(partial, rh.buffer, &completion, - &starts_with_whole)); - - // get all whole objects entirely inside the current buffer - std::shared_ptr<Buffer> whole, next_partial; - RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, &next_partial)); - - // launch parse task - task_group_->Append([this, partial, completion, whole, block_index] { - return ParseAndInsert(partial, completion, whole, block_index); - }); + std::shared_ptr<Buffer> partial = empty; + + while (rh.buffer) { + std::shared_ptr<Buffer> block, whole, completion, next_partial; + block = rh.buffer; RETURN_NOT_OK(readahead_.Read(&rh)); - if (rh.buffer == nullptr) { - DCHECK_EQ(string_view(*next_partial).find_first_not_of(" \t\n\r"), - string_view::npos); + + if (!rh.buffer) { + // End of file reached => compute completion from penultimate block + RETURN_NOT_OK(chunker_->ProcessFinal(partial, block, &completion, &whole)); + } else { + std::shared_ptr<Buffer> starts_with_whole; + // Get completion of partial from previous block. + RETURN_NOT_OK(chunker_->ProcessWithPartial(partial, block, &completion, + &starts_with_whole)); + + // Get all whole objects entirely inside the current buffer + RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, &next_partial)); } + + // Launch parse task + task_group_->Append([self, partial, completion, whole, block_index] { + return self->ParseAndInsert(partial, completion, whole, block_index); + }); + block_index++; + partial = next_partial; } @@ -123,13 +131,21 @@ class TableReaderImpl : public TableReader { RETURN_NOT_OK(parser->ReserveScalarStorage(partial->size() + completion->size() + whole->size())); - if (completion->size() != 0) { + if (partial->size() != 0 || completion->size() != 0) { std::shared_ptr<Buffer> straddling; - RETURN_NOT_OK(ConcatenateBuffers({partial, completion}, pool_, &straddling)); + if (partial->size() == 0) { + straddling = completion; + } else if (completion->size() == 0) { + straddling = partial; + } else { + RETURN_NOT_OK(ConcatenateBuffers({partial, completion}, pool_, &straddling)); + } RETURN_NOT_OK(parser->Parse(straddling)); } - RETURN_NOT_OK(parser->Parse(whole)); + if (whole->size() != 0) { + RETURN_NOT_OK(parser->Parse(whole)); + } std::shared_ptr<Array> parsed; RETURN_NOT_OK(parser->Finish(&parsed)); @@ -143,7 +159,7 @@ class TableReaderImpl : public TableReader { std::unique_ptr<Chunker> chunker_; std::shared_ptr<TaskGroup> task_group_; ReadaheadSpooler readahead_; - std::unique_ptr<ChunkedArrayBuilder> builder_; + std::shared_ptr<ChunkedArrayBuilder> builder_; }; Status TableReader::Make(MemoryPool* pool, std::shared_ptr<io::InputStream> input, @@ -174,7 +190,7 @@ Status ParseOne(ParseOptions options, std::shared_ptr<Buffer> json, options.unexpected_field_behavior == UnexpectedFieldBehavior::InferType ? GetPromotionGraph() : nullptr; - std::unique_ptr<ChunkedArrayBuilder> builder; + std::shared_ptr<ChunkedArrayBuilder> builder; RETURN_NOT_OK(MakeChunkedArrayBuilder(internal::TaskGroup::MakeSerial(), default_memory_pool(), promotion_graph, type, &builder)); diff --git a/cpp/src/arrow/json/reader_test.cc b/cpp/src/arrow/json/reader_test.cc index 016f49a..82f2c86 100644 --- a/cpp/src/arrow/json/reader_test.cc +++ b/cpp/src/arrow/json/reader_test.cc @@ -92,6 +92,22 @@ TEST_P(ReaderTest, Empty) { AssertTablesEqual(*expected_table, *table_); } +TEST_P(ReaderTest, EmptyNoNewlineAtEnd) { + SetUpReader("{}\n{}"); + ASSERT_OK(reader_->Read(&table_)); + + auto expected_table = Table::Make(schema({}), ArrayVector(), 2); + AssertTablesEqual(*expected_table, *table_); +} + +TEST_P(ReaderTest, EmptyManyNewlines) { + SetUpReader("{}\n\r\n{}\n\r\n"); + ASSERT_OK(reader_->Read(&table_)); + + auto expected_table = Table::Make(schema({}), ArrayVector(), 2); + AssertTablesEqual(*expected_table, *table_); +} + TEST_P(ReaderTest, Basics) { parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::InferType; auto src = scalars_only_src(); diff --git a/python/pyarrow/tests/test_json.py b/python/pyarrow/tests/test_json.py index 2225978..e571894 100644 --- a/python/pyarrow/tests/test_json.py +++ b/python/pyarrow/tests/test_json.py @@ -16,14 +16,41 @@ # under the License. import io +import itertools +import json +import string import unittest +import numpy as np import pytest import pyarrow as pa from pyarrow.json import read_json, ReadOptions, ParseOptions +def generate_col_names(): + # 'a', 'b'... 'z', then 'aa', 'ab'... + letters = string.ascii_lowercase + for letter in letters: + yield letter + for first in letter: + for second in letter: + yield first + second + + +def make_random_json(num_cols=2, num_rows=10, linesep=u'\r\n'): + arr = np.random.RandomState(42).randint(0, 1000, size=(num_cols, num_rows)) + col_names = list(itertools.islice(generate_col_names(), num_cols)) + lines = [] + for row in arr.T: + json_obj = {k: int(v) for (k, v) in zip(col_names, row)} + lines.append(json.dumps(json_obj)) + data = linesep.join(lines).encode() + columns = [pa.array(col, type=pa.int64()) for col in arr] + expected = pa.Table.from_arrays(columns, col_names) + return data, expected + + def test_read_options(): cls = ReadOptions opts = cls() @@ -75,6 +102,37 @@ class BaseTestJSONRead: with pytest.raises(TypeError): self.read_json(sio) + def test_block_sizes(self): + rows = b'{"a": 1}\n{"a": 2}\n{"a": 3}' + read_options = ReadOptions() + parse_options = ParseOptions() + + for data in [rows, rows + b'\n']: + for newlines_in_values in [False, True]: + parse_options.newlines_in_values = newlines_in_values + read_options.block_size = 4 + with pytest.raises(ValueError, + match="try to increase block size"): + self.read_bytes(data, read_options=read_options, + parse_options=parse_options) + + # Validate reader behavior with various block sizes. + # There used to be bugs in this area. + for block_size in range(9, 20): + read_options.block_size = block_size + table = self.read_bytes(data, read_options=read_options, + parse_options=parse_options) + assert table.to_pydict() == {'a': [1, 2, 3]} + + def test_no_newline_at_end(self): + rows = b'{"a": 1,"b": 2, "c": 3}\n{"a": 4,"b": 5, "c": 6}' + table = self.read_bytes(rows) + assert table.to_pydict() == { + 'a': [1, 4], + 'b': [2, 5], + 'c': [3, 6], + } + def test_simple_ints(self): # Infer integer columns rows = b'{"a": 1,"b": 2, "c": 3}\n{"a": 4,"b": 5, "c": 6}\n' @@ -126,6 +184,31 @@ class BaseTestJSONRead: 'e': [None, True, False], } + def test_small_random_json(self): + data, expected = make_random_json(num_cols=2, num_rows=10) + table = self.read_bytes(data) + assert table.schema == expected.schema + assert table.equals(expected) + assert table.to_pydict() == expected.to_pydict() + + def test_stress_block_sizes(self): + # Test a number of small block sizes to stress block stitching + data_base, expected = make_random_json(num_cols=2, num_rows=100) + read_options = ReadOptions() + parse_options = ParseOptions() + + for data in [data_base, data_base.rstrip(b'\r\n')]: + for newlines_in_values in [False, True]: + parse_options.newlines_in_values = newlines_in_values + for block_size in [22, 23, 37]: + read_options.block_size = block_size + table = self.read_bytes(data, read_options=read_options, + parse_options=parse_options) + assert table.schema == expected.schema + if not table.equals(expected): + # Better error output + assert table.to_pydict() == expected.to_pydict() + class TestSerialJSONRead(BaseTestJSONRead, unittest.TestCase):