pitrou commented on code in PR #47090:
URL: https://github.com/apache/arrow/pull/47090#discussion_r2297557096


##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1150,61 +1150,62 @@ void ColumnWriterImpl::FlushBufferedDataPages() {
 // ----------------------------------------------------------------------
 // TypedColumnWriter
 
-template <typename Action>
-inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
-  int64_t num_batches = static_cast<int>(total / batch_size);
-  for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size, /*check_page_size=*/true);
-  }
-  // Write the remaining values
-  if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size, 
/*check_page_size=*/true);
-  }
-}
-
-template <typename Action>
+template <typename Action, typename GetBufferedRows>
 inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
                         int64_t num_levels, int64_t batch_size, Action&& 
action,

Review Comment:
   The signature is slightly weird, can we perhaps group related arguments 
together? For example:
   ```c++
   template <typename Action, typename GetBufferedRows>
   inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
                           int64_t num_levels, int64_t batch_size, int64_t 
max_rows_per_page,
                           bool pages_change_on_record_boundaries, ,
                           Action&& action, GetBufferedRows&& 
curr_page_buffered_rows) {
   ```



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1150,61 +1150,62 @@ void ColumnWriterImpl::FlushBufferedDataPages() {
 // ----------------------------------------------------------------------
 // TypedColumnWriter
 
-template <typename Action>
-inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
-  int64_t num_batches = static_cast<int>(total / batch_size);
-  for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size, /*check_page_size=*/true);
-  }
-  // Write the remaining values
-  if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size, 
/*check_page_size=*/true);
-  }
-}
-
-template <typename Action>
+template <typename Action, typename GetBufferedRows>
 inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
                         int64_t num_levels, int64_t batch_size, Action&& 
action,
-                        bool pages_change_on_record_boundaries) {
-  if (!pages_change_on_record_boundaries || !rep_levels) {
-    // If rep_levels is null, then we are writing a non-repeated column.
-    // In this case, every record contains only one level.
-    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
-  }
-
+                        bool pages_change_on_record_boundaries, int64_t 
max_rows_per_page,
+                        GetBufferedRows&& curr_page_buffered_rows) {
   int64_t offset = 0;
   while (offset < num_levels) {
-    int64_t end_offset = std::min(offset + batch_size, num_levels);
+    int64_t min_batch_size = std::min(batch_size, num_levels - offset);
+    int64_t end_offset = num_levels;
+    int64_t check_page_limit_end_offset = -1;

Review Comment:
   Can you add a comment explaining what this is?



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1150,61 +1150,62 @@ void ColumnWriterImpl::FlushBufferedDataPages() {
 // ----------------------------------------------------------------------
 // TypedColumnWriter
 
-template <typename Action>
-inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
-  int64_t num_batches = static_cast<int>(total / batch_size);
-  for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size, /*check_page_size=*/true);
-  }
-  // Write the remaining values
-  if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size, 
/*check_page_size=*/true);
-  }
-}
-
-template <typename Action>
+template <typename Action, typename GetBufferedRows>
 inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
                         int64_t num_levels, int64_t batch_size, Action&& 
action,
-                        bool pages_change_on_record_boundaries) {
-  if (!pages_change_on_record_boundaries || !rep_levels) {
-    // If rep_levels is null, then we are writing a non-repeated column.
-    // In this case, every record contains only one level.
-    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
-  }
-
+                        bool pages_change_on_record_boundaries, int64_t 
max_rows_per_page,
+                        GetBufferedRows&& curr_page_buffered_rows) {
   int64_t offset = 0;
   while (offset < num_levels) {
-    int64_t end_offset = std::min(offset + batch_size, num_levels);
+    int64_t min_batch_size = std::min(batch_size, num_levels - offset);
+    int64_t end_offset = num_levels;
+    int64_t check_page_limit_end_offset = -1;
+
+    int64_t page_buffered_rows = curr_page_buffered_rows();
+    ARROW_DCHECK_LE(page_buffered_rows, max_rows_per_page);
 
-    // Find next record boundary (i.e. rep_level = 0)
-    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
-      end_offset++;
+    if (!rep_levels) {
+      min_batch_size = std::min(min_batch_size, max_rows_per_page - 
page_buffered_rows);
+      end_offset = offset + min_batch_size;
+      check_page_limit_end_offset = end_offset;
+    } else {
+      int64_t last_record_begin_offset = -1;
+      // Iterate rep_levels to find the shortest sequence that ends before a 
record
+      // boundary (i.e. rep_levels == 0) with a size no less than 
min_batch_size
+      for (int64_t i = offset; i < num_levels; ++i) {
+        if (rep_levels[i] == 0) {
+          last_record_begin_offset = i;
+          if (i - offset >= min_batch_size || page_buffered_rows >= 
max_rows_per_page) {
+            end_offset = i;
+            break;
+          }
+          page_buffered_rows += 1;
+        }
+      }
+      // Use the beginning of last record to check page limit.
+      check_page_limit_end_offset = last_record_begin_offset;
     }
 
+    ARROW_DCHECK_LE(offset, end_offset);
+    ARROW_DCHECK_LE(check_page_limit_end_offset, end_offset);
+
     if (end_offset < num_levels) {
       // This is not the last chunk of batch and end_offset is a record 
boundary.
-      // It is a good chance to check the page size.
-      action(offset, end_offset - offset, /*check_page_size=*/true);
+      // It is a good chance to check the page limit.
+      action(offset, end_offset - offset, /*check_page_limit=*/true);
     } else {
-      DCHECK_EQ(end_offset, num_levels);
-      // This is the last chunk of batch, and we do not know whether 
end_offset is a
-      // record boundary. Find the offset to beginning of last record in this 
chunk,
-      // so we can check page size.
-      int64_t last_record_begin_offset = num_levels - 1;
-      while (last_record_begin_offset >= offset &&
-             rep_levels[last_record_begin_offset] != 0) {
-        last_record_begin_offset--;
+      ARROW_DCHECK_EQ(end_offset, num_levels);
+      if (offset <= check_page_limit_end_offset) {

Review Comment:
   Should this inequality be strict?



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1150,61 +1150,62 @@ void ColumnWriterImpl::FlushBufferedDataPages() {
 // ----------------------------------------------------------------------
 // TypedColumnWriter
 
-template <typename Action>
-inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
-  int64_t num_batches = static_cast<int>(total / batch_size);
-  for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size, /*check_page_size=*/true);
-  }
-  // Write the remaining values
-  if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size, 
/*check_page_size=*/true);
-  }
-}
-
-template <typename Action>
+template <typename Action, typename GetBufferedRows>
 inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
                         int64_t num_levels, int64_t batch_size, Action&& 
action,
-                        bool pages_change_on_record_boundaries) {
-  if (!pages_change_on_record_boundaries || !rep_levels) {
-    // If rep_levels is null, then we are writing a non-repeated column.
-    // In this case, every record contains only one level.
-    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
-  }
-
+                        bool pages_change_on_record_boundaries, int64_t 
max_rows_per_page,
+                        GetBufferedRows&& curr_page_buffered_rows) {
   int64_t offset = 0;
   while (offset < num_levels) {
-    int64_t end_offset = std::min(offset + batch_size, num_levels);
+    int64_t min_batch_size = std::min(batch_size, num_levels - offset);
+    int64_t end_offset = num_levels;
+    int64_t check_page_limit_end_offset = -1;
+
+    int64_t page_buffered_rows = curr_page_buffered_rows();
+    ARROW_DCHECK_LE(page_buffered_rows, max_rows_per_page);
 
-    // Find next record boundary (i.e. rep_level = 0)
-    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
-      end_offset++;
+    if (!rep_levels) {
+      min_batch_size = std::min(min_batch_size, max_rows_per_page - 
page_buffered_rows);
+      end_offset = offset + min_batch_size;
+      check_page_limit_end_offset = end_offset;
+    } else {
+      int64_t last_record_begin_offset = -1;
+      // Iterate rep_levels to find the shortest sequence that ends before a 
record
+      // boundary (i.e. rep_levels == 0) with a size no less than 
min_batch_size
+      for (int64_t i = offset; i < num_levels; ++i) {
+        if (rep_levels[i] == 0) {
+          last_record_begin_offset = i;
+          if (i - offset >= min_batch_size || page_buffered_rows >= 
max_rows_per_page) {
+            end_offset = i;
+            break;
+          }
+          page_buffered_rows += 1;
+        }
+      }
+      // Use the beginning of last record to check page limit.
+      check_page_limit_end_offset = last_record_begin_offset;
     }
 
+    ARROW_DCHECK_LE(offset, end_offset);
+    ARROW_DCHECK_LE(check_page_limit_end_offset, end_offset);
+
     if (end_offset < num_levels) {
       // This is not the last chunk of batch and end_offset is a record 
boundary.
-      // It is a good chance to check the page size.
-      action(offset, end_offset - offset, /*check_page_size=*/true);
+      // It is a good chance to check the page limit.
+      action(offset, end_offset - offset, /*check_page_limit=*/true);
     } else {
-      DCHECK_EQ(end_offset, num_levels);
-      // This is the last chunk of batch, and we do not know whether 
end_offset is a
-      // record boundary. Find the offset to beginning of last record in this 
chunk,
-      // so we can check page size.
-      int64_t last_record_begin_offset = num_levels - 1;
-      while (last_record_begin_offset >= offset &&
-             rep_levels[last_record_begin_offset] != 0) {
-        last_record_begin_offset--;
+      ARROW_DCHECK_EQ(end_offset, num_levels);
+      if (offset <= check_page_limit_end_offset) {
+        action(offset, check_page_limit_end_offset - offset, 
/*check_page_limit=*/true);
+        offset = check_page_limit_end_offset;
       }
-
-      if (offset <= last_record_begin_offset) {
-        // We have found the beginning of last record and can check page size.
-        action(offset, last_record_begin_offset - offset, 
/*check_page_size=*/true);
-        offset = last_record_begin_offset;
+      if (offset < end_offset) {
+        // This is the last chunk of batch, and we do not know whether 
end_offset is a
+        // record boundary so we cannot check page limit if pages cannot 
change on
+        // record boundaries.
+        action(offset, end_offset - offset,
+               /*check_page_limit=*/!pages_change_on_record_boundaries);

Review Comment:
   If there are no rep levels, then can we still check page limit?
   



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1150,61 +1150,62 @@ void ColumnWriterImpl::FlushBufferedDataPages() {
 // ----------------------------------------------------------------------
 // TypedColumnWriter
 
-template <typename Action>
-inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
-  int64_t num_batches = static_cast<int>(total / batch_size);
-  for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size, /*check_page_size=*/true);
-  }
-  // Write the remaining values
-  if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size, 
/*check_page_size=*/true);
-  }
-}
-
-template <typename Action>
+template <typename Action, typename GetBufferedRows>
 inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
                         int64_t num_levels, int64_t batch_size, Action&& 
action,
-                        bool pages_change_on_record_boundaries) {
-  if (!pages_change_on_record_boundaries || !rep_levels) {
-    // If rep_levels is null, then we are writing a non-repeated column.
-    // In this case, every record contains only one level.
-    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
-  }
-
+                        bool pages_change_on_record_boundaries, int64_t 
max_rows_per_page,
+                        GetBufferedRows&& curr_page_buffered_rows) {
   int64_t offset = 0;
   while (offset < num_levels) {
-    int64_t end_offset = std::min(offset + batch_size, num_levels);
+    int64_t min_batch_size = std::min(batch_size, num_levels - offset);
+    int64_t end_offset = num_levels;
+    int64_t check_page_limit_end_offset = -1;
+
+    int64_t page_buffered_rows = curr_page_buffered_rows();
+    ARROW_DCHECK_LE(page_buffered_rows, max_rows_per_page);
 
-    // Find next record boundary (i.e. rep_level = 0)
-    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
-      end_offset++;
+    if (!rep_levels) {
+      min_batch_size = std::min(min_batch_size, max_rows_per_page - 
page_buffered_rows);
+      end_offset = offset + min_batch_size;
+      check_page_limit_end_offset = end_offset;
+    } else {
+      int64_t last_record_begin_offset = -1;

Review Comment:
   Why are we introducing a new variable instead of simply writing to 
`check_page_limit_end_offset`?



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1150,61 +1150,62 @@ void ColumnWriterImpl::FlushBufferedDataPages() {
 // ----------------------------------------------------------------------
 // TypedColumnWriter
 
-template <typename Action>
-inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
-  int64_t num_batches = static_cast<int>(total / batch_size);
-  for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size, /*check_page_size=*/true);
-  }
-  // Write the remaining values
-  if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size, 
/*check_page_size=*/true);
-  }
-}
-
-template <typename Action>
+template <typename Action, typename GetBufferedRows>
 inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
                         int64_t num_levels, int64_t batch_size, Action&& 
action,
-                        bool pages_change_on_record_boundaries) {
-  if (!pages_change_on_record_boundaries || !rep_levels) {
-    // If rep_levels is null, then we are writing a non-repeated column.
-    // In this case, every record contains only one level.
-    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
-  }
-
+                        bool pages_change_on_record_boundaries, int64_t 
max_rows_per_page,
+                        GetBufferedRows&& curr_page_buffered_rows) {
   int64_t offset = 0;
   while (offset < num_levels) {
-    int64_t end_offset = std::min(offset + batch_size, num_levels);
+    int64_t min_batch_size = std::min(batch_size, num_levels - offset);
+    int64_t end_offset = num_levels;
+    int64_t check_page_limit_end_offset = -1;
+
+    int64_t page_buffered_rows = curr_page_buffered_rows();
+    ARROW_DCHECK_LE(page_buffered_rows, max_rows_per_page);
 
-    // Find next record boundary (i.e. rep_level = 0)
-    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
-      end_offset++;
+    if (!rep_levels) {
+      min_batch_size = std::min(min_batch_size, max_rows_per_page - 
page_buffered_rows);

Review Comment:
   Can you add a comment similar to the "If rep_levels is null..." comment that 
was deleted above?



##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -2075,5 +2116,164 @@ TEST_F(TestGeometryValuesWriter, 
TestWriteAndReadAllNull) {
   EXPECT_EQ(geospatial_statistics->geometry_types(), std::nullopt);
 }
 
+template <typename TestType>
+class TestColumnWriterMaxRowsPerPage : public TestPrimitiveWriter<TestType> {
+ public:
+  TypedColumnWriter<TestType>* BuildWriter(
+      int64_t max_rows_per_page = kDefaultMaxRowsPerPage,
+      int64_t page_size = kDefaultDataPageSize) {
+    this->sink_ = CreateOutputStream();
+    this->writer_properties_ = WriterProperties::Builder()
+                                   .max_rows_per_page(max_rows_per_page)
+                                   ->data_pagesize(page_size)
+                                   ->enable_write_page_index()
+                                   ->build();
+    file_writer_ = ParquetFileWriter::Open(
+        this->sink_, 
std::static_pointer_cast<GroupNode>(this->schema_.schema_root()),
+        this->writer_properties_);
+    return static_cast<TypedColumnWriter<TestType>*>(
+        file_writer_->AppendRowGroup()->NextColumn());
+  }
+
+  void CloseWriter() const { file_writer_->Close(); }
+
+  void BuildReader() {
+    ASSERT_OK_AND_ASSIGN(auto buffer, this->sink_->Finish());
+    file_reader_ = ParquetFileReader::Open(
+        std::make_shared<::arrow::io::BufferReader>(buffer), 
default_reader_properties());
+    this->reader_ = std::static_pointer_cast<TypedColumnReader<TestType>>(
+        file_reader_->RowGroup(0)->Column(0));
+  }
+
+  void VerifyMaxRowsPerPage(int64_t max_rows_per_page) const {
+    auto file_meta = file_reader_->metadata();
+    int64_t num_row_groups = file_meta->num_row_groups();
+    ASSERT_EQ(num_row_groups, 1);
+
+    auto page_index_reader = file_reader_->GetPageIndexReader();
+    ASSERT_NE(page_index_reader, nullptr);
+
+    auto row_group_page_index_reader = page_index_reader->RowGroup(0);
+    ASSERT_NE(row_group_page_index_reader, nullptr);
+
+    auto offset_index = row_group_page_index_reader->GetOffsetIndex(0);
+    ASSERT_NE(offset_index, nullptr);
+    size_t num_pages = offset_index->page_locations().size();
+    for (size_t j = 1; j < num_pages; ++j) {
+      int64_t page_rows = offset_index->page_locations()[j].first_row_index -
+                          offset_index->page_locations()[j - 
1].first_row_index;
+      EXPECT_LE(page_rows, max_rows_per_page);
+    }
+    if (num_pages != 0) {
+      int64_t last_page_rows = file_meta->RowGroup(0)->num_rows() -
+                               
offset_index->page_locations().back().first_row_index;
+      EXPECT_LE(last_page_rows, max_rows_per_page);
+    }
+  }
+
+ private:
+  std::shared_ptr<ParquetFileWriter> file_writer_;
+  std::shared_ptr<ParquetFileReader> file_reader_;
+};
+
+TYPED_TEST_SUITE(TestColumnWriterMaxRowsPerPage, TestTypes);
+
+TYPED_TEST(TestColumnWriterMaxRowsPerPage, Optional) {
+  for (int64_t max_rows_per_page : {1, 10, 100}) {
+    this->SetUpSchema(Repetition::OPTIONAL);
+    this->GenerateData(SMALL_SIZE);
+    std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
+    definition_levels[1] = 0;
+
+    auto writer = this->BuildWriter(max_rows_per_page);
+    writer->WriteBatch(this->values_.size(), definition_levels.data(), nullptr,
+                       this->values_ptr_);
+    this->CloseWriter();
+
+    this->BuildReader();
+    ASSERT_NO_FATAL_FAILURE(this->VerifyMaxRowsPerPage(max_rows_per_page));
+  }
+}
+
+TYPED_TEST(TestColumnWriterMaxRowsPerPage, OptionalSpaced) {
+  for (int64_t max_rows_per_page : {1, 10, 100}) {
+    this->SetUpSchema(Repetition::OPTIONAL);
+
+    this->GenerateData(SMALL_SIZE);
+    std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
+    std::vector<uint8_t> 
valid_bits(::arrow::bit_util::BytesForBits(SMALL_SIZE), 255);
+
+    definition_levels[SMALL_SIZE - 1] = 0;
+    ::arrow::bit_util::ClearBit(valid_bits.data(), SMALL_SIZE - 1);
+    definition_levels[1] = 0;
+    ::arrow::bit_util::ClearBit(valid_bits.data(), 1);
+
+    auto writer = this->BuildWriter(max_rows_per_page);
+    writer->WriteBatchSpaced(this->values_.size(), definition_levels.data(), 
nullptr,
+                             valid_bits.data(), 0, this->values_ptr_);
+    this->CloseWriter();
+
+    this->BuildReader();
+    ASSERT_NO_FATAL_FAILURE(this->VerifyMaxRowsPerPage(max_rows_per_page));
+  }
+}
+
+TYPED_TEST(TestColumnWriterMaxRowsPerPage, Repeated) {
+  for (int64_t max_rows_per_page : {1, 10, 100}) {
+    this->SetUpSchema(Repetition::REPEATED);
+
+    this->GenerateData(SMALL_SIZE);
+    std::vector<int16_t> definition_levels(SMALL_SIZE);
+    std::vector<int16_t> repetition_levels(SMALL_SIZE);
+
+    // Generate levels to include variable-sized lists, null lists, and empty 
lists
+    for (int i = 0; i < SMALL_SIZE; i++) {
+      int list_length = (i % 5) + 1;
+      bool is_null = false;
+      if (i % 17 == 0) {
+        is_null = true;
+        list_length = 0;
+      } else if (i % 13 == 0) {
+        list_length = 0;
+      }
+
+      if (is_null) {
+        definition_levels[i] = 0;
+        repetition_levels[i] = 0;
+      } else if (list_length == 0) {
+        definition_levels[i] = 1;
+        repetition_levels[i] = 0;
+      } else {
+        for (int j = 0; j < list_length && i + j < SMALL_SIZE; j++) {
+          definition_levels[i + j] = 1;

Review Comment:
   So even non-empty lists are full of nulls, right (otherwise this would be 
2)? Perhaps add a comment about this?
   (or should this be corrected to be 2?)
   



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1150,61 +1150,62 @@ void ColumnWriterImpl::FlushBufferedDataPages() {
 // ----------------------------------------------------------------------
 // TypedColumnWriter
 
-template <typename Action>
-inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
-  int64_t num_batches = static_cast<int>(total / batch_size);
-  for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size, /*check_page_size=*/true);
-  }
-  // Write the remaining values
-  if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size, 
/*check_page_size=*/true);
-  }
-}
-
-template <typename Action>
+template <typename Action, typename GetBufferedRows>
 inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
                         int64_t num_levels, int64_t batch_size, Action&& 
action,
-                        bool pages_change_on_record_boundaries) {
-  if (!pages_change_on_record_boundaries || !rep_levels) {
-    // If rep_levels is null, then we are writing a non-repeated column.
-    // In this case, every record contains only one level.
-    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
-  }
-
+                        bool pages_change_on_record_boundaries, int64_t 
max_rows_per_page,
+                        GetBufferedRows&& curr_page_buffered_rows) {
   int64_t offset = 0;
   while (offset < num_levels) {
-    int64_t end_offset = std::min(offset + batch_size, num_levels);
+    int64_t min_batch_size = std::min(batch_size, num_levels - offset);
+    int64_t end_offset = num_levels;
+    int64_t check_page_limit_end_offset = -1;
+
+    int64_t page_buffered_rows = curr_page_buffered_rows();
+    ARROW_DCHECK_LE(page_buffered_rows, max_rows_per_page);
 
-    // Find next record boundary (i.e. rep_level = 0)
-    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
-      end_offset++;
+    if (!rep_levels) {
+      min_batch_size = std::min(min_batch_size, max_rows_per_page - 
page_buffered_rows);
+      end_offset = offset + min_batch_size;
+      check_page_limit_end_offset = end_offset;
+    } else {
+      int64_t last_record_begin_offset = -1;
+      // Iterate rep_levels to find the shortest sequence that ends before a 
record
+      // boundary (i.e. rep_levels == 0) with a size no less than 
min_batch_size
+      for (int64_t i = offset; i < num_levels; ++i) {
+        if (rep_levels[i] == 0) {
+          last_record_begin_offset = i;
+          if (i - offset >= min_batch_size || page_buffered_rows >= 
max_rows_per_page) {
+            end_offset = i;
+            break;
+          }
+          page_buffered_rows += 1;

Review Comment:
   What is this for?



##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -2075,5 +2116,164 @@ TEST_F(TestGeometryValuesWriter, 
TestWriteAndReadAllNull) {
   EXPECT_EQ(geospatial_statistics->geometry_types(), std::nullopt);
 }
 
+template <typename TestType>
+class TestColumnWriterMaxRowsPerPage : public TestPrimitiveWriter<TestType> {
+ public:
+  TypedColumnWriter<TestType>* BuildWriter(
+      int64_t max_rows_per_page = kDefaultMaxRowsPerPage,
+      int64_t page_size = kDefaultDataPageSize) {
+    this->sink_ = CreateOutputStream();
+    this->writer_properties_ = WriterProperties::Builder()
+                                   .max_rows_per_page(max_rows_per_page)
+                                   ->data_pagesize(page_size)
+                                   ->enable_write_page_index()
+                                   ->build();
+    file_writer_ = ParquetFileWriter::Open(
+        this->sink_, 
std::static_pointer_cast<GroupNode>(this->schema_.schema_root()),
+        this->writer_properties_);
+    return static_cast<TypedColumnWriter<TestType>*>(
+        file_writer_->AppendRowGroup()->NextColumn());
+  }
+
+  void CloseWriter() const { file_writer_->Close(); }
+
+  void BuildReader() {
+    ASSERT_OK_AND_ASSIGN(auto buffer, this->sink_->Finish());
+    file_reader_ = ParquetFileReader::Open(
+        std::make_shared<::arrow::io::BufferReader>(buffer), 
default_reader_properties());
+    this->reader_ = std::static_pointer_cast<TypedColumnReader<TestType>>(
+        file_reader_->RowGroup(0)->Column(0));
+  }
+
+  void VerifyMaxRowsPerPage(int64_t max_rows_per_page) const {
+    auto file_meta = file_reader_->metadata();
+    int64_t num_row_groups = file_meta->num_row_groups();
+    ASSERT_EQ(num_row_groups, 1);
+
+    auto page_index_reader = file_reader_->GetPageIndexReader();
+    ASSERT_NE(page_index_reader, nullptr);
+
+    auto row_group_page_index_reader = page_index_reader->RowGroup(0);
+    ASSERT_NE(row_group_page_index_reader, nullptr);
+
+    auto offset_index = row_group_page_index_reader->GetOffsetIndex(0);
+    ASSERT_NE(offset_index, nullptr);
+    size_t num_pages = offset_index->page_locations().size();
+    for (size_t j = 1; j < num_pages; ++j) {
+      int64_t page_rows = offset_index->page_locations()[j].first_row_index -
+                          offset_index->page_locations()[j - 
1].first_row_index;
+      EXPECT_LE(page_rows, max_rows_per_page);
+    }
+    if (num_pages != 0) {
+      int64_t last_page_rows = file_meta->RowGroup(0)->num_rows() -
+                               
offset_index->page_locations().back().first_row_index;
+      EXPECT_LE(last_page_rows, max_rows_per_page);
+    }

Review Comment:
   Can we also check that the row group `num_rows` is as expected?



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1150,61 +1150,62 @@ void ColumnWriterImpl::FlushBufferedDataPages() {
 // ----------------------------------------------------------------------
 // TypedColumnWriter
 
-template <typename Action>
-inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
-  int64_t num_batches = static_cast<int>(total / batch_size);
-  for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size, /*check_page_size=*/true);
-  }
-  // Write the remaining values
-  if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size, 
/*check_page_size=*/true);
-  }
-}
-
-template <typename Action>
+template <typename Action, typename GetBufferedRows>
 inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
                         int64_t num_levels, int64_t batch_size, Action&& 
action,
-                        bool pages_change_on_record_boundaries) {
-  if (!pages_change_on_record_boundaries || !rep_levels) {
-    // If rep_levels is null, then we are writing a non-repeated column.
-    // In this case, every record contains only one level.
-    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
-  }
-
+                        bool pages_change_on_record_boundaries, int64_t 
max_rows_per_page,
+                        GetBufferedRows&& curr_page_buffered_rows) {
   int64_t offset = 0;
   while (offset < num_levels) {
-    int64_t end_offset = std::min(offset + batch_size, num_levels);
+    int64_t min_batch_size = std::min(batch_size, num_levels - offset);
+    int64_t end_offset = num_levels;
+    int64_t check_page_limit_end_offset = -1;
+
+    int64_t page_buffered_rows = curr_page_buffered_rows();
+    ARROW_DCHECK_LE(page_buffered_rows, max_rows_per_page);

Review Comment:
   It seems this is only used when there are no rep levels, right?



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1150,61 +1150,62 @@ void ColumnWriterImpl::FlushBufferedDataPages() {
 // ----------------------------------------------------------------------
 // TypedColumnWriter
 
-template <typename Action>
-inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
-  int64_t num_batches = static_cast<int>(total / batch_size);
-  for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size, /*check_page_size=*/true);
-  }
-  // Write the remaining values
-  if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size, 
/*check_page_size=*/true);
-  }
-}
-
-template <typename Action>
+template <typename Action, typename GetBufferedRows>
 inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
                         int64_t num_levels, int64_t batch_size, Action&& 
action,
-                        bool pages_change_on_record_boundaries) {
-  if (!pages_change_on_record_boundaries || !rep_levels) {
-    // If rep_levels is null, then we are writing a non-repeated column.
-    // In this case, every record contains only one level.
-    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
-  }
-
+                        bool pages_change_on_record_boundaries, int64_t 
max_rows_per_page,
+                        GetBufferedRows&& curr_page_buffered_rows) {
   int64_t offset = 0;
   while (offset < num_levels) {
-    int64_t end_offset = std::min(offset + batch_size, num_levels);
+    int64_t min_batch_size = std::min(batch_size, num_levels - offset);
+    int64_t end_offset = num_levels;
+    int64_t check_page_limit_end_offset = -1;
+
+    int64_t page_buffered_rows = curr_page_buffered_rows();
+    ARROW_DCHECK_LE(page_buffered_rows, max_rows_per_page);
 
-    // Find next record boundary (i.e. rep_level = 0)
-    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
-      end_offset++;
+    if (!rep_levels) {
+      min_batch_size = std::min(min_batch_size, max_rows_per_page - 
page_buffered_rows);
+      end_offset = offset + min_batch_size;
+      check_page_limit_end_offset = end_offset;
+    } else {
+      int64_t last_record_begin_offset = -1;
+      // Iterate rep_levels to find the shortest sequence that ends before a 
record
+      // boundary (i.e. rep_levels == 0) with a size no less than 
min_batch_size
+      for (int64_t i = offset; i < num_levels; ++i) {
+        if (rep_levels[i] == 0) {
+          last_record_begin_offset = i;
+          if (i - offset >= min_batch_size || page_buffered_rows >= 
max_rows_per_page) {
+            end_offset = i;
+            break;
+          }
+          page_buffered_rows += 1;
+        }
+      }
+      // Use the beginning of last record to check page limit.
+      check_page_limit_end_offset = last_record_begin_offset;
     }
 
+    ARROW_DCHECK_LE(offset, end_offset);
+    ARROW_DCHECK_LE(check_page_limit_end_offset, end_offset);
+
     if (end_offset < num_levels) {
       // This is not the last chunk of batch and end_offset is a record 
boundary.
-      // It is a good chance to check the page size.
-      action(offset, end_offset - offset, /*check_page_size=*/true);
+      // It is a good chance to check the page limit.
+      action(offset, end_offset - offset, /*check_page_limit=*/true);
     } else {
-      DCHECK_EQ(end_offset, num_levels);
-      // This is the last chunk of batch, and we do not know whether 
end_offset is a
-      // record boundary. Find the offset to beginning of last record in this 
chunk,
-      // so we can check page size.
-      int64_t last_record_begin_offset = num_levels - 1;
-      while (last_record_begin_offset >= offset &&
-             rep_levels[last_record_begin_offset] != 0) {
-        last_record_begin_offset--;
+      ARROW_DCHECK_EQ(end_offset, num_levels);
+      if (offset <= check_page_limit_end_offset) {
+        action(offset, check_page_limit_end_offset - offset, 
/*check_page_limit=*/true);
+        offset = check_page_limit_end_offset;
       }
-
-      if (offset <= last_record_begin_offset) {
-        // We have found the beginning of last record and can check page size.
-        action(offset, last_record_begin_offset - offset, 
/*check_page_size=*/true);
-        offset = last_record_begin_offset;
+      if (offset < end_offset) {
+        // This is the last chunk of batch, and we do not know whether 
end_offset is a
+        // record boundary so we cannot check page limit if pages cannot 
change on
+        // record boundaries.
+        action(offset, end_offset - offset,
+               /*check_page_limit=*/!pages_change_on_record_boundaries);

Review Comment:
   Ah, but `check_page_limit_end_offset` is set to `end_offset` if there are no 
rep levels. The logic here is a bit confusing....



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1150,61 +1150,62 @@ void ColumnWriterImpl::FlushBufferedDataPages() {
 // ----------------------------------------------------------------------
 // TypedColumnWriter
 
-template <typename Action>
-inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
-  int64_t num_batches = static_cast<int>(total / batch_size);
-  for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size, /*check_page_size=*/true);
-  }
-  // Write the remaining values
-  if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size, 
/*check_page_size=*/true);
-  }
-}
-
-template <typename Action>
+template <typename Action, typename GetBufferedRows>
 inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
                         int64_t num_levels, int64_t batch_size, Action&& 
action,
-                        bool pages_change_on_record_boundaries) {
-  if (!pages_change_on_record_boundaries || !rep_levels) {
-    // If rep_levels is null, then we are writing a non-repeated column.
-    // In this case, every record contains only one level.
-    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
-  }
-
+                        bool pages_change_on_record_boundaries, int64_t 
max_rows_per_page,
+                        GetBufferedRows&& curr_page_buffered_rows) {
   int64_t offset = 0;
   while (offset < num_levels) {
-    int64_t end_offset = std::min(offset + batch_size, num_levels);
+    int64_t min_batch_size = std::min(batch_size, num_levels - offset);
+    int64_t end_offset = num_levels;
+    int64_t check_page_limit_end_offset = -1;
+
+    int64_t page_buffered_rows = curr_page_buffered_rows();
+    ARROW_DCHECK_LE(page_buffered_rows, max_rows_per_page);
 
-    // Find next record boundary (i.e. rep_level = 0)
-    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
-      end_offset++;
+    if (!rep_levels) {
+      min_batch_size = std::min(min_batch_size, max_rows_per_page - 
page_buffered_rows);
+      end_offset = offset + min_batch_size;
+      check_page_limit_end_offset = end_offset;
+    } else {
+      int64_t last_record_begin_offset = -1;
+      // Iterate rep_levels to find the shortest sequence that ends before a 
record
+      // boundary (i.e. rep_levels == 0) with a size no less than 
min_batch_size
+      for (int64_t i = offset; i < num_levels; ++i) {
+        if (rep_levels[i] == 0) {
+          last_record_begin_offset = i;
+          if (i - offset >= min_batch_size || page_buffered_rows >= 
max_rows_per_page) {
+            end_offset = i;
+            break;
+          }
+          page_buffered_rows += 1;
+        }
+      }
+      // Use the beginning of last record to check page limit.
+      check_page_limit_end_offset = last_record_begin_offset;
     }
 
+    ARROW_DCHECK_LE(offset, end_offset);
+    ARROW_DCHECK_LE(check_page_limit_end_offset, end_offset);
+
     if (end_offset < num_levels) {
       // This is not the last chunk of batch and end_offset is a record 
boundary.
-      // It is a good chance to check the page size.
-      action(offset, end_offset - offset, /*check_page_size=*/true);
+      // It is a good chance to check the page limit.
+      action(offset, end_offset - offset, /*check_page_limit=*/true);
     } else {
-      DCHECK_EQ(end_offset, num_levels);
-      // This is the last chunk of batch, and we do not know whether 
end_offset is a
-      // record boundary. Find the offset to beginning of last record in this 
chunk,
-      // so we can check page size.
-      int64_t last_record_begin_offset = num_levels - 1;
-      while (last_record_begin_offset >= offset &&
-             rep_levels[last_record_begin_offset] != 0) {
-        last_record_begin_offset--;
+      ARROW_DCHECK_EQ(end_offset, num_levels);
+      if (offset <= check_page_limit_end_offset) {
+        action(offset, check_page_limit_end_offset - offset, 
/*check_page_limit=*/true);
+        offset = check_page_limit_end_offset;
       }
-
-      if (offset <= last_record_begin_offset) {
-        // We have found the beginning of last record and can check page size.
-        action(offset, last_record_begin_offset - offset, 
/*check_page_size=*/true);
-        offset = last_record_begin_offset;
+      if (offset < end_offset) {
+        // This is the last chunk of batch, and we do not know whether 
end_offset is a
+        // record boundary so we cannot check page limit if pages cannot 
change on
+        // record boundaries.
+        action(offset, end_offset - offset,
+               /*check_page_limit=*/!pages_change_on_record_boundaries);

Review Comment:
   Perhaps we should really separate the two disjoint cases whether rep levels 
are present or not?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to