lidavidm commented on a change in pull request #10145:
URL: https://github.com/apache/arrow/pull/10145#discussion_r621443904



##########
File path: cpp/src/arrow/io/caching.cc
##########
@@ -149,12 +174,111 @@ struct ReadRangeCache::Impl {
     } else {
       entries = std::move(new_entries);
     }
+    // Prefetch immediately, regardless of executor availability, if possible
+    return file->WillNeed(ranges);
+  }
+
+  virtual Result<std::shared_ptr<Buffer>> Read(ReadRange range) {
+    if (range.length == 0) {
+      static const uint8_t byte = 0;
+      return std::make_shared<Buffer>(&byte, 0);
+    }
+
+    const auto it = std::lower_bound(
+        entries.begin(), entries.end(), range,
+        [](const RangeCacheEntry& entry, const ReadRange& range) {
+          return entry.range.offset + entry.range.length < range.offset + 
range.length;
+        });
+    if (it != entries.end() && it->range.Contains(range)) {
+      auto fut = MaybeRead(&*it);
+      ARROW_ASSIGN_OR_RAISE(auto buf, fut.result());
+      return SliceBuffer(std::move(buf), range.offset - it->range.offset, 
range.length);
+    }
+    return Status::Invalid("ReadRangeCache did not find matching cache entry");
+  }
+
+  virtual Future<> Wait() {
+    std::vector<Future<>> futures;
+    for (auto& entry : entries) {
+      futures.emplace_back(MaybeRead(&entry));
+    }
+    return AllComplete(futures);
+  }
+
+  virtual Future<> WaitFor(std::vector<ReadRange> ranges) {
+    auto end = std::remove_if(ranges.begin(), ranges.end(),
+                              [](const ReadRange& range) { return range.length 
== 0; });
+    ranges.resize(end - ranges.begin());
+    // Sort in reverse position order
+    std::sort(ranges.begin(), ranges.end(),
+              [](const ReadRange& a, const ReadRange& b) { return a.offset > 
b.offset; });
+
+    std::vector<Future<>> futures;
+    for (auto& entry : entries) {
+      bool include = false;
+      while (!ranges.empty()) {
+        const auto& next = ranges.back();
+        if (next.offset >= entry.range.offset &&
+            next.offset + next.length <= entry.range.offset + 
entry.range.length) {
+          include = true;
+          ranges.pop_back();

Review comment:
       You are expected to give the ranges up front in the granularity that you 
expect to read them, so no. In principle it could be supported and in principle 
if we wanted to _split_ large ranges to take advantage of I/O parallelism we'd 
have to do that.

##########
File path: cpp/src/arrow/io/caching.cc
##########
@@ -149,12 +174,111 @@ struct ReadRangeCache::Impl {
     } else {
       entries = std::move(new_entries);
     }
+    // Prefetch immediately, regardless of executor availability, if possible
+    return file->WillNeed(ranges);
+  }
+
+  virtual Result<std::shared_ptr<Buffer>> Read(ReadRange range) {
+    if (range.length == 0) {
+      static const uint8_t byte = 0;
+      return std::make_shared<Buffer>(&byte, 0);
+    }
+
+    const auto it = std::lower_bound(
+        entries.begin(), entries.end(), range,
+        [](const RangeCacheEntry& entry, const ReadRange& range) {
+          return entry.range.offset + entry.range.length < range.offset + 
range.length;
+        });
+    if (it != entries.end() && it->range.Contains(range)) {
+      auto fut = MaybeRead(&*it);
+      ARROW_ASSIGN_OR_RAISE(auto buf, fut.result());
+      return SliceBuffer(std::move(buf), range.offset - it->range.offset, 
range.length);
+    }
+    return Status::Invalid("ReadRangeCache did not find matching cache entry");
+  }
+
+  virtual Future<> Wait() {
+    std::vector<Future<>> futures;
+    for (auto& entry : entries) {
+      futures.emplace_back(MaybeRead(&entry));
+    }
+    return AllComplete(futures);
+  }
+
+  virtual Future<> WaitFor(std::vector<ReadRange> ranges) {
+    auto end = std::remove_if(ranges.begin(), ranges.end(),
+                              [](const ReadRange& range) { return range.length 
== 0; });
+    ranges.resize(end - ranges.begin());
+    // Sort in reverse position order
+    std::sort(ranges.begin(), ranges.end(),
+              [](const ReadRange& a, const ReadRange& b) { return a.offset > 
b.offset; });
+
+    std::vector<Future<>> futures;
+    for (auto& entry : entries) {
+      bool include = false;
+      while (!ranges.empty()) {
+        const auto& next = ranges.back();
+        if (next.offset >= entry.range.offset &&
+            next.offset + next.length <= entry.range.offset + 
entry.range.length) {
+          include = true;
+          ranges.pop_back();
+        } else {
+          break;
+        }
+      }
+      if (include) futures.emplace_back(MaybeRead(&entry));
+      if (ranges.empty()) break;
+    }
+    if (!ranges.empty()) {
+      return Status::Invalid("Given ranges were not previously requested for 
caching");
+    }
+    return AllComplete(futures);
+  }
+};
+
+// Don't read ranges when they're first added. Instead, wait until they're 
requested
+// (either through Read or WaitFor).
+struct ReadRangeCache::LazyImpl : public ReadRangeCache::Impl {
+  // Protect against concurrent modification of entries[i]->future
+  std::mutex entry_mutex;
+
+  virtual ~LazyImpl() = default;
+
+  Future<std::shared_ptr<Buffer>> MaybeRead(RangeCacheEntry* entry) override {
+    if (!entry->future.is_valid()) {
+      entry->future = file->ReadAsync(ctx, entry->range.offset, 
entry->range.length);
+    }
+    return entry->future;
+  }
+
+  RangeCacheEntry Cache(const ReadRange& range) override {
+    return {range, Future<std::shared_ptr<Buffer>>()};
+  }
+
+  Status Cache(std::vector<ReadRange> ranges) override {
+    std::unique_lock<std::mutex> guard(entry_mutex);
+    return ReadRangeCache::Impl::Cache(std::move(ranges));

Review comment:
       Hmm. I feel that if ReadAsync is synchronous, that's because it's also 
very fast (e.g. in-memory copy), in which case it's not a concern. I'll 
document the usage pattern and both variants.

##########
File path: cpp/src/arrow/io/caching.cc
##########
@@ -149,12 +174,111 @@ struct ReadRangeCache::Impl {
     } else {
       entries = std::move(new_entries);
     }
+    // Prefetch immediately, regardless of executor availability, if possible
+    return file->WillNeed(ranges);
+  }
+
+  virtual Result<std::shared_ptr<Buffer>> Read(ReadRange range) {
+    if (range.length == 0) {
+      static const uint8_t byte = 0;
+      return std::make_shared<Buffer>(&byte, 0);
+    }
+
+    const auto it = std::lower_bound(
+        entries.begin(), entries.end(), range,
+        [](const RangeCacheEntry& entry, const ReadRange& range) {
+          return entry.range.offset + entry.range.length < range.offset + 
range.length;
+        });
+    if (it != entries.end() && it->range.Contains(range)) {
+      auto fut = MaybeRead(&*it);
+      ARROW_ASSIGN_OR_RAISE(auto buf, fut.result());
+      return SliceBuffer(std::move(buf), range.offset - it->range.offset, 
range.length);
+    }
+    return Status::Invalid("ReadRangeCache did not find matching cache entry");
+  }
+
+  virtual Future<> Wait() {
+    std::vector<Future<>> futures;
+    for (auto& entry : entries) {
+      futures.emplace_back(MaybeRead(&entry));
+    }
+    return AllComplete(futures);
+  }
+
+  virtual Future<> WaitFor(std::vector<ReadRange> ranges) {
+    auto end = std::remove_if(ranges.begin(), ranges.end(),
+                              [](const ReadRange& range) { return range.length 
== 0; });
+    ranges.resize(end - ranges.begin());
+    // Sort in reverse position order
+    std::sort(ranges.begin(), ranges.end(),
+              [](const ReadRange& a, const ReadRange& b) { return a.offset > 
b.offset; });
+
+    std::vector<Future<>> futures;
+    for (auto& entry : entries) {
+      bool include = false;
+      while (!ranges.empty()) {
+        const auto& next = ranges.back();
+        if (next.offset >= entry.range.offset &&
+            next.offset + next.length <= entry.range.offset + 
entry.range.length) {
+          include = true;
+          ranges.pop_back();
+        } else {
+          break;
+        }
+      }
+      if (include) futures.emplace_back(MaybeRead(&entry));
+      if (ranges.empty()) break;
+    }
+    if (!ranges.empty()) {
+      return Status::Invalid("Given ranges were not previously requested for 
caching");
+    }
+    return AllComplete(futures);
+  }
+};
+
+// Don't read ranges when they're first added. Instead, wait until they're 
requested
+// (either through Read or WaitFor).
+struct ReadRangeCache::LazyImpl : public ReadRangeCache::Impl {
+  // Protect against concurrent modification of entries[i]->future
+  std::mutex entry_mutex;
+
+  virtual ~LazyImpl() = default;
+
+  Future<std::shared_ptr<Buffer>> MaybeRead(RangeCacheEntry* entry) override {
+    if (!entry->future.is_valid()) {
+      entry->future = file->ReadAsync(ctx, entry->range.offset, 
entry->range.length);
+    }
+    return entry->future;
+  }
+
+  RangeCacheEntry Cache(const ReadRange& range) override {
+    return {range, Future<std::shared_ptr<Buffer>>()};

Review comment:
       It's a little unclear, my bad - what'll happen is the user calls 
`Cache(vector<Range>)`, which coalesces the ranges and calls `Cache(Range)` for 
each coalesced range to make a cache entry. I'll rename the functions and 
clarify inline.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to