lidavidm commented on a change in pull request #10145:
URL: https://github.com/apache/arrow/pull/10145#discussion_r621453153



##########
File path: cpp/src/arrow/io/caching.cc
##########
@@ -149,12 +174,111 @@ struct ReadRangeCache::Impl {
     } else {
       entries = std::move(new_entries);
     }
+    // Prefetch immediately, regardless of executor availability, if possible
+    return file->WillNeed(ranges);
+  }
+
+  virtual Result<std::shared_ptr<Buffer>> Read(ReadRange range) {
+    if (range.length == 0) {
+      static const uint8_t byte = 0;
+      return std::make_shared<Buffer>(&byte, 0);
+    }
+
+    const auto it = std::lower_bound(
+        entries.begin(), entries.end(), range,
+        [](const RangeCacheEntry& entry, const ReadRange& range) {
+          return entry.range.offset + entry.range.length < range.offset + 
range.length;
+        });
+    if (it != entries.end() && it->range.Contains(range)) {
+      auto fut = MaybeRead(&*it);
+      ARROW_ASSIGN_OR_RAISE(auto buf, fut.result());
+      return SliceBuffer(std::move(buf), range.offset - it->range.offset, 
range.length);
+    }
+    return Status::Invalid("ReadRangeCache did not find matching cache entry");
+  }
+
+  virtual Future<> Wait() {
+    std::vector<Future<>> futures;
+    for (auto& entry : entries) {
+      futures.emplace_back(MaybeRead(&entry));
+    }
+    return AllComplete(futures);
+  }
+
+  virtual Future<> WaitFor(std::vector<ReadRange> ranges) {
+    auto end = std::remove_if(ranges.begin(), ranges.end(),
+                              [](const ReadRange& range) { return range.length 
== 0; });
+    ranges.resize(end - ranges.begin());
+    // Sort in reverse position order
+    std::sort(ranges.begin(), ranges.end(),
+              [](const ReadRange& a, const ReadRange& b) { return a.offset > 
b.offset; });
+
+    std::vector<Future<>> futures;
+    for (auto& entry : entries) {
+      bool include = false;
+      while (!ranges.empty()) {
+        const auto& next = ranges.back();
+        if (next.offset >= entry.range.offset &&
+            next.offset + next.length <= entry.range.offset + 
entry.range.length) {
+          include = true;
+          ranges.pop_back();
+        } else {
+          break;
+        }
+      }
+      if (include) futures.emplace_back(MaybeRead(&entry));
+      if (ranges.empty()) break;
+    }
+    if (!ranges.empty()) {
+      return Status::Invalid("Given ranges were not previously requested for 
caching");
+    }
+    return AllComplete(futures);
+  }
+};
+
+// Don't read ranges when they're first added. Instead, wait until they're 
requested
+// (either through Read or WaitFor).
+struct ReadRangeCache::LazyImpl : public ReadRangeCache::Impl {
+  // Protect against concurrent modification of entries[i]->future
+  std::mutex entry_mutex;
+
+  virtual ~LazyImpl() = default;
+
+  Future<std::shared_ptr<Buffer>> MaybeRead(RangeCacheEntry* entry) override {
+    if (!entry->future.is_valid()) {
+      entry->future = file->ReadAsync(ctx, entry->range.offset, 
entry->range.length);
+    }
+    return entry->future;
+  }
+
+  RangeCacheEntry Cache(const ReadRange& range) override {
+    return {range, Future<std::shared_ptr<Buffer>>()};
+  }
+
+  Status Cache(std::vector<ReadRange> ranges) override {
+    std::unique_lock<std::mutex> guard(entry_mutex);
+    return ReadRangeCache::Impl::Cache(std::move(ranges));

Review comment:
       Or put another way, (when `lazy == true`) this 'passes through' the 
synchronicity of ReadAsync (an oxymoron if there ever was one), which is the 
intent.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to