emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1157982396


##########
cpp/src/parquet/page_index.cc:
##########
@@ -426,6 +426,354 @@ class PageIndexReaderImpl : public PageIndexReader {
   std::unordered_map<int32_t, RowGroupIndexReadRange> index_read_ranges_;
 };
 
+/// \brief Internal state of page index builder.
+enum class BuilderState {
+  /// Created but not yet write any data.
+  kCreated,
+  /// Some data are written but not yet finished.
+  kStarted,
+  /// All data are written and no more write is allowed.
+  kFinished,
+  /// The builder has corrupted data or empty data and therefore discarded.
+  kDiscarded
+};
+
+template <typename DType>
+class ColumnIndexBuilderImpl final : public ColumnIndexBuilder {
+ public:
+  using T = typename DType::c_type;
+
+  explicit ColumnIndexBuilderImpl(const ColumnDescriptor* descr) : 
descr_(descr) {
+    /// Initialize the null_counts vector as set. Invalid null_counts vector 
from
+    /// any page will invalidate the null_counts vector of the column index.
+    column_index_.__isset.null_counts = true;
+    column_index_.boundary_order = format::BoundaryOrder::UNORDERED;
+  }
+
+  void AddPage(const EncodedStatistics& stats) override {
+    if (state_ == BuilderState::kFinished) {
+      throw ParquetException("Cannot add page to finished 
ColumnIndexBuilder.");
+    } else if (state_ == BuilderState::kDiscarded) {
+      /// The offset index is discarded. Do nothing.
+      return;
+    }
+
+    state_ = BuilderState::kStarted;
+
+    if (stats.all_null_value) {
+      column_index_.null_pages.emplace_back(true);
+      column_index_.min_values.emplace_back("");
+      column_index_.max_values.emplace_back("");
+    } else if (stats.has_min && stats.has_max) {
+      const size_t page_ordinal = column_index_.null_pages.size();
+      non_null_page_indices_.emplace_back(page_ordinal);
+      column_index_.min_values.emplace_back(stats.min());
+      column_index_.max_values.emplace_back(stats.max());
+      column_index_.null_pages.emplace_back(false);
+    } else {
+      /// This is a non-null page but it lacks of meaningful min/max values.
+      /// Discard the column index.
+      state_ = BuilderState::kDiscarded;
+      return;
+    }
+
+    if (column_index_.__isset.null_counts && stats.has_null_count) {
+      column_index_.null_counts.emplace_back(stats.null_count);
+    } else {
+      column_index_.__isset.null_counts = false;
+    }
+  }
+
+  void Finish() override {
+    switch (state_) {
+      case BuilderState::kCreated: {
+        /// No page is added. Discard the column index.
+        state_ = BuilderState::kDiscarded;
+        return;
+      }
+      case BuilderState::kFinished:
+        throw ParquetException("ColumnIndexBuilder is already finished.");
+      case BuilderState::kDiscarded:
+        // The column index is discarded. Do nothing.
+        return;
+      case BuilderState::kStarted:
+        break;
+    }
+
+    state_ = BuilderState::kFinished;
+
+    /// Clear null_counts vector because at least one page does not provide it.
+    if (!column_index_.__isset.null_counts) {
+      column_index_.null_counts.clear();
+    }
+
+    /// Decode min/max values according to the data type.
+    const size_t non_null_page_count = non_null_page_indices_.size();
+    std::vector<T> min_values, max_values;
+    min_values.resize(non_null_page_count);
+    max_values.resize(non_null_page_count);
+    auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
+    for (size_t i = 0; i < non_null_page_count; ++i) {
+      auto page_ordinal = non_null_page_indices_.at(i);
+      Decode<DType>(decoder, column_index_.min_values.at(page_ordinal), 
&min_values, i);
+      Decode<DType>(decoder, column_index_.max_values.at(page_ordinal), 
&max_values, i);
+    }
+
+    /// Decide the boundary order from decoded min/max values.
+    auto boundary_order = DetermineBoundaryOrder(min_values, max_values);
+    column_index_.__set_boundary_order(ToThrift(boundary_order));
+  }
+
+  void WriteTo(::arrow::io::OutputStream* sink) const override {
+    if (state_ == BuilderState::kFinished) {
+      ThriftSerializer{}.Serialize(&column_index_, sink);
+    }
+  }
+
+  std::unique_ptr<ColumnIndex> Build() const override {
+    if (state_ == BuilderState::kFinished) {
+      return std::make_unique<TypedColumnIndexImpl<DType>>(*descr_, 
column_index_);
+    }
+    return nullptr;
+  }
+
+ private:
+  BoundaryOrder::type DetermineBoundaryOrder(const std::vector<T>& min_values,
+                                             const std::vector<T>& max_values) 
const {
+    DCHECK_EQ(min_values.size(), max_values.size());
+    if (min_values.empty()) {
+      return BoundaryOrder::Unordered;
+    }
+
+    std::shared_ptr<TypedComparator<DType>> comparator;
+    try {
+      comparator = MakeComparator<DType>(descr_);

Review Comment:
   doesn't logical type need to be accounted for here for things like Decimal 
type?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to