wgtmac commented on code in PR #39608: URL: https://github.com/apache/arrow/pull/39608#discussion_r1468457012
########## cpp/src/parquet/row_range.h: ########## @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module contains the logical parquet-cpp types (independent of Thrift +// structures), schema nodes, and related type tools + +#pragma once +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { + +// Represent a range to read. The range is inclusive on both ends. Review Comment: ```suggestion // Represent an interval row range, which is inclusive on both ends. ``` ########## cpp/src/parquet/CMakeLists.txt: ########## @@ -162,6 +162,7 @@ set(PARQUET_SRCS arrow/writer.cc bloom_filter.cc bloom_filter_reader.cc + row_range.cc Review Comment: Please sort it in alphabetical order. ########## cpp/src/parquet/row_range.h: ########## @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module contains the logical parquet-cpp types (independent of Thrift +// structures), schema nodes, and related type tools + +#pragma once +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { + +// Represent a range to read. The range is inclusive on both ends. +struct IntervalRange { + IntervalRange(const int64_t start_, const int64_t end_) : start(start_), end(end_) { Review Comment: ```suggestion IntervalRange(const int64_t startRow, const int64_t endRow) : start(startRow), end(endRow) { ``` Normally in the arrow repo, variable name with `_` suffix are non-public class members. We should avoid naming like this in the function parameter. ########## cpp/src/parquet/row_range.h: ########## @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module contains the logical parquet-cpp types (independent of Thrift +// structures), schema nodes, and related type tools + +#pragma once +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { + +// Represent a range to read. The range is inclusive on both ends. +struct IntervalRange { + IntervalRange(const int64_t start_, const int64_t end_) : start(start_), end(end_) { + if (start > end) { + throw ParquetException("Invalid range with start: " + std::to_string(start) + + " bigger than end: " + std::to_string(end)); + } + } + + // inclusive + int64_t start = -1; + // inclusive + int64_t end = -1; +}; + +class IntervalRangeUtils { + public: + static IntervalRange Intersection(const IntervalRange& left, + const IntervalRange& right) { + if (left.start <= right.start) { + if (left.end >= right.start) { + return {right.start, std::min(left.end, right.end)}; + } + } else if (right.end >= left.start) { + return {left.start, std::min(left.end, right.end)}; + } + return {-1, -1}; // Return a default Range object if no intersection range found Review Comment: See my above comment, defining a `kInvalidIntervalRange` looks better. ########## cpp/src/parquet/row_range.h: ########## @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module contains the logical parquet-cpp types (independent of Thrift +// structures), schema nodes, and related type tools + +#pragma once +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { + +// Represent a range to read. The range is inclusive on both ends. +struct IntervalRange { + IntervalRange(const int64_t start_, const int64_t end_) : start(start_), end(end_) { + if (start > end) { + throw ParquetException("Invalid range with start: " + std::to_string(start) + + " bigger than end: " + std::to_string(end)); + } + } + + // inclusive + int64_t start = -1; + // inclusive + int64_t end = -1; +}; + +class IntervalRangeUtils { + public: + static IntervalRange Intersection(const IntervalRange& left, + const IntervalRange& right) { + if (left.start <= right.start) { + if (left.end >= right.start) { + return {right.start, std::min(left.end, right.end)}; + } + } else if (right.end >= left.start) { + return {left.start, std::min(left.end, right.end)}; + } + return {-1, -1}; // Return a default Range object if no intersection range found + } + + static std::string ToString(const IntervalRange& range) { + return "(" + std::to_string(range.start) + ", " + std::to_string(range.end) + ")"; + } + + static bool IsValid(const IntervalRange& range) { + return range.start >= 0 && range.end >= 0 && range.end >= range.start; + } + + static size_t Count(const IntervalRange& range) { + if (!IsValid(range)) { + throw ParquetException("Invalid range: " + ToString(range)); + } + return range.end - range.start + 1; + } + + static bool IsBefore(const IntervalRange& self, const IntervalRange& other) { + return self.end < other.start; + } + + static bool IsAfter(const IntervalRange& self, const IntervalRange& other) { + return self.start > other.end; + } + + static bool IsOverlap(const IntervalRange& self, const IntervalRange& other) { + return !IsBefore(self, other) && !IsAfter(self, other); + } +}; + +struct BitmapRange { + int64_t offset; + // zero added to, if there are less than 64 elements left in the column. + uint64_t bitmap; +}; + +struct End {}; + +// Represent a set of ranges to read. The ranges are sorted and non-overlapping. Review Comment: ```suggestion // Represent a set of non-overlapping row range in the ascending order. ``` It does not have to be used solely for read. ########## cpp/src/parquet/row_range.h: ########## @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module contains the logical parquet-cpp types (independent of Thrift +// structures), schema nodes, and related type tools + +#pragma once +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { + +// Represent a range to read. The range is inclusive on both ends. +struct IntervalRange { + IntervalRange(const int64_t start_, const int64_t end_) : start(start_), end(end_) { + if (start > end) { + throw ParquetException("Invalid range with start: " + std::to_string(start) + + " bigger than end: " + std::to_string(end)); + } + } + + // inclusive + int64_t start = -1; + // inclusive + int64_t end = -1; +}; + +class IntervalRangeUtils { + public: + static IntervalRange Intersection(const IntervalRange& left, + const IntervalRange& right) { + if (left.start <= right.start) { + if (left.end >= right.start) { + return {right.start, std::min(left.end, right.end)}; + } + } else if (right.end >= left.start) { + return {left.start, std::min(left.end, right.end)}; + } + return {-1, -1}; // Return a default Range object if no intersection range found + } + + static std::string ToString(const IntervalRange& range) { + return "(" + std::to_string(range.start) + ", " + std::to_string(range.end) + ")"; + } + + static bool IsValid(const IntervalRange& range) { + return range.start >= 0 && range.end >= 0 && range.end >= range.start; + } + + static size_t Count(const IntervalRange& range) { + if (!IsValid(range)) { + throw ParquetException("Invalid range: " + ToString(range)); + } + return range.end - range.start + 1; + } + + static bool IsBefore(const IntervalRange& self, const IntervalRange& other) { + return self.end < other.start; + } + + static bool IsAfter(const IntervalRange& self, const IntervalRange& other) { + return self.start > other.end; + } + + static bool IsOverlap(const IntervalRange& self, const IntervalRange& other) { + return !IsBefore(self, other) && !IsAfter(self, other); + } +}; + +struct BitmapRange { Review Comment: ```suggestion struct PARQUET_EXPORT BitmapRange { ``` ########## cpp/src/parquet/row_range.cc: ########## @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/row_range.h" + +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { +// ---------------------------------------------------------------------- +// RowRanges and ins implementations +bool IsValid(const std::vector<IntervalRange>& ranges) { Review Comment: This function should be moved into an anonymous namespace. ########## cpp/src/parquet/row_range.h: ########## @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module contains the logical parquet-cpp types (independent of Thrift +// structures), schema nodes, and related type tools + +#pragma once +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { + +// Represent a range to read. The range is inclusive on both ends. +struct IntervalRange { + IntervalRange(const int64_t start_, const int64_t end_) : start(start_), end(end_) { + if (start > end) { + throw ParquetException("Invalid range with start: " + std::to_string(start) + + " bigger than end: " + std::to_string(end)); + } + } + + // inclusive + int64_t start = -1; + // inclusive + int64_t end = -1; +}; + +class IntervalRangeUtils { + public: + static IntervalRange Intersection(const IntervalRange& left, + const IntervalRange& right) { + if (left.start <= right.start) { + if (left.end >= right.start) { + return {right.start, std::min(left.end, right.end)}; + } + } else if (right.end >= left.start) { + return {left.start, std::min(left.end, right.end)}; + } + return {-1, -1}; // Return a default Range object if no intersection range found + } + + static std::string ToString(const IntervalRange& range) { + return "(" + std::to_string(range.start) + ", " + std::to_string(range.end) + ")"; + } + + static bool IsValid(const IntervalRange& range) { + return range.start >= 0 && range.end >= 0 && range.end >= range.start; + } + + static size_t Count(const IntervalRange& range) { + if (!IsValid(range)) { + throw ParquetException("Invalid range: " + ToString(range)); + } + return range.end - range.start + 1; + } + + static bool IsBefore(const IntervalRange& self, const IntervalRange& other) { + return self.end < other.start; + } + + static bool IsAfter(const IntervalRange& self, const IntervalRange& other) { + return self.start > other.end; + } + + static bool IsOverlap(const IntervalRange& self, const IntervalRange& other) { + return !IsBefore(self, other) && !IsAfter(self, other); + } +}; + +struct BitmapRange { + int64_t offset; + // zero added to, if there are less than 64 elements left in the column. + uint64_t bitmap; +}; + +struct End {}; + +// Represent a set of ranges to read. The ranges are sorted and non-overlapping. +class RowRanges { + public: + virtual ~RowRanges() = default; + /// \brief Total number of rows in the row ranges. + virtual size_t num_rows() const = 0; + /// \brief First row in the ranges + virtual int64_t first_row() const = 0; + /// \brief Last row in the ranges + virtual int64_t last_row() const = 0; + /// \brief Whether the given range from start to end overlaps with the row ranges. + virtual bool IsOverlapping(int64_t start, int64_t end) const = 0; + /// \brief Split the row ranges into sub row ranges according to the + /// specified number of rows per sub row ranges. A typical use case is + /// to convert file based RowRanges to row group based RowRanges. + /// + /// \param num_rows_per_sub_ranges number of rows per sub row range. + virtual std::vector<std::unique_ptr<RowRanges>> SplitByRowRange( + const std::vector<int64_t>& num_rows_per_sub_ranges) const = 0; + /// \brief Readable string representation + virtual std::string ToString() const = 0; + + class Iterator { + public: + virtual std::variant<IntervalRange, BitmapRange, End> NextRange() = 0; + virtual ~Iterator() = default; + }; + /// \brief Create an iterator to iterate over the ranges + virtual std::unique_ptr<Iterator> NewIterator() const = 0; +}; + +class IntervalRanges : public RowRanges { Review Comment: I still think `IntervalRanges` and `IntervalRowRangesIterator` below should be relocated to an internal header file. WDYT? ########## cpp/src/parquet/row_range.cc: ########## @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/row_range.h" + +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { +// ---------------------------------------------------------------------- +// RowRanges and ins implementations +bool IsValid(const std::vector<IntervalRange>& ranges) { + if (ranges.size() == 0) return true; + if (ranges[0].start < 0) { + return false; + } + for (size_t i = 0; i < ranges.size(); i++) { + if (!IntervalRangeUtils::IsValid(ranges[i])) { + return false; + } + } + for (size_t i = 1; i < ranges.size(); i++) { + if (ranges[i].start <= ranges[i - 1].end) { + return false; + } + } + return true; +} + +IntervalRanges::IntervalRanges() = default; + +IntervalRanges::IntervalRanges(const IntervalRange& range) { + ranges_.push_back(range); + if (!IsValid(ranges_)) { Review Comment: It would be better to directly check the input range. The error message is misleading in this case because we have only one interval range here. ########## cpp/src/parquet/row_range.cc: ########## @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/row_range.h" + +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { +// ---------------------------------------------------------------------- +// RowRanges and ins implementations +bool IsValid(const std::vector<IntervalRange>& ranges) { + if (ranges.size() == 0) return true; + if (ranges[0].start < 0) { + return false; + } + for (size_t i = 0; i < ranges.size(); i++) { + if (!IntervalRangeUtils::IsValid(ranges[i])) { + return false; + } + } + for (size_t i = 1; i < ranges.size(); i++) { + if (ranges[i].start <= ranges[i - 1].end) { + return false; + } + } + return true; +} + +IntervalRanges::IntervalRanges() = default; + +IntervalRanges::IntervalRanges(const IntervalRange& range) { + ranges_.push_back(range); + if (!IsValid(ranges_)) { + throw ParquetException("Invalid range with start: " + std::to_string(range.start) + + " and end: " + std::to_string(range.end) + + ", keep it monotone and non-interleaving"); + } +} + +IntervalRanges::IntervalRanges(const std::vector<IntervalRange>& ranges) { + this->ranges_ = ranges; + if (!IsValid(ranges_)) { + throw ParquetException("Invalid ranges: " + this->IntervalRanges::ToString() + + ", keep it monotone and non-interleaving"); + } +} + +std::unique_ptr<RowRanges::Iterator> IntervalRanges::NewIterator() const { + return std::make_unique<IntervalRowRangesIterator>(ranges_); +} + +size_t IntervalRanges::num_rows() const { + size_t cnt = 0; + for (const IntervalRange& range : ranges_) { + cnt += IntervalRangeUtils::Count(range); + } + return cnt; +} + +int64_t IntervalRanges::first_row() const { + if (ranges_.empty()) { + throw ParquetException("first_row() called on empty IntervalRanges"); + } + return ranges_.front().start; +} + +int64_t IntervalRanges::last_row() const { + if (ranges_.empty()) { + throw ParquetException("last_row() called on empty IntervalRanges"); + } + return ranges_.back().end; +} + +bool IntervalRanges::IsOverlapping(const int64_t start, const int64_t end) const { + auto searchRange = IntervalRange{start, end}; + auto it = std::lower_bound(ranges_.begin(), ranges_.end(), searchRange, + [](const IntervalRange& r1, const IntervalRange& r2) { + return IntervalRangeUtils::IsBefore(r1, r2); + }); + return it != ranges_.end() && !IntervalRangeUtils::IsAfter(*it, searchRange); +} + +std::string IntervalRanges::ToString() const { + std::string result = "["; + for (const IntervalRange& range : ranges_) { + result += IntervalRangeUtils::ToString(range) + ", "; + } + if (!ranges_.empty()) { + result = result.substr(0, result.size() - 2); + } + result += "]"; + return result; +} + +std::vector<std::unique_ptr<RowRanges>> IntervalRanges::SplitByRowRange( + const std::vector<int64_t>& num_rows_per_sub_ranges) const { + if (num_rows_per_sub_ranges.size() <= 1) { + std::unique_ptr<RowRanges> single = + std::make_unique<IntervalRanges>(*this); // return a copy of itself + auto ret = std::vector<std::unique_ptr<RowRanges>>(); + ret.push_back(std::move(single)); + return ret; + } + + std::vector<std::unique_ptr<RowRanges>> result; + + IntervalRanges spaces; + int64_t rows_so_far = 0; + for (size_t i = 0; i < num_rows_per_sub_ranges.size(); ++i) { + auto start = rows_so_far; + rows_so_far += num_rows_per_sub_ranges[i]; + auto end = rows_so_far - 1; + spaces.Add({start, end}); + } + + // each RG's row range forms a space, we need to adjust RowRanges in each space to + // zero based. + for (IntervalRange space : spaces.GetRanges()) { + auto intersection = Intersection(IntervalRanges(space), *this); + + std::unique_ptr<IntervalRanges> zero_based_ranges = + std::make_unique<IntervalRanges>(); + for (const IntervalRange& range : intersection.GetRanges()) { + zero_based_ranges->Add({range.start - space.start, range.end - space.start}); + } + result.push_back(std::move(zero_based_ranges)); + } + + return result; +} + +IntervalRanges IntervalRanges::Intersection(const IntervalRanges& left, + const IntervalRanges& right) { + IntervalRanges result; + + size_t rightIndex = 0; + for (const IntervalRange& l : left.ranges_) { + for (size_t i = rightIndex, n = right.ranges_.size(); i < n; ++i) { + const IntervalRange& r = right.ranges_[i]; + if (IntervalRangeUtils::IsBefore(l, r)) { + break; + } else if (IntervalRangeUtils::IsAfter(l, r)) { + rightIndex = i + 1; + continue; + } + result.Add(IntervalRangeUtils::Intersection(l, r)); + } + } + + return result; +} + +void IntervalRanges::Add(const IntervalRange& range) { + const IntervalRange rangeToAdd = range; + if (ranges_.size() > 1 && rangeToAdd.start <= ranges_.back().end) { + throw ParquetException("Ranges must be added in order"); Review Comment: ```suggestion throw ParquetException("Ranges must be added in the ascending order"); ``` ########## cpp/src/parquet/row_range_test.cc: ########## @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include <gtest/gtest.h> +#include "parquet/column_reader.h" + +using parquet::IntervalRange; +using parquet::IntervalRanges; + +class RowRangesTest : public ::testing::Test { + protected: + IntervalRanges row_ranges; +}; + +TEST_F(RowRangesTest, EmptyRG_ReturnsOriginalRowRanges) { Review Comment: Avoid using `RG` concept in this test? ########## cpp/src/parquet/column_reader.h: ########## @@ -304,6 +305,30 @@ class TypedColumnReader : public ColumnReader { namespace internal { +// A RecordSkipper is used to skip uncessary rows within each pages. Review Comment: Should we rename it to RowGroupRecordSkipper? At least we need to add a comment to describe this assumption. ########## cpp/src/parquet/row_range.h: ########## @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module contains the logical parquet-cpp types (independent of Thrift +// structures), schema nodes, and related type tools + +#pragma once +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { + +// Represent a range to read. The range is inclusive on both ends. +struct IntervalRange { + IntervalRange(const int64_t start_, const int64_t end_) : start(start_), end(end_) { + if (start > end) { + throw ParquetException("Invalid range with start: " + std::to_string(start) + + " bigger than end: " + std::to_string(end)); + } + } + + // inclusive + int64_t start = -1; + // inclusive + int64_t end = -1; +}; + +class IntervalRangeUtils { + public: + static IntervalRange Intersection(const IntervalRange& left, + const IntervalRange& right) { + if (left.start <= right.start) { + if (left.end >= right.start) { + return {right.start, std::min(left.end, right.end)}; + } + } else if (right.end >= left.start) { + return {left.start, std::min(left.end, right.end)}; + } + return {-1, -1}; // Return a default Range object if no intersection range found + } + + static std::string ToString(const IntervalRange& range) { + return "(" + std::to_string(range.start) + ", " + std::to_string(range.end) + ")"; Review Comment: ```suggestion return "[" + std::to_string(range.start) + ", " + std::to_string(range.end) + "]"; ``` Would be better to reflect the inclusion? ########## cpp/src/parquet/column_reader.cc: ########## @@ -2300,5 +2322,71 @@ std::shared_ptr<RecordReader> RecordReader::Make(const ColumnDescriptor* descr, return nullptr; } +RecordSkipper::RecordSkipper(IntervalRanges& pages, const RowRanges& orig_row_ranges) { + // copy row_ranges + IntervalRanges skip_pages; + for (auto& page : pages.GetRanges()) { + if (!orig_row_ranges.IsOverlapping(page.start, page.end)) { + skip_pages.Add(page); + } + } + + AdjustRanges(skip_pages, orig_row_ranges, row_ranges_); + range_iter_ = row_ranges_->NewIterator(); + current_range_variant = range_iter_->NextRange(); + + total_rows_to_process_ = pages.num_rows() - skip_pages.num_rows(); +} + +int64_t RecordSkipper::AdviseNext(const int64_t current_rg_processed) { + if (current_range_variant.index() == 2) { + return 0; + } + + auto& current_range = std::get<IntervalRange>(current_range_variant); + + if (current_range.end < current_rg_processed) { + current_range_variant = range_iter_->NextRange(); + if (current_range_variant.index() == 2) { + // negative, skip the ramaining rows + return current_rg_processed - total_rows_to_process_; + } + } + + current_range = std::get<IntervalRange>(current_range_variant); + + if (current_range.start > current_rg_processed) { + // negative, skip + return current_rg_processed - current_range.start; + } + + const auto ret = current_range.end - current_rg_processed + 1; + return ret; Review Comment: ```suggestion return current_range.end - current_rg_processed + 1; ``` ########## cpp/src/parquet/column_reader.h: ########## @@ -304,6 +305,30 @@ class TypedColumnReader : public ColumnReader { namespace internal { +// A RecordSkipper is used to skip uncessary rows within each pages. +class PARQUET_EXPORT RecordSkipper { + public: + RecordSkipper(IntervalRanges& pages, const RowRanges& orig_row_ranges); + /// Return the number of records to read or to skip + /// if return values is positive, it means to read N records + /// if return values is negative, it means to skip N records + /// if return values is 0, it means end of RG + int64_t AdviseNext(const int64_t current_rg_processed); + + private: + /// Since the skipped pages will be silently skipped without updating + /// current_rg_processed_records or records_read_, we need to pre-process the row + /// ranges as if these skipped pages never existed + static void AdjustRanges(IntervalRanges& skip_pages, const RowRanges& orig_row_ranges, Review Comment: Why not defining it as ``` static std::unique_ptr<RowRanges> AdjustRanges(const IntervalRanges& row_ranges_to_skip, const RowRanges& requested_row_ranges); ``` ########## cpp/src/parquet/arrow/reader.cc: ########## @@ -325,19 +331,61 @@ class FileReaderImpl : public FileReader { return ReadRowGroup(i, Iota(reader_->metadata()->num_columns()), table); } + // This is a internal API owned by FileReaderImpl, not exposed in FileReader Review Comment: Please move it under anonymous namespace. ########## cpp/src/parquet/arrow/reader.cc: ########## @@ -325,19 +331,61 @@ class FileReaderImpl : public FileReader { return ReadRowGroup(i, Iota(reader_->metadata()->num_columns()), table); } + // This is a internal API owned by FileReaderImpl, not exposed in FileReader + Status GetRecordBatchReaderWithRowRanges( + const std::vector<int>& row_group_indices, const std::vector<int>& column_indices, + const std::shared_ptr<std::vector<std::unique_ptr<RowRanges>>>& row_ranges_per_rg, + std::unique_ptr<RecordBatchReader>* out); + + Status GetRecordBatchReader(const RowRanges& rows_to_return, + const std::vector<int>& column_indices, + std::unique_ptr<RecordBatchReader>* out) override { + const auto metadata = reader_->metadata(); + // check if the row ranges are within the row group boundaries + if (rows_to_return.num_rows() != 0 && + rows_to_return.last_row() >= metadata->num_rows()) { + return Status::Invalid("The provided row range " + rows_to_return.ToString() + + " exceeds the number of rows in the file: " + + std::to_string(metadata->num_rows())); + } + if (rows_to_return.num_rows() == 0) { + return GetRecordBatchReaderWithRowRanges({}, column_indices, {}, out); + } + + std::vector<int64_t> rows_per_rg; + for (int i = 0; i < metadata->num_row_groups(); i++) { + rows_per_rg.push_back(metadata->RowGroup(i)->num_rows()); + } + // We'll assign a RowRanges for each RG, even if it's not required to return any rows + std::vector<std::unique_ptr<RowRanges>> row_ranges_per_rg = + rows_to_return.SplitByRowRange(rows_per_rg); Review Comment: Is it too early to split the RowRanges into row groups? We can probably do this lazily for each row group. For example, we can start with row group 0 and test if it falls into the range. If true, compute its overlapping range and move all remaining ranges for the next round (row group 1, 2, etc.) In this way, we can make `GetFieldReaders` much simpler and less confusing. ########## cpp/src/parquet/row_range.h: ########## @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module contains the logical parquet-cpp types (independent of Thrift +// structures), schema nodes, and related type tools + +#pragma once +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { + +// Represent a range to read. The range is inclusive on both ends. +struct IntervalRange { + IntervalRange(const int64_t start_, const int64_t end_) : start(start_), end(end_) { + if (start > end) { + throw ParquetException("Invalid range with start: " + std::to_string(start) + + " bigger than end: " + std::to_string(end)); + } + } + + // inclusive + int64_t start = -1; + // inclusive + int64_t end = -1; +}; + +class IntervalRangeUtils { + public: + static IntervalRange Intersection(const IntervalRange& left, + const IntervalRange& right) { + if (left.start <= right.start) { + if (left.end >= right.start) { + return {right.start, std::min(left.end, right.end)}; + } + } else if (right.end >= left.start) { + return {left.start, std::min(left.end, right.end)}; + } + return {-1, -1}; // Return a default Range object if no intersection range found + } + + static std::string ToString(const IntervalRange& range) { + return "(" + std::to_string(range.start) + ", " + std::to_string(range.end) + ")"; + } + + static bool IsValid(const IntervalRange& range) { + return range.start >= 0 && range.end >= 0 && range.end >= range.start; + } + + static size_t Count(const IntervalRange& range) { Review Comment: ```suggestion static size_t RowCount(const IntervalRange& range) { ``` ########## cpp/src/parquet/row_range.h: ########## @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module contains the logical parquet-cpp types (independent of Thrift +// structures), schema nodes, and related type tools + +#pragma once +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { + +// Represent a range to read. The range is inclusive on both ends. +struct IntervalRange { Review Comment: ```suggestion struct PARQUET_EXPORT IntervalRange { ``` ########## cpp/src/parquet/row_range.h: ########## @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module contains the logical parquet-cpp types (independent of Thrift +// structures), schema nodes, and related type tools + +#pragma once +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { + +// Represent a range to read. The range is inclusive on both ends. +struct IntervalRange { + IntervalRange(const int64_t start_, const int64_t end_) : start(start_), end(end_) { + if (start > end) { + throw ParquetException("Invalid range with start: " + std::to_string(start) + + " bigger than end: " + std::to_string(end)); + } + } + + // inclusive + int64_t start = -1; + // inclusive + int64_t end = -1; +}; + +class IntervalRangeUtils { + public: + static IntervalRange Intersection(const IntervalRange& left, + const IntervalRange& right) { + if (left.start <= right.start) { + if (left.end >= right.start) { + return {right.start, std::min(left.end, right.end)}; + } + } else if (right.end >= left.start) { + return {left.start, std::min(left.end, right.end)}; + } + return {-1, -1}; // Return a default Range object if no intersection range found + } + + static std::string ToString(const IntervalRange& range) { + return "(" + std::to_string(range.start) + ", " + std::to_string(range.end) + ")"; + } + + static bool IsValid(const IntervalRange& range) { + return range.start >= 0 && range.end >= 0 && range.end >= range.start; + } + + static size_t Count(const IntervalRange& range) { + if (!IsValid(range)) { + throw ParquetException("Invalid range: " + ToString(range)); + } + return range.end - range.start + 1; + } + + static bool IsBefore(const IntervalRange& self, const IntervalRange& other) { + return self.end < other.start; + } + + static bool IsAfter(const IntervalRange& self, const IntervalRange& other) { + return self.start > other.end; + } + + static bool IsOverlap(const IntervalRange& self, const IntervalRange& other) { + return !IsBefore(self, other) && !IsAfter(self, other); + } +}; + +struct BitmapRange { + int64_t offset; + // zero added to, if there are less than 64 elements left in the column. + uint64_t bitmap; +}; + +struct End {}; + +// Represent a set of ranges to read. The ranges are sorted and non-overlapping. +class RowRanges { + public: + virtual ~RowRanges() = default; + /// \brief Total number of rows in the row ranges. Review Comment: Please leave one blank line between functions. Same for below. ########## cpp/src/parquet/row_range.h: ########## @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module contains the logical parquet-cpp types (independent of Thrift +// structures), schema nodes, and related type tools + +#pragma once +#include <variant> Review Comment: ```suggestion #pragma once #include <variant> ``` We need to leave a blank line here. ########## cpp/src/parquet/row_range.h: ########## @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module contains the logical parquet-cpp types (independent of Thrift +// structures), schema nodes, and related type tools + +#pragma once +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { + +// Represent a range to read. The range is inclusive on both ends. +struct IntervalRange { + IntervalRange(const int64_t start_, const int64_t end_) : start(start_), end(end_) { + if (start > end) { + throw ParquetException("Invalid range with start: " + std::to_string(start) + + " bigger than end: " + std::to_string(end)); + } + } + + // inclusive + int64_t start = -1; + // inclusive + int64_t end = -1; +}; + +class IntervalRangeUtils { Review Comment: Same as other comments, this should be moved to a separate `row_range_internal.h` header file and not be exposed to end user. ########## cpp/src/parquet/row_range.h: ########## @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module contains the logical parquet-cpp types (independent of Thrift +// structures), schema nodes, and related type tools + +#pragma once +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { + +// Represent a range to read. The range is inclusive on both ends. +struct IntervalRange { + IntervalRange(const int64_t start_, const int64_t end_) : start(start_), end(end_) { + if (start > end) { + throw ParquetException("Invalid range with start: " + std::to_string(start) + + " bigger than end: " + std::to_string(end)); + } + } + + // inclusive + int64_t start = -1; Review Comment: Do you mean [-1,-1] is an invalid range? It looks a little bit weird to define an invalid range by default. What about marking an invalid range simply by checking if `start >= end`? Or we can define a special invalid range like `constexpr IntervalRange kInvalidIntervalRange = {-1, -1};` and do not allow creating any other invalid range via the constructor. ########## cpp/src/parquet/row_range.cc: ########## @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/row_range.h" + +#include <variant> + +#include "parquet/exception.h" Review Comment: ditto ########## cpp/src/parquet/row_range.h: ########## @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module contains the logical parquet-cpp types (independent of Thrift +// structures), schema nodes, and related type tools + +#pragma once +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { + +// Represent a range to read. The range is inclusive on both ends. +struct IntervalRange { + IntervalRange(const int64_t start_, const int64_t end_) : start(start_), end(end_) { + if (start > end) { + throw ParquetException("Invalid range with start: " + std::to_string(start) + + " bigger than end: " + std::to_string(end)); + } + } + + // inclusive + int64_t start = -1; + // inclusive + int64_t end = -1; +}; + +class IntervalRangeUtils { + public: + static IntervalRange Intersection(const IntervalRange& left, + const IntervalRange& right) { + if (left.start <= right.start) { + if (left.end >= right.start) { + return {right.start, std::min(left.end, right.end)}; + } + } else if (right.end >= left.start) { + return {left.start, std::min(left.end, right.end)}; + } + return {-1, -1}; // Return a default Range object if no intersection range found + } + + static std::string ToString(const IntervalRange& range) { + return "(" + std::to_string(range.start) + ", " + std::to_string(range.end) + ")"; + } + + static bool IsValid(const IntervalRange& range) { + return range.start >= 0 && range.end >= 0 && range.end >= range.start; + } + + static size_t Count(const IntervalRange& range) { + if (!IsValid(range)) { + throw ParquetException("Invalid range: " + ToString(range)); + } + return range.end - range.start + 1; + } + + static bool IsBefore(const IntervalRange& self, const IntervalRange& other) { + return self.end < other.start; + } + + static bool IsAfter(const IntervalRange& self, const IntervalRange& other) { + return self.start > other.end; + } + + static bool IsOverlap(const IntervalRange& self, const IntervalRange& other) { Review Comment: Rename to `IsOverlapped` or `IsOverlapping`. ########## cpp/src/parquet/row_range.cc: ########## @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/row_range.h" + +#include <variant> Review Comment: This is redundant as it is declared in the header. ########## cpp/src/parquet/row_range.cc: ########## @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/row_range.h" + +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { +// ---------------------------------------------------------------------- +// RowRanges and ins implementations +bool IsValid(const std::vector<IntervalRange>& ranges) { + if (ranges.size() == 0) return true; Review Comment: ```suggestion if (ranges.empty()) return true; ``` ########## cpp/src/parquet/row_range.h: ########## @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module contains the logical parquet-cpp types (independent of Thrift +// structures), schema nodes, and related type tools + +#pragma once +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { + +// Represent a range to read. The range is inclusive on both ends. +struct IntervalRange { + IntervalRange(const int64_t start_, const int64_t end_) : start(start_), end(end_) { + if (start > end) { + throw ParquetException("Invalid range with start: " + std::to_string(start) + + " bigger than end: " + std::to_string(end)); + } + } + + // inclusive + int64_t start = -1; + // inclusive + int64_t end = -1; +}; + +class IntervalRangeUtils { + public: + static IntervalRange Intersection(const IntervalRange& left, + const IntervalRange& right) { + if (left.start <= right.start) { + if (left.end >= right.start) { + return {right.start, std::min(left.end, right.end)}; + } + } else if (right.end >= left.start) { + return {left.start, std::min(left.end, right.end)}; + } + return {-1, -1}; // Return a default Range object if no intersection range found + } + + static std::string ToString(const IntervalRange& range) { + return "(" + std::to_string(range.start) + ", " + std::to_string(range.end) + ")"; + } + + static bool IsValid(const IntervalRange& range) { + return range.start >= 0 && range.end >= 0 && range.end >= range.start; + } + + static size_t Count(const IntervalRange& range) { + if (!IsValid(range)) { + throw ParquetException("Invalid range: " + ToString(range)); + } + return range.end - range.start + 1; + } + + static bool IsBefore(const IntervalRange& self, const IntervalRange& other) { + return self.end < other.start; + } + + static bool IsAfter(const IntervalRange& self, const IntervalRange& other) { + return self.start > other.end; + } + + static bool IsOverlap(const IntervalRange& self, const IntervalRange& other) { + return !IsBefore(self, other) && !IsAfter(self, other); + } +}; + +struct BitmapRange { + int64_t offset; + // zero added to, if there are less than 64 elements left in the column. + uint64_t bitmap; +}; + +struct End {}; + +// Represent a set of ranges to read. The ranges are sorted and non-overlapping. +class RowRanges { + public: + virtual ~RowRanges() = default; + /// \brief Total number of rows in the row ranges. + virtual size_t num_rows() const = 0; + /// \brief First row in the ranges + virtual int64_t first_row() const = 0; + /// \brief Last row in the ranges + virtual int64_t last_row() const = 0; Review Comment: What user would expect `num_rows()`, `first_row()` and `last_row()` to return if the RowRanges are invalid? If would be good to make it clear in the docstring. ########## cpp/src/parquet/row_range.cc: ########## @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/row_range.h" + +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { +// ---------------------------------------------------------------------- +// RowRanges and ins implementations +bool IsValid(const std::vector<IntervalRange>& ranges) { + if (ranges.size() == 0) return true; + if (ranges[0].start < 0) { + return false; + } + for (size_t i = 0; i < ranges.size(); i++) { + if (!IntervalRangeUtils::IsValid(ranges[i])) { + return false; + } + } + for (size_t i = 1; i < ranges.size(); i++) { + if (ranges[i].start <= ranges[i - 1].end) { + return false; + } + } + return true; +} + +IntervalRanges::IntervalRanges() = default; + +IntervalRanges::IntervalRanges(const IntervalRange& range) { + ranges_.push_back(range); + if (!IsValid(ranges_)) { + throw ParquetException("Invalid range with start: " + std::to_string(range.start) + + " and end: " + std::to_string(range.end) + + ", keep it monotone and non-interleaving"); + } +} + +IntervalRanges::IntervalRanges(const std::vector<IntervalRange>& ranges) { + this->ranges_ = ranges; + if (!IsValid(ranges_)) { + throw ParquetException("Invalid ranges: " + this->IntervalRanges::ToString() + + ", keep it monotone and non-interleaving"); + } +} + +std::unique_ptr<RowRanges::Iterator> IntervalRanges::NewIterator() const { + return std::make_unique<IntervalRowRangesIterator>(ranges_); +} + +size_t IntervalRanges::num_rows() const { + size_t cnt = 0; + for (const IntervalRange& range : ranges_) { + cnt += IntervalRangeUtils::Count(range); Review Comment: nit: use std::accumulate to simplify this. ########## cpp/src/parquet/row_range.cc: ########## @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/row_range.h" + +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { +// ---------------------------------------------------------------------- +// RowRanges and ins implementations Review Comment: ```suggestion // RowRanges implementations ``` typo ? ########## cpp/src/parquet/row_range.cc: ########## @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/row_range.h" + +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { +// ---------------------------------------------------------------------- +// RowRanges and ins implementations +bool IsValid(const std::vector<IntervalRange>& ranges) { + if (ranges.size() == 0) return true; + if (ranges[0].start < 0) { Review Comment: Why special case ranges[0]? Isn't it handled by line 33 below? ########## cpp/src/parquet/row_range.cc: ########## @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/row_range.h" + +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { +// ---------------------------------------------------------------------- +// RowRanges and ins implementations +bool IsValid(const std::vector<IntervalRange>& ranges) { + if (ranges.size() == 0) return true; + if (ranges[0].start < 0) { + return false; + } + for (size_t i = 0; i < ranges.size(); i++) { + if (!IntervalRangeUtils::IsValid(ranges[i])) { + return false; + } + } + for (size_t i = 1; i < ranges.size(); i++) { + if (ranges[i].start <= ranges[i - 1].end) { + return false; + } + } + return true; +} + +IntervalRanges::IntervalRanges() = default; + +IntervalRanges::IntervalRanges(const IntervalRange& range) { + ranges_.push_back(range); + if (!IsValid(ranges_)) { + throw ParquetException("Invalid range with start: " + std::to_string(range.start) + + " and end: " + std::to_string(range.end) + + ", keep it monotone and non-interleaving"); + } +} + +IntervalRanges::IntervalRanges(const std::vector<IntervalRange>& ranges) { + this->ranges_ = ranges; + if (!IsValid(ranges_)) { Review Comment: Ditto, it would be good to check `ranges` before assigning it to ranges_. ########## cpp/src/parquet/row_range.cc: ########## @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/row_range.h" + +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { +// ---------------------------------------------------------------------- +// RowRanges and ins implementations +bool IsValid(const std::vector<IntervalRange>& ranges) { + if (ranges.size() == 0) return true; + if (ranges[0].start < 0) { + return false; + } + for (size_t i = 0; i < ranges.size(); i++) { + if (!IntervalRangeUtils::IsValid(ranges[i])) { + return false; + } + } + for (size_t i = 1; i < ranges.size(); i++) { + if (ranges[i].start <= ranges[i - 1].end) { + return false; + } + } + return true; +} + +IntervalRanges::IntervalRanges() = default; + +IntervalRanges::IntervalRanges(const IntervalRange& range) { + ranges_.push_back(range); + if (!IsValid(ranges_)) { + throw ParquetException("Invalid range with start: " + std::to_string(range.start) + + " and end: " + std::to_string(range.end) + + ", keep it monotone and non-interleaving"); + } +} + +IntervalRanges::IntervalRanges(const std::vector<IntervalRange>& ranges) { + this->ranges_ = ranges; + if (!IsValid(ranges_)) { + throw ParquetException("Invalid ranges: " + this->IntervalRanges::ToString() + Review Comment: The exception message may be too verbose. What about making it simpler like `Failed to create row ranges from overlapping ranges or ranges not sorted in the ascending order` ########## cpp/src/parquet/column_reader.h: ########## @@ -473,6 +508,8 @@ class PARQUET_EXPORT RecordReader { // If true, we will not leave any space for the null values in the values_ // vector. bool read_dense_for_nullable_ = false; + + std::unique_ptr<RecordSkipper> skipper_ = NULLPTR; Review Comment: Add some comment to explain when it will be used. ########## cpp/src/parquet/row_range.cc: ########## @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/row_range.h" + +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { +// ---------------------------------------------------------------------- +// RowRanges and ins implementations +bool IsValid(const std::vector<IntervalRange>& ranges) { + if (ranges.size() == 0) return true; + if (ranges[0].start < 0) { + return false; + } + for (size_t i = 0; i < ranges.size(); i++) { + if (!IntervalRangeUtils::IsValid(ranges[i])) { + return false; + } + } + for (size_t i = 1; i < ranges.size(); i++) { + if (ranges[i].start <= ranges[i - 1].end) { + return false; + } + } + return true; +} + +IntervalRanges::IntervalRanges() = default; + +IntervalRanges::IntervalRanges(const IntervalRange& range) { + ranges_.push_back(range); + if (!IsValid(ranges_)) { + throw ParquetException("Invalid range with start: " + std::to_string(range.start) + + " and end: " + std::to_string(range.end) + + ", keep it monotone and non-interleaving"); + } +} + +IntervalRanges::IntervalRanges(const std::vector<IntervalRange>& ranges) { + this->ranges_ = ranges; + if (!IsValid(ranges_)) { + throw ParquetException("Invalid ranges: " + this->IntervalRanges::ToString() + + ", keep it monotone and non-interleaving"); + } +} + +std::unique_ptr<RowRanges::Iterator> IntervalRanges::NewIterator() const { + return std::make_unique<IntervalRowRangesIterator>(ranges_); +} + +size_t IntervalRanges::num_rows() const { + size_t cnt = 0; + for (const IntervalRange& range : ranges_) { + cnt += IntervalRangeUtils::Count(range); + } + return cnt; +} + +int64_t IntervalRanges::first_row() const { + if (ranges_.empty()) { + throw ParquetException("first_row() called on empty IntervalRanges"); + } + return ranges_.front().start; +} + +int64_t IntervalRanges::last_row() const { + if (ranges_.empty()) { + throw ParquetException("last_row() called on empty IntervalRanges"); + } + return ranges_.back().end; +} + +bool IntervalRanges::IsOverlapping(const int64_t start, const int64_t end) const { + auto searchRange = IntervalRange{start, end}; + auto it = std::lower_bound(ranges_.begin(), ranges_.end(), searchRange, + [](const IntervalRange& r1, const IntervalRange& r2) { + return IntervalRangeUtils::IsBefore(r1, r2); + }); + return it != ranges_.end() && !IntervalRangeUtils::IsAfter(*it, searchRange); +} + +std::string IntervalRanges::ToString() const { + std::string result = "["; + for (const IntervalRange& range : ranges_) { + result += IntervalRangeUtils::ToString(range) + ", "; + } + if (!ranges_.empty()) { + result = result.substr(0, result.size() - 2); + } + result += "]"; + return result; +} + +std::vector<std::unique_ptr<RowRanges>> IntervalRanges::SplitByRowRange( + const std::vector<int64_t>& num_rows_per_sub_ranges) const { + if (num_rows_per_sub_ranges.size() <= 1) { + std::unique_ptr<RowRanges> single = + std::make_unique<IntervalRanges>(*this); // return a copy of itself + auto ret = std::vector<std::unique_ptr<RowRanges>>(); + ret.push_back(std::move(single)); + return ret; + } + + std::vector<std::unique_ptr<RowRanges>> result; + + IntervalRanges spaces; + int64_t rows_so_far = 0; + for (size_t i = 0; i < num_rows_per_sub_ranges.size(); ++i) { + auto start = rows_so_far; + rows_so_far += num_rows_per_sub_ranges[i]; + auto end = rows_so_far - 1; + spaces.Add({start, end}); + } + + // each RG's row range forms a space, we need to adjust RowRanges in each space to Review Comment: Sorry that I didn't think about this thoroughly. We'd better split this function into two separate ones: - `std::vector<std::unique_ptr<RowRanges>> RowRanges::SplitByRowIndex(const std::vector<int64_t>& row_ids);` The input `row_ids` are start_row_index of each row group so we can split the RowRanges into smaller ones but still use the global row id. - `void RowRanges::OffsetBy(int64_t row_offset);` All row ranges simply minus row_offset so we can adjust them to be relative to a row group. WDYT? ########## cpp/src/parquet/row_range.cc: ########## @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/row_range.h" + +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { +// ---------------------------------------------------------------------- +// RowRanges and ins implementations +bool IsValid(const std::vector<IntervalRange>& ranges) { + if (ranges.size() == 0) return true; + if (ranges[0].start < 0) { + return false; + } + for (size_t i = 0; i < ranges.size(); i++) { + if (!IntervalRangeUtils::IsValid(ranges[i])) { + return false; + } + } + for (size_t i = 1; i < ranges.size(); i++) { + if (ranges[i].start <= ranges[i - 1].end) { + return false; + } + } + return true; +} + +IntervalRanges::IntervalRanges() = default; + +IntervalRanges::IntervalRanges(const IntervalRange& range) { + ranges_.push_back(range); + if (!IsValid(ranges_)) { + throw ParquetException("Invalid range with start: " + std::to_string(range.start) + + " and end: " + std::to_string(range.end) + + ", keep it monotone and non-interleaving"); + } +} + +IntervalRanges::IntervalRanges(const std::vector<IntervalRange>& ranges) { + this->ranges_ = ranges; + if (!IsValid(ranges_)) { + throw ParquetException("Invalid ranges: " + this->IntervalRanges::ToString() + + ", keep it monotone and non-interleaving"); + } +} + +std::unique_ptr<RowRanges::Iterator> IntervalRanges::NewIterator() const { + return std::make_unique<IntervalRowRangesIterator>(ranges_); +} + +size_t IntervalRanges::num_rows() const { + size_t cnt = 0; + for (const IntervalRange& range : ranges_) { + cnt += IntervalRangeUtils::Count(range); + } + return cnt; +} + +int64_t IntervalRanges::first_row() const { + if (ranges_.empty()) { + throw ParquetException("first_row() called on empty IntervalRanges"); + } + return ranges_.front().start; +} + +int64_t IntervalRanges::last_row() const { + if (ranges_.empty()) { + throw ParquetException("last_row() called on empty IntervalRanges"); + } + return ranges_.back().end; +} + +bool IntervalRanges::IsOverlapping(const int64_t start, const int64_t end) const { + auto searchRange = IntervalRange{start, end}; + auto it = std::lower_bound(ranges_.begin(), ranges_.end(), searchRange, + [](const IntervalRange& r1, const IntervalRange& r2) { + return IntervalRangeUtils::IsBefore(r1, r2); + }); + return it != ranges_.end() && !IntervalRangeUtils::IsAfter(*it, searchRange); +} + +std::string IntervalRanges::ToString() const { + std::string result = "["; Review Comment: IMO, IntervalRange should be expressed as `[a,b]`. For the IntervalRanges collection, we can express them as `{[a,b], [c,d], ...}` ########## cpp/src/parquet/row_range_test.cc: ########## @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include <gtest/gtest.h> +#include "parquet/column_reader.h" Review Comment: ```suggestion // under the License. #include "gtest/gtest.h" #include "parquet/column_reader.h" ``` ########## cpp/src/parquet/column_reader.h: ########## @@ -304,6 +305,30 @@ class TypedColumnReader : public ColumnReader { namespace internal { +// A RecordSkipper is used to skip uncessary rows within each pages. +class PARQUET_EXPORT RecordSkipper { + public: + RecordSkipper(IntervalRanges& pages, const RowRanges& orig_row_ranges); + /// Return the number of records to read or to skip Review Comment: ```suggestion /// Return the number of records to read or skip continuously ``` ########## cpp/src/parquet/row_range_test.cc: ########## @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include <gtest/gtest.h> +#include "parquet/column_reader.h" + +using parquet::IntervalRange; +using parquet::IntervalRanges; + +class RowRangesTest : public ::testing::Test { Review Comment: It seems unnecessary to explicitly define the `RowRangesTest` class? ########## cpp/src/parquet/row_range_test.cc: ########## @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include <gtest/gtest.h> +#include "parquet/column_reader.h" + +using parquet::IntervalRange; Review Comment: Lines below should be in the parquet namespace. ########## cpp/src/parquet/row_range.cc: ########## @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/row_range.h" + +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { +// ---------------------------------------------------------------------- +// RowRanges and ins implementations +bool IsValid(const std::vector<IntervalRange>& ranges) { + if (ranges.size() == 0) return true; + if (ranges[0].start < 0) { + return false; + } + for (size_t i = 0; i < ranges.size(); i++) { + if (!IntervalRangeUtils::IsValid(ranges[i])) { + return false; + } + } + for (size_t i = 1; i < ranges.size(); i++) { + if (ranges[i].start <= ranges[i - 1].end) { + return false; + } + } + return true; +} + +IntervalRanges::IntervalRanges() = default; + +IntervalRanges::IntervalRanges(const IntervalRange& range) { + ranges_.push_back(range); + if (!IsValid(ranges_)) { + throw ParquetException("Invalid range with start: " + std::to_string(range.start) + + " and end: " + std::to_string(range.end) + + ", keep it monotone and non-interleaving"); + } +} + +IntervalRanges::IntervalRanges(const std::vector<IntervalRange>& ranges) { + this->ranges_ = ranges; + if (!IsValid(ranges_)) { + throw ParquetException("Invalid ranges: " + this->IntervalRanges::ToString() + + ", keep it monotone and non-interleaving"); + } +} + +std::unique_ptr<RowRanges::Iterator> IntervalRanges::NewIterator() const { + return std::make_unique<IntervalRowRangesIterator>(ranges_); +} + +size_t IntervalRanges::num_rows() const { + size_t cnt = 0; + for (const IntervalRange& range : ranges_) { + cnt += IntervalRangeUtils::Count(range); + } + return cnt; +} + +int64_t IntervalRanges::first_row() const { + if (ranges_.empty()) { + throw ParquetException("first_row() called on empty IntervalRanges"); Review Comment: Should we expose the name of `IntervalRanges` in the error message? ########## cpp/src/parquet/column_reader.h: ########## @@ -432,6 +465,8 @@ class PARQUET_EXPORT RecordReader { bool at_record_start_; int64_t records_read_; + int64_t current_rg_processed_records_ = 0; // counting both read and skip records Review Comment: ```suggestion int64_t records_processed_ = 0; // counting both read and skip records ``` AFAIK, RecordReader is bind to a single row group. ########## cpp/src/parquet/column_reader.h: ########## @@ -304,6 +305,30 @@ class TypedColumnReader : public ColumnReader { namespace internal { +// A RecordSkipper is used to skip uncessary rows within each pages. +class PARQUET_EXPORT RecordSkipper { + public: + RecordSkipper(IntervalRanges& pages, const RowRanges& orig_row_ranges); + /// Return the number of records to read or to skip + /// if return values is positive, it means to read N records + /// if return values is negative, it means to skip N records + /// if return values is 0, it means end of RG + int64_t AdviseNext(const int64_t current_rg_processed); Review Comment: Should we declare it as below: ```cpp enum class NextRead : int8_t { READ, SKIP, END }; std::pair<NextRead, int64_t> AdviseNext(int64_t num_processed_records); ``` Here I also want to reiterate that we can name the class as `RowGroupRecordSkipper` ########## cpp/src/parquet/column_reader.h: ########## @@ -304,6 +305,30 @@ class TypedColumnReader : public ColumnReader { namespace internal { +// A RecordSkipper is used to skip uncessary rows within each pages. +class PARQUET_EXPORT RecordSkipper { + public: + RecordSkipper(IntervalRanges& pages, const RowRanges& orig_row_ranges); + /// Return the number of records to read or to skip + /// if return values is positive, it means to read N records + /// if return values is negative, it means to skip N records + /// if return values is 0, it means end of RG + int64_t AdviseNext(const int64_t current_rg_processed); + + private: + /// Since the skipped pages will be silently skipped without updating + /// current_rg_processed_records or records_read_, we need to pre-process the row + /// ranges as if these skipped pages never existed + static void AdjustRanges(IntervalRanges& skip_pages, const RowRanges& orig_row_ranges, + std::unique_ptr<RowRanges>& ret); + + std::unique_ptr<RowRanges> row_ranges_; Review Comment: ```suggestion std::unique_ptr<RowRanges> requested_row_ranges_; ``` ########## cpp/src/parquet/row_range_test.cc: ########## @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include <gtest/gtest.h> +#include "parquet/column_reader.h" + +using parquet::IntervalRange; +using parquet::IntervalRanges; + +class RowRangesTest : public ::testing::Test { + protected: + IntervalRanges row_ranges; +}; + +TEST_F(RowRangesTest, EmptyRG_ReturnsOriginalRowRanges) { + row_ranges.Add(IntervalRange(0, 10)); + std::vector<int64_t> rows_per_rg; + + auto result = row_ranges.SplitByRowRange(rows_per_rg); + ASSERT_EQ(result.size(), 1); + + auto iter = result[0]->NewIterator(); + auto range = std::get<IntervalRange>(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 10); + ASSERT_EQ(iter->NextRange().index(), 2); +} + +TEST_F(RowRangesTest, SingleRG_ReturnsOriginalRowRanges2) { + row_ranges.Add(IntervalRange(0, 10)); + std::vector<int64_t> rows_per_rg = {11}; + + auto result = row_ranges.SplitByRowRange(rows_per_rg); + ASSERT_EQ(result.size(), 1); + + auto iter = result[0]->NewIterator(); + auto range = std::get<IntervalRange>(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 10); + ASSERT_EQ(iter->NextRange().index(), 2); +} + +TEST_F(RowRangesTest, ReturnsTwoRowRanges) { + row_ranges.Add(IntervalRange(0, 10)); + std::vector<int64_t> rows_per_rg = {5, 6}; + + auto result = row_ranges.SplitByRowRange(rows_per_rg); + ASSERT_EQ(result.size(), 2); + { + auto iter = result[0]->NewIterator(); + auto range = std::get<IntervalRange>(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 4); + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[1]->NewIterator(); + auto range = std::get<IntervalRange>(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 5); + ASSERT_EQ(iter->NextRange().index(), 2); + } +} + +TEST_F(RowRangesTest, ReturnsMultipleRowRanges) { + row_ranges.Add(IntervalRange(0, 11)); + std::vector<int64_t> rows_per_rg = {3, 4, 100}; + + auto result = row_ranges.SplitByRowRange(rows_per_rg); + ASSERT_EQ(result.size(), 3); + { + auto iter = result[0]->NewIterator(); + auto range = std::get<IntervalRange>(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 2); + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[1]->NewIterator(); + auto range = std::get<IntervalRange>(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 3); + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[2]->NewIterator(); + auto range = std::get<IntervalRange>(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 4); + ASSERT_EQ(iter->NextRange().index(), 2); + } +} + +TEST_F(RowRangesTest, MultipleInputRange) { + row_ranges.Add(IntervalRange(0, 10)); + row_ranges.Add(IntervalRange(90, 111)); + row_ranges.Add(IntervalRange(191, 210)); + + std::vector<int64_t> rows_per_rg = {100, 100}; + + auto result = row_ranges.SplitByRowRange(rows_per_rg); + ASSERT_EQ(result.size(), 2); + { + auto iter = result[0]->NewIterator(); + auto range = std::get<IntervalRange>(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 10); + + range = std::get<IntervalRange>(iter->NextRange()); + ASSERT_EQ(range.start, 90); + ASSERT_EQ(range.end, 99); + + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[1]->NewIterator(); + auto range = std::get<IntervalRange>(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 11); + + range = std::get<IntervalRange>(iter->NextRange()); + ASSERT_EQ(range.start, 91); + ASSERT_EQ(range.end, 99); + + ASSERT_EQ(iter->NextRange().index(), 2); + } +} + +TEST_F(RowRangesTest, MultipleSplitPoints_ReturnWithEmptyRowRanges) { + row_ranges.Add(IntervalRange(11, 18)); + std::vector<int64_t> rows_per_rg = {5, 5, 5, 5, 5}; + + auto result = row_ranges.SplitByRowRange(rows_per_rg); + ASSERT_EQ(result.size(), 5); + { + auto iter = result[0]->NewIterator(); + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[1]->NewIterator(); + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[2]->NewIterator(); + auto range = std::get<IntervalRange>(iter->NextRange()); + ASSERT_EQ(range.start, 1); + ASSERT_EQ(range.end, 4); + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[3]->NewIterator(); + auto range = std::get<IntervalRange>(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 3); + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[4]->NewIterator(); + ASSERT_EQ(iter->NextRange().index(), 2); + } +} + +TEST_F(RowRangesTest, RangeExceedRG) { Review Comment: These tests are insufficient to me. We need to cover at least all the public APIs and currently it seems only to be focused on splitting and iterator interface. We also need to verify RowRanges can handle unexpected inputs correctly. ########## cpp/src/parquet/row_range.cc: ########## @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/row_range.h" + +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { +// ---------------------------------------------------------------------- +// RowRanges and ins implementations +bool IsValid(const std::vector<IntervalRange>& ranges) { + if (ranges.size() == 0) return true; + if (ranges[0].start < 0) { + return false; + } + for (size_t i = 0; i < ranges.size(); i++) { + if (!IntervalRangeUtils::IsValid(ranges[i])) { + return false; + } + } + for (size_t i = 1; i < ranges.size(); i++) { + if (ranges[i].start <= ranges[i - 1].end) { + return false; + } + } + return true; +} + +IntervalRanges::IntervalRanges() = default; + +IntervalRanges::IntervalRanges(const IntervalRange& range) { + ranges_.push_back(range); + if (!IsValid(ranges_)) { + throw ParquetException("Invalid range with start: " + std::to_string(range.start) + + " and end: " + std::to_string(range.end) + + ", keep it monotone and non-interleaving"); + } +} + +IntervalRanges::IntervalRanges(const std::vector<IntervalRange>& ranges) { + this->ranges_ = ranges; + if (!IsValid(ranges_)) { + throw ParquetException("Invalid ranges: " + this->IntervalRanges::ToString() + + ", keep it monotone and non-interleaving"); + } +} + +std::unique_ptr<RowRanges::Iterator> IntervalRanges::NewIterator() const { + return std::make_unique<IntervalRowRangesIterator>(ranges_); +} + +size_t IntervalRanges::num_rows() const { + size_t cnt = 0; + for (const IntervalRange& range : ranges_) { + cnt += IntervalRangeUtils::Count(range); + } + return cnt; +} + +int64_t IntervalRanges::first_row() const { + if (ranges_.empty()) { + throw ParquetException("first_row() called on empty IntervalRanges"); + } + return ranges_.front().start; +} + +int64_t IntervalRanges::last_row() const { + if (ranges_.empty()) { + throw ParquetException("last_row() called on empty IntervalRanges"); + } + return ranges_.back().end; +} + +bool IntervalRanges::IsOverlapping(const int64_t start, const int64_t end) const { + auto searchRange = IntervalRange{start, end}; + auto it = std::lower_bound(ranges_.begin(), ranges_.end(), searchRange, + [](const IntervalRange& r1, const IntervalRange& r2) { + return IntervalRangeUtils::IsBefore(r1, r2); + }); + return it != ranges_.end() && !IntervalRangeUtils::IsAfter(*it, searchRange); +} + +std::string IntervalRanges::ToString() const { + std::string result = "["; + for (const IntervalRange& range : ranges_) { + result += IntervalRangeUtils::ToString(range) + ", "; + } + if (!ranges_.empty()) { + result = result.substr(0, result.size() - 2); + } + result += "]"; + return result; +} + +std::vector<std::unique_ptr<RowRanges>> IntervalRanges::SplitByRowRange( + const std::vector<int64_t>& num_rows_per_sub_ranges) const { + if (num_rows_per_sub_ranges.size() <= 1) { + std::unique_ptr<RowRanges> single = + std::make_unique<IntervalRanges>(*this); // return a copy of itself + auto ret = std::vector<std::unique_ptr<RowRanges>>(); + ret.push_back(std::move(single)); + return ret; + } + + std::vector<std::unique_ptr<RowRanges>> result; + + IntervalRanges spaces; + int64_t rows_so_far = 0; + for (size_t i = 0; i < num_rows_per_sub_ranges.size(); ++i) { + auto start = rows_so_far; + rows_so_far += num_rows_per_sub_ranges[i]; + auto end = rows_so_far - 1; + spaces.Add({start, end}); + } + + // each RG's row range forms a space, we need to adjust RowRanges in each space to + // zero based. + for (IntervalRange space : spaces.GetRanges()) { + auto intersection = Intersection(IntervalRanges(space), *this); + + std::unique_ptr<IntervalRanges> zero_based_ranges = + std::make_unique<IntervalRanges>(); + for (const IntervalRange& range : intersection.GetRanges()) { + zero_based_ranges->Add({range.start - space.start, range.end - space.start}); + } + result.push_back(std::move(zero_based_ranges)); + } + + return result; +} + +IntervalRanges IntervalRanges::Intersection(const IntervalRanges& left, + const IntervalRanges& right) { Review Comment: Do we need to validate both inputs? Or we'd better make this clear in the docstring. ########## cpp/src/parquet/column_reader.h: ########## @@ -304,6 +305,30 @@ class TypedColumnReader : public ColumnReader { namespace internal { +// A RecordSkipper is used to skip uncessary rows within each pages. +class PARQUET_EXPORT RecordSkipper { + public: + RecordSkipper(IntervalRanges& pages, const RowRanges& orig_row_ranges); Review Comment: ```suggestion RecordSkipper(const RowRanges& requested_row_ranges, IntervalRanges* page_row_ranges); ``` IMO, it would be good to place the in/out param at the end. Please also add docstring to state that `page_row_ranges` is a in/out param. ########## cpp/src/parquet/arrow/reader.h: ########## @@ -180,6 +181,19 @@ class PARQUET_EXPORT FileReader { const std::vector<int>& row_group_indices, const std::vector<int>& column_indices, std::unique_ptr<::arrow::RecordBatchReader>* out) = 0; + /// \brief Return a RecordBatchReader of row groups selected from + /// rows_to_return, whose columns are selected by column_indices. + /// + /// Notice that rows_to_return is file based, it not only decides which row groups to + /// read, but also which rows to read in each row group. + /// + /// Review Comment: ```suggestion /// read, but also which rows to read in each row group. /// ``` ########## cpp/src/parquet/arrow/reader.cc: ########## @@ -199,10 +201,11 @@ class FileReaderImpl : public FileReader { return ReadRowGroups(Iota(reader_->metadata()->num_row_groups()), indices, out); } - Status GetFieldReader(int i, - const std::shared_ptr<std::unordered_set<int>>& included_leaves, - const std::vector<int>& row_groups, - std::unique_ptr<ColumnReaderImpl>* out) { + Status GetFieldReader( Review Comment: Then we can limit the scope of refactering work to overload SomeRowGroupsFactory below at line 221: ``` ctx->iterator_factory = SomeRowGroupsFactory(row_groups); ``` ########## cpp/src/parquet/column_reader.h: ########## @@ -302,8 +303,150 @@ class TypedColumnReader : public ColumnReader { int32_t* dict_len) = 0; }; +// Represent a range to read. The range is inclusive on both ends. +struct IntervalRange { + static IntervalRange Intersection(const IntervalRange& left, + const IntervalRange& right) { + if (left.start <= right.start) { + if (left.end >= right.start) { + return {right.start, std::min(left.end, right.end)}; + } + } else if (right.end >= left.start) { + return {left.start, std::min(left.end, right.end)}; + } + return {-1, -1}; // Return a default Range object if no intersection range found + } + + IntervalRange(const int64_t start_, const int64_t end_) : start(start_), end(end_) { + if (start > end) { + throw ParquetException("Invalid range with start: " + std::to_string(start) + + " and end: " + std::to_string(end)); + } + } + + size_t Count() const { + if(!IsValid()) { + throw ParquetException("Invalid range with start: " + std::to_string(start) + + " and end: " + std::to_string(end)); + } + return end - start + 1; + } + + bool IsBefore(const IntervalRange& other) const { return end < other.start; } + + bool IsAfter(const IntervalRange& other) const { return start > other.end; } + + bool IsOverlap(const IntervalRange& other) const { + return !IsBefore(other) && !IsAfter(other); + } + + bool IsValid() const { return start >= 0 && end >= 0 && end >= start; } + + std::string ToString() const { + return "(" + std::to_string(start) + ", " + std::to_string(end) + ")"; + } + + // inclusive + int64_t start = -1; + // inclusive + int64_t end = -1; +}; + +struct BitmapRange { + int64_t offset; + // zero added to, if there are less than 64 elements left in the column. + uint64_t bitmap; +}; + +struct End {}; + +// Represent a set of ranges to read. The ranges are sorted and non-overlapping. +class RowRanges { + public: + RowRanges() = default; + virtual ~RowRanges() = default; + virtual size_t RowCount() const = 0; + virtual int64_t LastRow() const = 0; + virtual bool IsValid() const = 0; + virtual bool IsOverlapping(const IntervalRange& searchRange) const = 0; + // Given a RowRanges with rows accross all RGs, split it into N RowRanges, where N = number of RGs + // e.g.: suppose we have 2 RGs: [0-99] and [100-199], and user is interested in RowRanges [90-110], then + // this function will return 2 RowRanges: [90-99] and [0-10] + virtual std::vector<std::unique_ptr<RowRanges>> SplitByRowGroups(const std::vector<int64_t>& rows_per_rg) const = 0; + virtual std::string ToString() const = 0; + + // Returns a vector of PageLocations that must be read all to get values for + // all included in this range virtual std::vector<PageLocation> + // PageIndexesToInclude(const std::vector<PageLocation>& all_pages) = 0; + + class Iterator { + public: + virtual std::variant<IntervalRange, BitmapRange, End> NextRange() = 0; + virtual ~Iterator() = default; + }; + virtual std::unique_ptr<Iterator> NewIterator() const = 0; + +}; + +class IntervalRanges : public RowRanges { Review Comment: Clients are expected to pass RowRanges (not IntervalRanges) and we should support API like below to facilitate creating RowRanges: ``` std::unique_ptr<RowRanges> RowRanges::Make(const std::vector<IntervalRange>& ranges); ``` ########## cpp/src/parquet/column_reader.h: ########## @@ -304,6 +305,30 @@ class TypedColumnReader : public ColumnReader { namespace internal { +// A RecordSkipper is used to skip uncessary rows within each pages. +class PARQUET_EXPORT RecordSkipper { + public: + RecordSkipper(IntervalRanges& pages, const RowRanges& orig_row_ranges); + /// Return the number of records to read or to skip + /// if return values is positive, it means to read N records + /// if return values is negative, it means to skip N records + /// if return values is 0, it means end of RG + int64_t AdviseNext(const int64_t current_rg_processed); + + private: + /// Since the skipped pages will be silently skipped without updating + /// current_rg_processed_records or records_read_, we need to pre-process the row + /// ranges as if these skipped pages never existed + static void AdjustRanges(IntervalRanges& skip_pages, const RowRanges& orig_row_ranges, Review Comment: BTW, could we use base class RowRanges instead of IntervalRanges here? ########## cpp/src/parquet/row_range.h: ########## @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module contains the logical parquet-cpp types (independent of Thrift +// structures), schema nodes, and related type tools + +#pragma once +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { + +// Represent a range to read. The range is inclusive on both ends. +struct IntervalRange { + IntervalRange(const int64_t start_, const int64_t end_) : start(start_), end(end_) { + if (start > end) { + throw ParquetException("Invalid range with start: " + std::to_string(start) + + " bigger than end: " + std::to_string(end)); + } + } + + // inclusive + int64_t start = -1; + // inclusive + int64_t end = -1; +}; + +class IntervalRangeUtils { + public: + static IntervalRange Intersection(const IntervalRange& left, + const IntervalRange& right) { + if (left.start <= right.start) { + if (left.end >= right.start) { + return {right.start, std::min(left.end, right.end)}; + } + } else if (right.end >= left.start) { + return {left.start, std::min(left.end, right.end)}; + } + return {-1, -1}; // Return a default Range object if no intersection range found + } + + static std::string ToString(const IntervalRange& range) { + return "(" + std::to_string(range.start) + ", " + std::to_string(range.end) + ")"; + } + + static bool IsValid(const IntervalRange& range) { + return range.start >= 0 && range.end >= 0 && range.end >= range.start; + } + + static size_t Count(const IntervalRange& range) { + if (!IsValid(range)) { + throw ParquetException("Invalid range: " + ToString(range)); + } + return range.end - range.start + 1; + } + + static bool IsBefore(const IntervalRange& self, const IntervalRange& other) { + return self.end < other.start; + } + + static bool IsAfter(const IntervalRange& self, const IntervalRange& other) { + return self.start > other.end; + } + + static bool IsOverlap(const IntervalRange& self, const IntervalRange& other) { + return !IsBefore(self, other) && !IsAfter(self, other); + } +}; + +struct BitmapRange { Review Comment: Could you please also add docstring for it? This should be a user-faced API. ########## cpp/src/parquet/row_range.h: ########## @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module contains the logical parquet-cpp types (independent of Thrift +// structures), schema nodes, and related type tools + +#pragma once +#include <variant> + +#include "parquet/exception.h" + +namespace parquet { + +// Represent a range to read. The range is inclusive on both ends. +struct IntervalRange { Review Comment: Public classes should add `PARQUET_EXPORT`. ########## cpp/src/parquet/column_reader.cc: ########## @@ -2300,5 +2322,71 @@ std::shared_ptr<RecordReader> RecordReader::Make(const ColumnDescriptor* descr, return nullptr; } +RecordSkipper::RecordSkipper(IntervalRanges& pages, const RowRanges& orig_row_ranges) { + // copy row_ranges + IntervalRanges skip_pages; + for (auto& page : pages.GetRanges()) { + if (!orig_row_ranges.IsOverlapping(page.start, page.end)) { + skip_pages.Add(page); + } + } + + AdjustRanges(skip_pages, orig_row_ranges, row_ranges_); + range_iter_ = row_ranges_->NewIterator(); + current_range_variant = range_iter_->NextRange(); Review Comment: ```suggestion current_range_ = range_iter_->NextRange(); ``` ########## cpp/src/parquet/arrow/reader.cc: ########## @@ -199,10 +201,11 @@ class FileReaderImpl : public FileReader { return ReadRowGroups(Iota(reader_->metadata()->num_row_groups()), indices, out); } - Status GetFieldReader(int i, - const std::shared_ptr<std::unordered_set<int>>& included_leaves, - const std::vector<int>& row_groups, - std::unique_ptr<ColumnReaderImpl>* out) { + Status GetFieldReader( Review Comment: This function signature now looks strange to me since it contains conflicting parameters `const std::vector<int>& row_groups` and `const std::shared_ptr<std::vector<std::unique_ptr<RowRanges>>>& row_ranges_per_rg`. What about splitting file-based row ranges lazily? Then we can re-define this as ``` // RowGroups can be either std::vector<int> or RowRanges template <typename RowGroups> Status GetFieldReader(int i, const std::shared_ptr<std::unordered_set<int>>& included_leaves, const RowGroups& row_groups, std::unique_ptr<ColumnReaderImpl>* out); ``` Or if you still want to split file-based row ranges eagerly, we can do this: ``` // RowGroups can be either std::vector<int> or std::map<int, RowRanges> template <typename RowGroups> Status GetFieldReader(int i, const std::shared_ptr<std::unordered_set<int>>& included_leaves, const RowGroups& row_groups, std::unique_ptr<ColumnReaderImpl>* out); ``` ########## cpp/src/parquet/column_reader.cc: ########## @@ -1971,9 +1975,27 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>, this->ConsumeBufferedValues(values_to_read); } + current_rg_processed_records_ += records_read; return records_read; } + int64_t ReadRecordDataWithSkipCheck(const int64_t num_records) { Review Comment: Originally we can safely use `ReadRecords()` and `SkipRecords()` without any assumption. Now we have mixed `ReadRecords()` into `ReadRecords()` and users can no longer call `SkipRecords()` blindly. Should we throw in the SkipRecords() if record_skipper_ has been enabled and use a custom SkipRecordsXXX here instead? @fatemehp Do you have any concern on this? ########## cpp/src/parquet/column_reader.h: ########## @@ -302,8 +303,150 @@ class TypedColumnReader : public ColumnReader { int32_t* dict_len) = 0; }; +// Represent a range to read. The range is inclusive on both ends. +struct IntervalRange { + static IntervalRange Intersection(const IntervalRange& left, + const IntervalRange& right) { + if (left.start <= right.start) { + if (left.end >= right.start) { + return {right.start, std::min(left.end, right.end)}; + } + } else if (right.end >= left.start) { + return {left.start, std::min(left.end, right.end)}; + } + return {-1, -1}; // Return a default Range object if no intersection range found + } + + IntervalRange(const int64_t start_, const int64_t end_) : start(start_), end(end_) { + if (start > end) { + throw ParquetException("Invalid range with start: " + std::to_string(start) + + " and end: " + std::to_string(end)); + } + } + + size_t Count() const { + if(!IsValid()) { + throw ParquetException("Invalid range with start: " + std::to_string(start) + + " and end: " + std::to_string(end)); + } + return end - start + 1; + } + + bool IsBefore(const IntervalRange& other) const { return end < other.start; } + + bool IsAfter(const IntervalRange& other) const { return start > other.end; } + + bool IsOverlap(const IntervalRange& other) const { + return !IsBefore(other) && !IsAfter(other); + } + + bool IsValid() const { return start >= 0 && end >= 0 && end >= start; } + + std::string ToString() const { + return "(" + std::to_string(start) + ", " + std::to_string(end) + ")"; + } + + // inclusive + int64_t start = -1; + // inclusive + int64_t end = -1; +}; + +struct BitmapRange { + int64_t offset; + // zero added to, if there are less than 64 elements left in the column. + uint64_t bitmap; +}; + +struct End {}; + +// Represent a set of ranges to read. The ranges are sorted and non-overlapping. +class RowRanges { + public: + RowRanges() = default; + virtual ~RowRanges() = default; + virtual size_t RowCount() const = 0; + virtual int64_t LastRow() const = 0; + virtual bool IsValid() const = 0; + virtual bool IsOverlapping(const IntervalRange& searchRange) const = 0; + // Given a RowRanges with rows accross all RGs, split it into N RowRanges, where N = number of RGs + // e.g.: suppose we have 2 RGs: [0-99] and [100-199], and user is interested in RowRanges [90-110], then + // this function will return 2 RowRanges: [90-99] and [0-10] + virtual std::vector<std::unique_ptr<RowRanges>> SplitByRowGroups(const std::vector<int64_t>& rows_per_rg) const = 0; + virtual std::string ToString() const = 0; + + // Returns a vector of PageLocations that must be read all to get values for + // all included in this range virtual std::vector<PageLocation> + // PageIndexesToInclude(const std::vector<PageLocation>& all_pages) = 0; + + class Iterator { + public: + virtual std::variant<IntervalRange, BitmapRange, End> NextRange() = 0; + virtual ~Iterator() = default; + }; + virtual std::unique_ptr<Iterator> NewIterator() const = 0; + +}; + +class IntervalRanges : public RowRanges { + public: + IntervalRanges(); + explicit IntervalRanges(const IntervalRange& range); + explicit IntervalRanges(const std::vector<IntervalRange>& ranges); + std::unique_ptr<Iterator> NewIterator() const override; + size_t RowCount() const override; + int64_t LastRow() const override; + bool IsValid() const override; + bool IsOverlapping(const IntervalRange& searchRange) const override; + std::string ToString() const override; + std::vector<std::unique_ptr<RowRanges>> SplitByRowGroups( + const std::vector<int64_t>& rows_per_rg) const override; + static IntervalRanges Intersection(const IntervalRanges& left, + const IntervalRanges& right); + void Add(const IntervalRange& range); + const std::vector<IntervalRange>& GetRanges() const; + + private: + std::vector<IntervalRange> ranges_; +}; + +class IntervalRowRangesIterator : public RowRanges::Iterator { + public: + IntervalRowRangesIterator(const std::vector<IntervalRange>& ranges); + ~IntervalRowRangesIterator() override; + std::variant<IntervalRange, BitmapRange, End> NextRange() override; + + private: + const std::vector<IntervalRange>& ranges_; + size_t current_index_ = 0; +}; + namespace internal { +// A RecordSkipper is used to skip uncessary rows within each pages. +class PARQUET_EXPORT RecordSkipper { Review Comment: You may want to apply method III by moving the definition of `~RecordReader()` into column_reader.cc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
