westonpace commented on a change in pull request #11616:
URL: https://github.com/apache/arrow/pull/11616#discussion_r785223323



##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -1088,10 +1163,33 @@ class RecordBatchFileReaderImpl : public 
RecordBatchFileReader {
     return Status::OK();
   }
 
+  Future<std::shared_ptr<RecordBatch>> ReadRecordBatchAsync(int i) {
+    DCHECK_GE(i, 0);
+    DCHECK_LT(i, num_record_batches());
+
+    auto cached_metadata = cached_metadata_.find(i);
+    if (cached_metadata != cached_metadata_.end()) {
+      return ReadCachedRecordBatch(i, cached_metadata->second);
+    }
+
+    return Status::Invalid(
+        "Asynchronous record batch reading is only supported after a call to "
+        "PreBufferMetadata or PreBufferBatches");
+  }
+
   Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(int i) override {
     DCHECK_GE(i, 0);
     DCHECK_LT(i, num_record_batches());
 
+    auto cached_metadata = cached_metadata_.find(i);
+    if (cached_metadata != cached_metadata_.end()) {
+      return ReadCachedRecordBatch(i, cached_metadata->second).result();
+    }
+
+    // FIXME: What if they have prebuffered metadata and so the dictionary 
read has
+    // started but this batch wasn't prebuffered and so the dictionaries 
haven't been
+    // finished getting read yet.

Review comment:
       I suppose.  Probably more me being paranoid than anything.  I added a 
test that exposed the failure and made a more robust "wait for dictionaries to 
finish reading".
   
   I'll point out that both the old mechanism and this mechanism are not thread 
safe (I haven't given too much thought to multiple threads using the same 
reader but I think it would be a bad idea).

##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -1240,6 +1382,14 @@ class RecordBatchFileReaderImpl : public 
RecordBatchFileReader {
     return std::move(message);
   }
 
+  Result<std::unique_ptr<Message>> ReadCachedMessageFromBlock(const FileBlock& 
block) {
+    ++stats_.num_messages;
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> metadata,
+                          metadata_cache_->Read({block.offset, 
block.metadata_length}));
+    std::shared_ptr<Buffer> data;
+    return arrow::ipc::ReadMessageFromCached(std::move(metadata), 
std::move(data));
+  }
+

Review comment:
       Inlined.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to