wgtmac commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1133370962


##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5146,5 +5158,96 @@ TEST(TestArrowReadWrite, FuzzReader) {
   }
 }
 
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(4)
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema(
+      {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", 
::arrow::utf8())});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, 
&parquet_schema));
+  auto schema_node = 
std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Prepare data and each row group contains 4 rows.
+  auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([
+      [1,     "a"],
+      [2,     "b"],
+      [3,     "c"],
+      [null,  "d"],
+      [5,     null],
+      [6,     "f"]

Review Comment:
   Added a list column to verify page index of repeated column.



##########
cpp/src/parquet/page_index.cc:
##########
@@ -426,6 +426,354 @@ class PageIndexReaderImpl : public PageIndexReader {
   std::unordered_map<int32_t, RowGroupIndexReadRange> index_read_ranges_;
 };
 
+/// \brief Internal state of page index builder.
+enum class BuilderState {
+  /// Created but not yet write any data.
+  kCreated,
+  /// Some data are written but not yet finished.
+  kStarted,
+  /// All data are written and no more write is allowed.
+  kFinished,
+  /// The builder has corrupted data or empty data and therefore discarded.
+  kDiscarded
+};
+
+template <typename DType>
+class ColumnIndexBuilderImpl final : public ColumnIndexBuilder {
+ public:
+  using T = typename DType::c_type;
+
+  explicit ColumnIndexBuilderImpl(const ColumnDescriptor* descr) : 
descr_(descr) {
+    /// Initialize the null_counts vector as set. Invalid null_counts vector 
from
+    /// any page will invalidate the null_counts vector of the column index.
+    column_index_.__isset.null_counts = true;
+    column_index_.boundary_order = format::BoundaryOrder::UNORDERED;
+  }
+
+  void AddPage(const EncodedStatistics& stats) override {
+    if (state_ == BuilderState::kFinished) {
+      throw ParquetException("Cannot add page to finished 
ColumnIndexBuilder.");
+    } else if (state_ == BuilderState::kDiscarded) {
+      /// The offset index is discarded. Do nothing.
+      return;
+    }
+
+    state_ = BuilderState::kStarted;
+
+    if (stats.all_null_value) {
+      column_index_.null_pages.emplace_back(true);
+      column_index_.min_values.emplace_back("");
+      column_index_.max_values.emplace_back("");
+    } else if (stats.has_min && stats.has_max) {
+      const size_t page_ordinal = column_index_.null_pages.size();
+      non_null_page_indices_.emplace_back(page_ordinal);
+      column_index_.min_values.emplace_back(stats.min());
+      column_index_.max_values.emplace_back(stats.max());
+      column_index_.null_pages.emplace_back(false);
+    } else {
+      /// This is a non-null page but it lacks of meaningful min/max values.
+      /// Discard the column index.
+      state_ = BuilderState::kDiscarded;
+      return;
+    }
+
+    if (column_index_.__isset.null_counts && stats.has_null_count) {
+      column_index_.null_counts.emplace_back(stats.null_count);
+    } else {
+      column_index_.__isset.null_counts = false;
+    }
+  }
+
+  void Finish() override {
+    switch (state_) {
+      case BuilderState::kCreated: {
+        /// No page is added. Discard the column index.
+        state_ = BuilderState::kDiscarded;
+        return;
+      }
+      case BuilderState::kFinished:
+        throw ParquetException("ColumnIndexBuilder is already finished.");
+      case BuilderState::kDiscarded:
+        // The column index is discarded. Do nothing.
+        return;
+      case BuilderState::kStarted:
+        break;
+    }
+
+    state_ = BuilderState::kFinished;
+
+    /// Clear null_counts vector because at least one page does not provide it.
+    if (!column_index_.__isset.null_counts) {
+      column_index_.null_counts.clear();
+    }
+
+    /// Decode min/max values according to the data type.
+    const size_t non_null_page_count = non_null_page_indices_.size();
+    std::vector<T> min_values, max_values;
+    min_values.resize(non_null_page_count);
+    max_values.resize(non_null_page_count);
+    auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
+    for (size_t i = 0; i < non_null_page_count; ++i) {
+      auto page_ordinal = non_null_page_indices_.at(i);
+      Decode<DType>(decoder, column_index_.min_values.at(page_ordinal), 
&min_values, i);
+      Decode<DType>(decoder, column_index_.max_values.at(page_ordinal), 
&max_values, i);
+    }
+
+    /// Decide the boundary order from decoded min/max values.
+    auto boundary_order = DetermineBoundaryOrder(min_values, max_values);
+    column_index_.__set_boundary_order(ToThrift(boundary_order));
+  }
+
+  void WriteTo(::arrow::io::OutputStream* sink) const override {
+    if (state_ == BuilderState::kFinished) {
+      ThriftSerializer{}.Serialize(&column_index_, sink);
+    }
+  }
+
+  std::unique_ptr<ColumnIndex> Build() const override {
+    if (state_ == BuilderState::kFinished) {
+      return std::make_unique<TypedColumnIndexImpl<DType>>(*descr_, 
column_index_);
+    }
+    return nullptr;
+  }
+
+ private:
+  BoundaryOrder::type DetermineBoundaryOrder(const std::vector<T>& min_values,
+                                             const std::vector<T>& max_values) 
const {
+    DCHECK_EQ(min_values.size(), max_values.size());
+    if (min_values.empty()) {
+      return BoundaryOrder::Unordered;
+    }
+
+    std::shared_ptr<TypedComparator<DType>> comparator;
+    try {
+      comparator = MakeComparator<DType>(descr_);
+    } catch (const ParquetException&) {
+      /// Simply return unordered for unsupported comparator.
+      return BoundaryOrder::Unordered;
+    }
+
+    /// Check if both min_values and max_values are in ascending order.
+    bool is_ascending = true;
+    for (size_t i = 1; i < min_values.size(); ++i) {
+      if (comparator->Compare(min_values[i], min_values[i - 1]) ||

Review Comment:
   The `Compare` function tests if the 1st value is strictly less than the 2nd 
value as below.
   ```cpp
     /// \brief Scalar comparison of two elements, return true if first
     /// is strictly less than the second
     virtual bool Compare(const T& a, const T& b) = 0;
   ```



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5146,5 +5158,96 @@ TEST(TestArrowReadWrite, FuzzReader) {
   }
 }
 
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(4)
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema(
+      {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", 
::arrow::utf8())});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, 
&parquet_schema));
+  auto schema_node = 
std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Prepare data and each row group contains 4 rows.
+  auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([
+      [1,     "a"],
+      [2,     "b"],
+      [3,     "c"],
+      [null,  "d"],
+      [5,     null],
+      [6,     "f"]
+    ])");
+
+  // Create writer to write data via RecordBatch.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), record_batch->schema(),
+                             arrow_writer_properties, &arrow_writer));
+  ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = 
ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  auto encode_int64 = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  const std::vector<std::string> c0_min_values = {encode_int64(1), 
encode_int64(5)};
+  const std::vector<std::string> c0_max_values = {encode_int64(3), 
encode_int64(6)};
+  const std::vector<std::string> c1_min_values = {"a", "f"};
+  const std::vector<std::string> c1_max_values = {"d", "f"};

Review Comment:
   Added `TEST(TestArrowReadWrite, 
WriteReadPageIndexRoundTripWithLargeStatsDropped)` to cover this.



##########
cpp/src/parquet/properties.h:
##########
@@ -501,6 +502,28 @@ class PARQUET_EXPORT WriterProperties {
       return this;
     }
 
+    /// Enable writing page index.
+    ///
+    /// Page index contains statistics for data pages and can be used to skip 
pages
+    /// when scanning data in ordered and unordered columns.
+    ///
+    /// Please check the link below for more details:
+    /// https://github.com/apache/parquet-format/blob/master/PageIndex.md
+    ///
+    /// Default disabled.
+    Builder* enable_write_page_index() {

Review Comment:
   Agree. I will work on it once this PR gets merged.



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5146,5 +5158,96 @@ TEST(TestArrowReadWrite, FuzzReader) {
   }
 }
 
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(4)
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema(
+      {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", 
::arrow::utf8())});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, 
&parquet_schema));
+  auto schema_node = 
std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Prepare data and each row group contains 4 rows.
+  auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([
+      [1,     "a"],
+      [2,     "b"],
+      [3,     "c"],
+      [null,  "d"],
+      [5,     null],
+      [6,     "f"]
+    ])");
+
+  // Create writer to write data via RecordBatch.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), record_batch->schema(),
+                             arrow_writer_properties, &arrow_writer));
+  ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = 
ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  auto encode_int64 = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  const std::vector<std::string> c0_min_values = {encode_int64(1), 
encode_int64(5)};
+  const std::vector<std::string> c0_max_values = {encode_int64(3), 
encode_int64(6)};
+  const std::vector<std::string> c1_min_values = {"a", "f"};
+  const std::vector<std::string> c1_max_values = {"d", "f"};
+  const std::vector<int64_t> c0_null_counts = {1, 0};
+  const std::vector<int64_t> c1_null_counts = {0, 1};
+
+  const size_t num_pages = 1;

Review Comment:
   Added `TEST(TestArrowReadWrite, 
WriteReadPageIndexRoundTripWithMultiplePages)` to cover this.



##########
cpp/src/parquet/page_index_test.cc:
##########
@@ -416,4 +416,420 @@ TEST(PageIndex, 
DeterminePageIndexRangesInRowGroupWithMissingPageIndex) {
                          -1);
 }
 
+TEST(PageIndex, WriteOffsetIndex) {
+  /// Create offset index via the OffsetIndexBuilder interface.
+  auto builder = OffsetIndexBuilder::Make();
+  const size_t num_pages = 5;
+  const std::vector<int64_t> offsets = {100, 200, 300, 400, 500};
+  const std::vector<int32_t> page_sizes = {1024, 2048, 3072, 4096, 8192};
+  const std::vector<int64_t> first_row_indices = {0, 10000, 20000, 30000, 
40000};
+  for (size_t i = 0; i < num_pages; ++i) {
+    builder->AddPage(offsets[i], page_sizes[i], first_row_indices[i]);
+  }
+  const int64_t final_position = 4096;
+  builder->Finish(final_position);
+
+  std::vector<std::unique_ptr<OffsetIndex>> offset_indexes;
+  /// 1st element is the offset index just built.
+  offset_indexes.emplace_back(builder->Build());
+  /// 2nd element is the offset index restored by serialize-then-deserialize 
round trip.
+  auto sink = CreateOutputStream();
+  builder->WriteTo(sink.get());
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  offset_indexes.emplace_back(OffsetIndex::Make(buffer->data(),
+                                                
static_cast<uint32_t>(buffer->size()),
+                                                default_reader_properties()));
+
+  /// Verify the data of the offset index.
+  for (const auto& offset_index : offset_indexes) {
+    ASSERT_EQ(num_pages, offset_index->page_locations().size());
+    for (size_t i = 0; i < num_pages; ++i) {
+      const auto& page_location = offset_index->page_locations().at(i);
+      ASSERT_EQ(offsets[i] + final_position, page_location.offset);
+      ASSERT_EQ(page_sizes[i], page_location.compressed_page_size);
+      ASSERT_EQ(first_row_indices[i], page_location.first_row_index);
+    }
+  }
+}
+
+void TestWriteTypedColumnIndex(schema::NodePtr node,
+                               const std::vector<EncodedStatistics>& 
page_stats,
+                               BoundaryOrder::type boundary_order, bool 
has_null_counts) {
+  auto descr = std::make_unique<ColumnDescriptor>(node, 
/*max_definition_level=*/1, 0);
+
+  auto builder = ColumnIndexBuilder::Make(descr.get());
+  for (const auto& stats : page_stats) {
+    builder->AddPage(stats);
+  }
+  ASSERT_NO_THROW(builder->Finish());
+
+  std::vector<std::unique_ptr<ColumnIndex>> column_indexes;
+  /// 1st element is the column index just built.
+  column_indexes.emplace_back(builder->Build());
+  /// 2nd element is the column index restored by serialize-then-deserialize 
round trip.
+  auto sink = CreateOutputStream();
+  builder->WriteTo(sink.get());
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  column_indexes.emplace_back(ColumnIndex::Make(*descr, buffer->data(),
+                                                
static_cast<uint32_t>(buffer->size()),
+                                                default_reader_properties()));
+
+  /// Verify the data of the column index.
+  for (const auto& column_index : column_indexes) {
+    ASSERT_EQ(boundary_order, column_index->boundary_order());
+    ASSERT_EQ(has_null_counts, column_index->has_null_counts());
+    const size_t num_pages = column_index->null_pages().size();
+    for (size_t i = 0; i < num_pages; ++i) {
+      ASSERT_EQ(page_stats[i].all_null_value, column_index->null_pages()[i]);
+      ASSERT_EQ(page_stats[i].min(), column_index->encoded_min_values()[i]);
+      ASSERT_EQ(page_stats[i].max(), column_index->encoded_max_values()[i]);
+      if (has_null_counts) {
+        ASSERT_EQ(page_stats[i].null_count, column_index->null_counts()[i]);
+      }
+    }
+  }
+}
+
+TEST(PageIndex, WriteInt32ColumnIndex) {
+  auto encode = [=](int32_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int32_t));
+  };
+
+  // Integer values in the ascending order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(1).set_min(encode(1)).set_max(encode(2));
+  page_stats.at(1).set_null_count(2).set_min(encode(2)).set_max(encode(3));
+  page_stats.at(2).set_null_count(3).set_min(encode(3)).set_max(encode(4));
+
+  TestWriteTypedColumnIndex(schema::Int32("c1"), page_stats, 
BoundaryOrder::Ascending,
+                            /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteInt64ColumnIndex) {
+  auto encode = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  // Integer values in the descending order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(4).set_min(encode(-1)).set_max(encode(-2));
+  page_stats.at(1).set_null_count(0).set_min(encode(-2)).set_max(encode(-3));
+  page_stats.at(2).set_null_count(4).set_min(encode(-3)).set_max(encode(-4));
+
+  TestWriteTypedColumnIndex(schema::Int64("c1"), page_stats, 
BoundaryOrder::Descending,
+                            /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteFloatColumnIndex) {
+  auto encode = [=](float value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(float));
+  };
+
+  // Float values with no specific order.
+  std::vector<EncodedStatistics> page_stats(3);
+  
page_stats.at(0).set_null_count(0).set_min(encode(2.2F)).set_max(encode(4.4F));
+  
page_stats.at(1).set_null_count(0).set_min(encode(1.1F)).set_max(encode(5.5F));
+  
page_stats.at(2).set_null_count(0).set_min(encode(3.3F)).set_max(encode(6.6F));
+
+  TestWriteTypedColumnIndex(schema::Float("c1"), page_stats, 
BoundaryOrder::Unordered,

Review Comment:
   There is a comment at line 528 already saying that the data has no specific 
order.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to