pitrou commented on a change in pull request #9613:
URL: https://github.com/apache/arrow/pull/9613#discussion_r587361262
##########
File path: cpp/src/arrow/io/caching.cc
##########
@@ -193,6 +195,36 @@ Result<std::shared_ptr<Buffer>>
ReadRangeCache::Read(ReadRange range) {
return Status::Invalid("ReadRangeCache did not find matching cache entry");
}
+Future<> ReadRangeCache::Wait() {
+ struct State {
+ explicit State(std::vector<Future<std::shared_ptr<Buffer>>> f)
+ : futures(std::move(f)), remaining(futures.size()) {}
+ std::vector<Future<std::shared_ptr<Buffer>>> futures;
+ std::atomic<size_t> remaining;
+ };
+
+ std::vector<Future<std::shared_ptr<Buffer>>> futures;
+ for (const auto& entry : impl_->entries) {
+ futures.push_back(entry.future);
+ }
+ auto out = Future<>::Make();
+ auto state = std::make_shared<State>(std::move(futures));
+ for (const auto& future : state->futures) {
+ future.AddCallback([state, out](const Result<std::shared_ptr<Buffer>>&)
mutable {
+ if (state->remaining.fetch_sub(1) != 1) return;
Review comment:
Ideally this primitive would be globally available, instead of
reimplemented here. So you could write for example:
```c++
return Future::Gather(std::move(futures));
```
Also there may be different kinds of "gather". By default you probably want
to first error to immediately mark the resulting future errored.
cc @westonpace
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]