binmahone commented on code in PR #38867:
URL: https://github.com/apache/arrow/pull/38867#discussion_r1451680864
##########
cpp/src/parquet/column_reader.h:
##########
@@ -302,7 +300,284 @@ class TypedColumnReader : public ColumnReader {
int32_t* dict_len) = 0;
};
+// Represent a range to read. The range is inclusive on both ends.
+struct IntervalRange {
+ static IntervalRange Intersection(const IntervalRange& left,
+ const IntervalRange& right) {
+ if (left.start <= right.start) {
+ if (left.end >= right.start) {
+ return {right.start, std::min(left.end, right.end)};
+ }
+ } else if (right.end >= left.start) {
+ return {left.start, std::min(left.end, right.end)};
+ }
+ return {-1, -1}; // Return a default Range object if no intersection
range found
+ }
+
+ IntervalRange(const int64_t start_, const int64_t end_) : start(start_),
end(end_) {
+ if (start > end) {
+ throw ParquetException("Invalid range with start: " +
std::to_string(start) +
+ " and end: " + std::to_string(end));
+ }
+ }
+
+ size_t Count() const { return end - start + 1; }
+
+ bool IsBefore(const IntervalRange& other) const { return end < other.start; }
+
+ bool IsAfter(const IntervalRange& other) const { return start > other.end; }
+
+ bool IsOverlap(const IntervalRange& other) const {
+ return !IsBefore(other) && !IsAfter(other);
+ }
+
+ bool IsValid() const { return start >= 0 && end >= 0 && end >= start; }
+
+ std::string ToString() const {
+ return "[" + std::to_string(start) + ", " + std::to_string(end) + "]";
+ }
+
+ // inclusive
+ int64_t start;
+ // inclusive
+ int64_t end;
+};
+
+struct BitmapRange {
+ int64_t offset;
+ // zero added to, if there are less than 64 elements left in the column.
+ uint64_t bitmap;
+};
+
+struct End {};
+
+// Represent a set of ranges to read. The ranges are sorted and
non-overlapping.
+class RowRanges {
+ public:
+ RowRanges() = default;
+
+ explicit RowRanges(const IntervalRange& range) { ranges.push_back(range); }
+
+ RowRanges(const std::vector<IntervalRange>& ranges) { this->ranges = ranges;
}
+
+ RowRanges(const RowRanges& other) { ranges = other.ranges; }
+
+ RowRanges(RowRanges&& other) noexcept { ranges = std::move(other.ranges); }
+
+ static RowRanges Intersection(const RowRanges& left, const RowRanges& right)
{
+ RowRanges result;
+
+ size_t rightIndex = 0;
+ for (const IntervalRange& l : left.ranges) {
+ for (size_t i = rightIndex, n = right.ranges.size(); i < n; ++i) {
+ const IntervalRange& r = right.ranges[i];
+ if (l.IsBefore(r)) {
+ break;
+ } else if (l.IsAfter(r)) {
+ rightIndex = i + 1;
+ continue;
+ }
+ result.Add(IntervalRange::Intersection(l, r));
+ }
+ }
+
+ return result;
+ }
+
+ void Add(const IntervalRange& range) {
+ const IntervalRange rangeToAdd = range;
+ if (ranges.size() > 1 && rangeToAdd.start <= ranges.back().end) {
+ throw ParquetException("Ranges must be added in order");
+ }
+ ranges.push_back(rangeToAdd);
+ }
+
+ size_t RowCount() const {
+ size_t cnt = 0;
+ for (const IntervalRange& range : ranges) {
+ cnt += range.Count();
+ }
+ return cnt;
+ }
+
+ bool IsValid() const {
+ if (ranges.size() == 0) return true;
+ if (ranges[0].start < 0) {
+ return false;
+ }
+ for (size_t i = 1; i < ranges.size(); i++) {
+ if (ranges[i].start <= ranges[i - 1].end) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ bool IsOverlapping(int64_t start, int64_t end) const {
+ const IntervalRange searchRange(start, end);
+ return IsOverlapping(searchRange);
+ }
+
+ bool IsOverlapping(const IntervalRange& searchRange) const {
+ auto it = std::lower_bound(
+ ranges.begin(), ranges.end(), searchRange,
+ [](const IntervalRange& r1, const IntervalRange& r2) { return
r1.IsBefore(r2); });
+ return it != ranges.end() && !(*it).IsAfter(searchRange);
+ }
+
+ std::vector<IntervalRange>& GetRanges() { return ranges; }
+
+ const std::vector<IntervalRange>& GetRanges() const { return ranges; }
+
+ // Split the ranges into N+1 parts at the given split point, where N =
+ // split_points.size() The RowRows object itself is not modified
+ std::vector<RowRanges> SplitAt(const std::vector<int64_t>& split_points)
const {
+ if (split_points.size() == 0) {
+ return {*this};
+ }
+
+ std::vector<RowRanges> result;
+ int64_t last_split_point = -1;
+ for (const int64_t split_point : split_points) {
+ if (split_point <= 0) {
+ throw ParquetException("Invalid split point " +
std::to_string(split_point));
+ }
+ if (split_point <= last_split_point) {
+ throw ParquetException("Split points must be in ascending order");
+ }
+ last_split_point = split_point;
+ }
+
+ RowRanges spaces;
+ for (size_t i = 0; i < split_points.size(); ++i) {
+ auto start = i == 0 ? 0 : split_points[i - 1];
+ auto end = split_points[i] - 1;
+ spaces.Add({start, end});
+ }
+ spaces.Add(
+ {split_points[split_points.size() - 1],
std::numeric_limits<int64_t>::max()});
+
+ for (IntervalRange space : spaces.GetRanges()) {
+ RowRanges intersection = RowRanges::Intersection(RowRanges(space),
*this);
+ result.push_back(intersection);
+ }
+
+ return result;
+ }
+
+ const IntervalRange& operator[](size_t index) const {
+ // check index
+ if (index >= ranges.size() || index < 0) {
+ throw ParquetException("Index out of range");
+ }
+ return ranges[index];
+ }
+
+ RowRanges shift(const int64_t offset) const {
+ RowRanges result;
+ for (const IntervalRange& range : ranges) {
+ result.Add({range.start + offset, range.end + offset});
+ }
+ return result;
+ }
+
+ std::string ToString() const {
+ std::string result = "[";
+ for (const IntervalRange& range : ranges) {
+ result +=
+ "(" + std::to_string(range.start) + ", " + std::to_string(range.end)
+ "), ";
+ }
+ if (!ranges.empty()) {
+ result = result.substr(0, result.size() - 2);
+ }
+ result += "]";
+ return result;
+ }
+
+ /// The following APIs are to be implemented
+ /// Comment out for now to pass compile
+
+ // // Returns a vector of PageLocations that must be read all to get
values for
+ // all included in this range virtual std::vector<PageLocation>
+ // PageIndexesToInclude(const std::vector<PageLocation>& all_pages)
= 0; class
+ // Iterator {
+ // virtual std::variant<IntervalRange, BitmapRange, End>
NextRange() = 0;
+ // };
+ // virtual std::unique_ptr<Iterator> NewIterator() = 0;
+
+ private:
+ std::vector<IntervalRange> ranges;
+};
+
namespace internal {
+class PARQUET_EXPORT RecordSkipper {
+ public:
+ RecordSkipper(RowRanges& pages, const RowRanges& row_ranges_)
+ : row_ranges(row_ranges_) {
+ // copy row_ranges
+ RowRanges will_process_pages, skip_pages;
+ for (auto& page : pages.GetRanges()) {
+ if (!row_ranges.IsOverlapping(page)) {
+ skip_pages.Add(page);
+ }
+ }
+
+ /// Since the skipped pages will be silently skipped without updating
+ /// current_rg_processed_records or records_read_, we need to pre-process
the row
+ /// ranges as if these skipped pages never existed
+ adjust_ranges(skip_pages, row_ranges);
+
+ total_rows_to_process = pages.RowCount() - skip_pages.RowCount();
+ }
+
+ /// \brief Return the number of records to read or to skip
+ /// if return values is positive, it means to read N records
+ /// if return values is negative, it means to skip N records
+ /// if return values is 0, it means end of RG
+ int64_t advise_next(const int64_t current_rg_processed) {
+ if (row_ranges.GetRanges().size() == row_range_idx) {
+ return 0;
+ }
+
+ if (row_ranges[row_range_idx].end < current_rg_processed) {
+ row_range_idx++;
+ if (row_ranges.GetRanges().size() == row_range_idx) {
+ // negative, skip the ramaining rows
+ return current_rg_processed - total_rows_to_process;
+ }
+ }
+
+ if (row_ranges[row_range_idx].start > current_rg_processed) {
+ // negative, skip
+ return current_rg_processed - row_ranges[row_range_idx].start;
+ }
+
+ const auto ret = row_ranges[row_range_idx].end - current_rg_processed + 1;
+ return ret;
+ }
+
+ private:
+ void adjust_ranges(RowRanges& skip_pages, RowRanges& to_adjust) {
+ size_t skipped_rows = 0;
+ auto iter = to_adjust.GetRanges().begin();
+ auto skip_iter = skip_pages.GetRanges().begin();
+ while (iter != to_adjust.GetRanges().end()) {
+ while (skip_iter != skip_pages.GetRanges().end() &&
skip_iter->IsBefore(*iter)) {
+ skipped_rows += skip_iter->Count();
+ ++skip_iter;
+ }
+ iter->start -= skipped_rows;
+ iter->end -= skipped_rows;
+ ++iter;
+ }
+ }
+
+ /// Keep copy of ranges, because adjust_ranges() will modify them
+ RowRanges row_ranges;
+
+ size_t row_range_idx = 0;
+ size_t total_rows_to_process = 0;
Review Comment:
Got. The convention seems not enforced everywhere, for example:
ReaderContext::reader
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]