lgbo-ustc commented on code in PR #4634: URL: https://github.com/apache/incubator-gluten/pull/4634#discussion_r1522353767
########## cpp-ch/local-engine/Storages/Parquet/ColumnIndexFilter.cpp: ########## @@ -0,0 +1,982 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "ColumnIndexFilter.h" + +#if USE_PARQUET +#include <ranges> +#include <Interpreters/misc.h> +#include <Storages/MergeTree/KeyCondition.h> +#include <Storages/MergeTree/RPNBuilder.h> +#include <Storages/Parquet/ParquetConverter.h> +#include <parquet/schema.h> +#include <parquet/statistics.h> + +namespace local_engine +{ +struct BoundaryOrder; +struct UNORDERED; +struct ASCENDING; +struct DESCENDING; + +template <typename DType> +class TypedComparator; + +struct NoneNullPageIndexsBuilder +{ + explicit NoneNullPageIndexsBuilder(const std::vector<int32_t> & non_null_page_indices) : non_null_page_indices_(non_null_page_indices) + { + } + const std::vector<int32_t> & non_null_page_indices_; + + PageIndexs all() const { return non_null_page_indices_; } + PageIndexs range(const int32_t from, const int32_t to) const + { + const auto begin = non_null_page_indices_.begin() + from; + const auto end = non_null_page_indices_.begin() + to; + return PageIndexs{begin, end}; + } + + template <typename Predict> + PageIndexs filter(int32_t from, const int32_t to, Predict predict) const + { + PageIndexs pages; + for (; from != to; ++from) + if (predict(from)) + pages.emplace_back(non_null_page_indices_[from]); + return pages; + } + + template <typename Predict> + PageIndexs filter(Predict predict) const + { + return filter(0, non_null_page_indices_.size(), predict); + } +}; + +class EmptyColumnIndex final : public ColumnIndex +{ +public: + const parquet::OffsetIndex & offsetIndex() const override { return *offset_index_; } + bool hasParquetColumnIndex() const override { return false; } + + PageIndexs notEq(const DB::Field &) const override { abort(); } + PageIndexs eq(const DB::Field &) const override { abort(); } + PageIndexs gt(const DB::Field &) const override { abort(); } + PageIndexs gtEg(const DB::Field &) const override { abort(); } + PageIndexs lt(const DB::Field &) const override { abort(); } + PageIndexs ltEg(const DB::Field &) const override { abort(); } + PageIndexs in(const DB::ColumnPtr &) const override { abort(); } + + explicit EmptyColumnIndex(const std::shared_ptr<parquet::OffsetIndex> & offset_index) : offset_index_(offset_index) { } + +private: + std::shared_ptr<parquet::OffsetIndex> offset_index_; +}; + +template <typename T, typename Base> +concept Derived = std::is_base_of_v<Base, T>; + +template <typename DType, Derived<BoundaryOrder> ORDER> +class TypedColumnIndexImpl final : public TypedColumnIndex<DType> +{ + const parquet::ColumnDescriptor * descr_; + std::shared_ptr<parquet::TypedColumnIndex<DType>> column_index_; + std::shared_ptr<parquet::OffsetIndex> offset_index_; + std::shared_ptr<parquet::TypedComparator<DType>> comparator_; + +public: + using T = typename DType::c_type; + + TypedColumnIndexImpl( + const parquet::ColumnDescriptor * descr, + const std::shared_ptr<parquet::TypedColumnIndex<DType>> & column_index, + const std::shared_ptr<parquet::OffsetIndex> & offset_index) + : descr_(descr), column_index_(column_index), offset_index_(offset_index), comparator_(parquet::MakeComparator<DType>(descr)) + { + } + PageIndexs notEq(const DB::Field & value) const override; + PageIndexs eq(const DB::Field & value) const override; + PageIndexs gt(const DB::Field & value) const override; + PageIndexs gtEg(const DB::Field & value) const override; + PageIndexs lt(const DB::Field & value) const override; + PageIndexs ltEg(const DB::Field & value) const override; + PageIndexs in(const DB::ColumnPtr & column) const override; + + const parquet::OffsetIndex & offsetIndex() const override { return *offset_index_; } + bool hasParquetColumnIndex() const override { return true; } +}; + +/* + * A class containing the value to be compared to the min/max values. This way we only need to do the deboxing once + * per predicate execution instead for every comparison. + */ +template <typename DType> +class TypedComparator +{ + using T = typename DType::c_type; + const T & value_; + const std::vector<T> & min_; + const std::vector<T> & max_; + const std::vector<int32_t> & non_null_page_indices_; + parquet::TypedComparator<DType> & comparator_; + friend UNORDERED; + friend ASCENDING; + friend DESCENDING; + +public: + TypedComparator(const T & value, const parquet::TypedColumnIndex<DType> & index, parquet::TypedComparator<DType> & comparator) + : value_(value) + , min_(index.min_values()) + , max_(index.max_values()) + , non_null_page_indices_(index.non_null_page_indices()) + , comparator_(comparator) + { + } + size_t size() const { return non_null_page_indices_.size(); } + int32_t compareValueToMin(size_t non_null_page_index) const + { + const T & min = min_[non_null_page_indices_[non_null_page_index]]; + return comparator_.Compare(value_, min) ? -1 : comparator_.Compare(min, value_) ? 1 : 0; + } + int32_t compareValueToMax(size_t non_null_page_index) const + { + const T & max = max_[non_null_page_indices_[non_null_page_index]]; + return comparator_.Compare(value_, max) ? -1 : comparator_.Compare(max, value_) ? 1 : 0; + } +}; + +struct Bounds +{ + const int32_t lower; + const int32_t upper; + + Bounds(const int32_t l, const int32_t u) : lower(l), upper(u) { } +}; + +struct BoundaryOrder +{ + /// Avoid the possible overflow might happen in case of (left + right) / 2 + static int32_t floorMid(const int32_t lower, const int32_t upper) { return lower + (upper - lower) / 2; } + + /// Avoid the possible overflow might happen in case of (left + right + 1) / 2 + static int32_t ceilingMid(const int32_t lower, const int32_t upper) { return lower + (upper - lower + 1) / 2; } +}; + +struct UNORDERED : BoundaryOrder +{ + template <typename DType> + static PageIndexs eq(const TypedComparator<DType> & comparator) + { + return NoneNullPageIndexsBuilder{comparator.non_null_page_indices_}.filter( + [&](const int32_t i) { return comparator.compareValueToMin(i) >= 0 && comparator.compareValueToMax(i) <= 0; }); + } + + template <typename DType> + static PageIndexs gt(const TypedComparator<DType> & comparator) + { + return NoneNullPageIndexsBuilder{comparator.non_null_page_indices_}.filter([&](const int32_t i) + { return comparator.compareValueToMax(i) < 0; }); + } + + template <typename DType> + static PageIndexs gtEq(const TypedComparator<DType> & comparator) + { + return NoneNullPageIndexsBuilder{comparator.non_null_page_indices_}.filter([&](const int32_t i) + { return comparator.compareValueToMax(i) <= 0; }); + } + + template <typename DType> + static PageIndexs lt(const TypedComparator<DType> & comparator) + { + return NoneNullPageIndexsBuilder{comparator.non_null_page_indices_}.filter([&](const int32_t i) + { return comparator.compareValueToMin(i) > 0; }); + } + + template <typename DType> + static PageIndexs ltEq(const TypedComparator<DType> & comparator) + { + return NoneNullPageIndexsBuilder{comparator.non_null_page_indices_}.filter([&](const int32_t i) + { return comparator.compareValueToMin(i) >= 0; }); + } + + template <typename DType> + static PageIndexs notEq(const TypedComparator<DType> & comparator) + { + return NoneNullPageIndexsBuilder{comparator.non_null_page_indices_}.filter( + [&](const int32_t i) { return comparator.compareValueToMin(i) != 0 || comparator.compareValueToMax(i) != 0; }); + } +}; + +struct ASCENDING : BoundaryOrder +{ + template <typename DType> + static std::optional<Bounds> findBounds(const TypedComparator<DType> & comparator) + { + const int32_t length = comparator.size(); + int32_t lowerLeft = 0; + int32_t upperLeft = 0; + int32_t lowerRight = length - 1; + int32_t upperRight = length - 1; + + do + { + if (lowerLeft > lowerRight) + return std::nullopt; + + const int32_t i = floorMid(lowerLeft, lowerRight); + if (comparator.compareValueToMin(i) < 0) + lowerRight = upperRight = i - 1; + else if (comparator.compareValueToMax(i) > 0) + lowerLeft = upperLeft = i + 1; + else + lowerRight = upperLeft = i; + } while (lowerLeft != lowerRight); + + do + { + if (upperLeft > upperRight) + return std::nullopt; + + const int32_t i = ceilingMid(upperLeft, upperRight); + if (comparator.compareValueToMin(i) < 0) + upperRight = i - 1; + else if (comparator.compareValueToMax(i) > 0) + upperLeft = i + 1; + else + upperLeft = i; + } while (upperLeft != upperRight); + + return Bounds(lowerLeft, upperRight); + } + + template <typename DType> + static PageIndexs eq(const TypedComparator<DType> & comparator) + { + const NoneNullPageIndexsBuilder builder(comparator.non_null_page_indices_); + return findBounds(comparator) + .transform([&](const Bounds & b) { return builder.range(b.lower, b.upper + 1); }) + .value_or(PageIndexs{}); + } + + template <typename DType> + static PageIndexs gt(const TypedComparator<DType> & comparator) + { + const int32_t length = comparator.size(); + if (length == 0) + { + // No matching rows if the column index contains null pages only + return {}; + } + int32_t left = 0; + int32_t right = length; + do + { + int32_t i = floorMid(left, right); + if (comparator.compareValueToMax(i) >= 0) + left = i + 1; + else + right = i; + } while (left < right); + return NoneNullPageIndexsBuilder{comparator.non_null_page_indices_}.range(right, length); + } + + template <typename DType> + static PageIndexs gtEq(const TypedComparator<DType> & comparator) + { + const int32_t length = comparator.size(); + if (length == 0) + { + // No matching rows if the column index contains null pages only + return {}; + } + int32_t left = 0; + int32_t right = length; + do + { + int32_t i = floorMid(left, right); + if (comparator.compareValueToMax(i) > 0) + left = i + 1; + else + right = i; + } while (left < right); + return NoneNullPageIndexsBuilder{comparator.non_null_page_indices_}.range(right, length); + } + + template <typename DType> + static PageIndexs lt(const TypedComparator<DType> & comparator) + { + const int32_t length = comparator.size(); + if (length == 0) + { + // No matching rows if the column index contains null pages only + return {}; + } + int32_t left = -1; + int32_t right = length - 1; + do + { + int32_t i = ceilingMid(left, right); + if (comparator.compareValueToMin(i) <= 0) + right = i - 1; + else + left = i; + } while (left < right); + + return NoneNullPageIndexsBuilder{comparator.non_null_page_indices_}.range(0, left + 1); + } + + template <typename DType> + static PageIndexs ltEq(const TypedComparator<DType> & comparator) + { + const int32_t length = comparator.size(); + if (length == 0) + { + // No matching rows if the column index contains null pages only + return {}; + } + int32_t left = -1; + int32_t right = length - 1; + do + { + int32_t i = ceilingMid(left, right); + if (comparator.compareValueToMin(i) < 0) + right = i - 1; + else + left = i; + } while (left < right); + return NoneNullPageIndexsBuilder{comparator.non_null_page_indices_}.range(0, left + 1); + } + + template <typename DType> + static PageIndexs notEq(const TypedComparator<DType> & comparator) + { + NoneNullPageIndexsBuilder builder{comparator.non_null_page_indices_}; + return findBounds(comparator) + .transform( + [&](const Bounds & b) + { + return builder.filter( + [&](const int32_t i) { + return i < b.lower || i > b.upper || comparator.compareValueToMin(i) != 0 + || comparator.compareValueToMax(i) != 0; + }); + }) + .value_or(builder.all()); + } +}; + +struct DESCENDING : BoundaryOrder +{ + template <typename DType> + static std::optional<Bounds> findBounds(const TypedComparator<DType> & comparator) + { + const int32_t length = comparator.size(); + int32_t lowerLeft = 0; + int32_t upperLeft = 0; + int32_t lowerRight = length - 1; + int32_t upperRight = length - 1; + + do + { + if (lowerLeft > lowerRight) + return std::nullopt; + int32_t i = std::floor((lowerLeft + lowerRight) / 2); + if (comparator.compareValueToMax(i) > 0) + lowerRight = upperRight = i - 1; + else if (comparator.compareValueToMin(i) < 0) + lowerLeft = upperLeft = i + 1; + else + lowerRight = upperLeft = i; + } while (lowerLeft != lowerRight); + + do + { + if (upperLeft > upperRight) + return std::nullopt; + int32_t i = std::ceil((upperLeft + upperRight) / 2); + if (comparator.compareValueToMax(i) > 0) + upperRight = i - 1; + else if (comparator.compareValueToMin(i) < 0) + upperLeft = i + 1; + else + upperLeft = i; + } while (upperLeft != upperRight); + + return Bounds(lowerLeft, upperRight); + } + + template <typename DType> + static PageIndexs eq(const TypedComparator<DType> & comparator) + { + const NoneNullPageIndexsBuilder builder{comparator.non_null_page_indices_}; + return findBounds(comparator) + .transform([&](const Bounds & b) { return builder.range(b.lower, b.upper + 1); }) + .value_or(PageIndexs{}); + } + template <typename DType> + static PageIndexs gt(const TypedComparator<DType> & comparator) + { + const int32_t length = comparator.size(); + if (length == 0) + { + // No matching rows if the column index contains null pages only + return {}; + } + int32_t left = -1; + int32_t right = length - 1; + do + { + int32_t i = ceilingMid(left, right); + if (comparator.compareValueToMax(i) >= 0) + right = i - 1; + else + left = i; + } while (left < right); + return NoneNullPageIndexsBuilder{comparator.non_null_page_indices_}.range(0, left + 1); + } + + template <typename DType> + static PageIndexs gtEq(const TypedComparator<DType> & comparator) + { + const int32_t length = comparator.size(); + if (length == 0) + { + // No matching rows if the column index contains null pages only + return {}; + } + int32_t left = -1; + int32_t right = length - 1; + do + { + int32_t i = ceilingMid(left, right); + if (comparator.compareValueToMax(i) > 0) + right = i - 1; + else + left = i; + } while (left < right); + return NoneNullPageIndexsBuilder{comparator.non_null_page_indices_}.range(0, left + 1); + } + + template <typename DType> + static PageIndexs lt(const TypedComparator<DType> & comparator) + { + const int32_t length = comparator.size(); + if (length == 0) + { + // No matching rows if the column index contains null pages only + return PageIndexs{}; + } + int32_t left = 0; + int32_t right = length; + do + { + int32_t i = floorMid(left, right); + if (comparator.compareValueToMin(i) <= 0) + left = i + 1; + else + right = i; + } while (left < right); + return NoneNullPageIndexsBuilder{comparator.non_null_page_indices_}.range(right, length); + } + + template <typename DType> + static PageIndexs ltEq(const TypedComparator<DType> & comparator) + { + const int32_t length = comparator.size(); + if (length == 0) + { + // No matching rows if the column index contains null pages only + return PageIndexs{}; + } + int32_t left = 0; + int32_t right = length; + do + { + int32_t i = floorMid(left, right); + if (comparator.compareValueToMin(i) < 0) + left = i + 1; + else + right = i; + } while (left < right); + return NoneNullPageIndexsBuilder{comparator.non_null_page_indices_}.range(right, length); + } + template <typename DType> + static PageIndexs notEq(const TypedComparator<DType> & comparator) + { + const NoneNullPageIndexsBuilder builder{comparator.non_null_page_indices_}; + return findBounds(comparator) + .transform( + [&](const Bounds & b) + { + return builder.filter( + [&](const int32_t i) { + return i < b.lower || i > b.upper || comparator.compareValueToMin(i) != 0 + || comparator.compareValueToMin(i) != 0; + }); + }) + .value_or(builder.all()); + } +}; + +/// TODO: bencnmark +template <typename DType, Derived<BoundaryOrder> ORDER> +PageIndexs TypedColumnIndexImpl<DType, ORDER>::notEq(const DB::Field & value) const +{ + if (value.isNull()) + { + return PageIndexsBuilder::filter( + column_index_->null_pages().size(), [&](const int32_t i) { return !column_index_->null_pages()[i]; }); + } + if (!column_index_->has_null_counts()) + { + // Nulls match so if we don't have null related statistics we have to return all pages + return {PageIndexsBuilder::ALL_PAGES}; + } + + // Merging value filtering with pages containing nulls + auto real_value{parquet_cast<DType>(value)}; + TypedComparator<DType> typed_comparator{real_value, *column_index_, *comparator_}; + auto pages = ORDER::notEq(typed_comparator); + const std::set<size_t> matchingIndexes(pages.begin(), pages.end()); + + return PageIndexsBuilder::filter( + column_index_->null_counts().size(), + [&](const int32_t i) { return column_index_->null_counts()[i] > 0 || matchingIndexes.contains(i); }); +} + +template <typename DType, Derived<BoundaryOrder> ORDER> +PageIndexs TypedColumnIndexImpl<DType, ORDER>::eq(const DB::Field & value) const +{ + if (value.isNull()) + { + if (column_index_->has_null_counts()) + { + return PageIndexsBuilder::filter( + column_index_->null_counts().size(), [&](const int32_t i) { return column_index_->null_counts()[i] > 0; }); + } + else + { + // Searching for nulls so if we don't have null related statistics we have to return all pages + return {PageIndexsBuilder::ALL_PAGES}; + } + } + auto real_value = parquet_cast<DType>(value); + TypedComparator<DType> typed_comparator{real_value, *column_index_, *comparator_}; + return ORDER::eq(typed_comparator); +} + +template <typename DType, Derived<BoundaryOrder> ORDER> +PageIndexs TypedColumnIndexImpl<DType, ORDER>::gt(const DB::Field & value) const +{ + auto real_value{parquet_cast<DType>(value)}; + TypedComparator<DType> typed_comparator{real_value, *column_index_, *comparator_}; + return ORDER::gt(typed_comparator); +} + +template <typename DType, Derived<BoundaryOrder> ORDER> +PageIndexs TypedColumnIndexImpl<DType, ORDER>::gtEg(const DB::Field & value) const +{ + auto real_value{parquet_cast<DType>(value)}; + TypedComparator<DType> typed_comparator{real_value, *column_index_, *comparator_}; + return ORDER::gtEq(typed_comparator); Review Comment: Is this wrong ? call `gtEq` inside `gtEg` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
