wgtmac commented on code in PR #2048:
URL: https://github.com/apache/orc/pull/2048#discussion_r1850471092


##########
c++/src/io/Cache.cc:
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cassert>
+
+#include "Cache.hh"
+
+namespace orc {
+
+  std::vector<ReadRange> ReadRangeCombiner::coalesce(std::vector<ReadRange> 
ranges) const {
+    if (ranges.empty()) {
+      return ranges;
+    }
+
+    // Remove zero-sized ranges
+    auto end = std::remove_if(ranges.begin(), ranges.end(),
+                              [](const ReadRange& range) { return range.length 
== 0; });
+    // Sort in position order
+    std::sort(ranges.begin(), end, [](const ReadRange& a, const ReadRange& b) {
+      return a.offset != b.offset ? a.offset < b.offset : a.length > b.length;
+    });
+
+    // Remove ranges that overlap 100%
+    std::vector<ReadRange> uniqueRanges;
+    uniqueRanges.reserve(ranges.size());
+    for (auto it = ranges.begin(); it != end; ++it) {
+      if (uniqueRanges.empty() || !uniqueRanges.back().contains(*it)) {
+        uniqueRanges.push_back(*it);
+      }
+    }
+    ranges = std::move(uniqueRanges);
+
+    // Skip further processing if ranges is empty after removing zero-sized 
ranges.
+    if (ranges.empty()) {
+      return ranges;
+    }
+
+#ifndef NDEBUG
+    for (size_t i = 0; i < ranges.size() - 1; ++i) {
+      const auto& left = ranges[i];
+      const auto& right = ranges[i + 1];
+      assert(left.offset < right.offset);
+      assert(!left.contains(right));
+    }
+#endif
+
+    std::vector<ReadRange> coalesced;
+
+    auto itr = ranges.begin();
+    // Ensure ranges is not empty.
+    assert(itr <= ranges.end());
+
+    // Start of the current coalesced range and end (exclusive) of previous 
range.
+    // Both are initialized with the start of first range which is a 
placeholder value.
+    uint64_t coalescedStart = itr->offset;
+    uint64_t coalescedEnd = coalescedStart + itr->length;
+
+    for (++itr; itr < ranges.end(); ++itr) {
+      const uint64_t currentRangeStart = itr->offset;
+      const uint64_t currentRangeEnd = currentRangeStart + itr->length;
+
+      assert(coalescedStart < coalescedEnd);
+      assert(currentRangeStart < currentRangeEnd);
+
+      // At this point, the coalesced range is [coalesced_start, 
prev_range_end).
+      // Stop coalescing if:
+      //   - coalesced range is too large, or
+      //   - distance (hole/gap) between consecutive ranges is too large.
+      if ((currentRangeEnd - coalescedStart > rangeSizeLimit) ||
+          (currentRangeStart > coalescedEnd + holeSizeLimit)) {
+        coalesced.push_back({coalescedStart, coalescedEnd - coalescedStart});
+        coalescedStart = currentRangeStart;
+      }
+
+      // Update the prev_range_end with the current range.
+      coalescedEnd = currentRangeEnd;
+    }
+    coalesced.push_back({coalescedStart, coalescedEnd - coalescedStart});
+
+    assert(coalesced.front().offset == ranges.front().offset);
+    assert(coalesced.back().offset + coalesced.back().length ==
+           ranges.back().offset + ranges.back().length);
+    return coalesced;
+  }
+
+  std::vector<ReadRange> coalesceReadRanges(std::vector<ReadRange> ranges, 
uint64_t holeSizeLimit,
+                                            uint64_t rangeSizeLimit) {
+    assert(rangeSizeLimit > holeSizeLimit);
+
+    ReadRangeCombiner combiner{holeSizeLimit, rangeSizeLimit};
+    return combiner.coalesce(std::move(ranges));
+  }
+
+  void ReadRangeCache::cache(std::vector<ReadRange> ranges) {
+    ranges = coalesceReadRanges(std::move(ranges), options_.holeSizeLimit, 
options_.rangeSizeLimit);
+
+    std::vector<RangeCacheEntry> newEntries = makeCacheEntries(ranges);
+    // Add new entries, themselves ordered by offset
+    if (entries_.size() > 0) {
+      std::vector<RangeCacheEntry> merged(entries_.size() + newEntries.size());

Review Comment:
   Isn't it initialized with `entries_.size() + newEntries.size()` empty 
entries? I think we need to call `merged.reserve(...)` instead.



##########
c++/include/orc/Reader.hh:
##########
@@ -39,6 +39,8 @@ namespace orc {
   // classes that hold data members so we can maintain binary compatibility
   struct ReaderOptionsPrivate;
   struct RowReaderOptionsPrivate;
+  struct CacheOptions;

Review Comment:
   `CacheOptions` is defined in src/Cache.hh, which is not installed so that 
downstream users do not have access to it. I'd suggest to choose one of these 
options:
   1. Move the definition of CacheOptions to this file.
   2. Directly adding two parameters to ReaderOptions and totally remove 
CacheOptions.
   
   Personally I'd prefer the 2nd option.



##########
c++/include/orc/Reader.hh:
##########
@@ -39,6 +39,8 @@ namespace orc {
   // classes that hold data members so we can maintain binary compatibility
   struct ReaderOptionsPrivate;
   struct RowReaderOptionsPrivate;
+  struct CacheOptions;
+  class InputStream;

Review Comment:
   Please remove this line.



##########
c++/include/orc/OrcFile.hh:
##########
@@ -58,10 +62,38 @@ namespace orc {
      */
     virtual void read(void* buf, uint64_t length, uint64_t offset) = 0;
 
+    /**
+     * Read data asynchronously into the buffer. The buffer is allocated by 
the caller.
+     * @param buf the buffer to read into
+     * @param length the number of bytes to read.
+     * @param offset the position in the stream to read from.
+     * @return a future that will be set to the number of bytes read when the 
read is complete.
+     */
+    virtual std::future<void> readAsync(void* buf, uint64_t length, uint64_t 
offset) {
+      return std::async(std::launch::async,
+                        [this, buf, length, offset] { this->read(buf, length, 
offset); });
+    }
+
     /**
      * Get the name of the stream for error messages.
      */
     virtual const std::string& getName() const = 0;
+
+   protected:

Review Comment:
   `protected` is indeed visible to downstream users. BTW, it looks weird that 
we need to pass a memory pool to a read function. Therefore I think 
`readAsyncInternal` is ill-designed. Together with my comment to your comment 
of the benchmark, please remove this function and stick to `readAsync`.



##########
c++/src/io/Cache.hh:
##########
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <orc/MemoryPool.hh>
+#include <orc/OrcFile.hh>
+
+#include <algorithm>
+#include <cassert>
+#include <cstdint>
+#include <future>
+#include <utility>
+#include <vector>
+
+namespace orc {
+  class InputStream;
+
+  struct CacheOptions {
+    /// The maximum distance in bytes between two consecutive
+    /// ranges; beyond this value, ranges are not combined
+    uint64_t holeSizeLimit = 8192;
+
+    /// The maximum size in bytes of a combined range; if
+    /// combining two consecutive ranges would produce a range of a
+    /// size greater than this, they are not combined
+    uint64_t rangeSizeLimit = 32 * 1024 * 1024;
+  };
+
+  struct ReadRange {
+    uint64_t offset;
+    uint64_t length;
+
+    ReadRange() = default;
+    ReadRange(uint64_t offset, uint64_t length) : offset(offset), 
length(length) {}
+
+    friend bool operator==(const ReadRange& left, const ReadRange& right) {
+      return (left.offset == right.offset && left.length == right.length);
+    }
+    friend bool operator!=(const ReadRange& left, const ReadRange& right) {
+      return !(left == right);
+    }
+
+    bool contains(const ReadRange& other) const {
+      return (offset <= other.offset && offset + length >= other.offset + 
other.length);
+    }
+  };
+
+  struct ReadRangeCombiner {
+    const uint64_t holeSizeLimit;
+    const uint64_t rangeSizeLimit;
+
+    std::vector<ReadRange> coalesce(std::vector<ReadRange> ranges) const;
+  };
+
+  std::vector<ReadRange> coalesceReadRanges(std::vector<ReadRange> ranges, 
uint64_t holeSizeLimit,
+                                            uint64_t rangeSizeLimit);
+  struct RangeCacheEntry {
+    using BufferPtr = InputStream::BufferPtr;
+
+    ReadRange range;
+
+    // The result may be get multiple times, so we use shared_future instead 
of std::future
+    std::shared_future<BufferPtr> future;
+
+    RangeCacheEntry() = default;
+    RangeCacheEntry(const ReadRange& range, std::future<BufferPtr> future)
+        : range(range), future(std::move(future).share()) {}
+
+    friend bool operator<(const RangeCacheEntry& left, const RangeCacheEntry& 
right) {
+      return left.range.offset < right.range.offset;
+    }
+  };
+
+  /// A read cache designed to hide IO latencies when reading.
+  class ReadRangeCache {

Review Comment:
   nit: add some cache related metrics to `ReaderMetrics` class.



##########
c++/include/orc/OrcFile.hh:
##########
@@ -58,6 +73,17 @@ namespace orc {
      */
     virtual void read(void* buf, uint64_t length, uint64_t offset) = 0;
 
+    /**
+     * Read data asynchronously.
+     * @param offset the position in the stream to read from.
+     * @param length the number of bytes to read.
+     * @return a future that will be set to the buffer when the read is 
complete.
+     */
+    virtual std::future<BufferPtr> readAsync(uint64_t /*offset*/, uint64_t 
/*length*/,

Review Comment:
   TBH, I don't think this penalty is unacceptable. In practice, the number of 
issued I/Os should not be too many after being coalesced. When the data is in a 
hot cache (e.g. SSD or memory), this feature may not improve too much and may 
get worse when predicate pushdown is effective (due to reading more chunks than 
needed). It is super helpful when data is in the cloud object store, in which 
case this penalty is trivial.



##########
c++/src/io/Cache.cc:
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cassert>
+
+#include "Cache.hh"
+
+namespace orc {
+
+  std::vector<ReadRange> ReadRangeCombiner::coalesce(std::vector<ReadRange> 
ranges) const {
+    if (ranges.empty()) {
+      return ranges;
+    }
+
+    // Remove zero-sized ranges
+    auto end = std::remove_if(ranges.begin(), ranges.end(),
+                              [](const ReadRange& range) { return range.length 
== 0; });
+    // Sort in position order
+    std::sort(ranges.begin(), end, [](const ReadRange& a, const ReadRange& b) {
+      return a.offset != b.offset ? a.offset < b.offset : a.length > b.length;
+    });
+
+    // Remove ranges that overlap 100%
+    std::vector<ReadRange> uniqueRanges;
+    uniqueRanges.reserve(ranges.size());
+    for (auto it = ranges.begin(); it != end; ++it) {
+      if (uniqueRanges.empty() || !uniqueRanges.back().contains(*it)) {
+        uniqueRanges.push_back(*it);
+      }
+    }
+    ranges = std::move(uniqueRanges);
+
+    // Skip further processing if ranges is empty after removing zero-sized 
ranges.
+    if (ranges.empty()) {
+      return ranges;
+    }
+
+#ifndef NDEBUG
+    for (size_t i = 0; i < ranges.size() - 1; ++i) {
+      const auto& left = ranges[i];
+      const auto& right = ranges[i + 1];
+      assert(left.offset < right.offset);
+      assert(!left.contains(right));
+    }
+#endif
+
+    std::vector<ReadRange> coalesced;
+
+    auto itr = ranges.begin();
+    // Ensure ranges is not empty.
+    assert(itr <= ranges.end());
+
+    // Start of the current coalesced range and end (exclusive) of previous 
range.
+    // Both are initialized with the start of first range which is a 
placeholder value.
+    uint64_t coalescedStart = itr->offset;
+    uint64_t coalescedEnd = coalescedStart + itr->length;
+
+    for (++itr; itr < ranges.end(); ++itr) {
+      const uint64_t currentRangeStart = itr->offset;
+      const uint64_t currentRangeEnd = currentRangeStart + itr->length;
+
+      assert(coalescedStart < coalescedEnd);
+      assert(currentRangeStart < currentRangeEnd);
+
+      // At this point, the coalesced range is [coalesced_start, 
prev_range_end).
+      // Stop coalescing if:
+      //   - coalesced range is too large, or
+      //   - distance (hole/gap) between consecutive ranges is too large.
+      if ((currentRangeEnd - coalescedStart > rangeSizeLimit) ||
+          (currentRangeStart > coalescedEnd + holeSizeLimit)) {
+        coalesced.push_back({coalescedStart, coalescedEnd - coalescedStart});
+        coalescedStart = currentRangeStart;
+      }
+
+      // Update the prev_range_end with the current range.
+      coalescedEnd = currentRangeEnd;
+    }
+    coalesced.push_back({coalescedStart, coalescedEnd - coalescedStart});
+
+    assert(coalesced.front().offset == ranges.front().offset);
+    assert(coalesced.back().offset + coalesced.back().length ==
+           ranges.back().offset + ranges.back().length);
+    return coalesced;
+  }
+
+  std::vector<ReadRange> coalesceReadRanges(std::vector<ReadRange> ranges, 
uint64_t holeSizeLimit,
+                                            uint64_t rangeSizeLimit) {
+    assert(rangeSizeLimit > holeSizeLimit);
+
+    ReadRangeCombiner combiner{holeSizeLimit, rangeSizeLimit};

Review Comment:
   It seems that we don't need the ReadRangeCombiner class.



##########
c++/src/Reader.hh:
##########
@@ -352,10 +367,6 @@ namespace orc {
       return contents_->blockSize;
     }
 
-    const proto::Footer* getFooter() const {

Review Comment:
   Please do not delete this. It is not a public header.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to