This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new b9035b426 ORC-2000: [C++] Add support to prefetch small stripes
b9035b426 is described below

commit b9035b426070b1a90a6a1b2f02063d34c0eb7353
Author: Gang Wu <[email protected]>
AuthorDate: Thu Sep 18 12:44:46 2025 -0700

    ORC-2000: [C++] Add support to prefetch small stripes
    
    ### What changes were proposed in this pull request?
    
    - Support reader to prefetch and coalesce small stripes.
    - Add a reader option to limit number of stripes to prefetch.
    
    ### Why are the changes needed?
    
    Sometimes ORC files may have a lot of small stripes which hurt performance 
a lot.
    
    ### How was this patch tested?
    
    Added a test case to verify that the prefetch works and so as the limit.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #2406 from wgtmac/prefetch_stripe.
    
    Authored-by: Gang Wu <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 c++/include/orc/Reader.hh |  10 ++++
 c++/src/Options.hh        |  11 +++++
 c++/src/Reader.cc         |  81 ++++++++++++++++++++++++++-----
 c++/src/Reader.hh         |  10 ++--
 c++/test/TestReader.cc    | 119 ++++++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 215 insertions(+), 16 deletions(-)

diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index 46b3c3983..506f088d6 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -398,6 +398,16 @@ namespace orc {
      * Whether to enable async I/O prefetch of next stripe.
      */
     bool getEnableAsyncPrefetch() const;
+
+    /**
+     * Set the number of stripes to look ahead for small stripe prefetch.
+     */
+    RowReaderOptions& setSmallStripeLookAheadLimit(uint64_t numStripes);
+
+    /**
+     * Get the number of stripes to look ahead for small stripe prefetch.
+     */
+    uint64_t getSmallStripeLookAheadLimit() const;
   };
 
   class RowReader;
diff --git a/c++/src/Options.hh b/c++/src/Options.hh
index 2dca5e7f5..b71edcd42 100644
--- a/c++/src/Options.hh
+++ b/c++/src/Options.hh
@@ -155,6 +155,7 @@ namespace orc {
     std::shared_ptr<Type> readType;
     bool throwOnSchemaEvolutionOverflow;
     bool enableAsyncPrefetch;
+    uint64_t smallStripeLookAheadLimit;
 
     RowReaderOptionsPrivate() {
       selection = ColumnSelection_NONE;
@@ -167,6 +168,7 @@ namespace orc {
       useTightNumericVector = false;
       throwOnSchemaEvolutionOverflow = false;
       enableAsyncPrefetch = false;
+      smallStripeLookAheadLimit = 8;
     }
   };
 
@@ -351,6 +353,15 @@ namespace orc {
     return privateBits_->enableAsyncPrefetch;
   }
 
+  RowReaderOptions& RowReaderOptions::setSmallStripeLookAheadLimit(uint64_t 
numStripes) {
+    privateBits_->smallStripeLookAheadLimit = numStripes;
+    return *this;
+  }
+
+  uint64_t RowReaderOptions::getSmallStripeLookAheadLimit() const {
+    return privateBits_->smallStripeLookAheadLimit;
+  }
+
 }  // namespace orc
 
 #endif
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index c571fb5d6..1cf6940c8 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -252,6 +252,7 @@ namespace orc {
         throwOnHive11DecimalOverflow_(opts.getThrowOnHive11DecimalOverflow()),
         forcedScaleOnHive11Decimal_(opts.getForcedScaleOnHive11Decimal()),
         enableAsyncPrefetch_(opts.getEnableAsyncPrefetch()),
+        smallStripeLookAheadLimit_(opts.getSmallStripeLookAheadLimit()),
         footer_(contents_->footer.get()),
         firstRowOfStripe_(*contents_->pool, 0),
         enableEncodedBlock_(opts.getEnableLazyDecoding()),
@@ -520,14 +521,38 @@ namespace orc {
   }
 
   proto::StripeFooter getStripeFooter(const proto::StripeInformation& info,
-                                      const FileContents& contents) {
+                                      FileContents& contents) {
     uint64_t stripeFooterStart = info.offset() + info.index_length() + 
info.data_length();
     uint64_t stripeFooterLength = info.footer_length();
-    std::unique_ptr<SeekableInputStream> pbStream = createDecompressor(
-        contents.compression,
-        std::make_unique<SeekableFileInputStream>(contents.stream.get(), 
stripeFooterStart,
-                                                  stripeFooterLength, 
*contents.pool),
-        contents.blockSize, *contents.pool, contents.readerMetrics);
+
+    std::unique_ptr<SeekableInputStream> pbStream;
+
+    // Try to read from cache first
+    {
+      std::lock_guard<std::mutex> lock(contents.readCacheMutex);
+      if (contents.readCache) {
+        ReadRange footerRange(stripeFooterStart, stripeFooterLength);
+        BufferSlice cachedData = contents.readCache->read(footerRange);
+        if (cachedData.buffer != nullptr) {
+          // Create a stream from cached data
+          pbStream = createDecompressor(
+              contents.compression,
+              std::make_unique<SeekableArrayInputStream>(
+                  cachedData.buffer->data() + cachedData.offset, 
cachedData.length),
+              contents.blockSize, *contents.pool, contents.readerMetrics);
+        }
+      }
+    }
+
+    // Fall back to reading from disk if not in cache
+    if (!pbStream) {
+      pbStream = createDecompressor(
+          contents.compression,
+          std::make_unique<SeekableFileInputStream>(contents.stream.get(), 
stripeFooterStart,
+                                                    stripeFooterLength, 
*contents.pool),
+          contents.blockSize, *contents.pool, contents.readerMetrics);
+    }
+
     proto::StripeFooter result;
     if (!result.ParseFromZeroCopyStream(pbStream.get())) {
       throw ParseError(std::string("bad StripeFooter from ") + 
pbStream->getName());
@@ -1070,6 +1095,14 @@ namespace orc {
     }
   }
 
+  uint64_t getStripeSize(const proto::StripeInformation& stripeInfo) {
+    return stripeInfo.index_length() + stripeInfo.data_length() + 
stripeInfo.footer_length();
+  }
+
+  bool isSmallStripe(const proto::StripeInformation& stripeInfo, uint64_t 
threshold) {
+    return getStripeSize(stripeInfo) <= threshold;
+  }
+
   void RowReaderImpl::startNextStripe() {
     reader_.reset();  // ColumnReaders use lots of memory; free old memory 
first
     rowIndexes_.clear();
@@ -1138,13 +1171,37 @@ namespace orc {
 
     if (currentStripe_ < lastStripe_) {
       if (enableAsyncPrefetch_) {
-        // FIXME: this is very coarse since I/O ranges of all selected columns 
are about to
-        // prefetch. We can further evaluate index stream with knowledge of 
pruned row groups
-        // to issue less I/O ranges.
-        auto ranges = extractReadRangesForStripe(currentStripe_, 
currentStripeInfo_,
-                                                 currentStripeFooter_, 
selectedColumns_);
         contents_->evictCache(currentStripeInfo_.offset());
-        contents_->cacheRanges(std::move(ranges));
+
+        if (fullyCachedStripes_.find(currentStripe_) != 
fullyCachedStripes_.cend()) {
+          // Current stripe has been fully cached, do nothing.
+        } else if (isSmallStripe(currentStripeInfo_, 
contents_->cacheOptions.rangeSizeLimit)) {
+          std::vector<ReadRange> ranges;
+          uint64_t maxStripe =
+              std::min(lastStripe_, currentStripe_ + 
smallStripeLookAheadLimit_ + 1);
+          for (uint64_t stripe = currentStripe_; stripe < maxStripe; stripe++) 
{
+            const auto& stripeInfo = 
footer_->stripes(static_cast<int>(stripe));
+            if (!isSmallStripe(stripeInfo, 
contents_->cacheOptions.rangeSizeLimit)) {
+              break;
+            }
+            ranges.push_back(ReadRange{stripeInfo.offset(), 
getStripeSize(stripeInfo)});
+            fullyCachedStripes_.insert(stripe);
+          }
+          contents_->cacheRanges(std::move(ranges));
+        } else {
+          // This is very coarse since I/O ranges of all selected columns are 
about to prefetch.
+          // We can further evaluate index stream with knowledge of pruned row 
groups to issue
+          // less I/O ranges.
+          contents_->cacheRanges(extractReadRangesForStripe(
+              currentStripe_, currentStripeInfo_, currentStripeFooter_, 
selectedColumns_));
+          // Cache footer of next stripe to avoid blocking I/O.
+          if (currentStripe_ + 1 < lastStripe_) {
+            const auto& nextStripe = 
footer_->stripes(static_cast<int>(currentStripe_ + 1));
+            contents_->cacheRanges(std::vector<ReadRange>{ReadRange{
+                nextStripe.offset() + nextStripe.index_length() + 
nextStripe.data_length(),
+                nextStripe.footer_length()}});
+          }
+        }
       }
 
       // get writer timezone info from stripe footer to help understand 
timestamp values.
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 58dcb9062..966281cce 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -25,13 +25,12 @@
 #include "orc/Reader.hh"
 
 #include "ColumnReader.hh"
-#include "RLE.hh"
-#include "io/Cache.hh"
-
 #include "SchemaEvolution.hh"
-#include "TypeImpl.hh"
+#include "io/Cache.hh"
 #include "sargs/SargsApplier.hh"
 
+#include <unordered_set>
+
 namespace orc {
 
   static const uint64_t DIRECTORY_SIZE_GUESS = 16 * 1024;
@@ -148,6 +147,7 @@ namespace orc {
     const bool throwOnHive11DecimalOverflow_;
     const int32_t forcedScaleOnHive11Decimal_;
     const bool enableAsyncPrefetch_;
+    const uint64_t smallStripeLookAheadLimit_;
 
     // inputs
     std::vector<bool> selectedColumns_;
@@ -171,6 +171,8 @@ namespace orc {
     proto::StripeInformation currentStripeInfo_;
     proto::StripeFooter currentStripeFooter_;
     std::unique_ptr<ColumnReader> reader_;
+    // stripe indices that whose entire I/O ranges have been fully cached.
+    std::unordered_set<uint64_t> fullyCachedStripes_;
 
     bool enableEncodedBlock_;
     bool useTightNumericVector_;
diff --git a/c++/test/TestReader.cc b/c++/test/TestReader.cc
index 348a0ad69..12931a5eb 100644
--- a/c++/test/TestReader.cc
+++ b/c++/test/TestReader.cc
@@ -1103,4 +1103,123 @@ namespace orc {
     }
   }
 
+  class IOCountingInputStream : public InputStream {
+   private:
+    std::unique_ptr<InputStream> wrapped_;
+    mutable std::atomic<uint64_t> readCount_;
+
+   public:
+    IOCountingInputStream(std::unique_ptr<InputStream> wrapped)
+        : wrapped_(std::move(wrapped)), readCount_(0) {}
+
+    uint64_t getLength() const override {
+      return wrapped_->getLength();
+    }
+
+    uint64_t getNaturalReadSize() const override {
+      return wrapped_->getNaturalReadSize();
+    }
+
+    void read(void* buf, uint64_t length, uint64_t offset) override {
+      readCount_.fetch_add(1, std::memory_order_relaxed);
+      wrapped_->read(buf, length, offset);
+    }
+
+    const std::string& getName() const override {
+      return wrapped_->getName();
+    }
+
+    uint64_t getReadCount() const {
+      return readCount_.load(std::memory_order_relaxed);
+    }
+
+    void resetReadCount() {
+      readCount_.store(0, std::memory_order_relaxed);
+    }
+  };
+
+  TEST(TestSmallStripeLookAhead, testIOCountWithDifferentLimits) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    uint64_t totalRows = writeSampleData(memStream, /*stripeSize*/ 1024, 
/*rowsPerStripe*/ 200);
+    EXPECT_GT(totalRows, 0UL);
+
+    uint64_t noPrefetchIOCount = 0;
+    uint64_t smallLimitIOCount = 0;
+    uint64_t largeLimitIOCount = 0;
+
+    // Test 1: No async prefetch - should have most I/O operations
+    {
+      auto countingStream = std::make_unique<IOCountingInputStream>(
+          std::make_unique<MemoryInputStream>(memStream.getData(), 
memStream.getLength()));
+      auto* countingPtr = countingStream.get();
+
+      ReaderOptions readerOptions;
+      std::unique_ptr<Reader> reader = createReader(std::move(countingStream), 
readerOptions);
+
+      RowReaderOptions rowReaderOptions;
+      rowReaderOptions.setEnableAsyncPrefetch(false);
+      auto rowReader = reader->createRowReader(rowReaderOptions);
+
+      countingPtr->resetReadCount();
+      uint64_t readRows = readAllRows(*rowReader);
+
+      noPrefetchIOCount = countingPtr->getReadCount();
+      EXPECT_EQ(readRows, totalRows);
+      EXPECT_GT(noPrefetchIOCount, 0UL);
+
+      std::cout << "No async prefetch I/O count: " << noPrefetchIOCount << 
std::endl;
+    }
+
+    // Test 2: Async prefetch with small look ahead limit
+    {
+      auto countingStream = std::make_unique<IOCountingInputStream>(
+          std::make_unique<MemoryInputStream>(memStream.getData(), 
memStream.getLength()));
+      auto* countingPtr = countingStream.get();
+
+      ReaderOptions readerOptions;
+      std::unique_ptr<Reader> reader = createReader(std::move(countingStream), 
readerOptions);
+
+      RowReaderOptions rowReaderOptions;
+      rowReaderOptions.setEnableAsyncPrefetch(true);
+      rowReaderOptions.setSmallStripeLookAheadLimit(1);  // Small limit (only 
1 stripe ahead)
+      auto rowReader = reader->createRowReader(rowReaderOptions);
+
+      countingPtr->resetReadCount();
+      uint64_t readRows = readAllRows(*rowReader);
+
+      smallLimitIOCount = countingPtr->getReadCount();
+      EXPECT_EQ(readRows, totalRows);
+      EXPECT_GT(smallLimitIOCount, 0UL);
+
+      std::cout << "Small limit (1) prefetch I/O count: " << smallLimitIOCount 
<< std::endl;
+    }
+
+    // Test 3: Async prefetch with large look ahead limit
+    {
+      auto countingStream = std::make_unique<IOCountingInputStream>(
+          std::make_unique<MemoryInputStream>(memStream.getData(), 
memStream.getLength()));
+      auto* countingPtr = countingStream.get();
+
+      ReaderOptions readerOptions;
+      std::unique_ptr<Reader> reader = createReader(std::move(countingStream), 
readerOptions);
+
+      RowReaderOptions rowReaderOptions;
+      rowReaderOptions.setEnableAsyncPrefetch(true);
+      rowReaderOptions.setSmallStripeLookAheadLimit(3);  // Large limit (all 
remaining stripes)
+      auto rowReader = reader->createRowReader(rowReaderOptions);
+
+      countingPtr->resetReadCount();
+      uint64_t readRows = readAllRows(*rowReader);
+
+      largeLimitIOCount = countingPtr->getReadCount();
+      EXPECT_EQ(readRows, totalRows);
+      EXPECT_GT(largeLimitIOCount, 0UL);
+
+      std::cout << "Large limit (3) prefetch I/O count: " << largeLimitIOCount 
<< std::endl;
+    }
+
+    EXPECT_LT(smallLimitIOCount, noPrefetchIOCount);
+    EXPECT_LT(largeLimitIOCount, smallLimitIOCount);
+  }
+
 }  // namespace orc

Reply via email to