westonpace commented on a change in pull request #11616:
URL: https://github.com/apache/arrow/pull/11616#discussion_r785223323
##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -1088,10 +1163,33 @@ class RecordBatchFileReaderImpl : public
RecordBatchFileReader {
return Status::OK();
}
+ Future<std::shared_ptr<RecordBatch>> ReadRecordBatchAsync(int i) {
+ DCHECK_GE(i, 0);
+ DCHECK_LT(i, num_record_batches());
+
+ auto cached_metadata = cached_metadata_.find(i);
+ if (cached_metadata != cached_metadata_.end()) {
+ return ReadCachedRecordBatch(i, cached_metadata->second);
+ }
+
+ return Status::Invalid(
+ "Asynchronous record batch reading is only supported after a call to "
+ "PreBufferMetadata or PreBufferBatches");
+ }
+
Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(int i) override {
DCHECK_GE(i, 0);
DCHECK_LT(i, num_record_batches());
+ auto cached_metadata = cached_metadata_.find(i);
+ if (cached_metadata != cached_metadata_.end()) {
+ return ReadCachedRecordBatch(i, cached_metadata->second).result();
+ }
+
+ // FIXME: What if they have prebuffered metadata and so the dictionary
read has
+ // started but this batch wasn't prebuffered and so the dictionaries
haven't been
+ // finished getting read yet.
Review comment:
I suppose. Probably more me being paranoid than anything. I added a
test that exposed the failure and made a more robust "wait for dictionaries to
finish reading".
I'll point out that both the old mechanism and this mechanism are not thread
safe (I haven't given too much thought to multiple threads using the same
reader but I think it would be a bad idea).
##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -1240,6 +1382,14 @@ class RecordBatchFileReaderImpl : public
RecordBatchFileReader {
return std::move(message);
}
+ Result<std::unique_ptr<Message>> ReadCachedMessageFromBlock(const FileBlock&
block) {
+ ++stats_.num_messages;
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> metadata,
+ metadata_cache_->Read({block.offset,
block.metadata_length}));
+ std::shared_ptr<Buffer> data;
+ return arrow::ipc::ReadMessageFromCached(std::move(metadata),
std::move(data));
+ }
+
Review comment:
Inlined.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]