pitrou commented on a change in pull request #9656:
URL: https://github.com/apache/arrow/pull/9656#discussion_r590494650



##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -958,10 +959,196 @@ Result<std::shared_ptr<RecordBatchStreamReader>> 
RecordBatchStreamReader::Open(
 // ----------------------------------------------------------------------
 // Reader implementation
 
+// Common functions used in both the random-access file reader and the
+// asynchronous generator
 static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
   return FileBlock{block->offset(), block->metaDataLength(), 
block->bodyLength()};
 }
 
+Result<std::unique_ptr<Message>> ReadMessageFromBlock(const FileBlock& block,
+                                                      io::RandomAccessFile* 
file) {
+  if (!BitUtil::IsMultipleOf8(block.offset) ||
+      !BitUtil::IsMultipleOf8(block.metadata_length) ||
+      !BitUtil::IsMultipleOf8(block.body_length)) {
+    return Status::Invalid("Unaligned block in IPC file");
+  }
+
+  // TODO(wesm): this breaks integration tests, see ARROW-3256
+  // DCHECK_EQ((*out)->body_length(), block.body_length);
+
+  ARROW_ASSIGN_OR_RAISE(auto message,
+                        ReadMessage(block.offset, block.metadata_length, 
file));
+  return std::move(message);
+}
+
+Status ReadOneDictionary(Message* message, const IpcReadContext& context) {
+  CHECK_HAS_BODY(*message);
+  ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
+  DictionaryKind kind;
+  RETURN_NOT_OK(ReadDictionary(*message->metadata(), context, &kind, 
reader.get()));
+  if (kind != DictionaryKind::New) {
+    return Status::Invalid(
+        "Unsupported dictionary replacement or "
+        "dictionary delta in IPC file");
+  }
+  return Status::OK();
+}
+
+/// Common state used by the IPC message generator and record batch generator.
+struct ARROW_EXPORT IpcFileRecordBatchGeneratorState {

Review comment:
       This looks a bit tedious. You're essentially copying most of those 
fields from `RandomAccessFile`?

##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -958,10 +959,196 @@ Result<std::shared_ptr<RecordBatchStreamReader>> 
RecordBatchStreamReader::Open(
 // ----------------------------------------------------------------------
 // Reader implementation
 
+// Common functions used in both the random-access file reader and the
+// asynchronous generator
 static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
   return FileBlock{block->offset(), block->metaDataLength(), 
block->bodyLength()};
 }
 
+Result<std::unique_ptr<Message>> ReadMessageFromBlock(const FileBlock& block,

Review comment:
       Can you make those functions static or put them in the anonymous 
namespace?

##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -1022,6 +1209,31 @@ class RecordBatchFileReaderImpl : public 
RecordBatchFileReader {
 
   ReadStats stats() const override { return stats_; }
 
+  Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> GetRecordBatchGenerator(
+      int readahead_messages, const io::IOContext& io_context) override {
+    auto state = std::make_shared<IpcFileRecordBatchGeneratorState>();
+    state->num_dictionaries_ = num_dictionaries();
+    state->num_record_batches_ = num_record_batches();
+    state->file_ = file_;
+    state->options_ = options_;
+    state->owned_file_ = owned_file_;
+    state->footer_buffer_ = footer_buffer_;
+    state->footer_ = footer_;
+    // Must regenerate uncopyable DictionaryMemo

Review comment:
       Would be nice to avoid this.

##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -1022,6 +1209,31 @@ class RecordBatchFileReaderImpl : public 
RecordBatchFileReader {
 
   ReadStats stats() const override { return stats_; }
 
+  Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> GetRecordBatchGenerator(
+      int readahead_messages, const io::IOContext& io_context) override {
+    auto state = std::make_shared<IpcFileRecordBatchGeneratorState>();
+    state->num_dictionaries_ = num_dictionaries();
+    state->num_record_batches_ = num_record_batches();
+    state->file_ = file_;
+    state->options_ = options_;
+    state->owned_file_ = owned_file_;
+    state->footer_buffer_ = footer_buffer_;
+    state->footer_ = footer_;
+    // Must regenerate uncopyable DictionaryMemo
+    RETURN_NOT_OK(UnpackSchemaMessage(state->footer_->schema(), 
state->options_,
+                                      &state->dictionary_memo_, 
&state->schema_,
+                                      &state->out_schema_, 
&state->field_inclusion_mask_,
+                                      &state->swap_endian_));
+    AsyncGenerator<std::shared_ptr<Message>> message_generator =
+        IpcMessageGenerator(state, io_context);
+    if (readahead_messages > 0) {
+      message_generator =
+          MakeReadaheadGenerator(std::move(message_generator), 
readahead_messages);
+    }
+    return IpcFileRecordBatchGenerator(state, message_generator,
+                                       arrow::internal::GetCpuThreadPool());

Review comment:
       I don't think it's desirable to force all processing to go to the global 
thread pool unconditionally.
   (also, are you sure the processing is heavy enough that it benefits from it?)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to