benibus commented on code in PR #14355: URL: https://github.com/apache/arrow/pull/14355#discussion_r1036428792
########## cpp/src/arrow/json/reader.cc: ########## @@ -42,132 +42,435 @@ namespace arrow { using std::string_view; using internal::checked_cast; +using internal::Executor; using internal::GetCpuThreadPool; using internal::TaskGroup; using internal::ThreadPool; namespace json { +namespace { + +struct ChunkedBlock { + std::shared_ptr<Buffer> partial; + std::shared_ptr<Buffer> completion; + std::shared_ptr<Buffer> whole; + int64_t index = -1; +}; + +struct DecodedBlock { + std::shared_ptr<RecordBatch> record_batch; + int64_t num_bytes = 0; +}; + +} // namespace +} // namespace json + +template <> +struct IterationTraits<json::ChunkedBlock> { + static json::ChunkedBlock End() { return json::ChunkedBlock{}; } + static bool IsEnd(const json::ChunkedBlock& val) { return val.index < 0; } +}; + +template <> +struct IterationTraits<json::DecodedBlock> { + static json::DecodedBlock End() { return json::DecodedBlock{}; } + static bool IsEnd(const json::DecodedBlock& val) { return !val.record_batch; } +}; + +namespace json { +namespace { + +// Holds related parameters for parsing and type conversion +class DecodeContext { + public: + explicit DecodeContext(MemoryPool* pool) + : DecodeContext(ParseOptions::Defaults(), pool) {} + explicit DecodeContext(ParseOptions options = ParseOptions::Defaults(), + MemoryPool* pool = default_memory_pool()) + : pool_(pool) { + SetParseOptions(std::move(options)); + } + + void SetParseOptions(ParseOptions options) { + parse_options_ = std::move(options); + if (parse_options_.explicit_schema) { + conversion_type_ = struct_(parse_options_.explicit_schema->fields()); + } else { + parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::InferType; + conversion_type_ = struct_({}); + } + promotion_graph_ = + parse_options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType + ? GetPromotionGraph() + : nullptr; + } + + void SetSchema(std::shared_ptr<Schema> explicit_schema, + UnexpectedFieldBehavior unexpected_field_behavior) { + parse_options_.explicit_schema = std::move(explicit_schema); + parse_options_.unexpected_field_behavior = unexpected_field_behavior; + SetParseOptions(std::move(parse_options_)); + } + void SetSchema(std::shared_ptr<Schema> explicit_schema) { + SetSchema(std::move(explicit_schema), parse_options_.unexpected_field_behavior); + } + // Set the schema but ensure unexpected fields won't be accepted + void SetStrictSchema(std::shared_ptr<Schema> explicit_schema) { + auto unexpected_field_behavior = parse_options_.unexpected_field_behavior; + if (unexpected_field_behavior == UnexpectedFieldBehavior::InferType) { + unexpected_field_behavior = UnexpectedFieldBehavior::Ignore; + } + SetSchema(std::move(explicit_schema), unexpected_field_behavior); + } + + [[nodiscard]] MemoryPool* pool() const { return pool_; } + [[nodiscard]] const ParseOptions& parse_options() const { return parse_options_; } + [[nodiscard]] const PromotionGraph* promotion_graph() const { return promotion_graph_; } + [[nodiscard]] const std::shared_ptr<DataType>& conversion_type() const { + return conversion_type_; + } + + private: + ParseOptions parse_options_; + std::shared_ptr<DataType> conversion_type_; + const PromotionGraph* promotion_graph_; + MemoryPool* pool_; +}; + +Result<std::shared_ptr<Array>> ParseBlock(const ChunkedBlock& block, + const ParseOptions& parse_options, + MemoryPool* pool, int64_t* out_size = nullptr) { + std::unique_ptr<BlockParser> parser; + RETURN_NOT_OK(BlockParser::Make(pool, parse_options, &parser)); + + int64_t size = block.partial->size() + block.completion->size() + block.whole->size(); + RETURN_NOT_OK(parser->ReserveScalarStorage(size)); + + if (block.partial->size() || block.completion->size()) { + std::shared_ptr<Buffer> straddling; + if (!block.completion->size()) { + straddling = block.partial; + } else if (!block.partial->size()) { + straddling = block.completion; + } else { + ARROW_ASSIGN_OR_RAISE(straddling, + ConcatenateBuffers({block.partial, block.completion}, pool)); + } + RETURN_NOT_OK(parser->Parse(straddling)); + } + if (block.whole->size()) { + RETURN_NOT_OK(parser->Parse(block.whole)); + } + + std::shared_ptr<Array> parsed; + RETURN_NOT_OK(parser->Finish(&parsed)); + + if (out_size) *out_size = size; + + return parsed; +} + +class ChunkingTransformer { + public: + explicit ChunkingTransformer(std::unique_ptr<Chunker> chunker) + : chunker_(std::move(chunker)) {} + + template <typename... Args> + static Transformer<std::shared_ptr<Buffer>, ChunkedBlock> Make(Args&&... args) { + return [self = std::make_shared<ChunkingTransformer>(std::forward<Args>(args)...)]( + std::shared_ptr<Buffer> buffer) { return (*self)(std::move(buffer)); }; + } + + private: + Result<TransformFlow<ChunkedBlock>> operator()(std::shared_ptr<Buffer> next_buffer) { + if (!buffer_) { + if (ARROW_PREDICT_TRUE(!next_buffer)) { + partial_ = nullptr; Review Comment: Ah... yep, you're right - just an oversight on my part. In that case, the check would be useful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org