wgtmac commented on code in PR #38867:
URL: https://github.com/apache/arrow/pull/38867#discussion_r1443997791
##########
cpp/src/parquet/column_reader.h:
##########
@@ -302,7 +300,284 @@ class TypedColumnReader : public ColumnReader {
int32_t* dict_len) = 0;
};
+// Represent a range to read. The range is inclusive on both ends.
+struct IntervalRange {
+ static IntervalRange Intersection(const IntervalRange& left,
+ const IntervalRange& right) {
+ if (left.start <= right.start) {
+ if (left.end >= right.start) {
+ return {right.start, std::min(left.end, right.end)};
+ }
+ } else if (right.end >= left.start) {
+ return {left.start, std::min(left.end, right.end)};
+ }
+ return {-1, -1}; // Return a default Range object if no intersection
range found
+ }
+
+ IntervalRange(const int64_t start_, const int64_t end_) : start(start_),
end(end_) {
+ if (start > end) {
+ throw ParquetException("Invalid range with start: " +
std::to_string(start) +
+ " and end: " + std::to_string(end));
+ }
+ }
+
+ size_t Count() const { return end - start + 1; }
+
+ bool IsBefore(const IntervalRange& other) const { return end < other.start; }
+
+ bool IsAfter(const IntervalRange& other) const { return start > other.end; }
+
+ bool IsOverlap(const IntervalRange& other) const {
+ return !IsBefore(other) && !IsAfter(other);
+ }
+
+ bool IsValid() const { return start >= 0 && end >= 0 && end >= start; }
+
+ std::string ToString() const {
+ return "[" + std::to_string(start) + ", " + std::to_string(end) + "]";
+ }
+
+ // inclusive
+ int64_t start;
+ // inclusive
+ int64_t end;
+};
+
+struct BitmapRange {
+ int64_t offset;
+ // zero added to, if there are less than 64 elements left in the column.
+ uint64_t bitmap;
+};
+
+struct End {};
+
+// Represent a set of ranges to read. The ranges are sorted and
non-overlapping.
+class RowRanges {
Review Comment:
It would be good to relocate the RowRanges related classes to a separate
source file.
##########
cpp/src/parquet/column_reader.h:
##########
@@ -302,7 +300,284 @@ class TypedColumnReader : public ColumnReader {
int32_t* dict_len) = 0;
};
+// Represent a range to read. The range is inclusive on both ends.
+struct IntervalRange {
+ static IntervalRange Intersection(const IntervalRange& left,
+ const IntervalRange& right) {
+ if (left.start <= right.start) {
+ if (left.end >= right.start) {
+ return {right.start, std::min(left.end, right.end)};
+ }
+ } else if (right.end >= left.start) {
+ return {left.start, std::min(left.end, right.end)};
+ }
+ return {-1, -1}; // Return a default Range object if no intersection
range found
+ }
+
+ IntervalRange(const int64_t start_, const int64_t end_) : start(start_),
end(end_) {
+ if (start > end) {
+ throw ParquetException("Invalid range with start: " +
std::to_string(start) +
+ " and end: " + std::to_string(end));
+ }
+ }
+
+ size_t Count() const { return end - start + 1; }
+
+ bool IsBefore(const IntervalRange& other) const { return end < other.start; }
+
+ bool IsAfter(const IntervalRange& other) const { return start > other.end; }
+
+ bool IsOverlap(const IntervalRange& other) const {
+ return !IsBefore(other) && !IsAfter(other);
+ }
+
+ bool IsValid() const { return start >= 0 && end >= 0 && end >= start; }
+
+ std::string ToString() const {
+ return "[" + std::to_string(start) + ", " + std::to_string(end) + "]";
+ }
+
+ // inclusive
+ int64_t start;
+ // inclusive
+ int64_t end;
+};
+
+struct BitmapRange {
+ int64_t offset;
+ // zero added to, if there are less than 64 elements left in the column.
+ uint64_t bitmap;
+};
+
+struct End {};
+
+// Represent a set of ranges to read. The ranges are sorted and
non-overlapping.
+class RowRanges {
+ public:
+ RowRanges() = default;
+
+ explicit RowRanges(const IntervalRange& range) { ranges.push_back(range); }
+
+ RowRanges(const std::vector<IntervalRange>& ranges) { this->ranges = ranges;
}
+
+ RowRanges(const RowRanges& other) { ranges = other.ranges; }
+
+ RowRanges(RowRanges&& other) noexcept { ranges = std::move(other.ranges); }
+
+ static RowRanges Intersection(const RowRanges& left, const RowRanges& right)
{
+ RowRanges result;
+
+ size_t rightIndex = 0;
+ for (const IntervalRange& l : left.ranges) {
+ for (size_t i = rightIndex, n = right.ranges.size(); i < n; ++i) {
+ const IntervalRange& r = right.ranges[i];
+ if (l.IsBefore(r)) {
+ break;
+ } else if (l.IsAfter(r)) {
+ rightIndex = i + 1;
+ continue;
+ }
+ result.Add(IntervalRange::Intersection(l, r));
+ }
+ }
+
+ return result;
+ }
+
+ void Add(const IntervalRange& range) {
+ const IntervalRange rangeToAdd = range;
+ if (ranges.size() > 1 && rangeToAdd.start <= ranges.back().end) {
+ throw ParquetException("Ranges must be added in order");
+ }
+ ranges.push_back(rangeToAdd);
+ }
+
+ size_t RowCount() const {
+ size_t cnt = 0;
+ for (const IntervalRange& range : ranges) {
+ cnt += range.Count();
+ }
+ return cnt;
+ }
+
+ bool IsValid() const {
+ if (ranges.size() == 0) return true;
+ if (ranges[0].start < 0) {
+ return false;
+ }
+ for (size_t i = 1; i < ranges.size(); i++) {
+ if (ranges[i].start <= ranges[i - 1].end) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ bool IsOverlapping(int64_t start, int64_t end) const {
+ const IntervalRange searchRange(start, end);
+ return IsOverlapping(searchRange);
+ }
+
+ bool IsOverlapping(const IntervalRange& searchRange) const {
+ auto it = std::lower_bound(
+ ranges.begin(), ranges.end(), searchRange,
+ [](const IntervalRange& r1, const IntervalRange& r2) { return
r1.IsBefore(r2); });
+ return it != ranges.end() && !(*it).IsAfter(searchRange);
+ }
+
+ std::vector<IntervalRange>& GetRanges() { return ranges; }
+
+ const std::vector<IntervalRange>& GetRanges() const { return ranges; }
+
+ // Split the ranges into N+1 parts at the given split point, where N =
+ // split_points.size() The RowRows object itself is not modified
+ std::vector<RowRanges> SplitAt(const std::vector<int64_t>& split_points)
const {
+ if (split_points.size() == 0) {
+ return {*this};
+ }
+
+ std::vector<RowRanges> result;
+ int64_t last_split_point = -1;
+ for (const int64_t split_point : split_points) {
+ if (split_point <= 0) {
+ throw ParquetException("Invalid split point " +
std::to_string(split_point));
+ }
+ if (split_point <= last_split_point) {
+ throw ParquetException("Split points must be in ascending order");
+ }
+ last_split_point = split_point;
+ }
+
+ RowRanges spaces;
+ for (size_t i = 0; i < split_points.size(); ++i) {
+ auto start = i == 0 ? 0 : split_points[i - 1];
+ auto end = split_points[i] - 1;
+ spaces.Add({start, end});
+ }
+ spaces.Add(
+ {split_points[split_points.size() - 1],
std::numeric_limits<int64_t>::max()});
+
+ for (IntervalRange space : spaces.GetRanges()) {
+ RowRanges intersection = RowRanges::Intersection(RowRanges(space),
*this);
+ result.push_back(intersection);
+ }
+
+ return result;
+ }
+
+ const IntervalRange& operator[](size_t index) const {
+ // check index
+ if (index >= ranges.size() || index < 0) {
+ throw ParquetException("Index out of range");
+ }
+ return ranges[index];
+ }
+
+ RowRanges shift(const int64_t offset) const {
+ RowRanges result;
+ for (const IntervalRange& range : ranges) {
+ result.Add({range.start + offset, range.end + offset});
+ }
+ return result;
+ }
+
+ std::string ToString() const {
+ std::string result = "[";
+ for (const IntervalRange& range : ranges) {
+ result +=
+ "(" + std::to_string(range.start) + ", " + std::to_string(range.end)
+ "), ";
+ }
+ if (!ranges.empty()) {
+ result = result.substr(0, result.size() - 2);
+ }
+ result += "]";
+ return result;
+ }
+
+ /// The following APIs are to be implemented
+ /// Comment out for now to pass compile
+
+ // // Returns a vector of PageLocations that must be read all to get
values for
+ // all included in this range virtual std::vector<PageLocation>
+ // PageIndexesToInclude(const std::vector<PageLocation>& all_pages)
= 0; class
+ // Iterator {
+ // virtual std::variant<IntervalRange, BitmapRange, End>
NextRange() = 0;
+ // };
+ // virtual std::unique_ptr<Iterator> NewIterator() = 0;
+
+ private:
+ std::vector<IntervalRange> ranges;
+};
+
namespace internal {
+class PARQUET_EXPORT RecordSkipper {
+ public:
+ RecordSkipper(RowRanges& pages, const RowRanges& row_ranges_)
+ : row_ranges(row_ranges_) {
+ // copy row_ranges
+ RowRanges will_process_pages, skip_pages;
+ for (auto& page : pages.GetRanges()) {
+ if (!row_ranges.IsOverlapping(page)) {
+ skip_pages.Add(page);
+ }
+ }
+
+ /// Since the skipped pages will be silently skipped without updating
+ /// current_rg_processed_records or records_read_, we need to pre-process
the row
+ /// ranges as if these skipped pages never existed
+ adjust_ranges(skip_pages, row_ranges);
+
+ total_rows_to_process = pages.RowCount() - skip_pages.RowCount();
+ }
+
+ /// \brief Return the number of records to read or to skip
+ /// if return values is positive, it means to read N records
+ /// if return values is negative, it means to skip N records
+ /// if return values is 0, it means end of RG
+ int64_t advise_next(const int64_t current_rg_processed) {
+ if (row_ranges.GetRanges().size() == row_range_idx) {
+ return 0;
+ }
+
+ if (row_ranges[row_range_idx].end < current_rg_processed) {
+ row_range_idx++;
+ if (row_ranges.GetRanges().size() == row_range_idx) {
+ // negative, skip the ramaining rows
+ return current_rg_processed - total_rows_to_process;
+ }
+ }
+
+ if (row_ranges[row_range_idx].start > current_rg_processed) {
+ // negative, skip
+ return current_rg_processed - row_ranges[row_range_idx].start;
+ }
+
+ const auto ret = row_ranges[row_range_idx].end - current_rg_processed + 1;
+ return ret;
+ }
+
+ private:
+ void adjust_ranges(RowRanges& skip_pages, RowRanges& to_adjust) {
+ size_t skipped_rows = 0;
+ auto iter = to_adjust.GetRanges().begin();
+ auto skip_iter = skip_pages.GetRanges().begin();
+ while (iter != to_adjust.GetRanges().end()) {
+ while (skip_iter != skip_pages.GetRanges().end() &&
skip_iter->IsBefore(*iter)) {
+ skipped_rows += skip_iter->Count();
+ ++skip_iter;
+ }
+ iter->start -= skipped_rows;
+ iter->end -= skipped_rows;
+ ++iter;
+ }
+ }
+
+ /// Keep copy of ranges, because adjust_ranges() will modify them
+ RowRanges row_ranges;
+
+ size_t row_range_idx = 0;
+ size_t total_rows_to_process = 0;
Review Comment:
```suggestion
size_t row_range_idx_ = 0;
size_t total_rows_to_process_ = 0;
```
This is the style convection for class member variables.
##########
cpp/src/parquet/arrow/reader.cc:
##########
@@ -512,8 +599,79 @@ class LeafReader : public ColumnReaderImpl {
private:
std::shared_ptr<ChunkedArray> out_;
+
+ void checkAndGetPageRanges(const RowRanges & row_ranges,
+ std::shared_ptr<RowRanges>& page_ranges) const {
Review Comment:
```suggestion
void CheckAndGetPageRanges(const RowRanges& row_ranges,
std::shared_ptr<RowRanges>* page_ranges) const {
```
##########
cpp/src/parquet/column_reader.h:
##########
@@ -302,7 +300,284 @@ class TypedColumnReader : public ColumnReader {
int32_t* dict_len) = 0;
};
+// Represent a range to read. The range is inclusive on both ends.
+struct IntervalRange {
+ static IntervalRange Intersection(const IntervalRange& left,
+ const IntervalRange& right) {
+ if (left.start <= right.start) {
+ if (left.end >= right.start) {
+ return {right.start, std::min(left.end, right.end)};
+ }
+ } else if (right.end >= left.start) {
+ return {left.start, std::min(left.end, right.end)};
+ }
+ return {-1, -1}; // Return a default Range object if no intersection
range found
+ }
+
+ IntervalRange(const int64_t start_, const int64_t end_) : start(start_),
end(end_) {
+ if (start > end) {
+ throw ParquetException("Invalid range with start: " +
std::to_string(start) +
+ " and end: " + std::to_string(end));
+ }
+ }
+
+ size_t Count() const { return end - start + 1; }
+
+ bool IsBefore(const IntervalRange& other) const { return end < other.start; }
+
+ bool IsAfter(const IntervalRange& other) const { return start > other.end; }
+
+ bool IsOverlap(const IntervalRange& other) const {
+ return !IsBefore(other) && !IsAfter(other);
+ }
+
+ bool IsValid() const { return start >= 0 && end >= 0 && end >= start; }
+
+ std::string ToString() const {
+ return "[" + std::to_string(start) + ", " + std::to_string(end) + "]";
+ }
+
+ // inclusive
+ int64_t start;
+ // inclusive
+ int64_t end;
+};
+
+struct BitmapRange {
+ int64_t offset;
+ // zero added to, if there are less than 64 elements left in the column.
+ uint64_t bitmap;
+};
+
+struct End {};
+
+// Represent a set of ranges to read. The ranges are sorted and
non-overlapping.
+class RowRanges {
+ public:
+ RowRanges() = default;
+
+ explicit RowRanges(const IntervalRange& range) { ranges.push_back(range); }
+
+ RowRanges(const std::vector<IntervalRange>& ranges) { this->ranges = ranges;
}
+
+ RowRanges(const RowRanges& other) { ranges = other.ranges; }
+
+ RowRanges(RowRanges&& other) noexcept { ranges = std::move(other.ranges); }
+
+ static RowRanges Intersection(const RowRanges& left, const RowRanges& right)
{
+ RowRanges result;
+
+ size_t rightIndex = 0;
+ for (const IntervalRange& l : left.ranges) {
+ for (size_t i = rightIndex, n = right.ranges.size(); i < n; ++i) {
+ const IntervalRange& r = right.ranges[i];
+ if (l.IsBefore(r)) {
+ break;
+ } else if (l.IsAfter(r)) {
+ rightIndex = i + 1;
+ continue;
+ }
+ result.Add(IntervalRange::Intersection(l, r));
+ }
+ }
+
+ return result;
+ }
+
+ void Add(const IntervalRange& range) {
+ const IntervalRange rangeToAdd = range;
+ if (ranges.size() > 1 && rangeToAdd.start <= ranges.back().end) {
+ throw ParquetException("Ranges must be added in order");
+ }
+ ranges.push_back(rangeToAdd);
+ }
+
+ size_t RowCount() const {
+ size_t cnt = 0;
+ for (const IntervalRange& range : ranges) {
+ cnt += range.Count();
+ }
+ return cnt;
+ }
+
+ bool IsValid() const {
+ if (ranges.size() == 0) return true;
+ if (ranges[0].start < 0) {
+ return false;
+ }
+ for (size_t i = 1; i < ranges.size(); i++) {
+ if (ranges[i].start <= ranges[i - 1].end) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ bool IsOverlapping(int64_t start, int64_t end) const {
+ const IntervalRange searchRange(start, end);
+ return IsOverlapping(searchRange);
+ }
+
+ bool IsOverlapping(const IntervalRange& searchRange) const {
+ auto it = std::lower_bound(
+ ranges.begin(), ranges.end(), searchRange,
+ [](const IntervalRange& r1, const IntervalRange& r2) { return
r1.IsBefore(r2); });
+ return it != ranges.end() && !(*it).IsAfter(searchRange);
+ }
+
+ std::vector<IntervalRange>& GetRanges() { return ranges; }
+
+ const std::vector<IntervalRange>& GetRanges() const { return ranges; }
+
+ // Split the ranges into N+1 parts at the given split point, where N =
+ // split_points.size() The RowRows object itself is not modified
+ std::vector<RowRanges> SplitAt(const std::vector<int64_t>& split_points)
const {
+ if (split_points.size() == 0) {
+ return {*this};
+ }
+
+ std::vector<RowRanges> result;
+ int64_t last_split_point = -1;
+ for (const int64_t split_point : split_points) {
+ if (split_point <= 0) {
+ throw ParquetException("Invalid split point " +
std::to_string(split_point));
+ }
+ if (split_point <= last_split_point) {
+ throw ParquetException("Split points must be in ascending order");
+ }
+ last_split_point = split_point;
+ }
+
+ RowRanges spaces;
+ for (size_t i = 0; i < split_points.size(); ++i) {
+ auto start = i == 0 ? 0 : split_points[i - 1];
+ auto end = split_points[i] - 1;
+ spaces.Add({start, end});
+ }
+ spaces.Add(
+ {split_points[split_points.size() - 1],
std::numeric_limits<int64_t>::max()});
+
+ for (IntervalRange space : spaces.GetRanges()) {
+ RowRanges intersection = RowRanges::Intersection(RowRanges(space),
*this);
+ result.push_back(intersection);
+ }
+
+ return result;
+ }
+
+ const IntervalRange& operator[](size_t index) const {
+ // check index
+ if (index >= ranges.size() || index < 0) {
+ throw ParquetException("Index out of range");
+ }
+ return ranges[index];
+ }
+
+ RowRanges shift(const int64_t offset) const {
+ RowRanges result;
+ for (const IntervalRange& range : ranges) {
+ result.Add({range.start + offset, range.end + offset});
+ }
+ return result;
+ }
+
+ std::string ToString() const {
+ std::string result = "[";
+ for (const IntervalRange& range : ranges) {
+ result +=
+ "(" + std::to_string(range.start) + ", " + std::to_string(range.end)
+ "), ";
+ }
+ if (!ranges.empty()) {
+ result = result.substr(0, result.size() - 2);
+ }
+ result += "]";
+ return result;
+ }
+
+ /// The following APIs are to be implemented
+ /// Comment out for now to pass compile
+
+ // // Returns a vector of PageLocations that must be read all to get
values for
+ // all included in this range virtual std::vector<PageLocation>
+ // PageIndexesToInclude(const std::vector<PageLocation>& all_pages)
= 0; class
+ // Iterator {
+ // virtual std::variant<IntervalRange, BitmapRange, End>
NextRange() = 0;
+ // };
+ // virtual std::unique_ptr<Iterator> NewIterator() = 0;
+
+ private:
+ std::vector<IntervalRange> ranges;
Review Comment:
IMHO, `RowRanges` class should hide the implemetation detail and the private
member `std::vector<IntervalRange> ranges` should not appear here. If we don't
want to pay the cost for virtual class, we can use PIMPL idiom by following
example in `parquet/metadata.h`
##########
cpp/src/parquet/column_reader.h:
##########
@@ -302,7 +300,284 @@ class TypedColumnReader : public ColumnReader {
int32_t* dict_len) = 0;
};
+// Represent a range to read. The range is inclusive on both ends.
+struct IntervalRange {
Review Comment:
Should we simply define the IntervalRange class as below and move any
functions to an individual utility class?
```
struct IntervalRange {
// inclusive
int64_t start;
// inclusive
int64_t end;
};
struct IntervalRangeUtils {
static IntervalRange Intersection(const IntervalRange& rhs,
const IntervalRange& rhs);
static bool IsBefore(const IntervalRange& lhs, const IntervalRange& rhs);
...
}
```
##########
cpp/src/parquet/arrow/reader.h:
##########
@@ -206,6 +219,7 @@ class PARQUET_EXPORT FileReader {
std::shared_ptr<::arrow::RecordBatchReader>* out);
::arrow::Status
GetRecordBatchReader(std::shared_ptr<::arrow::RecordBatchReader>* out);
+
Review Comment:
remove the blank line.
##########
cpp/src/parquet/arrow/reader_internal.h:
##########
@@ -109,6 +113,7 @@ struct ReaderContext {
FileColumnIteratorFactory iterator_factory;
bool filter_leaves;
std::shared_ptr<std::unordered_set<int>> included_leaves;
+ std::shared_ptr<std::vector<RowRanges>> row_ranges_map;
Review Comment:
```suggestion
std::shared_ptr<std::vector<RowRanges>> row_ranges;
```
This does not look like a map.
##########
cpp/src/parquet/arrow/reader_internal.h:
##########
@@ -88,11 +89,14 @@ class FileColumnIterator {
int column_index() const { return column_index_; }
+ int current_row_group() const { return current_rg; }
+
protected:
int column_index_;
ParquetFileReader* reader_;
const SchemaDescriptor* schema_;
std::deque<int> row_groups_;
+ int current_rg = 0;
Review Comment:
```suggestion
int current_row_group_;
```
##########
cpp/src/parquet/arrow/reader.cc:
##########
@@ -72,6 +74,7 @@ using arrow::internal::Iota;
// Help reduce verbosity
using ParquetReader = parquet::ParquetFileReader;
+using parquet::IntervalRange;
Review Comment:
This seems unnecessary
##########
cpp/src/parquet/column_reader.h:
##########
@@ -302,7 +300,284 @@ class TypedColumnReader : public ColumnReader {
int32_t* dict_len) = 0;
};
+// Represent a range to read. The range is inclusive on both ends.
+struct IntervalRange {
+ static IntervalRange Intersection(const IntervalRange& left,
+ const IntervalRange& right) {
+ if (left.start <= right.start) {
+ if (left.end >= right.start) {
+ return {right.start, std::min(left.end, right.end)};
+ }
+ } else if (right.end >= left.start) {
+ return {left.start, std::min(left.end, right.end)};
+ }
+ return {-1, -1}; // Return a default Range object if no intersection
range found
+ }
+
+ IntervalRange(const int64_t start_, const int64_t end_) : start(start_),
end(end_) {
+ if (start > end) {
+ throw ParquetException("Invalid range with start: " +
std::to_string(start) +
+ " and end: " + std::to_string(end));
+ }
+ }
+
+ size_t Count() const { return end - start + 1; }
+
+ bool IsBefore(const IntervalRange& other) const { return end < other.start; }
+
+ bool IsAfter(const IntervalRange& other) const { return start > other.end; }
+
+ bool IsOverlap(const IntervalRange& other) const {
+ return !IsBefore(other) && !IsAfter(other);
+ }
+
+ bool IsValid() const { return start >= 0 && end >= 0 && end >= start; }
+
+ std::string ToString() const {
+ return "[" + std::to_string(start) + ", " + std::to_string(end) + "]";
+ }
+
+ // inclusive
+ int64_t start;
+ // inclusive
+ int64_t end;
+};
+
+struct BitmapRange {
+ int64_t offset;
+ // zero added to, if there are less than 64 elements left in the column.
+ uint64_t bitmap;
+};
+
+struct End {};
+
+// Represent a set of ranges to read. The ranges are sorted and
non-overlapping.
+class RowRanges {
+ public:
+ RowRanges() = default;
+
+ explicit RowRanges(const IntervalRange& range) { ranges.push_back(range); }
+
+ RowRanges(const std::vector<IntervalRange>& ranges) { this->ranges = ranges;
}
+
+ RowRanges(const RowRanges& other) { ranges = other.ranges; }
+
+ RowRanges(RowRanges&& other) noexcept { ranges = std::move(other.ranges); }
+
+ static RowRanges Intersection(const RowRanges& left, const RowRanges& right)
{
+ RowRanges result;
+
+ size_t rightIndex = 0;
+ for (const IntervalRange& l : left.ranges) {
+ for (size_t i = rightIndex, n = right.ranges.size(); i < n; ++i) {
+ const IntervalRange& r = right.ranges[i];
+ if (l.IsBefore(r)) {
+ break;
+ } else if (l.IsAfter(r)) {
+ rightIndex = i + 1;
+ continue;
+ }
+ result.Add(IntervalRange::Intersection(l, r));
+ }
+ }
+
+ return result;
+ }
+
+ void Add(const IntervalRange& range) {
+ const IntervalRange rangeToAdd = range;
+ if (ranges.size() > 1 && rangeToAdd.start <= ranges.back().end) {
+ throw ParquetException("Ranges must be added in order");
+ }
+ ranges.push_back(rangeToAdd);
+ }
+
+ size_t RowCount() const {
+ size_t cnt = 0;
+ for (const IntervalRange& range : ranges) {
+ cnt += range.Count();
+ }
+ return cnt;
+ }
+
+ bool IsValid() const {
+ if (ranges.size() == 0) return true;
+ if (ranges[0].start < 0) {
+ return false;
+ }
+ for (size_t i = 1; i < ranges.size(); i++) {
+ if (ranges[i].start <= ranges[i - 1].end) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ bool IsOverlapping(int64_t start, int64_t end) const {
+ const IntervalRange searchRange(start, end);
+ return IsOverlapping(searchRange);
+ }
+
+ bool IsOverlapping(const IntervalRange& searchRange) const {
+ auto it = std::lower_bound(
+ ranges.begin(), ranges.end(), searchRange,
+ [](const IntervalRange& r1, const IntervalRange& r2) { return
r1.IsBefore(r2); });
+ return it != ranges.end() && !(*it).IsAfter(searchRange);
+ }
+
+ std::vector<IntervalRange>& GetRanges() { return ranges; }
+
+ const std::vector<IntervalRange>& GetRanges() const { return ranges; }
+
+ // Split the ranges into N+1 parts at the given split point, where N =
+ // split_points.size() The RowRows object itself is not modified
+ std::vector<RowRanges> SplitAt(const std::vector<int64_t>& split_points)
const {
+ if (split_points.size() == 0) {
+ return {*this};
+ }
+
+ std::vector<RowRanges> result;
+ int64_t last_split_point = -1;
+ for (const int64_t split_point : split_points) {
+ if (split_point <= 0) {
+ throw ParquetException("Invalid split point " +
std::to_string(split_point));
+ }
+ if (split_point <= last_split_point) {
+ throw ParquetException("Split points must be in ascending order");
+ }
+ last_split_point = split_point;
+ }
+
+ RowRanges spaces;
+ for (size_t i = 0; i < split_points.size(); ++i) {
+ auto start = i == 0 ? 0 : split_points[i - 1];
+ auto end = split_points[i] - 1;
+ spaces.Add({start, end});
+ }
+ spaces.Add(
+ {split_points[split_points.size() - 1],
std::numeric_limits<int64_t>::max()});
+
+ for (IntervalRange space : spaces.GetRanges()) {
+ RowRanges intersection = RowRanges::Intersection(RowRanges(space),
*this);
+ result.push_back(intersection);
+ }
+
+ return result;
+ }
+
+ const IntervalRange& operator[](size_t index) const {
+ // check index
+ if (index >= ranges.size() || index < 0) {
+ throw ParquetException("Index out of range");
+ }
+ return ranges[index];
+ }
+
+ RowRanges shift(const int64_t offset) const {
+ RowRanges result;
+ for (const IntervalRange& range : ranges) {
+ result.Add({range.start + offset, range.end + offset});
+ }
+ return result;
+ }
+
+ std::string ToString() const {
+ std::string result = "[";
+ for (const IntervalRange& range : ranges) {
+ result +=
+ "(" + std::to_string(range.start) + ", " + std::to_string(range.end)
+ "), ";
+ }
+ if (!ranges.empty()) {
+ result = result.substr(0, result.size() - 2);
+ }
+ result += "]";
+ return result;
+ }
+
+ /// The following APIs are to be implemented
+ /// Comment out for now to pass compile
+
+ // // Returns a vector of PageLocations that must be read all to get
values for
+ // all included in this range virtual std::vector<PageLocation>
+ // PageIndexesToInclude(const std::vector<PageLocation>& all_pages)
= 0; class
+ // Iterator {
+ // virtual std::variant<IntervalRange, BitmapRange, End>
NextRange() = 0;
+ // };
+ // virtual std::unique_ptr<Iterator> NewIterator() = 0;
+
+ private:
+ std::vector<IntervalRange> ranges;
+};
+
namespace internal {
+class PARQUET_EXPORT RecordSkipper {
+ public:
+ RecordSkipper(RowRanges& pages, const RowRanges& row_ranges_)
+ : row_ranges(row_ranges_) {
+ // copy row_ranges
+ RowRanges will_process_pages, skip_pages;
+ for (auto& page : pages.GetRanges()) {
+ if (!row_ranges.IsOverlapping(page)) {
+ skip_pages.Add(page);
+ }
+ }
+
+ /// Since the skipped pages will be silently skipped without updating
+ /// current_rg_processed_records or records_read_, we need to pre-process
the row
+ /// ranges as if these skipped pages never existed
+ adjust_ranges(skip_pages, row_ranges);
+
+ total_rows_to_process = pages.RowCount() - skip_pages.RowCount();
+ }
+
+ /// \brief Return the number of records to read or to skip
+ /// if return values is positive, it means to read N records
+ /// if return values is negative, it means to skip N records
+ /// if return values is 0, it means end of RG
+ int64_t advise_next(const int64_t current_rg_processed) {
Review Comment:
```suggestion
int64_t AdviseNext(const int64_t current_rg_processed) {
```
##########
cpp/src/parquet/column_reader.h:
##########
@@ -302,7 +300,284 @@ class TypedColumnReader : public ColumnReader {
int32_t* dict_len) = 0;
};
+// Represent a range to read. The range is inclusive on both ends.
+struct IntervalRange {
Review Comment:
IMO, the above separation would make the public API `RowRanges` looks much
simpler to external user and those utility functions are what we need only in
the implementation.
##########
cpp/src/parquet/column_reader.h:
##########
@@ -302,7 +300,284 @@ class TypedColumnReader : public ColumnReader {
int32_t* dict_len) = 0;
};
+// Represent a range to read. The range is inclusive on both ends.
+struct IntervalRange {
+ static IntervalRange Intersection(const IntervalRange& left,
+ const IntervalRange& right) {
+ if (left.start <= right.start) {
+ if (left.end >= right.start) {
+ return {right.start, std::min(left.end, right.end)};
+ }
+ } else if (right.end >= left.start) {
+ return {left.start, std::min(left.end, right.end)};
+ }
+ return {-1, -1}; // Return a default Range object if no intersection
range found
+ }
+
+ IntervalRange(const int64_t start_, const int64_t end_) : start(start_),
end(end_) {
+ if (start > end) {
+ throw ParquetException("Invalid range with start: " +
std::to_string(start) +
+ " and end: " + std::to_string(end));
+ }
+ }
+
+ size_t Count() const { return end - start + 1; }
+
+ bool IsBefore(const IntervalRange& other) const { return end < other.start; }
+
+ bool IsAfter(const IntervalRange& other) const { return start > other.end; }
+
+ bool IsOverlap(const IntervalRange& other) const {
+ return !IsBefore(other) && !IsAfter(other);
+ }
+
+ bool IsValid() const { return start >= 0 && end >= 0 && end >= start; }
+
+ std::string ToString() const {
+ return "[" + std::to_string(start) + ", " + std::to_string(end) + "]";
+ }
+
+ // inclusive
+ int64_t start;
+ // inclusive
+ int64_t end;
+};
+
+struct BitmapRange {
+ int64_t offset;
+ // zero added to, if there are less than 64 elements left in the column.
+ uint64_t bitmap;
+};
+
+struct End {};
+
+// Represent a set of ranges to read. The ranges are sorted and
non-overlapping.
+class RowRanges {
+ public:
+ RowRanges() = default;
+
+ explicit RowRanges(const IntervalRange& range) { ranges.push_back(range); }
+
+ RowRanges(const std::vector<IntervalRange>& ranges) { this->ranges = ranges;
}
+
+ RowRanges(const RowRanges& other) { ranges = other.ranges; }
+
+ RowRanges(RowRanges&& other) noexcept { ranges = std::move(other.ranges); }
+
+ static RowRanges Intersection(const RowRanges& left, const RowRanges& right)
{
+ RowRanges result;
+
+ size_t rightIndex = 0;
+ for (const IntervalRange& l : left.ranges) {
+ for (size_t i = rightIndex, n = right.ranges.size(); i < n; ++i) {
+ const IntervalRange& r = right.ranges[i];
+ if (l.IsBefore(r)) {
+ break;
+ } else if (l.IsAfter(r)) {
+ rightIndex = i + 1;
+ continue;
+ }
+ result.Add(IntervalRange::Intersection(l, r));
+ }
+ }
+
+ return result;
+ }
+
+ void Add(const IntervalRange& range) {
+ const IntervalRange rangeToAdd = range;
+ if (ranges.size() > 1 && rangeToAdd.start <= ranges.back().end) {
+ throw ParquetException("Ranges must be added in order");
+ }
+ ranges.push_back(rangeToAdd);
+ }
+
+ size_t RowCount() const {
+ size_t cnt = 0;
+ for (const IntervalRange& range : ranges) {
+ cnt += range.Count();
+ }
+ return cnt;
+ }
+
+ bool IsValid() const {
+ if (ranges.size() == 0) return true;
+ if (ranges[0].start < 0) {
+ return false;
+ }
+ for (size_t i = 1; i < ranges.size(); i++) {
+ if (ranges[i].start <= ranges[i - 1].end) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ bool IsOverlapping(int64_t start, int64_t end) const {
+ const IntervalRange searchRange(start, end);
+ return IsOverlapping(searchRange);
+ }
+
+ bool IsOverlapping(const IntervalRange& searchRange) const {
+ auto it = std::lower_bound(
+ ranges.begin(), ranges.end(), searchRange,
+ [](const IntervalRange& r1, const IntervalRange& r2) { return
r1.IsBefore(r2); });
+ return it != ranges.end() && !(*it).IsAfter(searchRange);
+ }
+
+ std::vector<IntervalRange>& GetRanges() { return ranges; }
+
+ const std::vector<IntervalRange>& GetRanges() const { return ranges; }
+
+ // Split the ranges into N+1 parts at the given split point, where N =
+ // split_points.size() The RowRows object itself is not modified
+ std::vector<RowRanges> SplitAt(const std::vector<int64_t>& split_points)
const {
+ if (split_points.size() == 0) {
+ return {*this};
+ }
+
+ std::vector<RowRanges> result;
+ int64_t last_split_point = -1;
+ for (const int64_t split_point : split_points) {
+ if (split_point <= 0) {
+ throw ParquetException("Invalid split point " +
std::to_string(split_point));
+ }
+ if (split_point <= last_split_point) {
+ throw ParquetException("Split points must be in ascending order");
+ }
+ last_split_point = split_point;
+ }
+
+ RowRanges spaces;
+ for (size_t i = 0; i < split_points.size(); ++i) {
+ auto start = i == 0 ? 0 : split_points[i - 1];
+ auto end = split_points[i] - 1;
+ spaces.Add({start, end});
+ }
+ spaces.Add(
+ {split_points[split_points.size() - 1],
std::numeric_limits<int64_t>::max()});
+
+ for (IntervalRange space : spaces.GetRanges()) {
+ RowRanges intersection = RowRanges::Intersection(RowRanges(space),
*this);
+ result.push_back(intersection);
+ }
+
+ return result;
+ }
+
+ const IntervalRange& operator[](size_t index) const {
Review Comment:
ditto
##########
cpp/src/parquet/arrow/reader.cc:
##########
@@ -512,8 +599,79 @@ class LeafReader : public ColumnReaderImpl {
private:
std::shared_ptr<ChunkedArray> out_;
+
+ void checkAndGetPageRanges(const RowRanges & row_ranges,
+ std::shared_ptr<RowRanges>& page_ranges) const {
+ // check offset exists
+ const auto rg_pg_index_reader =
+
ctx_->reader->GetPageIndexReader()->RowGroup(input_->current_row_group());
+
+ if (!rg_pg_index_reader) {
+ throw ParquetException(
+ "Attempting to read with Ranges but Page Index is not found for Row "
+ "Group: " +
+ std::to_string(input_->current_row_group()));
+ }
+ const auto offset_index =
rg_pg_index_reader->GetOffsetIndex(input_->column_index());
+
+ if (!offset_index) {
+ throw ParquetException(
+ "Attempting to read with Ranges but Offset index is not found for "
+ "column: " +
+ field_->name());
+ }
+
+ const auto page_locations = offset_index->page_locations();
+ page_ranges = std::make_shared<RowRanges>();
+ for (size_t i = 0; i < page_locations.size() - 1; i++) {
+ page_ranges->Add(
+ {page_locations[i].first_row_index, page_locations[i +
1].first_row_index - 1});
+ }
+ if (page_locations.size() >= 1) {
+ page_ranges->Add(
+ {page_locations[page_locations.size() - 1].first_row_index,
+
ctx_->reader->metadata()->RowGroup(input_->current_row_group())->num_rows() -
Review Comment:
This cost (`RowGroup(input_->current_row_group())`) may not be negligible if
there are many columns to do this repeatedly.
##########
cpp/src/parquet/arrow/reader.cc:
##########
@@ -17,6 +17,8 @@
#include "parquet/arrow/reader.h"
+#include <parquet/page_index.h>
Review Comment:
```suggestion
#include "parquet/page_index.h"
```
##########
cpp/src/parquet/column_reader.h:
##########
@@ -302,7 +300,284 @@ class TypedColumnReader : public ColumnReader {
int32_t* dict_len) = 0;
};
+// Represent a range to read. The range is inclusive on both ends.
+struct IntervalRange {
+ static IntervalRange Intersection(const IntervalRange& left,
+ const IntervalRange& right) {
+ if (left.start <= right.start) {
+ if (left.end >= right.start) {
+ return {right.start, std::min(left.end, right.end)};
+ }
+ } else if (right.end >= left.start) {
+ return {left.start, std::min(left.end, right.end)};
+ }
+ return {-1, -1}; // Return a default Range object if no intersection
range found
+ }
+
+ IntervalRange(const int64_t start_, const int64_t end_) : start(start_),
end(end_) {
+ if (start > end) {
+ throw ParquetException("Invalid range with start: " +
std::to_string(start) +
+ " and end: " + std::to_string(end));
+ }
+ }
+
+ size_t Count() const { return end - start + 1; }
+
+ bool IsBefore(const IntervalRange& other) const { return end < other.start; }
+
+ bool IsAfter(const IntervalRange& other) const { return start > other.end; }
+
+ bool IsOverlap(const IntervalRange& other) const {
+ return !IsBefore(other) && !IsAfter(other);
+ }
+
+ bool IsValid() const { return start >= 0 && end >= 0 && end >= start; }
+
+ std::string ToString() const {
+ return "[" + std::to_string(start) + ", " + std::to_string(end) + "]";
+ }
+
+ // inclusive
+ int64_t start;
+ // inclusive
+ int64_t end;
+};
+
+struct BitmapRange {
+ int64_t offset;
+ // zero added to, if there are less than 64 elements left in the column.
+ uint64_t bitmap;
+};
+
+struct End {};
+
+// Represent a set of ranges to read. The ranges are sorted and
non-overlapping.
+class RowRanges {
+ public:
+ RowRanges() = default;
+
+ explicit RowRanges(const IntervalRange& range) { ranges.push_back(range); }
+
+ RowRanges(const std::vector<IntervalRange>& ranges) { this->ranges = ranges;
}
+
+ RowRanges(const RowRanges& other) { ranges = other.ranges; }
+
+ RowRanges(RowRanges&& other) noexcept { ranges = std::move(other.ranges); }
+
+ static RowRanges Intersection(const RowRanges& left, const RowRanges& right)
{
+ RowRanges result;
+
+ size_t rightIndex = 0;
+ for (const IntervalRange& l : left.ranges) {
+ for (size_t i = rightIndex, n = right.ranges.size(); i < n; ++i) {
+ const IntervalRange& r = right.ranges[i];
+ if (l.IsBefore(r)) {
+ break;
+ } else if (l.IsAfter(r)) {
+ rightIndex = i + 1;
+ continue;
+ }
+ result.Add(IntervalRange::Intersection(l, r));
+ }
+ }
+
+ return result;
+ }
+
+ void Add(const IntervalRange& range) {
+ const IntervalRange rangeToAdd = range;
+ if (ranges.size() > 1 && rangeToAdd.start <= ranges.back().end) {
+ throw ParquetException("Ranges must be added in order");
+ }
+ ranges.push_back(rangeToAdd);
+ }
+
+ size_t RowCount() const {
+ size_t cnt = 0;
+ for (const IntervalRange& range : ranges) {
+ cnt += range.Count();
+ }
+ return cnt;
+ }
+
+ bool IsValid() const {
+ if (ranges.size() == 0) return true;
+ if (ranges[0].start < 0) {
+ return false;
+ }
+ for (size_t i = 1; i < ranges.size(); i++) {
+ if (ranges[i].start <= ranges[i - 1].end) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ bool IsOverlapping(int64_t start, int64_t end) const {
+ const IntervalRange searchRange(start, end);
+ return IsOverlapping(searchRange);
+ }
+
+ bool IsOverlapping(const IntervalRange& searchRange) const {
+ auto it = std::lower_bound(
+ ranges.begin(), ranges.end(), searchRange,
+ [](const IntervalRange& r1, const IntervalRange& r2) { return
r1.IsBefore(r2); });
+ return it != ranges.end() && !(*it).IsAfter(searchRange);
+ }
+
+ std::vector<IntervalRange>& GetRanges() { return ranges; }
Review Comment:
This one and below expose all ranges as InteveralRange which does not look
right. The reason is that they are public APIs but we do not promise all ranges
are IntervalRanges.
##########
cpp/src/parquet/arrow/reader.cc:
##########
@@ -451,6 +504,40 @@ class RowGroupReaderImpl : public RowGroupReader {
// ----------------------------------------------------------------------
// Column reader implementations
+struct RowRangesPageFilter {
Review Comment:
Please add some comment to explain what it does.
##########
cpp/src/parquet/arrow/reader.cc:
##########
@@ -336,19 +342,66 @@ class FileReaderImpl : public FileReader {
return ReadRowGroup(i, Iota(reader_->metadata()->num_columns()), table);
}
+ // This is a internal API owned by FileReaderImpl, not exposed in FileReader
+ Status GetRecordBatchReaderWithRowRanges(const std::vector<int>&
row_group_indices,
+ const std::vector<int>&
column_indices,
+ const
std::shared_ptr<std::vector<RowRanges>> & row_ranges_map,
+ std::unique_ptr<RecordBatchReader>*
out);
+
+ Status GetRecordBatchReader(const RowRanges& rows_to_return,
+ const std::vector<int>& column_indices,
+ std::unique_ptr<RecordBatchReader>* out)
override {
+ const auto metadata = reader_->metadata();
+ // check if the row ranges are valid
+ if (!rows_to_return.IsValid()) {
+ return Status::Invalid("The provided row range is invalid, keep it
monotone and non-interleaving: " +
+ rows_to_return.ToString());
+ }
+ // check if the row ranges are within the row group boundaries
+ if (rows_to_return.RowCount() != 0 &&
rows_to_return.GetRanges().back().end >= metadata->num_rows()) {
+ return Status::Invalid("The provided row range " +
rows_to_return.ToString() +
+ " exceeds the number of rows in the file: " +
+ std::to_string(metadata->num_rows()));
+ }
+
+ std::vector<int64_t> split_points;
+ int64_t rows_so_far = 0;
+ for (int i = 0 ; i < metadata->num_row_groups() - 1; i++) {
+ rows_so_far += metadata->RowGroup(i)->num_rows();
+ split_points.push_back(rows_so_far);
+ }
+ // We'll assign a RowRanges for each RG, even if it's not required to
return any rows
+ const std::vector<RowRanges> splits = rows_to_return.SplitAt(split_points);
+ // Call row_ranges_map because array index is the row group index
+ const std::shared_ptr<std::vector<RowRanges>> row_ranges_map =
+ std::make_shared<std::vector<RowRanges>>();
+ rows_so_far = 0;
+ std::vector<int> row_group_indices;
+ for (int i = 0 ; i < metadata->num_row_groups(); i++) {
+ row_ranges_map->push_back(splits[i].shift(-rows_so_far));
+ rows_so_far += metadata->RowGroup(i)->num_rows();
+ if (row_ranges_map->at(i).RowCount() > 0)
+ row_group_indices.push_back(i);
+ }
+
+ return GetRecordBatchReaderWithRowRanges(row_group_indices,
column_indices, row_ranges_map, out);
Review Comment:
Why not merge row_group_indices and row_ranges_map into a map? Thus we can
eliminate empty row range of some row groups.
##########
cpp/src/parquet/arrow/reader.cc:
##########
@@ -451,6 +504,40 @@ class RowGroupReaderImpl : public RowGroupReader {
// ----------------------------------------------------------------------
// Column reader implementations
+struct RowRangesPageFilter {
+ explicit RowRangesPageFilter(const RowRanges& row_ranges_,
Review Comment:
Please remove underscore from input parameters.
##########
cpp/src/parquet/arrow/reader.cc:
##########
@@ -451,6 +504,40 @@ class RowGroupReaderImpl : public RowGroupReader {
// ----------------------------------------------------------------------
// Column reader implementations
+struct RowRangesPageFilter {
+ explicit RowRangesPageFilter(const RowRanges& row_ranges_,
+ const std::shared_ptr<RowRanges>& page_ranges_)
+ : row_ranges(row_ranges_), page_ranges(page_ranges_) {
+
+ if (page_ranges == nullptr || page_ranges->GetRanges().size() == 0) {
+ throw ParquetException("Page ranges is empty");
+ }
+ }
+
+ bool operator()(const DataPageStats& stats) {
+ ++page_range_idx;
+
+ IntervalRange current_page_range = (*page_ranges)[page_range_idx];
Review Comment:
Could we find some way to not directly assume all ranges are IntervalRange?
Probably we can use the iterator of RowRanges and throw when a BitmapRange is
obtained for now. Otherwise it would be difficult for future refactoring.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]