This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new d7d6924de ORC-262: [C++] Support async I/O prefetch
d7d6924de is described below

commit d7d6924de13fcbf938d953935b81949593969f12
Author: taiyang-li <[email protected]>
AuthorDate: Mon Dec 2 15:25:43 2024 -0800

    ORC-262: [C++] Support async I/O prefetch
    
    ### What changes were proposed in this pull request?
    Support async io prefetch for orc c++ lib. Close 
https://issues.apache.org/jira/browse/ORC-262
    
    Changes:
    - Added new interface `InputStream::readAsync`(default unimplemented). It 
reads io asynchronously within the specified range.
    - Added IO Cache implementation `ReadRangeCache` to cache async io results. 
This borrows from a similar design of Parquet Reader in 
https://github.com/apache/arrow
    - Added interface `Reader::preBuffer` to trigger io prefetch. In the 
specific implementation of `ReaderImpl::preBuffer`, the io ranges will be 
calculated according to the selected stripe and columns, and then these ranges 
will be merged and sorted, and `ReadRangeCache::cache` will be called to 
trigger the asynchronous io in the background, waiting for the use of the upper 
layer
    - Added the interface `Reader::releaseBuffer`, which is used to release all 
cached io ranges before an offset
    
    ### Why are the changes needed?
    Async io prefetch could hide io latency during reading orc files, which 
improves performance of scan operators in ClickHouse.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #2048 from taiyang-li/upstream_orc_prefetch.
    
    Lead-authored-by: taiyang-li <[email protected]>
    Co-authored-by: 李扬 <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 c++/include/orc/OrcFile.hh    |  13 +++
 c++/include/orc/Reader.hh     |  40 +++++++
 c++/src/CMakeLists.txt        |   1 +
 c++/src/Options.hh            |  12 +++
 c++/src/Reader.cc             |  88 +++++++++++++++
 c++/src/Reader.hh             |  16 +++
 c++/src/StripeStream.cc       |  25 ++++-
 c++/src/StripeStream.hh       |   2 +
 c++/src/io/Cache.cc           | 171 +++++++++++++++++++++++++++++
 c++/src/io/Cache.hh           | 122 +++++++++++++++++++++
 c++/test/CMakeLists.txt       |   1 +
 c++/test/MemoryInputStream.hh |   7 +-
 c++/test/TestCache.cc         | 142 ++++++++++++++++++++++++
 c++/test/TestReader.cc        | 246 +++++++++++++++++++++++++++++++++++-------
 14 files changed, 843 insertions(+), 43 deletions(-)

diff --git a/c++/include/orc/OrcFile.hh b/c++/include/orc/OrcFile.hh
index a9ad692d4..ea71567c5 100644
--- a/c++/include/orc/OrcFile.hh
+++ b/c++/include/orc/OrcFile.hh
@@ -19,6 +19,7 @@
 #ifndef ORC_FILE_HH
 #define ORC_FILE_HH
 
+#include <future>
 #include <string>
 
 #include "orc/Reader.hh"
@@ -58,6 +59,18 @@ namespace orc {
      */
     virtual void read(void* buf, uint64_t length, uint64_t offset) = 0;
 
+    /**
+     * Read data asynchronously into the buffer. The buffer is allocated by 
the caller.
+     * @param buf the buffer to read into
+     * @param length the number of bytes to read.
+     * @param offset the position in the stream to read from.
+     * @return a future that will be set when the read is complete.
+     */
+    virtual std::future<void> readAsync(void* buf, uint64_t length, uint64_t 
offset) {
+      return std::async(std::launch::async,
+                        [this, buf, length, offset] { this->read(buf, length, 
offset); });
+    }
+
     /**
      * Get the name of the stream for error messages.
      */
diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index 4ddce64ad..b015b6491 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -40,6 +40,17 @@ namespace orc {
   struct ReaderOptionsPrivate;
   struct RowReaderOptionsPrivate;
 
+  struct CacheOptions {
+    // The maximum distance in bytes between two consecutive
+    // ranges; beyond this value, ranges are not combined
+    uint64_t holeSizeLimit = 8192;
+
+    // The maximum size in bytes of a combined range; if
+    // combining two consecutive ranges would produce a range of a
+    // size greater than this, they are not combined
+    uint64_t rangeSizeLimit = 32 * 1024 * 1024;
+  };
+
   /**
    * Expose the reader metrics including the latency and
    * number of calls of the decompression/decoding/IO modules.
@@ -59,6 +70,8 @@ namespace orc {
     std::atomic<uint64_t> IOBlockingLatencyUs{0};
     std::atomic<uint64_t> SelectedRowGroupCount{0};
     std::atomic<uint64_t> EvaluatedRowGroupCount{0};
+    std::atomic<uint64_t> ReadRangeCacheHits{0};
+    std::atomic<uint64_t> ReadRangeCacheMisses{0};
   };
   ReaderMetrics* getDefaultReaderMetrics();
 
@@ -116,6 +129,11 @@ namespace orc {
      */
     ReaderOptions& setReaderMetrics(ReaderMetrics* metrics);
 
+    /**
+     * Set the cache options.
+     */
+    ReaderOptions& setCacheOptions(const CacheOptions& cacheOptions);
+
     /**
      * Set the location of the tail as defined by the logical length of the
      * file.
@@ -147,6 +165,11 @@ namespace orc {
      * Get the reader metrics.
      */
     ReaderMetrics* getReaderMetrics() const;
+
+    /**
+     * Set the cache options.
+     */
+    const CacheOptions& getCacheOptions() const;
   };
 
   /**
@@ -624,6 +647,23 @@ namespace orc {
      */
     virtual std::map<uint32_t, RowGroupIndex> getRowGroupIndex(
         uint32_t stripeIndex, const std::set<uint32_t>& included = {}) const = 
0;
+
+    /**
+     * Trigger IO prefetch and cache the prefetched contents asynchronously.
+     * It is thread safe. Users should make sure requested stripes and columns
+     * are not overlapped, otherwise the overlapping part will be prefetched 
multiple time,
+     * which doesn't affect correctness but waste IO and memory resources.
+     * @param stripes the stripes to prefetch
+     * @param includeTypes the types to prefetch
+     */
+    virtual void preBuffer(const std::vector<uint32_t>& stripes,
+                           const std::list<uint64_t>& includeTypes) = 0;
+
+    /**
+     * Release cached entries whose right boundary is less than or equal to 
the given boundary.
+     * @param boundary the boundary value to release cache entries
+     */
+    virtual void releaseBuffer(uint64_t boundary) = 0;
   };
 
   /**
diff --git a/c++/src/CMakeLists.txt b/c++/src/CMakeLists.txt
index 5b7840301..694667c06 100644
--- a/c++/src/CMakeLists.txt
+++ b/c++/src/CMakeLists.txt
@@ -150,6 +150,7 @@ set(SOURCE_FILES
   orc_proto.pb.h
   io/InputStream.cc
   io/OutputStream.cc
+  io/Cache.cc
   sargs/ExpressionTree.cc
   sargs/Literal.cc
   sargs/PredicateLeaf.cc
diff --git a/c++/src/Options.hh b/c++/src/Options.hh
index daf9d52e1..0a4bd56d8 100644
--- a/c++/src/Options.hh
+++ b/c++/src/Options.hh
@@ -23,6 +23,8 @@
 #include "orc/OrcFile.hh"
 #include "orc/Reader.hh"
 
+#include "io/Cache.hh"
+
 #include <limits>
 
 namespace orc {
@@ -43,6 +45,7 @@ namespace orc {
     MemoryPool* memoryPool;
     std::string serializedTail;
     ReaderMetrics* metrics;
+    CacheOptions cacheOptions;
 
     ReaderOptionsPrivate() {
       tailLocation = std::numeric_limits<uint64_t>::max();
@@ -122,6 +125,15 @@ namespace orc {
     return privateBits_->errorStream;
   }
 
+  ReaderOptions& ReaderOptions::setCacheOptions(const CacheOptions& 
cacheOptions) {
+    privateBits_->cacheOptions = cacheOptions;
+    return *this;
+  }
+
+  const CacheOptions& ReaderOptions::getCacheOptions() const {
+    return privateBits_->cacheOptions;
+  }
+
   /**
    * RowReaderOptions Implementation
    */
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 034ea04ee..c93c62f6c 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -1523,6 +1523,94 @@ namespace orc {
     return ret;
   }
 
+  void ReaderImpl::releaseBuffer(uint64_t boundary) {
+    std::lock_guard<std::mutex> lock(contents_->readCacheMutex);
+
+    if (contents_->readCache) {
+      contents_->readCache->evictEntriesBefore(boundary);
+    }
+  }
+
+  void ReaderImpl::preBuffer(const std::vector<uint32_t>& stripes,
+                             const std::list<uint64_t>& includeTypes) {
+    std::vector<uint32_t> newStripes;
+    for (auto stripe : stripes) {
+      if (stripe < static_cast<uint32_t>(footer_->stripes_size())) 
newStripes.push_back(stripe);
+    }
+
+    std::list<uint64_t> newIncludeTypes;
+    for (auto type : includeTypes) {
+      if (type < static_cast<uint64_t>(footer_->types_size())) 
newIncludeTypes.push_back(type);
+    }
+
+    if (newStripes.empty() || newIncludeTypes.empty()) {
+      return;
+    }
+
+    orc::RowReaderOptions rowReaderOptions;
+    rowReaderOptions.includeTypes(newIncludeTypes);
+    ColumnSelector columnSelector(contents_.get());
+    std::vector<bool> selectedColumns;
+    columnSelector.updateSelected(selectedColumns, rowReaderOptions);
+
+    std::vector<ReadRange> ranges;
+    ranges.reserve(newIncludeTypes.size());
+    for (auto stripe : newStripes) {
+      // get stripe information
+      const auto& stripeInfo = footer_->stripes(stripe);
+      uint64_t stripeFooterStart =
+          stripeInfo.offset() + stripeInfo.index_length() + 
stripeInfo.data_length();
+      uint64_t stripeFooterLength = stripeInfo.footer_length();
+
+      // get stripe footer
+      std::unique_ptr<SeekableInputStream> pbStream = createDecompressor(
+          contents_->compression,
+          std::make_unique<SeekableFileInputStream>(contents_->stream.get(), 
stripeFooterStart,
+                                                    stripeFooterLength, 
*contents_->pool),
+          contents_->blockSize, *contents_->pool, contents_->readerMetrics);
+      proto::StripeFooter stripeFooter;
+      if (!stripeFooter.ParseFromZeroCopyStream(pbStream.get())) {
+        throw ParseError(std::string("bad StripeFooter from ") + 
pbStream->getName());
+      }
+
+      // traverse all streams in stripe footer, choose selected streams to 
prebuffer
+      uint64_t offset = stripeInfo.offset();
+      for (int i = 0; i < stripeFooter.streams_size(); i++) {
+        const proto::Stream& stream = stripeFooter.streams(i);
+        if (offset + stream.length() > stripeFooterStart) {
+          std::stringstream msg;
+          msg << "Malformed stream meta at stream index " << i << " in stripe 
" << stripe
+              << ": streamOffset=" << offset << ", streamLength=" << 
stream.length()
+              << ", stripeOffset=" << stripeInfo.offset()
+              << ", stripeIndexLength=" << stripeInfo.index_length()
+              << ", stripeDataLength=" << stripeInfo.data_length();
+          throw ParseError(msg.str());
+        }
+
+        if (stream.has_kind() && selectedColumns[stream.column()]) {
+          const auto& kind = stream.kind();
+          if (kind == proto::Stream_Kind_DATA || kind == 
proto::Stream_Kind_DICTIONARY_DATA ||
+              kind == proto::Stream_Kind_PRESENT || kind == 
proto::Stream_Kind_LENGTH ||
+              kind == proto::Stream_Kind_SECONDARY) {
+            ranges.emplace_back(offset, stream.length());
+          }
+        }
+
+        offset += stream.length();
+      }
+
+      {
+        std::lock_guard<std::mutex> lock(contents_->readCacheMutex);
+
+        if (!contents_->readCache) {
+          contents_->readCache = std::make_shared<ReadRangeCache>(
+              getStream(), options_.getCacheOptions(), contents_->pool, 
contents_->readerMetrics);
+        }
+        contents_->readCache->cache(std::move(ranges));
+      }
+    }
+  }
+
   RowReader::~RowReader() {
     // PASS
   }
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 89606c331..39ca73967 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -26,6 +26,8 @@
 
 #include "ColumnReader.hh"
 #include "RLE.hh"
+#include "io/Cache.hh"
+
 #include "SchemaEvolution.hh"
 #include "TypeImpl.hh"
 #include "sargs/SargsApplier.hh"
@@ -70,6 +72,11 @@ namespace orc {
     bool isDecimalAsLong;
     std::unique_ptr<proto::Metadata> metadata;
     ReaderMetrics* readerMetrics;
+
+    // mutex to protect readCache_ from concurrent access
+    std::mutex readCacheMutex;
+    // cached io ranges. only valid when preBuffer is invoked.
+    std::shared_ptr<ReadRangeCache> readCache;
   };
 
   proto::StripeFooter getStripeFooter(const proto::StripeInformation& info,
@@ -245,6 +252,10 @@ namespace orc {
     const SchemaEvolution* getSchemaEvolution() const {
       return &schemaEvolution_;
     }
+
+    std::shared_ptr<ReadRangeCache> getReadCache() const {
+      return contents_->readCache;
+    }
   };
 
   class ReaderImpl : public Reader {
@@ -260,6 +271,7 @@ namespace orc {
     // footer
     proto::Footer* footer_;
     uint64_t numberOfStripes_;
+
     uint64_t getMemoryUse(int stripeIx, std::vector<bool>& selectedColumns);
 
     // internal methods
@@ -375,6 +387,10 @@ namespace orc {
     std::map<uint32_t, BloomFilterIndex> getBloomFilters(
         uint32_t stripeIndex, const std::set<uint32_t>& included) const 
override;
 
+    void preBuffer(const std::vector<uint32_t>& stripes,
+                   const std::list<uint64_t>& includeTypes) override;
+    void releaseBuffer(uint64_t boundary) override;
+
     std::map<uint32_t, RowGroupIndex> getRowGroupIndex(
         uint32_t stripeIndex, const std::set<uint32_t>& included) const 
override;
   };
diff --git a/c++/src/StripeStream.cc b/c++/src/StripeStream.cc
index f4345c087..a5609f762 100644
--- a/c++/src/StripeStream.cc
+++ b/c++/src/StripeStream.cc
@@ -19,6 +19,7 @@
 #include "StripeStream.hh"
 #include "RLE.hh"
 #include "Reader.hh"
+#include "io/Cache.hh"
 #include "orc/Exceptions.hh"
 
 #include "wrap/coded-stream-wrapper.h"
@@ -37,7 +38,8 @@ namespace orc {
         stripeStart_(stripeStart),
         input_(input),
         writerTimezone_(writerTimezone),
-        readerTimezone_(readerTimezone) {
+        readerTimezone_(readerTimezone),
+        readCache_(reader.getReadCache()) {
     // PASS
   }
 
@@ -89,7 +91,6 @@ namespace orc {
       if (stream.has_kind() && stream.kind() == kind &&
           stream.column() == static_cast<uint64_t>(columnId)) {
         uint64_t streamLength = stream.length();
-        uint64_t myBlock = shouldStream ? input_.getNaturalReadSize() : 
streamLength;
         if (offset + streamLength > dataEnd) {
           std::stringstream msg;
           msg << "Malformed stream meta at stream index " << i << " in stripe 
" << stripeIndex_
@@ -99,9 +100,23 @@ namespace orc {
               << ", stripeDataLength=" << stripeInfo_.data_length();
           throw ParseError(msg.str());
         }
-        return createDecompressor(reader_.getCompression(),
-                                  std::make_unique<SeekableFileInputStream>(
-                                      &input_, offset, stream.length(), *pool, 
myBlock),
+
+        BufferSlice slice;
+        if (readCache_) {
+          ReadRange range{offset, streamLength};
+          slice = readCache_->read(range);
+        }
+
+        uint64_t myBlock = shouldStream ? input_.getNaturalReadSize() : 
streamLength;
+        std::unique_ptr<SeekableInputStream> seekableInput;
+        if (slice.buffer) {
+          seekableInput = std::make_unique<SeekableArrayInputStream>(
+              slice.buffer->data() + slice.offset, slice.length);
+        } else {
+          seekableInput = std::make_unique<SeekableFileInputStream>(&input_, 
offset, streamLength,
+                                                                    *pool, 
myBlock);
+        }
+        return createDecompressor(reader_.getCompression(), 
std::move(seekableInput),
                                   reader_.getCompressionSize(), *pool,
                                   reader_.getFileContents().readerMetrics);
       }
diff --git a/c++/src/StripeStream.hh b/c++/src/StripeStream.hh
index ad82d472c..2d26f8575 100644
--- a/c++/src/StripeStream.hh
+++ b/c++/src/StripeStream.hh
@@ -30,6 +30,7 @@
 namespace orc {
 
   class RowReaderImpl;
+  class ReadRangeCache;
 
   /**
    * StripeStream Implementation
@@ -45,6 +46,7 @@ namespace orc {
     InputStream& input_;
     const Timezone& writerTimezone_;
     const Timezone& readerTimezone_;
+    std::shared_ptr<ReadRangeCache> readCache_;
 
    public:
     StripeStreamsImpl(const RowReaderImpl& reader, uint64_t index,
diff --git a/c++/src/io/Cache.cc b/c++/src/io/Cache.cc
new file mode 100644
index 000000000..39f63fdd2
--- /dev/null
+++ b/c++/src/io/Cache.cc
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cassert>
+
+#include "Cache.hh"
+
+namespace orc {
+
+  std::vector<ReadRange> ReadRangeCombiner::coalesce(std::vector<ReadRange> 
ranges) const {
+    if (ranges.empty()) {
+      return ranges;
+    }
+
+    // Remove zero-sized ranges
+    auto end = std::remove_if(ranges.begin(), ranges.end(),
+                              [](const ReadRange& range) { return range.length 
== 0; });
+    // Sort in position order
+    std::sort(ranges.begin(), end, [](const ReadRange& a, const ReadRange& b) {
+      return a.offset != b.offset ? a.offset < b.offset : a.length > b.length;
+    });
+
+    // Remove ranges that overlap 100%
+    std::vector<ReadRange> uniqueRanges;
+    uniqueRanges.reserve(ranges.size());
+    for (auto it = ranges.begin(); it != end; ++it) {
+      if (uniqueRanges.empty() || !uniqueRanges.back().contains(*it)) {
+        uniqueRanges.push_back(*it);
+      }
+    }
+    ranges = std::move(uniqueRanges);
+
+    // Skip further processing if ranges is empty after removing zero-sized 
ranges.
+    if (ranges.empty()) {
+      return ranges;
+    }
+
+#ifndef NDEBUG
+    for (size_t i = 0; i < ranges.size() - 1; ++i) {
+      const auto& left = ranges[i];
+      const auto& right = ranges[i + 1];
+      assert(left.offset < right.offset);
+      assert(!left.contains(right));
+    }
+#endif
+
+    std::vector<ReadRange> coalesced;
+    auto itr = ranges.begin();
+
+    // Start of the current coalesced range and end (exclusive) of previous 
range.
+    // Both are initialized with the start of first range which is a 
placeholder value.
+    uint64_t coalescedStart = itr->offset;
+    uint64_t coalescedEnd = coalescedStart + itr->length;
+
+    for (++itr; itr < ranges.end(); ++itr) {
+      const uint64_t currentRangeStart = itr->offset;
+      const uint64_t currentRangeEnd = currentRangeStart + itr->length;
+
+      assert(coalescedStart < coalescedEnd);
+      assert(currentRangeStart < currentRangeEnd);
+
+      // At this point, the coalesced range is [coalesced_start, 
prev_range_end).
+      // Stop coalescing if:
+      //   - coalesced range is too large, or
+      //   - distance (hole/gap) between consecutive ranges is too large.
+      if ((currentRangeEnd - coalescedStart > rangeSizeLimit) ||
+          (currentRangeStart > coalescedEnd + holeSizeLimit)) {
+        coalesced.push_back({coalescedStart, coalescedEnd - coalescedStart});
+        coalescedStart = currentRangeStart;
+      }
+
+      // Update the prev_range_end with the current range.
+      coalescedEnd = currentRangeEnd;
+    }
+    coalesced.push_back({coalescedStart, coalescedEnd - coalescedStart});
+
+    assert(coalesced.front().offset == ranges.front().offset);
+    assert(coalesced.back().offset + coalesced.back().length ==
+           ranges.back().offset + ranges.back().length);
+    return coalesced;
+  }
+
+  std::vector<ReadRange> 
ReadRangeCombiner::coalesceReadRanges(std::vector<ReadRange> ranges,
+                                                               uint64_t 
holeSizeLimit,
+                                                               uint64_t 
rangeSizeLimit) {
+    assert(rangeSizeLimit > holeSizeLimit);
+
+    ReadRangeCombiner combiner{holeSizeLimit, rangeSizeLimit};
+    return combiner.coalesce(std::move(ranges));
+  }
+
+  void ReadRangeCache::cache(std::vector<ReadRange> ranges) {
+    ranges = ReadRangeCombiner::coalesceReadRanges(std::move(ranges), 
options_.holeSizeLimit,
+                                                   options_.rangeSizeLimit);
+
+    std::vector<RangeCacheEntry> newEntries = makeCacheEntries(ranges);
+    // Add new entries, themselves ordered by offset
+    if (entries_.size() > 0) {
+      std::vector<RangeCacheEntry> merged(entries_.size() + newEntries.size());
+      std::merge(entries_.begin(), entries_.end(), newEntries.begin(), 
newEntries.end(),
+                 merged.begin());
+      entries_ = std::move(merged);
+    } else {
+      entries_ = std::move(newEntries);
+    }
+  }
+
+  BufferSlice ReadRangeCache::read(const ReadRange& range) {
+    if (range.length == 0) {
+      return {std::make_shared<Buffer>(*memoryPool_, 0), 0, 0};
+    }
+
+    const auto it = std::lower_bound(entries_.begin(), entries_.end(), range,
+                                     [](const RangeCacheEntry& entry, const 
ReadRange& range) {
+                                       return entry.range.offset + 
entry.range.length <
+                                              range.offset + range.length;
+                                     });
+
+    BufferSlice result{};
+    bool hit_cache = false;
+    if (it != entries_.end() && it->range.contains(range)) {
+      hit_cache = it->future.valid();
+      it->future.get();
+      result = BufferSlice{it->buffer, range.offset - it->range.offset, 
range.length};
+    }
+
+    if (metrics_) {
+      if (hit_cache)
+        metrics_->ReadRangeCacheHits.fetch_add(1);
+      else
+        metrics_->ReadRangeCacheMisses.fetch_add(1);
+    }
+    return result;
+  }
+
+  void ReadRangeCache::evictEntriesBefore(uint64_t boundary) {
+    auto it = std::lower_bound(entries_.begin(), entries_.end(), boundary,
+                               [](const RangeCacheEntry& entry, uint64_t 
offset) {
+                                 return entry.range.offset + 
entry.range.length <= offset;
+                               });
+    entries_.erase(entries_.begin(), it);
+  }
+
+  std::vector<RangeCacheEntry> ReadRangeCache::makeCacheEntries(
+      const std::vector<ReadRange>& ranges) const {
+    std::vector<RangeCacheEntry> newEntries;
+    newEntries.reserve(ranges.size());
+    for (const auto& range : ranges) {
+      BufferPtr buffer = std::make_shared<Buffer>(*memoryPool_, range.length);
+      std::future<void> future = stream_->readAsync(buffer->data(), 
buffer->size(), range.offset);
+      newEntries.emplace_back(range, std::move(buffer), std::move(future));
+    }
+    return newEntries;
+  }
+
+}  // namespace orc
diff --git a/c++/src/io/Cache.hh b/c++/src/io/Cache.hh
new file mode 100644
index 000000000..7fc79718a
--- /dev/null
+++ b/c++/src/io/Cache.hh
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "orc/MemoryPool.hh"
+#include "orc/OrcFile.hh"
+
+#include <algorithm>
+#include <cassert>
+#include <cstdint>
+#include <future>
+#include <utility>
+#include <vector>
+
+namespace orc {
+
+  struct ReadRange {
+    uint64_t offset;
+    uint64_t length;
+
+    ReadRange() = default;
+    ReadRange(uint64_t offset, uint64_t length) : offset(offset), 
length(length) {}
+
+    friend bool operator==(const ReadRange& left, const ReadRange& right) {
+      return (left.offset == right.offset && left.length == right.length);
+    }
+    friend bool operator!=(const ReadRange& left, const ReadRange& right) {
+      return !(left == right);
+    }
+
+    bool contains(const ReadRange& other) const {
+      return (offset <= other.offset && offset + length >= other.offset + 
other.length);
+    }
+  };
+
+  struct ReadRangeCombiner {
+    const uint64_t holeSizeLimit;
+    const uint64_t rangeSizeLimit;
+
+    std::vector<ReadRange> coalesce(std::vector<ReadRange> ranges) const;
+
+    static std::vector<ReadRange> coalesceReadRanges(std::vector<ReadRange> 
ranges,
+                                                     uint64_t holeSizeLimit,
+                                                     uint64_t rangeSizeLimit);
+  };
+
+  using Buffer = DataBuffer<char>;
+  using BufferPtr = std::shared_ptr<Buffer>;
+
+  struct RangeCacheEntry {
+    ReadRange range;
+    BufferPtr buffer;
+    std::shared_future<void> future;  // use shared_future in case of multiple 
get calls
+
+    RangeCacheEntry() = default;
+    RangeCacheEntry(const ReadRange& range, BufferPtr buffer, 
std::future<void> future)
+        : range(range), buffer(std::move(buffer)), 
future(std::move(future).share()) {}
+
+    friend bool operator<(const RangeCacheEntry& left, const RangeCacheEntry& 
right) {
+      return left.range.offset < right.range.offset;
+    }
+  };
+
+  struct BufferSlice {
+    BufferPtr buffer = nullptr;
+    uint64_t offset = 0;
+    uint64_t length = 0;
+  };
+
+  /// A read cache designed to hide IO latencies when reading.
+  class ReadRangeCache {
+   public:
+    /// Construct a read cache with given options
+    explicit ReadRangeCache(InputStream* stream, CacheOptions options, 
MemoryPool* memoryPool,
+                            ReaderMetrics* metrics = nullptr)
+        : stream_(stream),
+          options_(std::move(options)),
+          memoryPool_(memoryPool),
+          metrics_(metrics) {}
+
+    ~ReadRangeCache() = default;
+
+    /// Cache the given ranges in the background.
+    ///
+    /// The caller must ensure that the ranges do not overlap with each other,
+    /// nor with previously cached ranges.  Otherwise, behaviour will be 
undefined.
+    void cache(std::vector<ReadRange> ranges);
+
+    /// Read a range previously given to Cache().
+    BufferSlice read(const ReadRange& range);
+
+    /// Evict cache entries with its range before given boundary.
+    void evictEntriesBefore(uint64_t boundary);
+
+   private:
+    std::vector<RangeCacheEntry> makeCacheEntries(const 
std::vector<ReadRange>& ranges) const;
+
+    InputStream* stream_;
+    CacheOptions options_;
+    // Ordered by offset (so as to find a matching region by binary search)
+    std::vector<RangeCacheEntry> entries_;
+    MemoryPool* memoryPool_;
+    ReaderMetrics* metrics_;
+  };
+
+}  // namespace orc
diff --git a/c++/test/CMakeLists.txt b/c++/test/CMakeLists.txt
index e6a1491d4..6c5b26c4f 100644
--- a/c++/test/CMakeLists.txt
+++ b/c++/test/CMakeLists.txt
@@ -63,6 +63,7 @@ add_executable (orc-test
   TestTimezone.cc
   TestType.cc
   TestWriter.cc
+  TestCache.cc
   ${SIMD_TEST_SRCS}
 )
 
diff --git a/c++/test/MemoryInputStream.hh b/c++/test/MemoryInputStream.hh
index e6ef55b6d..31333ae43 100644
--- a/c++/test/MemoryInputStream.hh
+++ b/c++/test/MemoryInputStream.hh
@@ -22,8 +22,6 @@
 #include "io/InputStream.hh"
 #include "orc/OrcFile.hh"
 
-#include <iostream>
-
 namespace orc {
   class MemoryInputStream : public InputStream {
    public:
@@ -44,6 +42,11 @@ namespace orc {
       memcpy(buf, buffer_ + offset, length);
     }
 
+    std::future<void> readAsync(void* buf, uint64_t length, uint64_t offset) 
override {
+      return std::async(std::launch::async,
+                        [this, buf, length, offset] { this->read(buf, length, 
offset); });
+    }
+
     virtual const std::string& getName() const override {
       return name_;
     }
diff --git a/c++/test/TestCache.cc b/c++/test/TestCache.cc
new file mode 100644
index 000000000..496ba3ec9
--- /dev/null
+++ b/c++/test/TestCache.cc
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cstring>
+
+#include "MemoryInputStream.hh"
+#include "io/Cache.hh"
+
+#include "wrap/gmock.h"
+#include "wrap/gtest-wrapper.h"
+
+namespace orc {
+
+  TEST(TestReadRangeCombiner, testBasics) {
+    ReadRangeCombiner combinator{0, 100};
+    /// Ranges with partial overlap and identical offsets
+    std::vector<ReadRange> ranges{{0, 15}, {5, 11}, {5, 15}};
+    std::vector<ReadRange> result = combinator.coalesce(std::move(ranges));
+    std::vector<ReadRange> expect{{0, 20}};
+    ASSERT_EQ(result, expect);
+  }
+
+  TEST(TestCoalesceReadRanges, testBasics) {
+    auto check = [](std::vector<ReadRange> ranges, std::vector<ReadRange> 
expected) -> void {
+      const uint64_t holeSizeLimit = 9;
+      const uint64_t rangeSizeLimit = 99;
+      auto coalesced = ReadRangeCombiner::coalesceReadRanges(ranges, 
holeSizeLimit, rangeSizeLimit);
+      ASSERT_EQ(coalesced, expected);
+    };
+
+    check({}, {});
+    // Zero sized range that ends up in empty list
+    check({{110, 0}}, {});
+    // Combination on 1 zero sized range and 1 non-zero sized range
+    check({{110, 10}, {120, 0}}, {{110, 10}});
+    // 1 non-zero sized range
+    check({{110, 10}}, {{110, 10}});
+    // No holes + unordered ranges
+    check({{130, 10}, {110, 10}, {120, 10}}, {{110, 30}});
+    // No holes
+    check({{110, 10}, {120, 10}, {130, 10}}, {{110, 30}});
+    // Small holes only
+    check({{110, 11}, {130, 11}, {150, 11}}, {{110, 51}});
+    // Large holes
+    check({{110, 10}, {130, 10}}, {{110, 10}, {130, 10}});
+    check({{110, 11}, {130, 11}, {150, 10}, {170, 11}, {190, 11}}, {{110, 50}, 
{170, 31}});
+
+    // With zero-sized ranges
+    check({{110, 11}, {130, 0}, {130, 11}, {145, 0}, {150, 11}, {200, 0}}, 
{{110, 51}});
+
+    // No holes but large ranges
+    check({{110, 100}, {210, 100}}, {{110, 100}, {210, 100}});
+    // Small holes and large range in the middle (*)
+    check({{110, 10}, {120, 11}, {140, 100}, {240, 11}, {260, 11}},
+          {{110, 21}, {140, 100}, {240, 31}});
+    // Mid-size ranges that would turn large after coalescing
+    check({{100, 50}, {150, 50}}, {{100, 50}, {150, 50}});
+    check({{100, 30}, {130, 30}, {160, 30}, {190, 30}, {220, 30}}, {{100, 90}, 
{190, 60}});
+
+    // Same as (*) but unsorted
+    check({{140, 100}, {120, 11}, {240, 11}, {110, 10}, {260, 11}},
+          {{110, 21}, {140, 100}, {240, 31}});
+
+    // Completely overlapping ranges should be eliminated
+    check({{20, 5}, {20, 5}, {21, 2}}, {{20, 5}});
+  }
+
+  TEST(TestReadRangeCache, testBasics) {
+    std::string data = "abcdefghijklmnopqrstuvwxyz";
+
+    CacheOptions options;
+    options.holeSizeLimit = 2;
+    options.rangeSizeLimit = 10;
+
+    auto file = std::make_shared<MemoryInputStream>(data.data(), data.size());
+    ReadRangeCache cache(file.get(), options, getDefaultPool());
+
+    cache.cache({{1, 2}, {3, 2}, {8, 2}, {20, 2}, {25, 0}});
+    cache.cache({{10, 4}, {14, 0}, {15, 4}});
+
+    auto assert_slice_equal = [](const BufferSlice& slice, const std::string& 
expected) {
+      ASSERT_TRUE(slice.buffer);
+      ASSERT_EQ(expected, std::string_view(slice.buffer->data() + 
slice.offset, slice.length));
+    };
+
+    BufferSlice slice;
+
+    slice = cache.read({20, 2});
+    assert_slice_equal(slice, "uv");
+
+    slice = cache.read({1, 2});
+    assert_slice_equal(slice, "bc");
+
+    slice = cache.read({3, 2});
+    assert_slice_equal(slice, "de");
+
+    slice = cache.read({8, 2});
+    assert_slice_equal(slice, "ij");
+
+    slice = cache.read({10, 4});
+    assert_slice_equal(slice, "klmn");
+
+    slice = cache.read({15, 4});
+    assert_slice_equal(slice, "pqrs");
+
+    // Zero-sized
+    slice = cache.read({14, 0});
+    assert_slice_equal(slice, "");
+    slice = cache.read({25, 0});
+    assert_slice_equal(slice, "");
+
+    // Non-cached ranges
+    ASSERT_FALSE(cache.read({20, 3}).buffer);
+    ASSERT_FALSE(cache.read({19, 3}).buffer);
+    ASSERT_FALSE(cache.read({0, 3}).buffer);
+    ASSERT_FALSE(cache.read({25, 2}).buffer);
+
+    // Release cache entries before 10. After that cache entries would be: 
{10, 9}, {20, 2}
+    cache.evictEntriesBefore(15);
+    ASSERT_FALSE(cache.read({1, 2}).buffer);
+    ASSERT_FALSE(cache.read({8, 2}).buffer);
+    slice = cache.read({10, 4});
+    assert_slice_equal(slice, "klmn");
+    slice = cache.read({20, 2});
+    assert_slice_equal(slice, "uv");
+  }
+}  // namespace orc
diff --git a/c++/test/TestReader.cc b/c++/test/TestReader.cc
index 33a0481b6..f9df6edc9 100644
--- a/c++/test/TestReader.cc
+++ b/c++/test/TestReader.cc
@@ -155,7 +155,10 @@ namespace orc {
     ASSERT_THAT(rowReader->getSelectedColumns(), ElementsAreArray(expected));
   }
 
-  std::unique_ptr<Reader> createNestedListMemReader(MemoryOutputStream& 
memStream) {
+  std::unique_ptr<Reader> createNestedListMemReader(MemoryOutputStream& 
memStream,
+                                                    const 
std::vector<uint32_t>& stripesToPrefetch,
+                                                    const std::list<uint64_t>& 
columnsToPrefetch,
+                                                    bool prefetchTwice) {
     MemoryPool* pool = getDefaultPool();
 
     auto type = std::unique_ptr<Type>(
@@ -218,20 +221,43 @@ namespace orc {
     auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), 
memStream.getLength());
     ReaderOptions readerOptions;
     readerOptions.setMemoryPool(*pool);
-    return createReader(std::move(inStream), readerOptions);
+    auto reader = createReader(std::move(inStream), readerOptions);
+
+    reader->preBuffer(stripesToPrefetch, columnsToPrefetch);
+    if (prefetchTwice) {
+      reader->preBuffer(stripesToPrefetch, columnsToPrefetch);
+    }
+
+    return reader;
   }
 
-  TEST(TestReadIntent, testListAll) {
+  class TestReadIntentFromNestedList
+      : public ::testing::TestWithParam<
+            std::tuple<std::vector<uint32_t>, std::list<uint64_t>, bool>> {};
+
+  TEST_P(TestReadIntentFromNestedList, testListAll) {
+    const auto& params = GetParam();
+    const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+    const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+    bool prefetchTwice = std::get<2>(params);
+
     MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
-    std::unique_ptr<Reader> reader = createNestedListMemReader(memStream);
+    std::unique_ptr<Reader> reader =
+        createNestedListMemReader(memStream, stripesToPrefetch, 
columnsToPrefetch, prefetchTwice);
 
     // select all of int_array.
     verifySelection(reader, {{1, ReadIntent_ALL}}, {0, 1, 2});
   }
 
-  TEST(TestReadIntent, testListOffsets) {
+  TEST_P(TestReadIntentFromNestedList, testListOffsets) {
+    const auto& params = GetParam();
+    const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+    const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+    bool prefetchTwice = std::get<2>(params);
+
     MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
-    std::unique_ptr<Reader> reader = createNestedListMemReader(memStream);
+    std::unique_ptr<Reader> reader =
+        createNestedListMemReader(memStream, stripesToPrefetch, 
columnsToPrefetch, prefetchTwice);
 
     // select only the offsets of int_array.
     verifySelection(reader, {{1, ReadIntent_OFFSETS}}, {0, 1});
@@ -244,26 +270,44 @@ namespace orc {
     verifySelection(reader, {{3, ReadIntent_OFFSETS}, {5, 
ReadIntent_OFFSETS}}, {0, 3, 4, 5});
   }
 
-  TEST(TestReadIntent, testListAllAndOffsets) {
+  TEST_P(TestReadIntentFromNestedList, testListAllAndOffsets) {
+    const auto& params = GetParam();
+    const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+    const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+    bool prefetchTwice = std::get<2>(params);
+
     MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
-    std::unique_ptr<Reader> reader = createNestedListMemReader(memStream);
+    std::unique_ptr<Reader> reader =
+        createNestedListMemReader(memStream, stripesToPrefetch, 
columnsToPrefetch, prefetchTwice);
 
     // select all of int_array and only the outermost offsets of 
int_array_array_array.
     verifySelection(reader, {{1, ReadIntent_ALL}, {3, ReadIntent_OFFSETS}}, 
{0, 1, 2, 3});
   }
 
-  TEST(TestReadIntent, testListConflictingIntent) {
+  TEST_P(TestReadIntentFromNestedList, testListConflictingIntent) {
+    const auto& params = GetParam();
+    const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+    const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+    bool prefetchTwice = std::get<2>(params);
+
     MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
-    std::unique_ptr<Reader> reader = createNestedListMemReader(memStream);
+    std::unique_ptr<Reader> reader =
+        createNestedListMemReader(memStream, stripesToPrefetch, 
columnsToPrefetch, prefetchTwice);
 
     // test conflicting ReadIntent on nested list.
     verifySelection(reader, {{3, ReadIntent_OFFSETS}, {5, ReadIntent_ALL}}, 
{0, 3, 4, 5, 6});
     verifySelection(reader, {{3, ReadIntent_ALL}, {5, ReadIntent_OFFSETS}}, 
{0, 3, 4, 5, 6});
   }
 
-  TEST(TestReadIntent, testRowBatchContent) {
+  TEST_P(TestReadIntentFromNestedList, testRowBatchContent) {
+    const auto& params = GetParam();
+    const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+    const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+    bool prefetchTwice = std::get<2>(params);
+
     MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
-    std::unique_ptr<Reader> reader = createNestedListMemReader(memStream);
+    std::unique_ptr<Reader> reader =
+        createNestedListMemReader(memStream, stripesToPrefetch, 
columnsToPrefetch, prefetchTwice);
 
     // select all of int_array and only the offsets of int_array_array.
     RowReaderOptions::IdReadIntentMap idReadIntentMap = {{1, ReadIntent_ALL},
@@ -299,7 +343,24 @@ namespace orc {
     EXPECT_EQ(nullptr, intArrayArrayArrayBatch.elements);
   }
 
-  std::unique_ptr<Reader> createNestedMapMemReader(MemoryOutputStream& 
memStream) {
+  INSTANTIATE_TEST_SUITE_P(
+      TestReadIntentFromNestedListInstance, TestReadIntentFromNestedList,
+      ::testing::Values(
+          std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{}, 
true),
+          std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{}, 
false),
+          std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{1, 3}, 
true),
+          std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{1, 3}, 
false),
+          std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{}, 
true),
+          std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{}, 
false),
+          std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{1, 3}, 
true),
+          std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{1, 3}, 
false),
+          std::make_tuple(std::vector<uint32_t>{1000}, 
std::list<uint64_t>{1000}, true),
+          std::make_tuple(std::vector<uint32_t>{1000}, 
std::list<uint64_t>{1000}, false)));
+
+  std::unique_ptr<Reader> createNestedMapMemReader(MemoryOutputStream& 
memStream,
+                                                   const 
std::vector<uint32_t>& stripesToPrefetch,
+                                                   const std::list<uint64_t>& 
columnsToPrefetch,
+                                                   bool prefetchTwice) {
     MemoryPool* pool = getDefaultPool();
 
     auto type = std::unique_ptr<Type>(
@@ -389,20 +450,42 @@ namespace orc {
     auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), 
memStream.getLength());
     ReaderOptions readerOptions;
     readerOptions.setMemoryPool(*pool);
-    return createReader(std::move(inStream), readerOptions);
+    auto reader = createReader(std::move(inStream), readerOptions);
+
+    reader->preBuffer(stripesToPrefetch, columnsToPrefetch);
+    if (prefetchTwice) {
+      reader->preBuffer(stripesToPrefetch, columnsToPrefetch);
+    }
+    return reader;
   }
 
-  TEST(TestReadIntent, testMapAll) {
+  class TestReadIntentFromNestedMap
+      : public ::testing::TestWithParam<
+            std::tuple<std::vector<uint32_t>, std::list<uint64_t>, bool>> {};
+
+  TEST_P(TestReadIntentFromNestedMap, testMapAll) {
+    const auto& params = GetParam();
+    const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+    const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+    bool prefetchTwice = std::get<2>(params);
+
     MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
-    std::unique_ptr<Reader> reader = createNestedMapMemReader(memStream);
+    std::unique_ptr<Reader> reader =
+        createNestedMapMemReader(memStream, stripesToPrefetch, 
columnsToPrefetch, prefetchTwice);
 
     // select all of single_map.
     verifySelection(reader, {{2, ReadIntent_ALL}}, {0, 2, 3, 4});
   }
 
-  TEST(TestReadIntent, testMapOffsets) {
+  TEST_P(TestReadIntentFromNestedMap, testMapOffsets) {
+    const auto& params = GetParam();
+    const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+    const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+    bool prefetchTwice = std::get<2>(params);
+
     MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
-    std::unique_ptr<Reader> reader = createNestedMapMemReader(memStream);
+    std::unique_ptr<Reader> reader =
+        createNestedMapMemReader(memStream, stripesToPrefetch, 
columnsToPrefetch, prefetchTwice);
 
     // select only the offsets of single_map.
     verifySelection(reader, {{2, ReadIntent_OFFSETS}}, {0, 2});
@@ -414,17 +497,29 @@ namespace orc {
     verifySelection(reader, {{5, ReadIntent_OFFSETS}, {9, 
ReadIntent_OFFSETS}}, {0, 5, 7, 9});
   }
 
-  TEST(TestReadIntent, testMapAllAndOffsets) {
+  TEST_P(TestReadIntentFromNestedMap, testMapAllAndOffsets) {
+    const auto& params = GetParam();
+    const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+    const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+    bool prefetchTwice = std::get<2>(params);
+
     MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
-    std::unique_ptr<Reader> reader = createNestedMapMemReader(memStream);
+    std::unique_ptr<Reader> reader =
+        createNestedMapMemReader(memStream, stripesToPrefetch, 
columnsToPrefetch, prefetchTwice);
 
     // select all of single_map and only the outermost offsets of nested_map.
     verifySelection(reader, {{2, ReadIntent_ALL}, {5, ReadIntent_OFFSETS}}, 
{0, 2, 3, 4, 5});
   }
 
-  TEST(TestReadIntent, testMapConflictingIntent) {
+  TEST_P(TestReadIntentFromNestedMap, testMapConflictingIntent) {
+    const auto& params = GetParam();
+    const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+    const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+    bool prefetchTwice = std::get<2>(params);
+
     MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
-    std::unique_ptr<Reader> reader = createNestedMapMemReader(memStream);
+    std::unique_ptr<Reader> reader =
+        createNestedMapMemReader(memStream, stripesToPrefetch, 
columnsToPrefetch, prefetchTwice);
 
     // test conflicting ReadIntent on nested_map.
     verifySelection(reader, {{5, ReadIntent_OFFSETS}, {9, ReadIntent_ALL}}, 
{0, 5, 7, 9, 10, 11});
@@ -434,9 +529,15 @@ namespace orc {
                     {0, 5, 7, 8, 9, 10, 11});
   }
 
-  TEST(TestReadIntent, testMapRowBatchContent) {
+  TEST_P(TestReadIntentFromNestedMap, testMapRowBatchContent) {
+    const auto& params = GetParam();
+    const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+    const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+    bool prefetchTwice = std::get<2>(params);
+
     MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
-    std::unique_ptr<Reader> reader = createNestedMapMemReader(memStream);
+    std::unique_ptr<Reader> reader =
+        createNestedMapMemReader(memStream, stripesToPrefetch, 
columnsToPrefetch, prefetchTwice);
 
     // select all of single_map and only the offsets of nested_map.
     RowReaderOptions::IdReadIntentMap idReadIntentMap = {{2, ReadIntent_ALL},
@@ -482,7 +583,24 @@ namespace orc {
     EXPECT_EQ(nullptr, nestedMapBatch.elements);
   }
 
-  std::unique_ptr<Reader> createNestedUnionMemReader(MemoryOutputStream& 
memStream) {
+  INSTANTIATE_TEST_SUITE_P(
+      TestReadIntentFromNestedMapInstance, TestReadIntentFromNestedMap,
+      ::testing::Values(
+          std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{}, 
true),
+          std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{}, 
false),
+          std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{1, 5}, 
true),
+          std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{1, 5}, 
false),
+          std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{}, 
true),
+          std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{}, 
false),
+          std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{1, 5}, 
true),
+          std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{1, 5}, 
false),
+          std::make_tuple(std::vector<uint32_t>{1000}, 
std::list<uint64_t>{1000}, true),
+          std::make_tuple(std::vector<uint32_t>{1000}, 
std::list<uint64_t>{1000}, false)));
+
+  std::unique_ptr<Reader> createNestedUnionMemReader(MemoryOutputStream& 
memStream,
+                                                     const 
std::vector<uint32_t>& stripesToPrefetch,
+                                                     const 
std::list<uint64_t>& columnsToPrefetch,
+                                                     bool prefetchTwice) {
     MemoryPool* pool = getDefaultPool();
 
     auto type = std::unique_ptr<Type>(
@@ -566,20 +684,43 @@ namespace orc {
     ReaderOptions readerOptions;
     readerOptions.setMemoryPool(*pool);
     readerOptions.setReaderMetrics(nullptr);
-    return createReader(std::move(inStream), readerOptions);
+    auto reader = createReader(std::move(inStream), readerOptions);
+
+    reader->preBuffer(stripesToPrefetch, columnsToPrefetch);
+    if (prefetchTwice) {
+      reader->preBuffer(stripesToPrefetch, columnsToPrefetch);
+    }
+
+    return reader;
   }
 
-  TEST(TestReadIntent, testUnionAll) {
+  class TestReadIntentFromNestedUnion
+      : public ::testing::TestWithParam<
+            std::tuple<std::vector<uint32_t>, std::list<uint64_t>, bool>> {};
+
+  TEST_P(TestReadIntentFromNestedUnion, testUnionAll) {
+    const auto& params = GetParam();
+    const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+    const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+    bool prefetchTwice = std::get<2>(params);
+
     MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
-    std::unique_ptr<Reader> reader = createNestedUnionMemReader(memStream);
+    std::unique_ptr<Reader> reader =
+        createNestedUnionMemReader(memStream, stripesToPrefetch, 
columnsToPrefetch, prefetchTwice);
 
     // select all of single_union.
     verifySelection(reader, {{2, ReadIntent_ALL}}, {0, 2, 3, 4});
   }
 
-  TEST(TestReadIntent, testUnionOffsets) {
+  TEST_P(TestReadIntentFromNestedUnion, testUnionOffsets) {
+    const auto& params = GetParam();
+    const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+    const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+    bool prefetchTwice = std::get<2>(params);
+
     MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
-    std::unique_ptr<Reader> reader = createNestedUnionMemReader(memStream);
+    std::unique_ptr<Reader> reader =
+        createNestedUnionMemReader(memStream, stripesToPrefetch, 
columnsToPrefetch, prefetchTwice);
 
     // select only the offsets of single_union.
     verifySelection(reader, {{2, ReadIntent_OFFSETS}}, {0, 2});
@@ -592,17 +733,29 @@ namespace orc {
                     {0, 2, 5, 6, 7, 8, 11});
   }
 
-  TEST(TestReadIntent, testUnionAllAndOffsets) {
+  TEST_P(TestReadIntentFromNestedUnion, testUnionAllAndOffsets) {
+    const auto& params = GetParam();
+    const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+    const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+    bool prefetchTwice = std::get<2>(params);
+
     MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
-    std::unique_ptr<Reader> reader = createNestedUnionMemReader(memStream);
+    std::unique_ptr<Reader> reader =
+        createNestedUnionMemReader(memStream, stripesToPrefetch, 
columnsToPrefetch, prefetchTwice);
 
     // select all of single_union and only the outermost offsets of 
nested_union.
     verifySelection(reader, {{2, ReadIntent_ALL}, {5, ReadIntent_OFFSETS}}, 
{0, 2, 3, 4, 5});
   }
 
-  TEST(TestReadIntent, testUnionConflictingIntent) {
+  TEST_P(TestReadIntentFromNestedUnion, testUnionConflictingIntent) {
+    const auto& params = GetParam();
+    const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+    const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+    bool prefetchTwice = std::get<2>(params);
+
     MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
-    std::unique_ptr<Reader> reader = createNestedUnionMemReader(memStream);
+    std::unique_ptr<Reader> reader =
+        createNestedUnionMemReader(memStream, stripesToPrefetch, 
columnsToPrefetch, prefetchTwice);
 
     // test conflicting ReadIntent on nested_union.
     verifySelection(reader, {{5, ReadIntent_OFFSETS}, {8, ReadIntent_ALL}},
@@ -613,9 +766,15 @@ namespace orc {
                     {0, 5, 6, 7, 8, 9, 10, 11});
   }
 
-  TEST(TestReadIntent, testUnionRowBatchContent) {
+  TEST_P(TestReadIntentFromNestedUnion, testUnionRowBatchContent) {
+    const auto& params = GetParam();
+    const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+    const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+    bool prefetchTwice = std::get<2>(params);
+
     MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
-    std::unique_ptr<Reader> reader = createNestedUnionMemReader(memStream);
+    std::unique_ptr<Reader> reader =
+        createNestedUnionMemReader(memStream, stripesToPrefetch, 
columnsToPrefetch, prefetchTwice);
 
     // select all of single_union and only the offsets of nested_union.
     RowReaderOptions::IdReadIntentMap idReadIntentMap = {{2, ReadIntent_ALL},
@@ -665,10 +824,25 @@ namespace orc {
     EXPECT_EQ(1, nestedUnionBatch.offsets.data()[1]);
   }
 
+  INSTANTIATE_TEST_SUITE_P(
+      TestReadIntentFromNestedUnionInstance, TestReadIntentFromNestedUnion,
+      ::testing::Values(
+          std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{}, 
true),
+          std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{}, 
false),
+          std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{1, 2}, 
true),
+          std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{1, 2}, 
false),
+          std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{}, 
true),
+          std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{}, 
false),
+          std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{1, 2}, 
true),
+          std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{1, 2}, 
false),
+          std::make_tuple(std::vector<uint32_t>{1000}, 
std::list<uint64_t>{1000}, true),
+          std::make_tuple(std::vector<uint32_t>{1000}, 
std::list<uint64_t>{1000}, false)));
+
   TEST(TestReadIntent, testSeekOverEmptyPresentStream) {
     MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
     MemoryPool* pool = getDefaultPool();
     uint64_t rowCount = 5000;
+
     {
       auto type = std::unique_ptr<Type>(
           
Type::buildTypeFromString("struct<col1:struct<col2:int>,col3:struct<col4:int>,"

Reply via email to