This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new b9035b426 ORC-2000: [C++] Add support to prefetch small stripes
b9035b426 is described below
commit b9035b426070b1a90a6a1b2f02063d34c0eb7353
Author: Gang Wu <[email protected]>
AuthorDate: Thu Sep 18 12:44:46 2025 -0700
ORC-2000: [C++] Add support to prefetch small stripes
### What changes were proposed in this pull request?
- Support reader to prefetch and coalesce small stripes.
- Add a reader option to limit number of stripes to prefetch.
### Why are the changes needed?
Sometimes ORC files may have a lot of small stripes which hurt performance
a lot.
### How was this patch tested?
Added a test case to verify that the prefetch works and so as the limit.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #2406 from wgtmac/prefetch_stripe.
Authored-by: Gang Wu <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
c++/include/orc/Reader.hh | 10 ++++
c++/src/Options.hh | 11 +++++
c++/src/Reader.cc | 81 ++++++++++++++++++++++++++-----
c++/src/Reader.hh | 10 ++--
c++/test/TestReader.cc | 119 ++++++++++++++++++++++++++++++++++++++++++++++
5 files changed, 215 insertions(+), 16 deletions(-)
diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index 46b3c3983..506f088d6 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -398,6 +398,16 @@ namespace orc {
* Whether to enable async I/O prefetch of next stripe.
*/
bool getEnableAsyncPrefetch() const;
+
+ /**
+ * Set the number of stripes to look ahead for small stripe prefetch.
+ */
+ RowReaderOptions& setSmallStripeLookAheadLimit(uint64_t numStripes);
+
+ /**
+ * Get the number of stripes to look ahead for small stripe prefetch.
+ */
+ uint64_t getSmallStripeLookAheadLimit() const;
};
class RowReader;
diff --git a/c++/src/Options.hh b/c++/src/Options.hh
index 2dca5e7f5..b71edcd42 100644
--- a/c++/src/Options.hh
+++ b/c++/src/Options.hh
@@ -155,6 +155,7 @@ namespace orc {
std::shared_ptr<Type> readType;
bool throwOnSchemaEvolutionOverflow;
bool enableAsyncPrefetch;
+ uint64_t smallStripeLookAheadLimit;
RowReaderOptionsPrivate() {
selection = ColumnSelection_NONE;
@@ -167,6 +168,7 @@ namespace orc {
useTightNumericVector = false;
throwOnSchemaEvolutionOverflow = false;
enableAsyncPrefetch = false;
+ smallStripeLookAheadLimit = 8;
}
};
@@ -351,6 +353,15 @@ namespace orc {
return privateBits_->enableAsyncPrefetch;
}
+ RowReaderOptions& RowReaderOptions::setSmallStripeLookAheadLimit(uint64_t
numStripes) {
+ privateBits_->smallStripeLookAheadLimit = numStripes;
+ return *this;
+ }
+
+ uint64_t RowReaderOptions::getSmallStripeLookAheadLimit() const {
+ return privateBits_->smallStripeLookAheadLimit;
+ }
+
} // namespace orc
#endif
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index c571fb5d6..1cf6940c8 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -252,6 +252,7 @@ namespace orc {
throwOnHive11DecimalOverflow_(opts.getThrowOnHive11DecimalOverflow()),
forcedScaleOnHive11Decimal_(opts.getForcedScaleOnHive11Decimal()),
enableAsyncPrefetch_(opts.getEnableAsyncPrefetch()),
+ smallStripeLookAheadLimit_(opts.getSmallStripeLookAheadLimit()),
footer_(contents_->footer.get()),
firstRowOfStripe_(*contents_->pool, 0),
enableEncodedBlock_(opts.getEnableLazyDecoding()),
@@ -520,14 +521,38 @@ namespace orc {
}
proto::StripeFooter getStripeFooter(const proto::StripeInformation& info,
- const FileContents& contents) {
+ FileContents& contents) {
uint64_t stripeFooterStart = info.offset() + info.index_length() +
info.data_length();
uint64_t stripeFooterLength = info.footer_length();
- std::unique_ptr<SeekableInputStream> pbStream = createDecompressor(
- contents.compression,
- std::make_unique<SeekableFileInputStream>(contents.stream.get(),
stripeFooterStart,
- stripeFooterLength,
*contents.pool),
- contents.blockSize, *contents.pool, contents.readerMetrics);
+
+ std::unique_ptr<SeekableInputStream> pbStream;
+
+ // Try to read from cache first
+ {
+ std::lock_guard<std::mutex> lock(contents.readCacheMutex);
+ if (contents.readCache) {
+ ReadRange footerRange(stripeFooterStart, stripeFooterLength);
+ BufferSlice cachedData = contents.readCache->read(footerRange);
+ if (cachedData.buffer != nullptr) {
+ // Create a stream from cached data
+ pbStream = createDecompressor(
+ contents.compression,
+ std::make_unique<SeekableArrayInputStream>(
+ cachedData.buffer->data() + cachedData.offset,
cachedData.length),
+ contents.blockSize, *contents.pool, contents.readerMetrics);
+ }
+ }
+ }
+
+ // Fall back to reading from disk if not in cache
+ if (!pbStream) {
+ pbStream = createDecompressor(
+ contents.compression,
+ std::make_unique<SeekableFileInputStream>(contents.stream.get(),
stripeFooterStart,
+ stripeFooterLength,
*contents.pool),
+ contents.blockSize, *contents.pool, contents.readerMetrics);
+ }
+
proto::StripeFooter result;
if (!result.ParseFromZeroCopyStream(pbStream.get())) {
throw ParseError(std::string("bad StripeFooter from ") +
pbStream->getName());
@@ -1070,6 +1095,14 @@ namespace orc {
}
}
+ uint64_t getStripeSize(const proto::StripeInformation& stripeInfo) {
+ return stripeInfo.index_length() + stripeInfo.data_length() +
stripeInfo.footer_length();
+ }
+
+ bool isSmallStripe(const proto::StripeInformation& stripeInfo, uint64_t
threshold) {
+ return getStripeSize(stripeInfo) <= threshold;
+ }
+
void RowReaderImpl::startNextStripe() {
reader_.reset(); // ColumnReaders use lots of memory; free old memory
first
rowIndexes_.clear();
@@ -1138,13 +1171,37 @@ namespace orc {
if (currentStripe_ < lastStripe_) {
if (enableAsyncPrefetch_) {
- // FIXME: this is very coarse since I/O ranges of all selected columns
are about to
- // prefetch. We can further evaluate index stream with knowledge of
pruned row groups
- // to issue less I/O ranges.
- auto ranges = extractReadRangesForStripe(currentStripe_,
currentStripeInfo_,
- currentStripeFooter_,
selectedColumns_);
contents_->evictCache(currentStripeInfo_.offset());
- contents_->cacheRanges(std::move(ranges));
+
+ if (fullyCachedStripes_.find(currentStripe_) !=
fullyCachedStripes_.cend()) {
+ // Current stripe has been fully cached, do nothing.
+ } else if (isSmallStripe(currentStripeInfo_,
contents_->cacheOptions.rangeSizeLimit)) {
+ std::vector<ReadRange> ranges;
+ uint64_t maxStripe =
+ std::min(lastStripe_, currentStripe_ +
smallStripeLookAheadLimit_ + 1);
+ for (uint64_t stripe = currentStripe_; stripe < maxStripe; stripe++)
{
+ const auto& stripeInfo =
footer_->stripes(static_cast<int>(stripe));
+ if (!isSmallStripe(stripeInfo,
contents_->cacheOptions.rangeSizeLimit)) {
+ break;
+ }
+ ranges.push_back(ReadRange{stripeInfo.offset(),
getStripeSize(stripeInfo)});
+ fullyCachedStripes_.insert(stripe);
+ }
+ contents_->cacheRanges(std::move(ranges));
+ } else {
+ // This is very coarse since I/O ranges of all selected columns are
about to prefetch.
+ // We can further evaluate index stream with knowledge of pruned row
groups to issue
+ // less I/O ranges.
+ contents_->cacheRanges(extractReadRangesForStripe(
+ currentStripe_, currentStripeInfo_, currentStripeFooter_,
selectedColumns_));
+ // Cache footer of next stripe to avoid blocking I/O.
+ if (currentStripe_ + 1 < lastStripe_) {
+ const auto& nextStripe =
footer_->stripes(static_cast<int>(currentStripe_ + 1));
+ contents_->cacheRanges(std::vector<ReadRange>{ReadRange{
+ nextStripe.offset() + nextStripe.index_length() +
nextStripe.data_length(),
+ nextStripe.footer_length()}});
+ }
+ }
}
// get writer timezone info from stripe footer to help understand
timestamp values.
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 58dcb9062..966281cce 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -25,13 +25,12 @@
#include "orc/Reader.hh"
#include "ColumnReader.hh"
-#include "RLE.hh"
-#include "io/Cache.hh"
-
#include "SchemaEvolution.hh"
-#include "TypeImpl.hh"
+#include "io/Cache.hh"
#include "sargs/SargsApplier.hh"
+#include <unordered_set>
+
namespace orc {
static const uint64_t DIRECTORY_SIZE_GUESS = 16 * 1024;
@@ -148,6 +147,7 @@ namespace orc {
const bool throwOnHive11DecimalOverflow_;
const int32_t forcedScaleOnHive11Decimal_;
const bool enableAsyncPrefetch_;
+ const uint64_t smallStripeLookAheadLimit_;
// inputs
std::vector<bool> selectedColumns_;
@@ -171,6 +171,8 @@ namespace orc {
proto::StripeInformation currentStripeInfo_;
proto::StripeFooter currentStripeFooter_;
std::unique_ptr<ColumnReader> reader_;
+ // stripe indices that whose entire I/O ranges have been fully cached.
+ std::unordered_set<uint64_t> fullyCachedStripes_;
bool enableEncodedBlock_;
bool useTightNumericVector_;
diff --git a/c++/test/TestReader.cc b/c++/test/TestReader.cc
index 348a0ad69..12931a5eb 100644
--- a/c++/test/TestReader.cc
+++ b/c++/test/TestReader.cc
@@ -1103,4 +1103,123 @@ namespace orc {
}
}
+ class IOCountingInputStream : public InputStream {
+ private:
+ std::unique_ptr<InputStream> wrapped_;
+ mutable std::atomic<uint64_t> readCount_;
+
+ public:
+ IOCountingInputStream(std::unique_ptr<InputStream> wrapped)
+ : wrapped_(std::move(wrapped)), readCount_(0) {}
+
+ uint64_t getLength() const override {
+ return wrapped_->getLength();
+ }
+
+ uint64_t getNaturalReadSize() const override {
+ return wrapped_->getNaturalReadSize();
+ }
+
+ void read(void* buf, uint64_t length, uint64_t offset) override {
+ readCount_.fetch_add(1, std::memory_order_relaxed);
+ wrapped_->read(buf, length, offset);
+ }
+
+ const std::string& getName() const override {
+ return wrapped_->getName();
+ }
+
+ uint64_t getReadCount() const {
+ return readCount_.load(std::memory_order_relaxed);
+ }
+
+ void resetReadCount() {
+ readCount_.store(0, std::memory_order_relaxed);
+ }
+ };
+
+ TEST(TestSmallStripeLookAhead, testIOCountWithDifferentLimits) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ uint64_t totalRows = writeSampleData(memStream, /*stripeSize*/ 1024,
/*rowsPerStripe*/ 200);
+ EXPECT_GT(totalRows, 0UL);
+
+ uint64_t noPrefetchIOCount = 0;
+ uint64_t smallLimitIOCount = 0;
+ uint64_t largeLimitIOCount = 0;
+
+ // Test 1: No async prefetch - should have most I/O operations
+ {
+ auto countingStream = std::make_unique<IOCountingInputStream>(
+ std::make_unique<MemoryInputStream>(memStream.getData(),
memStream.getLength()));
+ auto* countingPtr = countingStream.get();
+
+ ReaderOptions readerOptions;
+ std::unique_ptr<Reader> reader = createReader(std::move(countingStream),
readerOptions);
+
+ RowReaderOptions rowReaderOptions;
+ rowReaderOptions.setEnableAsyncPrefetch(false);
+ auto rowReader = reader->createRowReader(rowReaderOptions);
+
+ countingPtr->resetReadCount();
+ uint64_t readRows = readAllRows(*rowReader);
+
+ noPrefetchIOCount = countingPtr->getReadCount();
+ EXPECT_EQ(readRows, totalRows);
+ EXPECT_GT(noPrefetchIOCount, 0UL);
+
+ std::cout << "No async prefetch I/O count: " << noPrefetchIOCount <<
std::endl;
+ }
+
+ // Test 2: Async prefetch with small look ahead limit
+ {
+ auto countingStream = std::make_unique<IOCountingInputStream>(
+ std::make_unique<MemoryInputStream>(memStream.getData(),
memStream.getLength()));
+ auto* countingPtr = countingStream.get();
+
+ ReaderOptions readerOptions;
+ std::unique_ptr<Reader> reader = createReader(std::move(countingStream),
readerOptions);
+
+ RowReaderOptions rowReaderOptions;
+ rowReaderOptions.setEnableAsyncPrefetch(true);
+ rowReaderOptions.setSmallStripeLookAheadLimit(1); // Small limit (only
1 stripe ahead)
+ auto rowReader = reader->createRowReader(rowReaderOptions);
+
+ countingPtr->resetReadCount();
+ uint64_t readRows = readAllRows(*rowReader);
+
+ smallLimitIOCount = countingPtr->getReadCount();
+ EXPECT_EQ(readRows, totalRows);
+ EXPECT_GT(smallLimitIOCount, 0UL);
+
+ std::cout << "Small limit (1) prefetch I/O count: " << smallLimitIOCount
<< std::endl;
+ }
+
+ // Test 3: Async prefetch with large look ahead limit
+ {
+ auto countingStream = std::make_unique<IOCountingInputStream>(
+ std::make_unique<MemoryInputStream>(memStream.getData(),
memStream.getLength()));
+ auto* countingPtr = countingStream.get();
+
+ ReaderOptions readerOptions;
+ std::unique_ptr<Reader> reader = createReader(std::move(countingStream),
readerOptions);
+
+ RowReaderOptions rowReaderOptions;
+ rowReaderOptions.setEnableAsyncPrefetch(true);
+ rowReaderOptions.setSmallStripeLookAheadLimit(3); // Large limit (all
remaining stripes)
+ auto rowReader = reader->createRowReader(rowReaderOptions);
+
+ countingPtr->resetReadCount();
+ uint64_t readRows = readAllRows(*rowReader);
+
+ largeLimitIOCount = countingPtr->getReadCount();
+ EXPECT_EQ(readRows, totalRows);
+ EXPECT_GT(largeLimitIOCount, 0UL);
+
+ std::cout << "Large limit (3) prefetch I/O count: " << largeLimitIOCount
<< std::endl;
+ }
+
+ EXPECT_LT(smallLimitIOCount, noPrefetchIOCount);
+ EXPECT_LT(largeLimitIOCount, smallLimitIOCount);
+ }
+
} // namespace orc