emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1132725203
##########
cpp/src/parquet/page_index.cc:
##########
@@ -426,6 +426,354 @@ class PageIndexReaderImpl : public PageIndexReader {
std::unordered_map<int32_t, RowGroupIndexReadRange> index_read_ranges_;
};
+/// \brief Internal state of page index builder.
+enum class BuilderState {
+ /// Created but not yet write any data.
+ kCreated,
+ /// Some data are written but not yet finished.
+ kStarted,
+ /// All data are written and no more write is allowed.
+ kFinished,
+ /// The builder has corrupted data or empty data and therefore discarded.
+ kDiscarded
+};
+
+template <typename DType>
+class ColumnIndexBuilderImpl final : public ColumnIndexBuilder {
+ public:
+ using T = typename DType::c_type;
+
+ explicit ColumnIndexBuilderImpl(const ColumnDescriptor* descr) :
descr_(descr) {
+ /// Initialize the null_counts vector as set. Invalid null_counts vector
from
+ /// any page will invalidate the null_counts vector of the column index.
+ column_index_.__isset.null_counts = true;
+ column_index_.boundary_order = format::BoundaryOrder::UNORDERED;
+ }
+
+ void AddPage(const EncodedStatistics& stats) override {
+ if (state_ == BuilderState::kFinished) {
+ throw ParquetException("Cannot add page to finished
ColumnIndexBuilder.");
+ } else if (state_ == BuilderState::kDiscarded) {
+ /// The offset index is discarded. Do nothing.
+ return;
+ }
+
+ state_ = BuilderState::kStarted;
+
+ if (stats.all_null_value) {
+ column_index_.null_pages.emplace_back(true);
+ column_index_.min_values.emplace_back("");
+ column_index_.max_values.emplace_back("");
+ } else if (stats.has_min && stats.has_max) {
+ const size_t page_ordinal = column_index_.null_pages.size();
+ non_null_page_indices_.emplace_back(page_ordinal);
+ column_index_.min_values.emplace_back(stats.min());
+ column_index_.max_values.emplace_back(stats.max());
+ column_index_.null_pages.emplace_back(false);
+ } else {
+ /// This is a non-null page but it lacks of meaningful min/max values.
+ /// Discard the column index.
+ state_ = BuilderState::kDiscarded;
+ return;
+ }
+
+ if (column_index_.__isset.null_counts && stats.has_null_count) {
+ column_index_.null_counts.emplace_back(stats.null_count);
+ } else {
+ column_index_.__isset.null_counts = false;
+ }
+ }
+
+ void Finish() override {
+ switch (state_) {
+ case BuilderState::kCreated: {
+ /// No page is added. Discard the column index.
+ state_ = BuilderState::kDiscarded;
+ return;
+ }
+ case BuilderState::kFinished:
+ throw ParquetException("ColumnIndexBuilder is already finished.");
+ case BuilderState::kDiscarded:
+ // The column index is discarded. Do nothing.
+ return;
+ case BuilderState::kStarted:
+ break;
+ }
+
+ state_ = BuilderState::kFinished;
+
+ /// Clear null_counts vector because at least one page does not provide it.
+ if (!column_index_.__isset.null_counts) {
+ column_index_.null_counts.clear();
+ }
+
+ /// Decode min/max values according to the data type.
+ const size_t non_null_page_count = non_null_page_indices_.size();
+ std::vector<T> min_values, max_values;
+ min_values.resize(non_null_page_count);
+ max_values.resize(non_null_page_count);
+ auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
+ for (size_t i = 0; i < non_null_page_count; ++i) {
+ auto page_ordinal = non_null_page_indices_.at(i);
+ Decode<DType>(decoder, column_index_.min_values.at(page_ordinal),
&min_values, i);
+ Decode<DType>(decoder, column_index_.max_values.at(page_ordinal),
&max_values, i);
+ }
+
+ /// Decide the boundary order from decoded min/max values.
+ auto boundary_order = DetermineBoundaryOrder(min_values, max_values);
+ column_index_.__set_boundary_order(ToThrift(boundary_order));
+ }
+
+ void WriteTo(::arrow::io::OutputStream* sink) const override {
+ if (state_ == BuilderState::kFinished) {
+ ThriftSerializer{}.Serialize(&column_index_, sink);
+ }
+ }
+
+ std::unique_ptr<ColumnIndex> Build() const override {
+ if (state_ == BuilderState::kFinished) {
+ return std::make_unique<TypedColumnIndexImpl<DType>>(*descr_,
column_index_);
+ }
+ return nullptr;
+ }
+
+ private:
+ BoundaryOrder::type DetermineBoundaryOrder(const std::vector<T>& min_values,
+ const std::vector<T>& max_values)
const {
+ DCHECK_EQ(min_values.size(), max_values.size());
+ if (min_values.empty()) {
+ return BoundaryOrder::Unordered;
+ }
+
+ std::shared_ptr<TypedComparator<DType>> comparator;
+ try {
+ comparator = MakeComparator<DType>(descr_);
+ } catch (const ParquetException&) {
+ /// Simply return unordered for unsupported comparator.
+ return BoundaryOrder::Unordered;
+ }
+
+ /// Check if both min_values and max_values are in ascending order.
+ bool is_ascending = true;
+ for (size_t i = 1; i < min_values.size(); ++i) {
+ if (comparator->Compare(min_values[i], min_values[i - 1]) ||
Review Comment:
this doesn't look write should this have an inequality?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]