lidavidm commented on a change in pull request #9613:
URL: https://github.com/apache/arrow/pull/9613#discussion_r587706050
##########
File path: cpp/src/arrow/io/caching.cc
##########
@@ -193,6 +195,36 @@ Result<std::shared_ptr<Buffer>>
ReadRangeCache::Read(ReadRange range) {
return Status::Invalid("ReadRangeCache did not find matching cache entry");
}
+Future<> ReadRangeCache::Wait() {
+ struct State {
+ explicit State(std::vector<Future<std::shared_ptr<Buffer>>> f)
+ : futures(std::move(f)), remaining(futures.size()) {}
+ std::vector<Future<std::shared_ptr<Buffer>>> futures;
+ std::atomic<size_t> remaining;
+ };
+
+ std::vector<Future<std::shared_ptr<Buffer>>> futures;
+ for (const auto& entry : impl_->entries) {
+ futures.push_back(entry.future);
+ }
+ auto out = Future<>::Make();
+ auto state = std::make_shared<State>(std::move(futures));
+ for (const auto& future : state->futures) {
+ future.AddCallback([state, out](const Result<std::shared_ptr<Buffer>>&)
mutable {
+ if (state->remaining.fetch_sub(1) != 1) return;
Review comment:
All gives `Future<vector<Result<T>>>` and never marks the future failed;
this gives `Future<>` and marks the future failed if any individual result
failed. Additionally this 'fails fast' in that if any fail, you immediately get
a result, instead of waiting for the rest of the futures.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]