pitrou commented on a change in pull request #9656: URL: https://github.com/apache/arrow/pull/9656#discussion_r590494650
########## File path: cpp/src/arrow/ipc/reader.cc ########## @@ -958,10 +959,196 @@ Result<std::shared_ptr<RecordBatchStreamReader>> RecordBatchStreamReader::Open( // ---------------------------------------------------------------------- // Reader implementation +// Common functions used in both the random-access file reader and the +// asynchronous generator static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) { return FileBlock{block->offset(), block->metaDataLength(), block->bodyLength()}; } +Result<std::unique_ptr<Message>> ReadMessageFromBlock(const FileBlock& block, + io::RandomAccessFile* file) { + if (!BitUtil::IsMultipleOf8(block.offset) || + !BitUtil::IsMultipleOf8(block.metadata_length) || + !BitUtil::IsMultipleOf8(block.body_length)) { + return Status::Invalid("Unaligned block in IPC file"); + } + + // TODO(wesm): this breaks integration tests, see ARROW-3256 + // DCHECK_EQ((*out)->body_length(), block.body_length); + + ARROW_ASSIGN_OR_RAISE(auto message, + ReadMessage(block.offset, block.metadata_length, file)); + return std::move(message); +} + +Status ReadOneDictionary(Message* message, const IpcReadContext& context) { + CHECK_HAS_BODY(*message); + ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body())); + DictionaryKind kind; + RETURN_NOT_OK(ReadDictionary(*message->metadata(), context, &kind, reader.get())); + if (kind != DictionaryKind::New) { + return Status::Invalid( + "Unsupported dictionary replacement or " + "dictionary delta in IPC file"); + } + return Status::OK(); +} + +/// Common state used by the IPC message generator and record batch generator. +struct ARROW_EXPORT IpcFileRecordBatchGeneratorState { Review comment: This looks a bit tedious. You're essentially copying most of those fields from `RandomAccessFile`? ########## File path: cpp/src/arrow/ipc/reader.cc ########## @@ -958,10 +959,196 @@ Result<std::shared_ptr<RecordBatchStreamReader>> RecordBatchStreamReader::Open( // ---------------------------------------------------------------------- // Reader implementation +// Common functions used in both the random-access file reader and the +// asynchronous generator static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) { return FileBlock{block->offset(), block->metaDataLength(), block->bodyLength()}; } +Result<std::unique_ptr<Message>> ReadMessageFromBlock(const FileBlock& block, Review comment: Can you make those functions static or put them in the anonymous namespace? ########## File path: cpp/src/arrow/ipc/reader.cc ########## @@ -1022,6 +1209,31 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { ReadStats stats() const override { return stats_; } + Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> GetRecordBatchGenerator( + int readahead_messages, const io::IOContext& io_context) override { + auto state = std::make_shared<IpcFileRecordBatchGeneratorState>(); + state->num_dictionaries_ = num_dictionaries(); + state->num_record_batches_ = num_record_batches(); + state->file_ = file_; + state->options_ = options_; + state->owned_file_ = owned_file_; + state->footer_buffer_ = footer_buffer_; + state->footer_ = footer_; + // Must regenerate uncopyable DictionaryMemo Review comment: Would be nice to avoid this. ########## File path: cpp/src/arrow/ipc/reader.cc ########## @@ -1022,6 +1209,31 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { ReadStats stats() const override { return stats_; } + Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> GetRecordBatchGenerator( + int readahead_messages, const io::IOContext& io_context) override { + auto state = std::make_shared<IpcFileRecordBatchGeneratorState>(); + state->num_dictionaries_ = num_dictionaries(); + state->num_record_batches_ = num_record_batches(); + state->file_ = file_; + state->options_ = options_; + state->owned_file_ = owned_file_; + state->footer_buffer_ = footer_buffer_; + state->footer_ = footer_; + // Must regenerate uncopyable DictionaryMemo + RETURN_NOT_OK(UnpackSchemaMessage(state->footer_->schema(), state->options_, + &state->dictionary_memo_, &state->schema_, + &state->out_schema_, &state->field_inclusion_mask_, + &state->swap_endian_)); + AsyncGenerator<std::shared_ptr<Message>> message_generator = + IpcMessageGenerator(state, io_context); + if (readahead_messages > 0) { + message_generator = + MakeReadaheadGenerator(std::move(message_generator), readahead_messages); + } + return IpcFileRecordBatchGenerator(state, message_generator, + arrow::internal::GetCpuThreadPool()); Review comment: I don't think it's desirable to force all processing to go to the global thread pool unconditionally. (also, are you sure the processing is heavy enough that it benefits from it?) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org