This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new d7d6924de ORC-262: [C++] Support async I/O prefetch
d7d6924de is described below
commit d7d6924de13fcbf938d953935b81949593969f12
Author: taiyang-li <[email protected]>
AuthorDate: Mon Dec 2 15:25:43 2024 -0800
ORC-262: [C++] Support async I/O prefetch
### What changes were proposed in this pull request?
Support async io prefetch for orc c++ lib. Close
https://issues.apache.org/jira/browse/ORC-262
Changes:
- Added new interface `InputStream::readAsync`(default unimplemented). It
reads io asynchronously within the specified range.
- Added IO Cache implementation `ReadRangeCache` to cache async io results.
This borrows from a similar design of Parquet Reader in
https://github.com/apache/arrow
- Added interface `Reader::preBuffer` to trigger io prefetch. In the
specific implementation of `ReaderImpl::preBuffer`, the io ranges will be
calculated according to the selected stripe and columns, and then these ranges
will be merged and sorted, and `ReadRangeCache::cache` will be called to
trigger the asynchronous io in the background, waiting for the use of the upper
layer
- Added the interface `Reader::releaseBuffer`, which is used to release all
cached io ranges before an offset
### Why are the changes needed?
Async io prefetch could hide io latency during reading orc files, which
improves performance of scan operators in ClickHouse.
### How was this patch tested?
Pass the CIs.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #2048 from taiyang-li/upstream_orc_prefetch.
Lead-authored-by: taiyang-li <[email protected]>
Co-authored-by: 李扬 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
c++/include/orc/OrcFile.hh | 13 +++
c++/include/orc/Reader.hh | 40 +++++++
c++/src/CMakeLists.txt | 1 +
c++/src/Options.hh | 12 +++
c++/src/Reader.cc | 88 +++++++++++++++
c++/src/Reader.hh | 16 +++
c++/src/StripeStream.cc | 25 ++++-
c++/src/StripeStream.hh | 2 +
c++/src/io/Cache.cc | 171 +++++++++++++++++++++++++++++
c++/src/io/Cache.hh | 122 +++++++++++++++++++++
c++/test/CMakeLists.txt | 1 +
c++/test/MemoryInputStream.hh | 7 +-
c++/test/TestCache.cc | 142 ++++++++++++++++++++++++
c++/test/TestReader.cc | 246 +++++++++++++++++++++++++++++++++++-------
14 files changed, 843 insertions(+), 43 deletions(-)
diff --git a/c++/include/orc/OrcFile.hh b/c++/include/orc/OrcFile.hh
index a9ad692d4..ea71567c5 100644
--- a/c++/include/orc/OrcFile.hh
+++ b/c++/include/orc/OrcFile.hh
@@ -19,6 +19,7 @@
#ifndef ORC_FILE_HH
#define ORC_FILE_HH
+#include <future>
#include <string>
#include "orc/Reader.hh"
@@ -58,6 +59,18 @@ namespace orc {
*/
virtual void read(void* buf, uint64_t length, uint64_t offset) = 0;
+ /**
+ * Read data asynchronously into the buffer. The buffer is allocated by
the caller.
+ * @param buf the buffer to read into
+ * @param length the number of bytes to read.
+ * @param offset the position in the stream to read from.
+ * @return a future that will be set when the read is complete.
+ */
+ virtual std::future<void> readAsync(void* buf, uint64_t length, uint64_t
offset) {
+ return std::async(std::launch::async,
+ [this, buf, length, offset] { this->read(buf, length,
offset); });
+ }
+
/**
* Get the name of the stream for error messages.
*/
diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index 4ddce64ad..b015b6491 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -40,6 +40,17 @@ namespace orc {
struct ReaderOptionsPrivate;
struct RowReaderOptionsPrivate;
+ struct CacheOptions {
+ // The maximum distance in bytes between two consecutive
+ // ranges; beyond this value, ranges are not combined
+ uint64_t holeSizeLimit = 8192;
+
+ // The maximum size in bytes of a combined range; if
+ // combining two consecutive ranges would produce a range of a
+ // size greater than this, they are not combined
+ uint64_t rangeSizeLimit = 32 * 1024 * 1024;
+ };
+
/**
* Expose the reader metrics including the latency and
* number of calls of the decompression/decoding/IO modules.
@@ -59,6 +70,8 @@ namespace orc {
std::atomic<uint64_t> IOBlockingLatencyUs{0};
std::atomic<uint64_t> SelectedRowGroupCount{0};
std::atomic<uint64_t> EvaluatedRowGroupCount{0};
+ std::atomic<uint64_t> ReadRangeCacheHits{0};
+ std::atomic<uint64_t> ReadRangeCacheMisses{0};
};
ReaderMetrics* getDefaultReaderMetrics();
@@ -116,6 +129,11 @@ namespace orc {
*/
ReaderOptions& setReaderMetrics(ReaderMetrics* metrics);
+ /**
+ * Set the cache options.
+ */
+ ReaderOptions& setCacheOptions(const CacheOptions& cacheOptions);
+
/**
* Set the location of the tail as defined by the logical length of the
* file.
@@ -147,6 +165,11 @@ namespace orc {
* Get the reader metrics.
*/
ReaderMetrics* getReaderMetrics() const;
+
+ /**
+ * Set the cache options.
+ */
+ const CacheOptions& getCacheOptions() const;
};
/**
@@ -624,6 +647,23 @@ namespace orc {
*/
virtual std::map<uint32_t, RowGroupIndex> getRowGroupIndex(
uint32_t stripeIndex, const std::set<uint32_t>& included = {}) const =
0;
+
+ /**
+ * Trigger IO prefetch and cache the prefetched contents asynchronously.
+ * It is thread safe. Users should make sure requested stripes and columns
+ * are not overlapped, otherwise the overlapping part will be prefetched
multiple time,
+ * which doesn't affect correctness but waste IO and memory resources.
+ * @param stripes the stripes to prefetch
+ * @param includeTypes the types to prefetch
+ */
+ virtual void preBuffer(const std::vector<uint32_t>& stripes,
+ const std::list<uint64_t>& includeTypes) = 0;
+
+ /**
+ * Release cached entries whose right boundary is less than or equal to
the given boundary.
+ * @param boundary the boundary value to release cache entries
+ */
+ virtual void releaseBuffer(uint64_t boundary) = 0;
};
/**
diff --git a/c++/src/CMakeLists.txt b/c++/src/CMakeLists.txt
index 5b7840301..694667c06 100644
--- a/c++/src/CMakeLists.txt
+++ b/c++/src/CMakeLists.txt
@@ -150,6 +150,7 @@ set(SOURCE_FILES
orc_proto.pb.h
io/InputStream.cc
io/OutputStream.cc
+ io/Cache.cc
sargs/ExpressionTree.cc
sargs/Literal.cc
sargs/PredicateLeaf.cc
diff --git a/c++/src/Options.hh b/c++/src/Options.hh
index daf9d52e1..0a4bd56d8 100644
--- a/c++/src/Options.hh
+++ b/c++/src/Options.hh
@@ -23,6 +23,8 @@
#include "orc/OrcFile.hh"
#include "orc/Reader.hh"
+#include "io/Cache.hh"
+
#include <limits>
namespace orc {
@@ -43,6 +45,7 @@ namespace orc {
MemoryPool* memoryPool;
std::string serializedTail;
ReaderMetrics* metrics;
+ CacheOptions cacheOptions;
ReaderOptionsPrivate() {
tailLocation = std::numeric_limits<uint64_t>::max();
@@ -122,6 +125,15 @@ namespace orc {
return privateBits_->errorStream;
}
+ ReaderOptions& ReaderOptions::setCacheOptions(const CacheOptions&
cacheOptions) {
+ privateBits_->cacheOptions = cacheOptions;
+ return *this;
+ }
+
+ const CacheOptions& ReaderOptions::getCacheOptions() const {
+ return privateBits_->cacheOptions;
+ }
+
/**
* RowReaderOptions Implementation
*/
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 034ea04ee..c93c62f6c 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -1523,6 +1523,94 @@ namespace orc {
return ret;
}
+ void ReaderImpl::releaseBuffer(uint64_t boundary) {
+ std::lock_guard<std::mutex> lock(contents_->readCacheMutex);
+
+ if (contents_->readCache) {
+ contents_->readCache->evictEntriesBefore(boundary);
+ }
+ }
+
+ void ReaderImpl::preBuffer(const std::vector<uint32_t>& stripes,
+ const std::list<uint64_t>& includeTypes) {
+ std::vector<uint32_t> newStripes;
+ for (auto stripe : stripes) {
+ if (stripe < static_cast<uint32_t>(footer_->stripes_size()))
newStripes.push_back(stripe);
+ }
+
+ std::list<uint64_t> newIncludeTypes;
+ for (auto type : includeTypes) {
+ if (type < static_cast<uint64_t>(footer_->types_size()))
newIncludeTypes.push_back(type);
+ }
+
+ if (newStripes.empty() || newIncludeTypes.empty()) {
+ return;
+ }
+
+ orc::RowReaderOptions rowReaderOptions;
+ rowReaderOptions.includeTypes(newIncludeTypes);
+ ColumnSelector columnSelector(contents_.get());
+ std::vector<bool> selectedColumns;
+ columnSelector.updateSelected(selectedColumns, rowReaderOptions);
+
+ std::vector<ReadRange> ranges;
+ ranges.reserve(newIncludeTypes.size());
+ for (auto stripe : newStripes) {
+ // get stripe information
+ const auto& stripeInfo = footer_->stripes(stripe);
+ uint64_t stripeFooterStart =
+ stripeInfo.offset() + stripeInfo.index_length() +
stripeInfo.data_length();
+ uint64_t stripeFooterLength = stripeInfo.footer_length();
+
+ // get stripe footer
+ std::unique_ptr<SeekableInputStream> pbStream = createDecompressor(
+ contents_->compression,
+ std::make_unique<SeekableFileInputStream>(contents_->stream.get(),
stripeFooterStart,
+ stripeFooterLength,
*contents_->pool),
+ contents_->blockSize, *contents_->pool, contents_->readerMetrics);
+ proto::StripeFooter stripeFooter;
+ if (!stripeFooter.ParseFromZeroCopyStream(pbStream.get())) {
+ throw ParseError(std::string("bad StripeFooter from ") +
pbStream->getName());
+ }
+
+ // traverse all streams in stripe footer, choose selected streams to
prebuffer
+ uint64_t offset = stripeInfo.offset();
+ for (int i = 0; i < stripeFooter.streams_size(); i++) {
+ const proto::Stream& stream = stripeFooter.streams(i);
+ if (offset + stream.length() > stripeFooterStart) {
+ std::stringstream msg;
+ msg << "Malformed stream meta at stream index " << i << " in stripe
" << stripe
+ << ": streamOffset=" << offset << ", streamLength=" <<
stream.length()
+ << ", stripeOffset=" << stripeInfo.offset()
+ << ", stripeIndexLength=" << stripeInfo.index_length()
+ << ", stripeDataLength=" << stripeInfo.data_length();
+ throw ParseError(msg.str());
+ }
+
+ if (stream.has_kind() && selectedColumns[stream.column()]) {
+ const auto& kind = stream.kind();
+ if (kind == proto::Stream_Kind_DATA || kind ==
proto::Stream_Kind_DICTIONARY_DATA ||
+ kind == proto::Stream_Kind_PRESENT || kind ==
proto::Stream_Kind_LENGTH ||
+ kind == proto::Stream_Kind_SECONDARY) {
+ ranges.emplace_back(offset, stream.length());
+ }
+ }
+
+ offset += stream.length();
+ }
+
+ {
+ std::lock_guard<std::mutex> lock(contents_->readCacheMutex);
+
+ if (!contents_->readCache) {
+ contents_->readCache = std::make_shared<ReadRangeCache>(
+ getStream(), options_.getCacheOptions(), contents_->pool,
contents_->readerMetrics);
+ }
+ contents_->readCache->cache(std::move(ranges));
+ }
+ }
+ }
+
RowReader::~RowReader() {
// PASS
}
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 89606c331..39ca73967 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -26,6 +26,8 @@
#include "ColumnReader.hh"
#include "RLE.hh"
+#include "io/Cache.hh"
+
#include "SchemaEvolution.hh"
#include "TypeImpl.hh"
#include "sargs/SargsApplier.hh"
@@ -70,6 +72,11 @@ namespace orc {
bool isDecimalAsLong;
std::unique_ptr<proto::Metadata> metadata;
ReaderMetrics* readerMetrics;
+
+ // mutex to protect readCache_ from concurrent access
+ std::mutex readCacheMutex;
+ // cached io ranges. only valid when preBuffer is invoked.
+ std::shared_ptr<ReadRangeCache> readCache;
};
proto::StripeFooter getStripeFooter(const proto::StripeInformation& info,
@@ -245,6 +252,10 @@ namespace orc {
const SchemaEvolution* getSchemaEvolution() const {
return &schemaEvolution_;
}
+
+ std::shared_ptr<ReadRangeCache> getReadCache() const {
+ return contents_->readCache;
+ }
};
class ReaderImpl : public Reader {
@@ -260,6 +271,7 @@ namespace orc {
// footer
proto::Footer* footer_;
uint64_t numberOfStripes_;
+
uint64_t getMemoryUse(int stripeIx, std::vector<bool>& selectedColumns);
// internal methods
@@ -375,6 +387,10 @@ namespace orc {
std::map<uint32_t, BloomFilterIndex> getBloomFilters(
uint32_t stripeIndex, const std::set<uint32_t>& included) const
override;
+ void preBuffer(const std::vector<uint32_t>& stripes,
+ const std::list<uint64_t>& includeTypes) override;
+ void releaseBuffer(uint64_t boundary) override;
+
std::map<uint32_t, RowGroupIndex> getRowGroupIndex(
uint32_t stripeIndex, const std::set<uint32_t>& included) const
override;
};
diff --git a/c++/src/StripeStream.cc b/c++/src/StripeStream.cc
index f4345c087..a5609f762 100644
--- a/c++/src/StripeStream.cc
+++ b/c++/src/StripeStream.cc
@@ -19,6 +19,7 @@
#include "StripeStream.hh"
#include "RLE.hh"
#include "Reader.hh"
+#include "io/Cache.hh"
#include "orc/Exceptions.hh"
#include "wrap/coded-stream-wrapper.h"
@@ -37,7 +38,8 @@ namespace orc {
stripeStart_(stripeStart),
input_(input),
writerTimezone_(writerTimezone),
- readerTimezone_(readerTimezone) {
+ readerTimezone_(readerTimezone),
+ readCache_(reader.getReadCache()) {
// PASS
}
@@ -89,7 +91,6 @@ namespace orc {
if (stream.has_kind() && stream.kind() == kind &&
stream.column() == static_cast<uint64_t>(columnId)) {
uint64_t streamLength = stream.length();
- uint64_t myBlock = shouldStream ? input_.getNaturalReadSize() :
streamLength;
if (offset + streamLength > dataEnd) {
std::stringstream msg;
msg << "Malformed stream meta at stream index " << i << " in stripe
" << stripeIndex_
@@ -99,9 +100,23 @@ namespace orc {
<< ", stripeDataLength=" << stripeInfo_.data_length();
throw ParseError(msg.str());
}
- return createDecompressor(reader_.getCompression(),
- std::make_unique<SeekableFileInputStream>(
- &input_, offset, stream.length(), *pool,
myBlock),
+
+ BufferSlice slice;
+ if (readCache_) {
+ ReadRange range{offset, streamLength};
+ slice = readCache_->read(range);
+ }
+
+ uint64_t myBlock = shouldStream ? input_.getNaturalReadSize() :
streamLength;
+ std::unique_ptr<SeekableInputStream> seekableInput;
+ if (slice.buffer) {
+ seekableInput = std::make_unique<SeekableArrayInputStream>(
+ slice.buffer->data() + slice.offset, slice.length);
+ } else {
+ seekableInput = std::make_unique<SeekableFileInputStream>(&input_,
offset, streamLength,
+ *pool,
myBlock);
+ }
+ return createDecompressor(reader_.getCompression(),
std::move(seekableInput),
reader_.getCompressionSize(), *pool,
reader_.getFileContents().readerMetrics);
}
diff --git a/c++/src/StripeStream.hh b/c++/src/StripeStream.hh
index ad82d472c..2d26f8575 100644
--- a/c++/src/StripeStream.hh
+++ b/c++/src/StripeStream.hh
@@ -30,6 +30,7 @@
namespace orc {
class RowReaderImpl;
+ class ReadRangeCache;
/**
* StripeStream Implementation
@@ -45,6 +46,7 @@ namespace orc {
InputStream& input_;
const Timezone& writerTimezone_;
const Timezone& readerTimezone_;
+ std::shared_ptr<ReadRangeCache> readCache_;
public:
StripeStreamsImpl(const RowReaderImpl& reader, uint64_t index,
diff --git a/c++/src/io/Cache.cc b/c++/src/io/Cache.cc
new file mode 100644
index 000000000..39f63fdd2
--- /dev/null
+++ b/c++/src/io/Cache.cc
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cassert>
+
+#include "Cache.hh"
+
+namespace orc {
+
+ std::vector<ReadRange> ReadRangeCombiner::coalesce(std::vector<ReadRange>
ranges) const {
+ if (ranges.empty()) {
+ return ranges;
+ }
+
+ // Remove zero-sized ranges
+ auto end = std::remove_if(ranges.begin(), ranges.end(),
+ [](const ReadRange& range) { return range.length
== 0; });
+ // Sort in position order
+ std::sort(ranges.begin(), end, [](const ReadRange& a, const ReadRange& b) {
+ return a.offset != b.offset ? a.offset < b.offset : a.length > b.length;
+ });
+
+ // Remove ranges that overlap 100%
+ std::vector<ReadRange> uniqueRanges;
+ uniqueRanges.reserve(ranges.size());
+ for (auto it = ranges.begin(); it != end; ++it) {
+ if (uniqueRanges.empty() || !uniqueRanges.back().contains(*it)) {
+ uniqueRanges.push_back(*it);
+ }
+ }
+ ranges = std::move(uniqueRanges);
+
+ // Skip further processing if ranges is empty after removing zero-sized
ranges.
+ if (ranges.empty()) {
+ return ranges;
+ }
+
+#ifndef NDEBUG
+ for (size_t i = 0; i < ranges.size() - 1; ++i) {
+ const auto& left = ranges[i];
+ const auto& right = ranges[i + 1];
+ assert(left.offset < right.offset);
+ assert(!left.contains(right));
+ }
+#endif
+
+ std::vector<ReadRange> coalesced;
+ auto itr = ranges.begin();
+
+ // Start of the current coalesced range and end (exclusive) of previous
range.
+ // Both are initialized with the start of first range which is a
placeholder value.
+ uint64_t coalescedStart = itr->offset;
+ uint64_t coalescedEnd = coalescedStart + itr->length;
+
+ for (++itr; itr < ranges.end(); ++itr) {
+ const uint64_t currentRangeStart = itr->offset;
+ const uint64_t currentRangeEnd = currentRangeStart + itr->length;
+
+ assert(coalescedStart < coalescedEnd);
+ assert(currentRangeStart < currentRangeEnd);
+
+ // At this point, the coalesced range is [coalesced_start,
prev_range_end).
+ // Stop coalescing if:
+ // - coalesced range is too large, or
+ // - distance (hole/gap) between consecutive ranges is too large.
+ if ((currentRangeEnd - coalescedStart > rangeSizeLimit) ||
+ (currentRangeStart > coalescedEnd + holeSizeLimit)) {
+ coalesced.push_back({coalescedStart, coalescedEnd - coalescedStart});
+ coalescedStart = currentRangeStart;
+ }
+
+ // Update the prev_range_end with the current range.
+ coalescedEnd = currentRangeEnd;
+ }
+ coalesced.push_back({coalescedStart, coalescedEnd - coalescedStart});
+
+ assert(coalesced.front().offset == ranges.front().offset);
+ assert(coalesced.back().offset + coalesced.back().length ==
+ ranges.back().offset + ranges.back().length);
+ return coalesced;
+ }
+
+ std::vector<ReadRange>
ReadRangeCombiner::coalesceReadRanges(std::vector<ReadRange> ranges,
+ uint64_t
holeSizeLimit,
+ uint64_t
rangeSizeLimit) {
+ assert(rangeSizeLimit > holeSizeLimit);
+
+ ReadRangeCombiner combiner{holeSizeLimit, rangeSizeLimit};
+ return combiner.coalesce(std::move(ranges));
+ }
+
+ void ReadRangeCache::cache(std::vector<ReadRange> ranges) {
+ ranges = ReadRangeCombiner::coalesceReadRanges(std::move(ranges),
options_.holeSizeLimit,
+ options_.rangeSizeLimit);
+
+ std::vector<RangeCacheEntry> newEntries = makeCacheEntries(ranges);
+ // Add new entries, themselves ordered by offset
+ if (entries_.size() > 0) {
+ std::vector<RangeCacheEntry> merged(entries_.size() + newEntries.size());
+ std::merge(entries_.begin(), entries_.end(), newEntries.begin(),
newEntries.end(),
+ merged.begin());
+ entries_ = std::move(merged);
+ } else {
+ entries_ = std::move(newEntries);
+ }
+ }
+
+ BufferSlice ReadRangeCache::read(const ReadRange& range) {
+ if (range.length == 0) {
+ return {std::make_shared<Buffer>(*memoryPool_, 0), 0, 0};
+ }
+
+ const auto it = std::lower_bound(entries_.begin(), entries_.end(), range,
+ [](const RangeCacheEntry& entry, const
ReadRange& range) {
+ return entry.range.offset +
entry.range.length <
+ range.offset + range.length;
+ });
+
+ BufferSlice result{};
+ bool hit_cache = false;
+ if (it != entries_.end() && it->range.contains(range)) {
+ hit_cache = it->future.valid();
+ it->future.get();
+ result = BufferSlice{it->buffer, range.offset - it->range.offset,
range.length};
+ }
+
+ if (metrics_) {
+ if (hit_cache)
+ metrics_->ReadRangeCacheHits.fetch_add(1);
+ else
+ metrics_->ReadRangeCacheMisses.fetch_add(1);
+ }
+ return result;
+ }
+
+ void ReadRangeCache::evictEntriesBefore(uint64_t boundary) {
+ auto it = std::lower_bound(entries_.begin(), entries_.end(), boundary,
+ [](const RangeCacheEntry& entry, uint64_t
offset) {
+ return entry.range.offset +
entry.range.length <= offset;
+ });
+ entries_.erase(entries_.begin(), it);
+ }
+
+ std::vector<RangeCacheEntry> ReadRangeCache::makeCacheEntries(
+ const std::vector<ReadRange>& ranges) const {
+ std::vector<RangeCacheEntry> newEntries;
+ newEntries.reserve(ranges.size());
+ for (const auto& range : ranges) {
+ BufferPtr buffer = std::make_shared<Buffer>(*memoryPool_, range.length);
+ std::future<void> future = stream_->readAsync(buffer->data(),
buffer->size(), range.offset);
+ newEntries.emplace_back(range, std::move(buffer), std::move(future));
+ }
+ return newEntries;
+ }
+
+} // namespace orc
diff --git a/c++/src/io/Cache.hh b/c++/src/io/Cache.hh
new file mode 100644
index 000000000..7fc79718a
--- /dev/null
+++ b/c++/src/io/Cache.hh
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "orc/MemoryPool.hh"
+#include "orc/OrcFile.hh"
+
+#include <algorithm>
+#include <cassert>
+#include <cstdint>
+#include <future>
+#include <utility>
+#include <vector>
+
+namespace orc {
+
+ struct ReadRange {
+ uint64_t offset;
+ uint64_t length;
+
+ ReadRange() = default;
+ ReadRange(uint64_t offset, uint64_t length) : offset(offset),
length(length) {}
+
+ friend bool operator==(const ReadRange& left, const ReadRange& right) {
+ return (left.offset == right.offset && left.length == right.length);
+ }
+ friend bool operator!=(const ReadRange& left, const ReadRange& right) {
+ return !(left == right);
+ }
+
+ bool contains(const ReadRange& other) const {
+ return (offset <= other.offset && offset + length >= other.offset +
other.length);
+ }
+ };
+
+ struct ReadRangeCombiner {
+ const uint64_t holeSizeLimit;
+ const uint64_t rangeSizeLimit;
+
+ std::vector<ReadRange> coalesce(std::vector<ReadRange> ranges) const;
+
+ static std::vector<ReadRange> coalesceReadRanges(std::vector<ReadRange>
ranges,
+ uint64_t holeSizeLimit,
+ uint64_t rangeSizeLimit);
+ };
+
+ using Buffer = DataBuffer<char>;
+ using BufferPtr = std::shared_ptr<Buffer>;
+
+ struct RangeCacheEntry {
+ ReadRange range;
+ BufferPtr buffer;
+ std::shared_future<void> future; // use shared_future in case of multiple
get calls
+
+ RangeCacheEntry() = default;
+ RangeCacheEntry(const ReadRange& range, BufferPtr buffer,
std::future<void> future)
+ : range(range), buffer(std::move(buffer)),
future(std::move(future).share()) {}
+
+ friend bool operator<(const RangeCacheEntry& left, const RangeCacheEntry&
right) {
+ return left.range.offset < right.range.offset;
+ }
+ };
+
+ struct BufferSlice {
+ BufferPtr buffer = nullptr;
+ uint64_t offset = 0;
+ uint64_t length = 0;
+ };
+
+ /// A read cache designed to hide IO latencies when reading.
+ class ReadRangeCache {
+ public:
+ /// Construct a read cache with given options
+ explicit ReadRangeCache(InputStream* stream, CacheOptions options,
MemoryPool* memoryPool,
+ ReaderMetrics* metrics = nullptr)
+ : stream_(stream),
+ options_(std::move(options)),
+ memoryPool_(memoryPool),
+ metrics_(metrics) {}
+
+ ~ReadRangeCache() = default;
+
+ /// Cache the given ranges in the background.
+ ///
+ /// The caller must ensure that the ranges do not overlap with each other,
+ /// nor with previously cached ranges. Otherwise, behaviour will be
undefined.
+ void cache(std::vector<ReadRange> ranges);
+
+ /// Read a range previously given to Cache().
+ BufferSlice read(const ReadRange& range);
+
+ /// Evict cache entries with its range before given boundary.
+ void evictEntriesBefore(uint64_t boundary);
+
+ private:
+ std::vector<RangeCacheEntry> makeCacheEntries(const
std::vector<ReadRange>& ranges) const;
+
+ InputStream* stream_;
+ CacheOptions options_;
+ // Ordered by offset (so as to find a matching region by binary search)
+ std::vector<RangeCacheEntry> entries_;
+ MemoryPool* memoryPool_;
+ ReaderMetrics* metrics_;
+ };
+
+} // namespace orc
diff --git a/c++/test/CMakeLists.txt b/c++/test/CMakeLists.txt
index e6a1491d4..6c5b26c4f 100644
--- a/c++/test/CMakeLists.txt
+++ b/c++/test/CMakeLists.txt
@@ -63,6 +63,7 @@ add_executable (orc-test
TestTimezone.cc
TestType.cc
TestWriter.cc
+ TestCache.cc
${SIMD_TEST_SRCS}
)
diff --git a/c++/test/MemoryInputStream.hh b/c++/test/MemoryInputStream.hh
index e6ef55b6d..31333ae43 100644
--- a/c++/test/MemoryInputStream.hh
+++ b/c++/test/MemoryInputStream.hh
@@ -22,8 +22,6 @@
#include "io/InputStream.hh"
#include "orc/OrcFile.hh"
-#include <iostream>
-
namespace orc {
class MemoryInputStream : public InputStream {
public:
@@ -44,6 +42,11 @@ namespace orc {
memcpy(buf, buffer_ + offset, length);
}
+ std::future<void> readAsync(void* buf, uint64_t length, uint64_t offset)
override {
+ return std::async(std::launch::async,
+ [this, buf, length, offset] { this->read(buf, length,
offset); });
+ }
+
virtual const std::string& getName() const override {
return name_;
}
diff --git a/c++/test/TestCache.cc b/c++/test/TestCache.cc
new file mode 100644
index 000000000..496ba3ec9
--- /dev/null
+++ b/c++/test/TestCache.cc
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cstring>
+
+#include "MemoryInputStream.hh"
+#include "io/Cache.hh"
+
+#include "wrap/gmock.h"
+#include "wrap/gtest-wrapper.h"
+
+namespace orc {
+
+ TEST(TestReadRangeCombiner, testBasics) {
+ ReadRangeCombiner combinator{0, 100};
+ /// Ranges with partial overlap and identical offsets
+ std::vector<ReadRange> ranges{{0, 15}, {5, 11}, {5, 15}};
+ std::vector<ReadRange> result = combinator.coalesce(std::move(ranges));
+ std::vector<ReadRange> expect{{0, 20}};
+ ASSERT_EQ(result, expect);
+ }
+
+ TEST(TestCoalesceReadRanges, testBasics) {
+ auto check = [](std::vector<ReadRange> ranges, std::vector<ReadRange>
expected) -> void {
+ const uint64_t holeSizeLimit = 9;
+ const uint64_t rangeSizeLimit = 99;
+ auto coalesced = ReadRangeCombiner::coalesceReadRanges(ranges,
holeSizeLimit, rangeSizeLimit);
+ ASSERT_EQ(coalesced, expected);
+ };
+
+ check({}, {});
+ // Zero sized range that ends up in empty list
+ check({{110, 0}}, {});
+ // Combination on 1 zero sized range and 1 non-zero sized range
+ check({{110, 10}, {120, 0}}, {{110, 10}});
+ // 1 non-zero sized range
+ check({{110, 10}}, {{110, 10}});
+ // No holes + unordered ranges
+ check({{130, 10}, {110, 10}, {120, 10}}, {{110, 30}});
+ // No holes
+ check({{110, 10}, {120, 10}, {130, 10}}, {{110, 30}});
+ // Small holes only
+ check({{110, 11}, {130, 11}, {150, 11}}, {{110, 51}});
+ // Large holes
+ check({{110, 10}, {130, 10}}, {{110, 10}, {130, 10}});
+ check({{110, 11}, {130, 11}, {150, 10}, {170, 11}, {190, 11}}, {{110, 50},
{170, 31}});
+
+ // With zero-sized ranges
+ check({{110, 11}, {130, 0}, {130, 11}, {145, 0}, {150, 11}, {200, 0}},
{{110, 51}});
+
+ // No holes but large ranges
+ check({{110, 100}, {210, 100}}, {{110, 100}, {210, 100}});
+ // Small holes and large range in the middle (*)
+ check({{110, 10}, {120, 11}, {140, 100}, {240, 11}, {260, 11}},
+ {{110, 21}, {140, 100}, {240, 31}});
+ // Mid-size ranges that would turn large after coalescing
+ check({{100, 50}, {150, 50}}, {{100, 50}, {150, 50}});
+ check({{100, 30}, {130, 30}, {160, 30}, {190, 30}, {220, 30}}, {{100, 90},
{190, 60}});
+
+ // Same as (*) but unsorted
+ check({{140, 100}, {120, 11}, {240, 11}, {110, 10}, {260, 11}},
+ {{110, 21}, {140, 100}, {240, 31}});
+
+ // Completely overlapping ranges should be eliminated
+ check({{20, 5}, {20, 5}, {21, 2}}, {{20, 5}});
+ }
+
+ TEST(TestReadRangeCache, testBasics) {
+ std::string data = "abcdefghijklmnopqrstuvwxyz";
+
+ CacheOptions options;
+ options.holeSizeLimit = 2;
+ options.rangeSizeLimit = 10;
+
+ auto file = std::make_shared<MemoryInputStream>(data.data(), data.size());
+ ReadRangeCache cache(file.get(), options, getDefaultPool());
+
+ cache.cache({{1, 2}, {3, 2}, {8, 2}, {20, 2}, {25, 0}});
+ cache.cache({{10, 4}, {14, 0}, {15, 4}});
+
+ auto assert_slice_equal = [](const BufferSlice& slice, const std::string&
expected) {
+ ASSERT_TRUE(slice.buffer);
+ ASSERT_EQ(expected, std::string_view(slice.buffer->data() +
slice.offset, slice.length));
+ };
+
+ BufferSlice slice;
+
+ slice = cache.read({20, 2});
+ assert_slice_equal(slice, "uv");
+
+ slice = cache.read({1, 2});
+ assert_slice_equal(slice, "bc");
+
+ slice = cache.read({3, 2});
+ assert_slice_equal(slice, "de");
+
+ slice = cache.read({8, 2});
+ assert_slice_equal(slice, "ij");
+
+ slice = cache.read({10, 4});
+ assert_slice_equal(slice, "klmn");
+
+ slice = cache.read({15, 4});
+ assert_slice_equal(slice, "pqrs");
+
+ // Zero-sized
+ slice = cache.read({14, 0});
+ assert_slice_equal(slice, "");
+ slice = cache.read({25, 0});
+ assert_slice_equal(slice, "");
+
+ // Non-cached ranges
+ ASSERT_FALSE(cache.read({20, 3}).buffer);
+ ASSERT_FALSE(cache.read({19, 3}).buffer);
+ ASSERT_FALSE(cache.read({0, 3}).buffer);
+ ASSERT_FALSE(cache.read({25, 2}).buffer);
+
+ // Release cache entries before 10. After that cache entries would be:
{10, 9}, {20, 2}
+ cache.evictEntriesBefore(15);
+ ASSERT_FALSE(cache.read({1, 2}).buffer);
+ ASSERT_FALSE(cache.read({8, 2}).buffer);
+ slice = cache.read({10, 4});
+ assert_slice_equal(slice, "klmn");
+ slice = cache.read({20, 2});
+ assert_slice_equal(slice, "uv");
+ }
+} // namespace orc
diff --git a/c++/test/TestReader.cc b/c++/test/TestReader.cc
index 33a0481b6..f9df6edc9 100644
--- a/c++/test/TestReader.cc
+++ b/c++/test/TestReader.cc
@@ -155,7 +155,10 @@ namespace orc {
ASSERT_THAT(rowReader->getSelectedColumns(), ElementsAreArray(expected));
}
- std::unique_ptr<Reader> createNestedListMemReader(MemoryOutputStream&
memStream) {
+ std::unique_ptr<Reader> createNestedListMemReader(MemoryOutputStream&
memStream,
+ const
std::vector<uint32_t>& stripesToPrefetch,
+ const std::list<uint64_t>&
columnsToPrefetch,
+ bool prefetchTwice) {
MemoryPool* pool = getDefaultPool();
auto type = std::unique_ptr<Type>(
@@ -218,20 +221,43 @@ namespace orc {
auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(),
memStream.getLength());
ReaderOptions readerOptions;
readerOptions.setMemoryPool(*pool);
- return createReader(std::move(inStream), readerOptions);
+ auto reader = createReader(std::move(inStream), readerOptions);
+
+ reader->preBuffer(stripesToPrefetch, columnsToPrefetch);
+ if (prefetchTwice) {
+ reader->preBuffer(stripesToPrefetch, columnsToPrefetch);
+ }
+
+ return reader;
}
- TEST(TestReadIntent, testListAll) {
+ class TestReadIntentFromNestedList
+ : public ::testing::TestWithParam<
+ std::tuple<std::vector<uint32_t>, std::list<uint64_t>, bool>> {};
+
+ TEST_P(TestReadIntentFromNestedList, testListAll) {
+ const auto& params = GetParam();
+ const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+ const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+ bool prefetchTwice = std::get<2>(params);
+
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
- std::unique_ptr<Reader> reader = createNestedListMemReader(memStream);
+ std::unique_ptr<Reader> reader =
+ createNestedListMemReader(memStream, stripesToPrefetch,
columnsToPrefetch, prefetchTwice);
// select all of int_array.
verifySelection(reader, {{1, ReadIntent_ALL}}, {0, 1, 2});
}
- TEST(TestReadIntent, testListOffsets) {
+ TEST_P(TestReadIntentFromNestedList, testListOffsets) {
+ const auto& params = GetParam();
+ const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+ const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+ bool prefetchTwice = std::get<2>(params);
+
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
- std::unique_ptr<Reader> reader = createNestedListMemReader(memStream);
+ std::unique_ptr<Reader> reader =
+ createNestedListMemReader(memStream, stripesToPrefetch,
columnsToPrefetch, prefetchTwice);
// select only the offsets of int_array.
verifySelection(reader, {{1, ReadIntent_OFFSETS}}, {0, 1});
@@ -244,26 +270,44 @@ namespace orc {
verifySelection(reader, {{3, ReadIntent_OFFSETS}, {5,
ReadIntent_OFFSETS}}, {0, 3, 4, 5});
}
- TEST(TestReadIntent, testListAllAndOffsets) {
+ TEST_P(TestReadIntentFromNestedList, testListAllAndOffsets) {
+ const auto& params = GetParam();
+ const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+ const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+ bool prefetchTwice = std::get<2>(params);
+
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
- std::unique_ptr<Reader> reader = createNestedListMemReader(memStream);
+ std::unique_ptr<Reader> reader =
+ createNestedListMemReader(memStream, stripesToPrefetch,
columnsToPrefetch, prefetchTwice);
// select all of int_array and only the outermost offsets of
int_array_array_array.
verifySelection(reader, {{1, ReadIntent_ALL}, {3, ReadIntent_OFFSETS}},
{0, 1, 2, 3});
}
- TEST(TestReadIntent, testListConflictingIntent) {
+ TEST_P(TestReadIntentFromNestedList, testListConflictingIntent) {
+ const auto& params = GetParam();
+ const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+ const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+ bool prefetchTwice = std::get<2>(params);
+
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
- std::unique_ptr<Reader> reader = createNestedListMemReader(memStream);
+ std::unique_ptr<Reader> reader =
+ createNestedListMemReader(memStream, stripesToPrefetch,
columnsToPrefetch, prefetchTwice);
// test conflicting ReadIntent on nested list.
verifySelection(reader, {{3, ReadIntent_OFFSETS}, {5, ReadIntent_ALL}},
{0, 3, 4, 5, 6});
verifySelection(reader, {{3, ReadIntent_ALL}, {5, ReadIntent_OFFSETS}},
{0, 3, 4, 5, 6});
}
- TEST(TestReadIntent, testRowBatchContent) {
+ TEST_P(TestReadIntentFromNestedList, testRowBatchContent) {
+ const auto& params = GetParam();
+ const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+ const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+ bool prefetchTwice = std::get<2>(params);
+
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
- std::unique_ptr<Reader> reader = createNestedListMemReader(memStream);
+ std::unique_ptr<Reader> reader =
+ createNestedListMemReader(memStream, stripesToPrefetch,
columnsToPrefetch, prefetchTwice);
// select all of int_array and only the offsets of int_array_array.
RowReaderOptions::IdReadIntentMap idReadIntentMap = {{1, ReadIntent_ALL},
@@ -299,7 +343,24 @@ namespace orc {
EXPECT_EQ(nullptr, intArrayArrayArrayBatch.elements);
}
- std::unique_ptr<Reader> createNestedMapMemReader(MemoryOutputStream&
memStream) {
+ INSTANTIATE_TEST_SUITE_P(
+ TestReadIntentFromNestedListInstance, TestReadIntentFromNestedList,
+ ::testing::Values(
+ std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{},
true),
+ std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{},
false),
+ std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{1, 3},
true),
+ std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{1, 3},
false),
+ std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{},
true),
+ std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{},
false),
+ std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{1, 3},
true),
+ std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{1, 3},
false),
+ std::make_tuple(std::vector<uint32_t>{1000},
std::list<uint64_t>{1000}, true),
+ std::make_tuple(std::vector<uint32_t>{1000},
std::list<uint64_t>{1000}, false)));
+
+ std::unique_ptr<Reader> createNestedMapMemReader(MemoryOutputStream&
memStream,
+ const
std::vector<uint32_t>& stripesToPrefetch,
+ const std::list<uint64_t>&
columnsToPrefetch,
+ bool prefetchTwice) {
MemoryPool* pool = getDefaultPool();
auto type = std::unique_ptr<Type>(
@@ -389,20 +450,42 @@ namespace orc {
auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(),
memStream.getLength());
ReaderOptions readerOptions;
readerOptions.setMemoryPool(*pool);
- return createReader(std::move(inStream), readerOptions);
+ auto reader = createReader(std::move(inStream), readerOptions);
+
+ reader->preBuffer(stripesToPrefetch, columnsToPrefetch);
+ if (prefetchTwice) {
+ reader->preBuffer(stripesToPrefetch, columnsToPrefetch);
+ }
+ return reader;
}
- TEST(TestReadIntent, testMapAll) {
+ class TestReadIntentFromNestedMap
+ : public ::testing::TestWithParam<
+ std::tuple<std::vector<uint32_t>, std::list<uint64_t>, bool>> {};
+
+ TEST_P(TestReadIntentFromNestedMap, testMapAll) {
+ const auto& params = GetParam();
+ const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+ const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+ bool prefetchTwice = std::get<2>(params);
+
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
- std::unique_ptr<Reader> reader = createNestedMapMemReader(memStream);
+ std::unique_ptr<Reader> reader =
+ createNestedMapMemReader(memStream, stripesToPrefetch,
columnsToPrefetch, prefetchTwice);
// select all of single_map.
verifySelection(reader, {{2, ReadIntent_ALL}}, {0, 2, 3, 4});
}
- TEST(TestReadIntent, testMapOffsets) {
+ TEST_P(TestReadIntentFromNestedMap, testMapOffsets) {
+ const auto& params = GetParam();
+ const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+ const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+ bool prefetchTwice = std::get<2>(params);
+
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
- std::unique_ptr<Reader> reader = createNestedMapMemReader(memStream);
+ std::unique_ptr<Reader> reader =
+ createNestedMapMemReader(memStream, stripesToPrefetch,
columnsToPrefetch, prefetchTwice);
// select only the offsets of single_map.
verifySelection(reader, {{2, ReadIntent_OFFSETS}}, {0, 2});
@@ -414,17 +497,29 @@ namespace orc {
verifySelection(reader, {{5, ReadIntent_OFFSETS}, {9,
ReadIntent_OFFSETS}}, {0, 5, 7, 9});
}
- TEST(TestReadIntent, testMapAllAndOffsets) {
+ TEST_P(TestReadIntentFromNestedMap, testMapAllAndOffsets) {
+ const auto& params = GetParam();
+ const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+ const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+ bool prefetchTwice = std::get<2>(params);
+
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
- std::unique_ptr<Reader> reader = createNestedMapMemReader(memStream);
+ std::unique_ptr<Reader> reader =
+ createNestedMapMemReader(memStream, stripesToPrefetch,
columnsToPrefetch, prefetchTwice);
// select all of single_map and only the outermost offsets of nested_map.
verifySelection(reader, {{2, ReadIntent_ALL}, {5, ReadIntent_OFFSETS}},
{0, 2, 3, 4, 5});
}
- TEST(TestReadIntent, testMapConflictingIntent) {
+ TEST_P(TestReadIntentFromNestedMap, testMapConflictingIntent) {
+ const auto& params = GetParam();
+ const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+ const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+ bool prefetchTwice = std::get<2>(params);
+
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
- std::unique_ptr<Reader> reader = createNestedMapMemReader(memStream);
+ std::unique_ptr<Reader> reader =
+ createNestedMapMemReader(memStream, stripesToPrefetch,
columnsToPrefetch, prefetchTwice);
// test conflicting ReadIntent on nested_map.
verifySelection(reader, {{5, ReadIntent_OFFSETS}, {9, ReadIntent_ALL}},
{0, 5, 7, 9, 10, 11});
@@ -434,9 +529,15 @@ namespace orc {
{0, 5, 7, 8, 9, 10, 11});
}
- TEST(TestReadIntent, testMapRowBatchContent) {
+ TEST_P(TestReadIntentFromNestedMap, testMapRowBatchContent) {
+ const auto& params = GetParam();
+ const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+ const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+ bool prefetchTwice = std::get<2>(params);
+
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
- std::unique_ptr<Reader> reader = createNestedMapMemReader(memStream);
+ std::unique_ptr<Reader> reader =
+ createNestedMapMemReader(memStream, stripesToPrefetch,
columnsToPrefetch, prefetchTwice);
// select all of single_map and only the offsets of nested_map.
RowReaderOptions::IdReadIntentMap idReadIntentMap = {{2, ReadIntent_ALL},
@@ -482,7 +583,24 @@ namespace orc {
EXPECT_EQ(nullptr, nestedMapBatch.elements);
}
- std::unique_ptr<Reader> createNestedUnionMemReader(MemoryOutputStream&
memStream) {
+ INSTANTIATE_TEST_SUITE_P(
+ TestReadIntentFromNestedMapInstance, TestReadIntentFromNestedMap,
+ ::testing::Values(
+ std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{},
true),
+ std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{},
false),
+ std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{1, 5},
true),
+ std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{1, 5},
false),
+ std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{},
true),
+ std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{},
false),
+ std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{1, 5},
true),
+ std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{1, 5},
false),
+ std::make_tuple(std::vector<uint32_t>{1000},
std::list<uint64_t>{1000}, true),
+ std::make_tuple(std::vector<uint32_t>{1000},
std::list<uint64_t>{1000}, false)));
+
+ std::unique_ptr<Reader> createNestedUnionMemReader(MemoryOutputStream&
memStream,
+ const
std::vector<uint32_t>& stripesToPrefetch,
+ const
std::list<uint64_t>& columnsToPrefetch,
+ bool prefetchTwice) {
MemoryPool* pool = getDefaultPool();
auto type = std::unique_ptr<Type>(
@@ -566,20 +684,43 @@ namespace orc {
ReaderOptions readerOptions;
readerOptions.setMemoryPool(*pool);
readerOptions.setReaderMetrics(nullptr);
- return createReader(std::move(inStream), readerOptions);
+ auto reader = createReader(std::move(inStream), readerOptions);
+
+ reader->preBuffer(stripesToPrefetch, columnsToPrefetch);
+ if (prefetchTwice) {
+ reader->preBuffer(stripesToPrefetch, columnsToPrefetch);
+ }
+
+ return reader;
}
- TEST(TestReadIntent, testUnionAll) {
+ class TestReadIntentFromNestedUnion
+ : public ::testing::TestWithParam<
+ std::tuple<std::vector<uint32_t>, std::list<uint64_t>, bool>> {};
+
+ TEST_P(TestReadIntentFromNestedUnion, testUnionAll) {
+ const auto& params = GetParam();
+ const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+ const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+ bool prefetchTwice = std::get<2>(params);
+
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
- std::unique_ptr<Reader> reader = createNestedUnionMemReader(memStream);
+ std::unique_ptr<Reader> reader =
+ createNestedUnionMemReader(memStream, stripesToPrefetch,
columnsToPrefetch, prefetchTwice);
// select all of single_union.
verifySelection(reader, {{2, ReadIntent_ALL}}, {0, 2, 3, 4});
}
- TEST(TestReadIntent, testUnionOffsets) {
+ TEST_P(TestReadIntentFromNestedUnion, testUnionOffsets) {
+ const auto& params = GetParam();
+ const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+ const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+ bool prefetchTwice = std::get<2>(params);
+
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
- std::unique_ptr<Reader> reader = createNestedUnionMemReader(memStream);
+ std::unique_ptr<Reader> reader =
+ createNestedUnionMemReader(memStream, stripesToPrefetch,
columnsToPrefetch, prefetchTwice);
// select only the offsets of single_union.
verifySelection(reader, {{2, ReadIntent_OFFSETS}}, {0, 2});
@@ -592,17 +733,29 @@ namespace orc {
{0, 2, 5, 6, 7, 8, 11});
}
- TEST(TestReadIntent, testUnionAllAndOffsets) {
+ TEST_P(TestReadIntentFromNestedUnion, testUnionAllAndOffsets) {
+ const auto& params = GetParam();
+ const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+ const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+ bool prefetchTwice = std::get<2>(params);
+
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
- std::unique_ptr<Reader> reader = createNestedUnionMemReader(memStream);
+ std::unique_ptr<Reader> reader =
+ createNestedUnionMemReader(memStream, stripesToPrefetch,
columnsToPrefetch, prefetchTwice);
// select all of single_union and only the outermost offsets of
nested_union.
verifySelection(reader, {{2, ReadIntent_ALL}, {5, ReadIntent_OFFSETS}},
{0, 2, 3, 4, 5});
}
- TEST(TestReadIntent, testUnionConflictingIntent) {
+ TEST_P(TestReadIntentFromNestedUnion, testUnionConflictingIntent) {
+ const auto& params = GetParam();
+ const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+ const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+ bool prefetchTwice = std::get<2>(params);
+
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
- std::unique_ptr<Reader> reader = createNestedUnionMemReader(memStream);
+ std::unique_ptr<Reader> reader =
+ createNestedUnionMemReader(memStream, stripesToPrefetch,
columnsToPrefetch, prefetchTwice);
// test conflicting ReadIntent on nested_union.
verifySelection(reader, {{5, ReadIntent_OFFSETS}, {8, ReadIntent_ALL}},
@@ -613,9 +766,15 @@ namespace orc {
{0, 5, 6, 7, 8, 9, 10, 11});
}
- TEST(TestReadIntent, testUnionRowBatchContent) {
+ TEST_P(TestReadIntentFromNestedUnion, testUnionRowBatchContent) {
+ const auto& params = GetParam();
+ const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params);
+ const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params);
+ bool prefetchTwice = std::get<2>(params);
+
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
- std::unique_ptr<Reader> reader = createNestedUnionMemReader(memStream);
+ std::unique_ptr<Reader> reader =
+ createNestedUnionMemReader(memStream, stripesToPrefetch,
columnsToPrefetch, prefetchTwice);
// select all of single_union and only the offsets of nested_union.
RowReaderOptions::IdReadIntentMap idReadIntentMap = {{2, ReadIntent_ALL},
@@ -665,10 +824,25 @@ namespace orc {
EXPECT_EQ(1, nestedUnionBatch.offsets.data()[1]);
}
+ INSTANTIATE_TEST_SUITE_P(
+ TestReadIntentFromNestedUnionInstance, TestReadIntentFromNestedUnion,
+ ::testing::Values(
+ std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{},
true),
+ std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{},
false),
+ std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{1, 2},
true),
+ std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{1, 2},
false),
+ std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{},
true),
+ std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{},
false),
+ std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{1, 2},
true),
+ std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{1, 2},
false),
+ std::make_tuple(std::vector<uint32_t>{1000},
std::list<uint64_t>{1000}, true),
+ std::make_tuple(std::vector<uint32_t>{1000},
std::list<uint64_t>{1000}, false)));
+
TEST(TestReadIntent, testSeekOverEmptyPresentStream) {
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
MemoryPool* pool = getDefaultPool();
uint64_t rowCount = 5000;
+
{
auto type = std::unique_ptr<Type>(
Type::buildTypeFromString("struct<col1:struct<col2:int>,col3:struct<col4:int>,"