benibus commented on code in PR #14355:
URL: https://github.com/apache/arrow/pull/14355#discussion_r1036428792


##########
cpp/src/arrow/json/reader.cc:
##########
@@ -42,132 +42,435 @@ namespace arrow {
 using std::string_view;
 
 using internal::checked_cast;
+using internal::Executor;
 using internal::GetCpuThreadPool;
 using internal::TaskGroup;
 using internal::ThreadPool;
 
 namespace json {
+namespace {
+
+struct ChunkedBlock {
+  std::shared_ptr<Buffer> partial;
+  std::shared_ptr<Buffer> completion;
+  std::shared_ptr<Buffer> whole;
+  int64_t index = -1;
+};
+
+struct DecodedBlock {
+  std::shared_ptr<RecordBatch> record_batch;
+  int64_t num_bytes = 0;
+};
+
+}  // namespace
+}  // namespace json
+
+template <>
+struct IterationTraits<json::ChunkedBlock> {
+  static json::ChunkedBlock End() { return json::ChunkedBlock{}; }
+  static bool IsEnd(const json::ChunkedBlock& val) { return val.index < 0; }
+};
+
+template <>
+struct IterationTraits<json::DecodedBlock> {
+  static json::DecodedBlock End() { return json::DecodedBlock{}; }
+  static bool IsEnd(const json::DecodedBlock& val) { return !val.record_batch; 
}
+};
+
+namespace json {
+namespace {
+
+// Holds related parameters for parsing and type conversion
+class DecodeContext {
+ public:
+  explicit DecodeContext(MemoryPool* pool)
+      : DecodeContext(ParseOptions::Defaults(), pool) {}
+  explicit DecodeContext(ParseOptions options = ParseOptions::Defaults(),
+                         MemoryPool* pool = default_memory_pool())
+      : pool_(pool) {
+    SetParseOptions(std::move(options));
+  }
+
+  void SetParseOptions(ParseOptions options) {
+    parse_options_ = std::move(options);
+    if (parse_options_.explicit_schema) {
+      conversion_type_ = struct_(parse_options_.explicit_schema->fields());
+    } else {
+      parse_options_.unexpected_field_behavior = 
UnexpectedFieldBehavior::InferType;
+      conversion_type_ = struct_({});
+    }
+    promotion_graph_ =
+        parse_options_.unexpected_field_behavior == 
UnexpectedFieldBehavior::InferType
+            ? GetPromotionGraph()
+            : nullptr;
+  }
+
+  void SetSchema(std::shared_ptr<Schema> explicit_schema,
+                 UnexpectedFieldBehavior unexpected_field_behavior) {
+    parse_options_.explicit_schema = std::move(explicit_schema);
+    parse_options_.unexpected_field_behavior = unexpected_field_behavior;
+    SetParseOptions(std::move(parse_options_));
+  }
+  void SetSchema(std::shared_ptr<Schema> explicit_schema) {
+    SetSchema(std::move(explicit_schema), 
parse_options_.unexpected_field_behavior);
+  }
+  // Set the schema but ensure unexpected fields won't be accepted
+  void SetStrictSchema(std::shared_ptr<Schema> explicit_schema) {
+    auto unexpected_field_behavior = parse_options_.unexpected_field_behavior;
+    if (unexpected_field_behavior == UnexpectedFieldBehavior::InferType) {
+      unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+    }
+    SetSchema(std::move(explicit_schema), unexpected_field_behavior);
+  }
+
+  [[nodiscard]] MemoryPool* pool() const { return pool_; }
+  [[nodiscard]] const ParseOptions& parse_options() const { return 
parse_options_; }
+  [[nodiscard]] const PromotionGraph* promotion_graph() const { return 
promotion_graph_; }
+  [[nodiscard]] const std::shared_ptr<DataType>& conversion_type() const {
+    return conversion_type_;
+  }
+
+ private:
+  ParseOptions parse_options_;
+  std::shared_ptr<DataType> conversion_type_;
+  const PromotionGraph* promotion_graph_;
+  MemoryPool* pool_;
+};
+
+Result<std::shared_ptr<Array>> ParseBlock(const ChunkedBlock& block,
+                                          const ParseOptions& parse_options,
+                                          MemoryPool* pool, int64_t* out_size 
= nullptr) {
+  std::unique_ptr<BlockParser> parser;
+  RETURN_NOT_OK(BlockParser::Make(pool, parse_options, &parser));
+
+  int64_t size = block.partial->size() + block.completion->size() + 
block.whole->size();
+  RETURN_NOT_OK(parser->ReserveScalarStorage(size));
+
+  if (block.partial->size() || block.completion->size()) {
+    std::shared_ptr<Buffer> straddling;
+    if (!block.completion->size()) {
+      straddling = block.partial;
+    } else if (!block.partial->size()) {
+      straddling = block.completion;
+    } else {
+      ARROW_ASSIGN_OR_RAISE(straddling,
+                            ConcatenateBuffers({block.partial, 
block.completion}, pool));
+    }
+    RETURN_NOT_OK(parser->Parse(straddling));
+  }
+  if (block.whole->size()) {
+    RETURN_NOT_OK(parser->Parse(block.whole));
+  }
+
+  std::shared_ptr<Array> parsed;
+  RETURN_NOT_OK(parser->Finish(&parsed));
+
+  if (out_size) *out_size = size;
+
+  return parsed;
+}
+
+class ChunkingTransformer {
+ public:
+  explicit ChunkingTransformer(std::unique_ptr<Chunker> chunker)
+      : chunker_(std::move(chunker)) {}
+
+  template <typename... Args>
+  static Transformer<std::shared_ptr<Buffer>, ChunkedBlock> Make(Args&&... 
args) {
+    return [self = 
std::make_shared<ChunkingTransformer>(std::forward<Args>(args)...)](
+               std::shared_ptr<Buffer> buffer) { return 
(*self)(std::move(buffer)); };
+  }
+
+ private:
+  Result<TransformFlow<ChunkedBlock>> operator()(std::shared_ptr<Buffer> 
next_buffer) {
+    if (!buffer_) {
+      if (ARROW_PREDICT_TRUE(!next_buffer)) {
+        partial_ = nullptr;

Review Comment:
   Ah... yep, you're right - just an oversight on my part. In that case, the 
check would be useful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to