lidavidm commented on a change in pull request #10145:
URL: https://github.com/apache/arrow/pull/10145#discussion_r622145095
##########
File path: cpp/src/arrow/io/caching.cc
##########
@@ -149,12 +177,117 @@ struct ReadRangeCache::Impl {
} else {
entries = std::move(new_entries);
}
+ // Prefetch immediately, regardless of executor availability, if possible
+ return file->WillNeed(ranges);
+ }
+
+ // Read the given range from the cache, blocking if needed. Cannot read a
range
+ // that spans cache entries.
+ virtual Result<std::shared_ptr<Buffer>> Read(ReadRange range) {
+ if (range.length == 0) {
+ static const uint8_t byte = 0;
+ return std::make_shared<Buffer>(&byte, 0);
+ }
+
+ const auto it = std::lower_bound(
+ entries.begin(), entries.end(), range,
+ [](const RangeCacheEntry& entry, const ReadRange& range) {
+ return entry.range.offset + entry.range.length < range.offset +
range.length;
+ });
+ if (it != entries.end() && it->range.Contains(range)) {
+ auto fut = MaybeRead(&*it);
+ ARROW_ASSIGN_OR_RAISE(auto buf, fut.result());
+ return SliceBuffer(std::move(buf), range.offset - it->range.offset,
range.length);
+ }
+ return Status::Invalid("ReadRangeCache did not find matching cache entry");
+ }
+
+ virtual Future<> Wait() {
+ std::vector<Future<>> futures;
+ for (auto& entry : entries) {
+ futures.emplace_back(MaybeRead(&entry));
+ }
+ return AllComplete(futures);
+ }
+
+ // Return a Future that completes when the given ranges have been read.
+ virtual Future<> WaitFor(std::vector<ReadRange> ranges) {
+ auto end = std::remove_if(ranges.begin(), ranges.end(),
+ [](const ReadRange& range) { return range.length
== 0; });
+ ranges.resize(end - ranges.begin());
+ // Sort in reverse position order
+ std::sort(ranges.begin(), ranges.end(),
+ [](const ReadRange& a, const ReadRange& b) { return a.offset >
b.offset; });
+
+ std::vector<Future<>> futures;
+ for (auto& entry : entries) {
Review comment:
Thanks, I've changed the implementation. This is definitely better
(avoids a sort and since # ranges is likely << # entries, it's ~O(log(#
entries)) instead of ~O(# entries)).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]