emkornfield commented on code in PR #38867:
URL: https://github.com/apache/arrow/pull/38867#discussion_r1405603852
##########
cpp/src/parquet/column_reader.h:
##########
@@ -302,8 +303,274 @@ class TypedColumnReader : public ColumnReader {
int32_t* dict_len) = 0;
};
+struct Range {
+ static Range unionRange(const Range& left, const Range& right) {
+ if (left.from <= right.from) {
+ if (left.to + 1 >= right.from) {
+ return {left.from, std::max(left.to, right.to)};
+ }
+ } else if (right.to + 1 >= left.from) {
+ return {right.from, std::max(left.to, right.to)};
+ }
+ return {-1, -1};
+ }
+
+ static Range intersection(const Range& left, const Range& right) {
+ if (left.from <= right.from) {
+ if (left.to >= right.from) {
+ return {right.from, std::min(left.to, right.to)};
+ }
+ } else if (right.to >= left.from) {
+ return {left.from, std::min(left.to, right.to)};
+ }
+ return {-1, -1}; // Return a default Range object if no intersection
range found
+ }
+
+ int64_t from;
+ int64_t to;
+
+ Range(const int64_t from_, const int64_t to_) : from(from_), to(to_) {
+ assert(from <= to);
+ }
+
+ size_t count() const { return to - from + 1; }
+
+ bool isBefore(const Range& other) const { return to < other.from; }
+
+ bool isAfter(const Range& other) const { return from > other.to; }
+
+ bool isOverlap(const Range& other) const { return !isBefore(other) &&
!isAfter(other); }
+
+ std::string toString() const {
+ return "[" + std::to_string(from) + ", " + std::to_string(to) + "]";
+ }
+};
+
+class RowRanges {
+ std::vector<Range> ranges;
+
+ public:
+ RowRanges() = default;
+
+ explicit RowRanges(const Range& range) { ranges.push_back(range); }
+
+ RowRanges(const std::vector<Range>& ranges) { this->ranges = ranges; }
+
+ // copy cstr
+ RowRanges(const RowRanges& other) { ranges = other.ranges; }
+
+ RowRanges(RowRanges&& other) noexcept { ranges = std::move(other.ranges); }
+
+ static RowRanges unionRanges(const RowRanges& left, const RowRanges& right) {
+ RowRanges result;
+ auto it1 = left.ranges.begin();
+ auto it2 = right.ranges.begin();
+ if (it2 != right.ranges.end()) {
+ Range range2 = *it2;
+ while (it1 != left.ranges.end()) {
+ Range range1 = *it1;
+ if (range1.isAfter(range2)) {
+ result.add(range2);
+ range2 = range1;
+ const auto tmp = it1;
+ it1 = it2;
+ it2 = tmp;
+ } else {
+ result.add(range1);
+ }
+ ++it1;
+ }
+ result.add(range2);
+ } else {
+ it2 = it1;
+ }
+ while (it2 != right.ranges.end()) {
+ result.add(*it2);
+ ++it2;
+ }
+
+ return result;
+ }
+
+ static RowRanges intersection(const RowRanges& left, const RowRanges& right)
{
+ RowRanges result;
+
+ size_t rightIndex = 0;
+ for (const Range& l : left.ranges) {
+ for (size_t i = rightIndex, n = right.ranges.size(); i < n; ++i) {
+ const Range& r = right.ranges[i];
+ if (l.isBefore(r)) {
+ break;
+ } else if (l.isAfter(r)) {
+ rightIndex = i + 1;
+ continue;
+ }
+ result.add(Range::intersection(l, r));
+ }
+ }
+
+ return result;
+ }
+
+ RowRanges slice(const int64_t from, const int64_t to) const {
+ RowRanges result;
+ for (const Range& range : ranges) {
+ if (range.from >= from && range.to <= to) {
+ result.add(range);
+ }
+ }
+ return result;
+ }
+
+ void add(const Range& range, bool merge = true) {
+ Range rangeToAdd = range;
+ if (merge) {
+ for (int i = static_cast<int>(ranges.size()) - 1; i >= 0; --i) {
+ Range last = ranges[i];
+ if (last.isAfter(range)) {
+ throw ParquetException(range.toString() + " cannot be added to " +
+ this->toString());
+ }
+ const Range u = Range::unionRange(last, rangeToAdd);
+ if (u.from == -1 && u.to == -1) {
+ break;
+ }
+ rangeToAdd = u;
+ ranges.erase(ranges.begin() + i);
+ }
+ } else {
+ if (ranges.size() > 1) assert(rangeToAdd.from > ranges.back().to);
+ }
+ ranges.push_back(rangeToAdd);
+ }
+
+ size_t rowCount() const {
+ size_t cnt = 0;
+ for (const Range& range : ranges) {
+ cnt += range.count();
+ }
+ return cnt;
+ }
+
+ bool isValid() const {
+ if (ranges.size() == 0) {
+ return false;
+ }
+ if (ranges[0].from < 0) {
+ return false;
+ }
+ for (size_t i = 1; i < ranges.size(); i++) {
+ if (ranges[i].from <= ranges[i - 1].to) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ bool isOverlapping(int64_t from, int64_t to) const {
+ const Range searchRange(from, to);
+ return isOverlapping(searchRange);
+ }
+
+ bool isOverlapping(const Range& searchRange) const {
+ auto it = std::lower_bound(
+ ranges.begin(), ranges.end(), searchRange,
+ [](const Range& r1, const Range& r2) { return r1.isBefore(r2); });
+ return it != ranges.end() && !(*it).isAfter(searchRange);
+ }
+
+ std::vector<Range>& getRanges() { return ranges; }
+
+ const Range& operator[](size_t index) const { return ranges[index]; }
+
+ std::string toString() const {
+ std::string result = "[";
+ for (const Range& range : ranges) {
+ result +=
+ "(" + std::to_string(range.from) + ", " + std::to_string(range.to) +
"), ";
+ }
+ if (!ranges.empty()) {
+ result = result.substr(0, result.size() - 2);
+ }
+ result += "]";
+ return result;
+ }
+};
+
+using RowRangesPtr = std::shared_ptr<RowRanges>;
+
namespace internal {
+class PARQUET_EXPORT RecordSkipper {
+ public:
+ RecordSkipper(RowRanges& pages, RowRanges& row_ranges_)
+ : row_ranges(row_ranges_) { // copy row_ranges
+ RowRanges will_process_pages, skip_pages;
+ for (auto& page : pages.getRanges()) {
+ if (!row_ranges.isOverlapping(page)) {
+ skip_pages.add(page, false);
+ }
+ }
+
+ /// Since the skipped pages will be slienly skipped without updating
+ /// current_rg_processed_records or records_read_, we need to pre-process
the row
+ /// ranges as if these skipped pages never existed
+ adjust_ranges(skip_pages, row_ranges);
+
+ total_rows_to_process = pages.rowCount() - skip_pages.rowCount();
+ }
+
+ /// \brief Return the number of records to read or to skip
+ /// if return values is positive, it means to read N records
Review Comment:
is sign reversal really necessary? Couldn't the reader we have a conention
were the first element returned is always records to read and then alternate
between inclusion and exclusion?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]