wgtmac commented on code in PR #47090:
URL: https://github.com/apache/arrow/pull/47090#discussion_r2473470956


##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1150,67 +1150,99 @@ void ColumnWriterImpl::FlushBufferedDataPages() {
 // ----------------------------------------------------------------------
 // TypedColumnWriter
 
-template <typename Action>
-inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
-  int64_t num_batches = static_cast<int>(total / batch_size);
-  for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size, /*check_page_size=*/true);
-  }
-  // Write the remaining values
-  if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size, 
/*check_page_size=*/true);
-  }
-}
+// DoInBatches for non-repeated columns
+template <typename Action, typename GetBufferedRows>
+inline void DoInBatchesNonRepeated(int64_t num_levels, int64_t batch_size,
+                                   int64_t max_rows_per_page, Action&& action,
+                                   GetBufferedRows&& curr_page_buffered_rows) {
+  int64_t offset = 0;
+  while (offset < num_levels) {
+    int64_t page_buffered_rows = curr_page_buffered_rows();
+    ARROW_DCHECK_LE(page_buffered_rows, max_rows_per_page);
 
-template <typename Action>
-inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
-                        int64_t num_levels, int64_t batch_size, Action&& 
action,
-                        bool pages_change_on_record_boundaries) {
-  if (!pages_change_on_record_boundaries || !rep_levels) {
-    // If rep_levels is null, then we are writing a non-repeated column.
-    // In this case, every record contains only one level.
-    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
+    // Every record contains only one level.
+    int64_t max_batch_size = std::min(batch_size, num_levels - offset);
+    max_batch_size = std::min(max_batch_size, max_rows_per_page - 
page_buffered_rows);
+    int64_t end_offset = offset + max_batch_size;
+
+    ARROW_DCHECK_LE(offset, end_offset);
+    ARROW_DCHECK_LE(end_offset, num_levels);
+
+    // Always check page limit for non-repeated columns.
+    action(offset, end_offset - offset, /*check_page_limit=*/true);
+
+    offset = end_offset;
   }
+}
 
+// DoInBatches for repeated columns
+template <typename Action, typename GetBufferedRows>
+inline void DoInBatchesRepeated(const int16_t* def_levels, const int16_t* 
rep_levels,
+                                int64_t num_levels, int64_t batch_size,
+                                int64_t max_rows_per_page,
+                                bool pages_change_on_record_boundaries, 
Action&& action,
+                                GetBufferedRows&& curr_page_buffered_rows) {
   int64_t offset = 0;
   while (offset < num_levels) {
-    int64_t end_offset = std::min(offset + batch_size, num_levels);
-
-    // Find next record boundary (i.e. rep_level = 0)
-    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
-      end_offset++;
-    }
-
-    if (end_offset < num_levels) {
-      // This is not the last chunk of batch and end_offset is a record 
boundary.
-      // It is a good chance to check the page size.
-      action(offset, end_offset - offset, /*check_page_size=*/true);
-    } else {
-      DCHECK_EQ(end_offset, num_levels);
-      // This is the last chunk of batch, and we do not know whether 
end_offset is a
-      // record boundary. Find the offset to beginning of last record in this 
chunk,
-      // so we can check page size.
-      int64_t last_record_begin_offset = num_levels - 1;
-      while (last_record_begin_offset >= offset &&
-             rep_levels[last_record_begin_offset] != 0) {
-        last_record_begin_offset--;
+    int64_t max_batch_size = std::min(batch_size, num_levels - offset);
+    int64_t end_offset = num_levels;           // end offset of the current 
batch
+    int64_t check_page_limit_end_offset = -1;  // offset to check page limit 
(if not -1)
+
+    int64_t page_buffered_rows = curr_page_buffered_rows();
+    ARROW_DCHECK_LE(page_buffered_rows, max_rows_per_page);
+
+    // Iterate rep_levels to find the shortest sequence that ends before a 
record
+    // boundary (i.e. rep_levels == 0) with a size no less than max_batch_size
+    for (int64_t i = offset; i < num_levels; ++i) {

Review Comment:
   Then all levels must be checked, otherwise we can't tell how many records in 
this batch from the beginning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to