wgtmac commented on code in PR #2048: URL: https://github.com/apache/orc/pull/2048#discussion_r1853290234
########## c++/src/io/Cache.hh: ########## @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "orc/MemoryPool.hh" +#include "orc/OrcFile.hh" + +#include <algorithm> +#include <cassert> +#include <cstdint> +#include <future> +#include <utility> +#include <vector> + +namespace orc { + + struct ReadRange { + uint64_t offset; + uint64_t length; + + ReadRange() = default; + ReadRange(uint64_t offset, uint64_t length) : offset(offset), length(length) {} + + friend bool operator==(const ReadRange& left, const ReadRange& right) { + return (left.offset == right.offset && left.length == right.length); + } + friend bool operator!=(const ReadRange& left, const ReadRange& right) { + return !(left == right); + } + + bool contains(const ReadRange& other) const { + return (offset <= other.offset && offset + length >= other.offset + other.length); + } + }; + + struct ReadRangeCombiner { + const uint64_t holeSizeLimit; + const uint64_t rangeSizeLimit; + + std::vector<ReadRange> coalesce(std::vector<ReadRange> ranges) const; + }; + + std::vector<ReadRange> coalesceReadRanges(std::vector<ReadRange> ranges, uint64_t holeSizeLimit, + uint64_t rangeSizeLimit); + struct RangeCacheEntry { + using BufferPtr = InputStream::BufferPtr; Review Comment: We can directly use std::shared_ptr<DataBuffer> now. ########## c++/src/io/Cache.hh: ########## @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "orc/MemoryPool.hh" +#include "orc/OrcFile.hh" + +#include <algorithm> +#include <cassert> +#include <cstdint> +#include <future> +#include <utility> +#include <vector> + +namespace orc { + + struct ReadRange { + uint64_t offset; + uint64_t length; + + ReadRange() = default; + ReadRange(uint64_t offset, uint64_t length) : offset(offset), length(length) {} + + friend bool operator==(const ReadRange& left, const ReadRange& right) { + return (left.offset == right.offset && left.length == right.length); + } + friend bool operator!=(const ReadRange& left, const ReadRange& right) { + return !(left == right); + } + + bool contains(const ReadRange& other) const { + return (offset <= other.offset && offset + length >= other.offset + other.length); + } + }; + + struct ReadRangeCombiner { + const uint64_t holeSizeLimit; + const uint64_t rangeSizeLimit; + + std::vector<ReadRange> coalesce(std::vector<ReadRange> ranges) const; + }; + + std::vector<ReadRange> coalesceReadRanges(std::vector<ReadRange> ranges, uint64_t holeSizeLimit, + uint64_t rangeSizeLimit); + struct RangeCacheEntry { + using BufferPtr = InputStream::BufferPtr; + + ReadRange range; + + // The result may be get multiple times, so we use shared_future instead of std::future + BufferPtr buffer; + std::shared_future<void> future; + + RangeCacheEntry() = default; + RangeCacheEntry(const ReadRange& range, BufferPtr buffer, std::future<void> future) + : range(range), buffer(std::move(buffer)), future(std::move(future).share()) {} + + friend bool operator<(const RangeCacheEntry& left, const RangeCacheEntry& right) { + return left.range.offset < right.range.offset; + } + }; + + /// A read cache designed to hide IO latencies when reading. + class ReadRangeCache { + public: + using Buffer = InputStream::Buffer; + using BufferPtr = InputStream::BufferPtr; + + struct BufferSlice { Review Comment: To be consistent, `struct BufferSlice` should not be a nest class as well. ########## c++/src/io/Cache.hh: ########## @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "orc/MemoryPool.hh" +#include "orc/OrcFile.hh" + +#include <algorithm> +#include <cassert> +#include <cstdint> +#include <future> +#include <utility> +#include <vector> + +namespace orc { + + struct ReadRange { + uint64_t offset; + uint64_t length; + + ReadRange() = default; + ReadRange(uint64_t offset, uint64_t length) : offset(offset), length(length) {} + + friend bool operator==(const ReadRange& left, const ReadRange& right) { + return (left.offset == right.offset && left.length == right.length); + } + friend bool operator!=(const ReadRange& left, const ReadRange& right) { + return !(left == right); + } + + bool contains(const ReadRange& other) const { + return (offset <= other.offset && offset + length >= other.offset + other.length); + } + }; + + struct ReadRangeCombiner { + const uint64_t holeSizeLimit; + const uint64_t rangeSizeLimit; + + std::vector<ReadRange> coalesce(std::vector<ReadRange> ranges) const; + }; + + std::vector<ReadRange> coalesceReadRanges(std::vector<ReadRange> ranges, uint64_t holeSizeLimit, Review Comment: What about moving `coalesceReadRanges` into `struct ReadRangeCombiner` as a static function? Actually I think a separate coalesceReadRanges function is redundant. ########## c++/src/io/Cache.hh: ########## @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "orc/MemoryPool.hh" +#include "orc/OrcFile.hh" + +#include <algorithm> +#include <cassert> +#include <cstdint> +#include <future> +#include <utility> +#include <vector> + +namespace orc { + + struct ReadRange { + uint64_t offset; + uint64_t length; + + ReadRange() = default; + ReadRange(uint64_t offset, uint64_t length) : offset(offset), length(length) {} + + friend bool operator==(const ReadRange& left, const ReadRange& right) { + return (left.offset == right.offset && left.length == right.length); + } + friend bool operator!=(const ReadRange& left, const ReadRange& right) { + return !(left == right); + } + + bool contains(const ReadRange& other) const { + return (offset <= other.offset && offset + length >= other.offset + other.length); + } + }; + + struct ReadRangeCombiner { + const uint64_t holeSizeLimit; + const uint64_t rangeSizeLimit; + + std::vector<ReadRange> coalesce(std::vector<ReadRange> ranges) const; + }; + + std::vector<ReadRange> coalesceReadRanges(std::vector<ReadRange> ranges, uint64_t holeSizeLimit, + uint64_t rangeSizeLimit); + struct RangeCacheEntry { + using BufferPtr = InputStream::BufferPtr; + + ReadRange range; + + // The result may be get multiple times, so we use shared_future instead of std::future + BufferPtr buffer; + std::shared_future<void> future; + + RangeCacheEntry() = default; Review Comment: Should it be `delete`? ########## c++/src/io/Cache.hh: ########## @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "orc/MemoryPool.hh" +#include "orc/OrcFile.hh" + +#include <algorithm> +#include <cassert> +#include <cstdint> +#include <future> +#include <utility> +#include <vector> + +namespace orc { + + struct ReadRange { + uint64_t offset; + uint64_t length; + + ReadRange() = default; + ReadRange(uint64_t offset, uint64_t length) : offset(offset), length(length) {} + + friend bool operator==(const ReadRange& left, const ReadRange& right) { + return (left.offset == right.offset && left.length == right.length); + } + friend bool operator!=(const ReadRange& left, const ReadRange& right) { + return !(left == right); + } + + bool contains(const ReadRange& other) const { + return (offset <= other.offset && offset + length >= other.offset + other.length); + } + }; + + struct ReadRangeCombiner { + const uint64_t holeSizeLimit; + const uint64_t rangeSizeLimit; + + std::vector<ReadRange> coalesce(std::vector<ReadRange> ranges) const; + }; + + std::vector<ReadRange> coalesceReadRanges(std::vector<ReadRange> ranges, uint64_t holeSizeLimit, + uint64_t rangeSizeLimit); + struct RangeCacheEntry { + using BufferPtr = InputStream::BufferPtr; + + ReadRange range; + + // The result may be get multiple times, so we use shared_future instead of std::future + BufferPtr buffer; + std::shared_future<void> future; + + RangeCacheEntry() = default; + RangeCacheEntry(const ReadRange& range, BufferPtr buffer, std::future<void> future) + : range(range), buffer(std::move(buffer)), future(std::move(future).share()) {} + + friend bool operator<(const RangeCacheEntry& left, const RangeCacheEntry& right) { + return left.range.offset < right.range.offset; + } + }; + + /// A read cache designed to hide IO latencies when reading. + class ReadRangeCache { + public: + using Buffer = InputStream::Buffer; + using BufferPtr = InputStream::BufferPtr; + + struct BufferSlice { + BufferSlice() : buffer(nullptr), offset(0), length(0) {} + + BufferSlice(BufferPtr buffer, uint64_t offset, uint64_t length) + : buffer(std::move(buffer)), offset(offset), length(length) {} + + BufferPtr buffer; + uint64_t offset; + uint64_t length; + }; + + /// Construct a read cache with given options + explicit ReadRangeCache(InputStream* stream, CacheOptions options, MemoryPool* memoryPool, + ReaderMetrics* metrics = nullptr) + : stream_(stream), + options_(std::move(options)), + memoryPool_(memoryPool), + metrics_(metrics) {} + + ~ReadRangeCache() = default; + + /// Cache the given ranges in the background. + /// + /// The caller must ensure that the ranges do not overlap with each other, + /// nor with previously cached ranges. Otherwise, behaviour will be undefined. + void cache(std::vector<ReadRange> ranges); + + /// Read a range previously given to Cache(). + BufferSlice read(const ReadRange& range); + + /// Evict cache entries with its range before given boundary. + void evictEntriesBefore(uint64_t boundary); + + private: + std::vector<RangeCacheEntry> makeCacheEntries(const std::vector<ReadRange>& ranges); + + InputStream* stream_; Review Comment: Please either remove blank lines or add a blank line between member variables to keep consistency. ########## c++/src/io/Cache.hh: ########## @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "orc/MemoryPool.hh" +#include "orc/OrcFile.hh" + +#include <algorithm> +#include <cassert> +#include <cstdint> +#include <future> +#include <utility> +#include <vector> + +namespace orc { + + struct ReadRange { + uint64_t offset; + uint64_t length; + + ReadRange() = default; + ReadRange(uint64_t offset, uint64_t length) : offset(offset), length(length) {} + + friend bool operator==(const ReadRange& left, const ReadRange& right) { + return (left.offset == right.offset && left.length == right.length); + } + friend bool operator!=(const ReadRange& left, const ReadRange& right) { + return !(left == right); + } + + bool contains(const ReadRange& other) const { + return (offset <= other.offset && offset + length >= other.offset + other.length); + } + }; + + struct ReadRangeCombiner { + const uint64_t holeSizeLimit; + const uint64_t rangeSizeLimit; + + std::vector<ReadRange> coalesce(std::vector<ReadRange> ranges) const; + }; + + std::vector<ReadRange> coalesceReadRanges(std::vector<ReadRange> ranges, uint64_t holeSizeLimit, + uint64_t rangeSizeLimit); + struct RangeCacheEntry { + using BufferPtr = InputStream::BufferPtr; + + ReadRange range; + + // The result may be get multiple times, so we use shared_future instead of std::future + BufferPtr buffer; + std::shared_future<void> future; + + RangeCacheEntry() = default; + RangeCacheEntry(const ReadRange& range, BufferPtr buffer, std::future<void> future) + : range(range), buffer(std::move(buffer)), future(std::move(future).share()) {} + + friend bool operator<(const RangeCacheEntry& left, const RangeCacheEntry& right) { Review Comment: Why `friend`? ########## c++/src/io/Cache.cc: ########## @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <cassert> + +#include "Cache.hh" + +namespace orc { + + std::vector<ReadRange> ReadRangeCombiner::coalesce(std::vector<ReadRange> ranges) const { + if (ranges.empty()) { + return ranges; + } + + // Remove zero-sized ranges + auto end = std::remove_if(ranges.begin(), ranges.end(), + [](const ReadRange& range) { return range.length == 0; }); + // Sort in position order + std::sort(ranges.begin(), end, [](const ReadRange& a, const ReadRange& b) { + return a.offset != b.offset ? a.offset < b.offset : a.length > b.length; + }); + + // Remove ranges that overlap 100% + std::vector<ReadRange> uniqueRanges; + uniqueRanges.reserve(ranges.size()); + for (auto it = ranges.begin(); it != end; ++it) { + if (uniqueRanges.empty() || !uniqueRanges.back().contains(*it)) { + uniqueRanges.push_back(*it); + } + } + ranges = std::move(uniqueRanges); + + // Skip further processing if ranges is empty after removing zero-sized ranges. + if (ranges.empty()) { + return ranges; + } + +#ifndef NDEBUG + for (size_t i = 0; i < ranges.size() - 1; ++i) { + const auto& left = ranges[i]; + const auto& right = ranges[i + 1]; + assert(left.offset < right.offset); + assert(!left.contains(right)); + } +#endif + + std::vector<ReadRange> coalesced; + + auto itr = ranges.begin(); + // Ensure ranges is not empty. + assert(itr <= ranges.end()); + + // Start of the current coalesced range and end (exclusive) of previous range. + // Both are initialized with the start of first range which is a placeholder value. + uint64_t coalescedStart = itr->offset; + uint64_t coalescedEnd = coalescedStart + itr->length; + + for (++itr; itr < ranges.end(); ++itr) { + const uint64_t currentRangeStart = itr->offset; + const uint64_t currentRangeEnd = currentRangeStart + itr->length; + + assert(coalescedStart < coalescedEnd); + assert(currentRangeStart < currentRangeEnd); + + // At this point, the coalesced range is [coalesced_start, prev_range_end). + // Stop coalescing if: + // - coalesced range is too large, or + // - distance (hole/gap) between consecutive ranges is too large. + if ((currentRangeEnd - coalescedStart > rangeSizeLimit) || + (currentRangeStart > coalescedEnd + holeSizeLimit)) { + coalesced.push_back({coalescedStart, coalescedEnd - coalescedStart}); + coalescedStart = currentRangeStart; + } + + // Update the prev_range_end with the current range. + coalescedEnd = currentRangeEnd; + } + coalesced.push_back({coalescedStart, coalescedEnd - coalescedStart}); + + assert(coalesced.front().offset == ranges.front().offset); + assert(coalesced.back().offset + coalesced.back().length == + ranges.back().offset + ranges.back().length); + return coalesced; + } + + std::vector<ReadRange> coalesceReadRanges(std::vector<ReadRange> ranges, uint64_t holeSizeLimit, + uint64_t rangeSizeLimit) { + assert(rangeSizeLimit > holeSizeLimit); + + ReadRangeCombiner combiner{holeSizeLimit, rangeSizeLimit}; + return combiner.coalesce(std::move(ranges)); + } + + void ReadRangeCache::cache(std::vector<ReadRange> ranges) { + ranges = coalesceReadRanges(std::move(ranges), options_.holeSizeLimit, options_.rangeSizeLimit); + + std::vector<RangeCacheEntry> newEntries = makeCacheEntries(ranges); + // Add new entries, themselves ordered by offset + if (entries_.size() > 0) { + std::vector<RangeCacheEntry> merged(entries_.size() + newEntries.size()); + std::merge(entries_.begin(), entries_.end(), newEntries.begin(), newEntries.end(), + merged.begin()); + entries_ = std::move(merged); + } else { + entries_ = std::move(newEntries); + } + } + + ReadRangeCache::BufferSlice ReadRangeCache::read(const ReadRange& range) { + if (range.length == 0) { + return {std::make_shared<Buffer>(*memoryPool_, 0), 0, 0}; + } + + const auto it = std::lower_bound(entries_.begin(), entries_.end(), range, + [](const RangeCacheEntry& entry, const ReadRange& range) { + return entry.range.offset + entry.range.length < + range.offset + range.length; + }); + + BufferSlice result{}; + bool hit_cache = false; + if (it != entries_.end() && it->range.contains(range)) { + hit_cache = it->future.valid(); + it->future.get(); + result = BufferSlice{it->buffer, range.offset - it->range.offset, range.length}; + } + + if (metrics_) { + if (hit_cache) + metrics_->ReadRangeCacheHits.fetch_add(1); + else + metrics_->ReadRangeCacheMisses.fetch_add(1); + } + return result; + } + + void ReadRangeCache::evictEntriesBefore(uint64_t boundary) { + auto it = std::lower_bound(entries_.begin(), entries_.end(), boundary, + [](const RangeCacheEntry& entry, uint64_t offset) { + return entry.range.offset + entry.range.length < offset; + }); + entries_.erase(entries_.begin(), it); + } + + std::vector<RangeCacheEntry> ReadRangeCache::makeCacheEntries( Review Comment: nit: make it const or static ########## c++/src/io/Cache.cc: ########## @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <cassert> + +#include "Cache.hh" + +namespace orc { + + std::vector<ReadRange> ReadRangeCombiner::coalesce(std::vector<ReadRange> ranges) const { + if (ranges.empty()) { + return ranges; + } + + // Remove zero-sized ranges + auto end = std::remove_if(ranges.begin(), ranges.end(), + [](const ReadRange& range) { return range.length == 0; }); + // Sort in position order + std::sort(ranges.begin(), end, [](const ReadRange& a, const ReadRange& b) { + return a.offset != b.offset ? a.offset < b.offset : a.length > b.length; + }); + + // Remove ranges that overlap 100% + std::vector<ReadRange> uniqueRanges; + uniqueRanges.reserve(ranges.size()); + for (auto it = ranges.begin(); it != end; ++it) { + if (uniqueRanges.empty() || !uniqueRanges.back().contains(*it)) { + uniqueRanges.push_back(*it); + } + } + ranges = std::move(uniqueRanges); + + // Skip further processing if ranges is empty after removing zero-sized ranges. + if (ranges.empty()) { + return ranges; + } + +#ifndef NDEBUG + for (size_t i = 0; i < ranges.size() - 1; ++i) { + const auto& left = ranges[i]; + const auto& right = ranges[i + 1]; + assert(left.offset < right.offset); + assert(!left.contains(right)); + } +#endif + + std::vector<ReadRange> coalesced; + + auto itr = ranges.begin(); + // Ensure ranges is not empty. + assert(itr <= ranges.end()); Review Comment: this assert is unnecessary ########## c++/src/io/Cache.cc: ########## @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <cassert> + +#include "Cache.hh" + +namespace orc { + + std::vector<ReadRange> ReadRangeCombiner::coalesce(std::vector<ReadRange> ranges) const { + if (ranges.empty()) { + return ranges; + } + + // Remove zero-sized ranges + auto end = std::remove_if(ranges.begin(), ranges.end(), + [](const ReadRange& range) { return range.length == 0; }); + // Sort in position order + std::sort(ranges.begin(), end, [](const ReadRange& a, const ReadRange& b) { + return a.offset != b.offset ? a.offset < b.offset : a.length > b.length; + }); + + // Remove ranges that overlap 100% + std::vector<ReadRange> uniqueRanges; + uniqueRanges.reserve(ranges.size()); + for (auto it = ranges.begin(); it != end; ++it) { + if (uniqueRanges.empty() || !uniqueRanges.back().contains(*it)) { + uniqueRanges.push_back(*it); + } + } + ranges = std::move(uniqueRanges); + + // Skip further processing if ranges is empty after removing zero-sized ranges. + if (ranges.empty()) { + return ranges; + } + +#ifndef NDEBUG + for (size_t i = 0; i < ranges.size() - 1; ++i) { + const auto& left = ranges[i]; + const auto& right = ranges[i + 1]; + assert(left.offset < right.offset); + assert(!left.contains(right)); + } +#endif + + std::vector<ReadRange> coalesced; + + auto itr = ranges.begin(); + // Ensure ranges is not empty. + assert(itr <= ranges.end()); + + // Start of the current coalesced range and end (exclusive) of previous range. + // Both are initialized with the start of first range which is a placeholder value. + uint64_t coalescedStart = itr->offset; + uint64_t coalescedEnd = coalescedStart + itr->length; + + for (++itr; itr < ranges.end(); ++itr) { + const uint64_t currentRangeStart = itr->offset; + const uint64_t currentRangeEnd = currentRangeStart + itr->length; + + assert(coalescedStart < coalescedEnd); + assert(currentRangeStart < currentRangeEnd); + + // At this point, the coalesced range is [coalesced_start, prev_range_end). + // Stop coalescing if: + // - coalesced range is too large, or + // - distance (hole/gap) between consecutive ranges is too large. + if ((currentRangeEnd - coalescedStart > rangeSizeLimit) || + (currentRangeStart > coalescedEnd + holeSizeLimit)) { + coalesced.push_back({coalescedStart, coalescedEnd - coalescedStart}); + coalescedStart = currentRangeStart; + } + + // Update the prev_range_end with the current range. + coalescedEnd = currentRangeEnd; + } + coalesced.push_back({coalescedStart, coalescedEnd - coalescedStart}); + + assert(coalesced.front().offset == ranges.front().offset); + assert(coalesced.back().offset + coalesced.back().length == + ranges.back().offset + ranges.back().length); + return coalesced; + } + + std::vector<ReadRange> coalesceReadRanges(std::vector<ReadRange> ranges, uint64_t holeSizeLimit, + uint64_t rangeSizeLimit) { + assert(rangeSizeLimit > holeSizeLimit); + + ReadRangeCombiner combiner{holeSizeLimit, rangeSizeLimit}; + return combiner.coalesce(std::move(ranges)); + } + + void ReadRangeCache::cache(std::vector<ReadRange> ranges) { + ranges = coalesceReadRanges(std::move(ranges), options_.holeSizeLimit, options_.rangeSizeLimit); + + std::vector<RangeCacheEntry> newEntries = makeCacheEntries(ranges); + // Add new entries, themselves ordered by offset + if (entries_.size() > 0) { + std::vector<RangeCacheEntry> merged(entries_.size() + newEntries.size()); + std::merge(entries_.begin(), entries_.end(), newEntries.begin(), newEntries.end(), + merged.begin()); + entries_ = std::move(merged); + } else { + entries_ = std::move(newEntries); + } + } + + ReadRangeCache::BufferSlice ReadRangeCache::read(const ReadRange& range) { + if (range.length == 0) { + return {std::make_shared<Buffer>(*memoryPool_, 0), 0, 0}; + } + + const auto it = std::lower_bound(entries_.begin(), entries_.end(), range, + [](const RangeCacheEntry& entry, const ReadRange& range) { + return entry.range.offset + entry.range.length < + range.offset + range.length; + }); + + BufferSlice result{}; + bool hit_cache = false; + if (it != entries_.end() && it->range.contains(range)) { + hit_cache = it->future.valid(); + it->future.get(); Review Comment: Should we catch and rethrow an orc::Exception? Should we use timeout here to fallback to direct read? ########## c++/src/io/Cache.cc: ########## @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <cassert> + +#include "Cache.hh" + +namespace orc { + + std::vector<ReadRange> ReadRangeCombiner::coalesce(std::vector<ReadRange> ranges) const { + if (ranges.empty()) { + return ranges; + } + + // Remove zero-sized ranges + auto end = std::remove_if(ranges.begin(), ranges.end(), + [](const ReadRange& range) { return range.length == 0; }); + // Sort in position order + std::sort(ranges.begin(), end, [](const ReadRange& a, const ReadRange& b) { + return a.offset != b.offset ? a.offset < b.offset : a.length > b.length; + }); + + // Remove ranges that overlap 100% + std::vector<ReadRange> uniqueRanges; + uniqueRanges.reserve(ranges.size()); + for (auto it = ranges.begin(); it != end; ++it) { + if (uniqueRanges.empty() || !uniqueRanges.back().contains(*it)) { + uniqueRanges.push_back(*it); + } + } + ranges = std::move(uniqueRanges); + + // Skip further processing if ranges is empty after removing zero-sized ranges. + if (ranges.empty()) { + return ranges; + } + +#ifndef NDEBUG + for (size_t i = 0; i < ranges.size() - 1; ++i) { + const auto& left = ranges[i]; + const auto& right = ranges[i + 1]; + assert(left.offset < right.offset); + assert(!left.contains(right)); + } +#endif + + std::vector<ReadRange> coalesced; + + auto itr = ranges.begin(); + // Ensure ranges is not empty. + assert(itr <= ranges.end()); + + // Start of the current coalesced range and end (exclusive) of previous range. + // Both are initialized with the start of first range which is a placeholder value. + uint64_t coalescedStart = itr->offset; + uint64_t coalescedEnd = coalescedStart + itr->length; + + for (++itr; itr < ranges.end(); ++itr) { Review Comment: We have iterated the ranges for three times: line 31, line 41 and here. It can be done in a single pass after sorting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
