This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new b2190db54c GH-47030: [C++][Parquet] Add setting to limit the number of 
rows written per page (#47090)
b2190db54c is described below

commit b2190db54c7ee31a6ce3c3990a66020345471c99
Author: Gang Wu <[email protected]>
AuthorDate: Wed Oct 29 23:38:22 2025 +0800

    GH-47030: [C++][Parquet] Add setting to limit the number of rows written 
per page (#47090)
    
    ### Rationale for this change
    
    Currently only page size is limited. We need to limit number of rows per 
page too.
    
    ### What changes are included in this PR?
    
    Add `parquet::WriterProperties::max_rows_per_page(int64_t max_rows)` to 
limit number of rows per data page.
    
    ### Are these changes tested?
    
    Yes
    
    ### Are there any user-facing changes?
    
    Yes, users are allowed to set this config value.
    * GitHub Issue: #47030
    
    Authored-by: Gang Wu <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/parquet/column_writer.cc        | 153 +++++++++++++++---------
 cpp/src/parquet/column_writer_test.cc   | 200 +++++++++++++++++++++++++++++++-
 cpp/src/parquet/properties.h            |  23 +++-
 cpp/src/parquet/size_statistics_test.cc |   1 +
 4 files changed, 314 insertions(+), 63 deletions(-)

diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index 1f3d64f622..22c36531cd 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -1150,67 +1150,99 @@ void ColumnWriterImpl::FlushBufferedDataPages() {
 // ----------------------------------------------------------------------
 // TypedColumnWriter
 
-template <typename Action>
-inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
-  int64_t num_batches = static_cast<int>(total / batch_size);
-  for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size, /*check_page_size=*/true);
-  }
-  // Write the remaining values
-  if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size, 
/*check_page_size=*/true);
-  }
-}
+// DoInBatches for non-repeated columns
+template <typename Action, typename GetBufferedRows>
+inline void DoInBatchesNonRepeated(int64_t num_levels, int64_t batch_size,
+                                   int64_t max_rows_per_page, Action&& action,
+                                   GetBufferedRows&& curr_page_buffered_rows) {
+  int64_t offset = 0;
+  while (offset < num_levels) {
+    int64_t page_buffered_rows = curr_page_buffered_rows();
+    ARROW_DCHECK_LE(page_buffered_rows, max_rows_per_page);
 
-template <typename Action>
-inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
-                        int64_t num_levels, int64_t batch_size, Action&& 
action,
-                        bool pages_change_on_record_boundaries) {
-  if (!pages_change_on_record_boundaries || !rep_levels) {
-    // If rep_levels is null, then we are writing a non-repeated column.
-    // In this case, every record contains only one level.
-    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
+    // Every record contains only one level.
+    int64_t max_batch_size = std::min(batch_size, num_levels - offset);
+    max_batch_size = std::min(max_batch_size, max_rows_per_page - 
page_buffered_rows);
+    int64_t end_offset = offset + max_batch_size;
+
+    ARROW_DCHECK_LE(offset, end_offset);
+    ARROW_DCHECK_LE(end_offset, num_levels);
+
+    // Always check page limit for non-repeated columns.
+    action(offset, end_offset - offset, /*check_page_limit=*/true);
+
+    offset = end_offset;
   }
+}
 
+// DoInBatches for repeated columns
+template <typename Action, typename GetBufferedRows>
+inline void DoInBatchesRepeated(const int16_t* def_levels, const int16_t* 
rep_levels,
+                                int64_t num_levels, int64_t batch_size,
+                                int64_t max_rows_per_page,
+                                bool pages_change_on_record_boundaries, 
Action&& action,
+                                GetBufferedRows&& curr_page_buffered_rows) {
   int64_t offset = 0;
   while (offset < num_levels) {
-    int64_t end_offset = std::min(offset + batch_size, num_levels);
-
-    // Find next record boundary (i.e. rep_level = 0)
-    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
-      end_offset++;
-    }
-
-    if (end_offset < num_levels) {
-      // This is not the last chunk of batch and end_offset is a record 
boundary.
-      // It is a good chance to check the page size.
-      action(offset, end_offset - offset, /*check_page_size=*/true);
-    } else {
-      DCHECK_EQ(end_offset, num_levels);
-      // This is the last chunk of batch, and we do not know whether 
end_offset is a
-      // record boundary. Find the offset to beginning of last record in this 
chunk,
-      // so we can check page size.
-      int64_t last_record_begin_offset = num_levels - 1;
-      while (last_record_begin_offset >= offset &&
-             rep_levels[last_record_begin_offset] != 0) {
-        last_record_begin_offset--;
+    int64_t max_batch_size = std::min(batch_size, num_levels - offset);
+    int64_t end_offset = num_levels;           // end offset of the current 
batch
+    int64_t check_page_limit_end_offset = -1;  // offset to check page limit 
(if not -1)
+
+    int64_t page_buffered_rows = curr_page_buffered_rows();
+    ARROW_DCHECK_LE(page_buffered_rows, max_rows_per_page);
+
+    // Iterate rep_levels to find the shortest sequence that ends before a 
record
+    // boundary (i.e. rep_levels == 0) with a size no less than max_batch_size
+    for (int64_t i = offset; i < num_levels; ++i) {
+      if (rep_levels[i] == 0) {
+        // Use the beginning of last record to check page limit.
+        check_page_limit_end_offset = i;
+        if (i - offset >= max_batch_size || page_buffered_rows >= 
max_rows_per_page) {
+          end_offset = i;
+          break;
+        }
+        page_buffered_rows += 1;
       }
+    }
 
-      if (offset <= last_record_begin_offset) {
-        // We have found the beginning of last record and can check page size.
-        action(offset, last_record_begin_offset - offset, 
/*check_page_size=*/true);
-        offset = last_record_begin_offset;
-      }
+    ARROW_DCHECK_LE(offset, end_offset);
+    ARROW_DCHECK_LE(check_page_limit_end_offset, end_offset);
 
-      // Write remaining data after the record boundary,
-      // or all data if no boundary was found.
-      action(offset, end_offset - offset, /*check_page_size=*/false);
+    if (check_page_limit_end_offset >= 0) {
+      // At least one record boundary is included in this batch.
+      // It is a good chance to check the page limit.
+      action(offset, check_page_limit_end_offset - offset, 
/*check_page_limit=*/true);
+      offset = check_page_limit_end_offset;
+    }
+    if (end_offset > offset) {
+      // The is the last chunk of batch, and we do not know whether end_offset 
is a
+      // record boundary so we cannot check page limit if pages cannot change 
on
+      // record boundaries.
+      ARROW_DCHECK_EQ(end_offset, num_levels);
+      action(offset, end_offset - offset,
+             /*check_page_limit=*/!pages_change_on_record_boundaries);
     }
 
     offset = end_offset;
   }
 }
 
+template <typename Action, typename GetBufferedRows>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, int64_t 
max_rows_per_page,
+                        bool pages_change_on_record_boundaries, Action&& 
action,
+                        GetBufferedRows&& curr_page_buffered_rows) {
+  if (!rep_levels) {
+    DoInBatchesNonRepeated(num_levels, batch_size, max_rows_per_page,
+                           std::forward<Action>(action),
+                           
std::forward<GetBufferedRows>(curr_page_buffered_rows));
+  } else {
+    DoInBatchesRepeated(def_levels, rep_levels, num_levels, batch_size, 
max_rows_per_page,
+                        pages_change_on_record_boundaries, 
std::forward<Action>(action),
+                        
std::forward<GetBufferedRows>(curr_page_buffered_rows));
+  }
+}
+
 namespace {
 
 bool DictionaryDirectWriteSupported(const ::arrow::Array& array) {
@@ -1318,7 +1350,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
       CheckDictionarySizeLimit();
     };
     DoInBatches(def_levels, rep_levels, num_values, 
properties_->write_batch_size(),
-                WriteChunk, pages_change_on_record_boundaries());
+                properties_->max_rows_per_page(), 
pages_change_on_record_boundaries(),
+                WriteChunk, [this]() { return num_buffered_rows_; });
     return value_offset;
   }
 
@@ -1368,7 +1401,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
       CheckDictionarySizeLimit();
     };
     DoInBatches(def_levels, rep_levels, num_values, 
properties_->write_batch_size(),
-                WriteChunk, pages_change_on_record_boundaries());
+                properties_->max_rows_per_page(), 
pages_change_on_record_boundaries(),
+                WriteChunk, [this]() { return num_buffered_rows_; });
   }
 
   Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels,
@@ -1769,13 +1803,14 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
   }
 
   void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values,
-                                    int64_t num_nulls, bool check_page_size) {
+                                    int64_t num_nulls, bool check_page_limit) {
     num_buffered_values_ += num_levels;
     num_buffered_encoded_values_ += num_values;
     num_buffered_nulls_ += num_nulls;
 
-    if (check_page_size &&
-        current_encoder_->EstimatedDataEncodedSize() >= 
properties_->data_pagesize()) {
+    if (check_page_limit &&
+        (current_encoder_->EstimatedDataEncodedSize() >= 
properties_->data_pagesize() ||
+         num_buffered_rows_ >= properties_->max_rows_per_page())) {
       AddDataPage();
     }
   }
@@ -1996,9 +2031,10 @@ Status 
TypedColumnWriterImpl<ParquetType>::WriteArrowDictionary(
     return WriteDense();
   }
 
-  PARQUET_CATCH_NOT_OK(DoInBatches(def_levels, rep_levels, num_levels,
-                                   properties_->write_batch_size(), 
WriteIndicesChunk,
-                                   pages_change_on_record_boundaries()));
+  PARQUET_CATCH_NOT_OK(
+      DoInBatches(def_levels, rep_levels, num_levels, 
properties_->write_batch_size(),
+                  properties_->max_rows_per_page(), 
pages_change_on_record_boundaries(),
+                  WriteIndicesChunk, [this]() { return num_buffered_rows_; }));
   return Status::OK();
 }
 
@@ -2441,9 +2477,10 @@ Status 
TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(
     value_offset += batch_num_spaced_values;
   };
 
-  PARQUET_CATCH_NOT_OK(DoInBatches(def_levels, rep_levels, num_levels,
-                                   properties_->write_batch_size(), WriteChunk,
-                                   pages_change_on_record_boundaries()));
+  PARQUET_CATCH_NOT_OK(
+      DoInBatches(def_levels, rep_levels, num_levels, 
properties_->write_batch_size(),
+                  properties_->max_rows_per_page(), 
pages_change_on_record_boundaries(),
+                  WriteChunk, [this]() { return num_buffered_rows_; }));
   return Status::OK();
 }
 
diff --git a/cpp/src/parquet/column_writer_test.cc 
b/cpp/src/parquet/column_writer_test.cc
index 990125df4e..48cac04f07 100644
--- a/cpp/src/parquet/column_writer_test.cc
+++ b/cpp/src/parquet/column_writer_test.cc
@@ -37,6 +37,7 @@
 #include "parquet/file_writer.h"
 #include "parquet/geospatial/statistics.h"
 #include "parquet/metadata.h"
+#include "parquet/page_index.h"
 #include "parquet/platform.h"
 #include "parquet/properties.h"
 #include "parquet/statistics.h"
@@ -108,7 +109,8 @@ class TestPrimitiveWriter : public 
PrimitiveTypedTest<TestType> {
       const ColumnProperties& column_properties = ColumnProperties(),
       const ParquetVersion::type version = ParquetVersion::PARQUET_1_0,
       const ParquetDataPageVersion data_page_version = 
ParquetDataPageVersion::V1,
-      bool enable_checksum = false, int64_t page_size = kDefaultDataPageSize) {
+      bool enable_checksum = false, int64_t page_size = kDefaultDataPageSize,
+      int64_t max_rows_per_page = kDefaultMaxRowsPerPage) {
     sink_ = CreateOutputStream();
     WriterProperties::Builder wp_builder;
     wp_builder.version(version)->data_page_version(data_page_version);
@@ -125,6 +127,7 @@ class TestPrimitiveWriter : public 
PrimitiveTypedTest<TestType> {
     }
     wp_builder.max_statistics_size(column_properties.max_statistics_size());
     wp_builder.data_pagesize(page_size);
+    wp_builder.max_rows_per_page(max_rows_per_page);
     writer_properties_ = wp_builder.build();
 
     metadata_ = ColumnChunkMetaDataBuilder::Make(writer_properties_, 
this->descr_);
@@ -506,6 +509,44 @@ void 
TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type compressio
   this->SyncValuesOut();
 }
 
+template <>
+void TestPrimitiveWriter<ByteArrayType>::ReadColumnFully(Compression::type 
compression,
+                                                         bool 
page_checksum_verify) {
+  int64_t total_values = static_cast<int64_t>(this->values_out_.size());
+  BuildReader(total_values, compression, page_checksum_verify);
+  this->data_buffer_.clear();
+
+  values_read_ = 0;
+  while (values_read_ < total_values) {
+    int64_t values_read_recently = 0;
+    reader_->ReadBatch(
+        static_cast<int>(this->values_out_.size()) - 
static_cast<int>(values_read_),
+        definition_levels_out_.data() + values_read_,
+        repetition_levels_out_.data() + values_read_,
+        this->values_out_ptr_ + values_read_, &values_read_recently);
+
+    // Compute the total length of the data
+    int64_t total_length = 0;
+    for (int64_t i = 0; i < values_read_recently; i++) {
+      total_length += this->values_out_[i + values_read_].len;
+    }
+
+    // Copy contents of the pointers
+    std::vector<uint8_t> data(total_length);
+    uint8_t* data_ptr = data.data();
+    for (int64_t i = 0; i < values_read_recently; i++) {
+      const ByteArray& value = this->values_out_ptr_[i + values_read_];
+      memcpy(data_ptr, value.ptr, value.len);
+      this->values_out_[i + values_read_].ptr = data_ptr;
+      data_ptr += value.len;
+    }
+    data_buffer_.emplace_back(std::move(data));
+
+    values_read_ += values_read_recently;
+  }
+  this->SyncValuesOut();
+}
+
 typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, 
DoubleType,
                          BooleanType, ByteArrayType, FLBAType>
     TestTypes;
@@ -2075,5 +2116,162 @@ TEST_F(TestGeometryValuesWriter, 
TestWriteAndReadAllNull) {
   EXPECT_EQ(geospatial_statistics->geometry_types(), std::nullopt);
 }
 
+template <typename TestType>
+class TestColumnWriterMaxRowsPerPage : public TestPrimitiveWriter<TestType> {
+ public:
+  TypedColumnWriter<TestType>* BuildWriter(
+      int64_t max_rows_per_page = kDefaultMaxRowsPerPage,
+      int64_t page_size = kDefaultDataPageSize) {
+    this->sink_ = CreateOutputStream();
+    this->writer_properties_ = WriterProperties::Builder()
+                                   .max_rows_per_page(max_rows_per_page)
+                                   ->data_pagesize(page_size)
+                                   ->enable_write_page_index()
+                                   ->build();
+    file_writer_ = ParquetFileWriter::Open(
+        this->sink_, 
std::static_pointer_cast<GroupNode>(this->schema_.schema_root()),
+        this->writer_properties_);
+    return static_cast<TypedColumnWriter<TestType>*>(
+        file_writer_->AppendRowGroup()->NextColumn());
+  }
+
+  void CloseWriter() const { file_writer_->Close(); }
+
+  void BuildReader() {
+    ASSERT_OK_AND_ASSIGN(auto buffer, this->sink_->Finish());
+    file_reader_ = ParquetFileReader::Open(
+        std::make_shared<::arrow::io::BufferReader>(buffer), 
default_reader_properties());
+    this->reader_ = std::static_pointer_cast<TypedColumnReader<TestType>>(
+        file_reader_->RowGroup(0)->Column(0));
+  }
+
+  void VerifyMaxRowsPerPage(int64_t max_rows_per_page) const {
+    auto file_meta = file_reader_->metadata();
+    int64_t num_row_groups = file_meta->num_row_groups();
+    ASSERT_EQ(num_row_groups, 1);
+
+    auto page_index_reader = file_reader_->GetPageIndexReader();
+    ASSERT_NE(page_index_reader, nullptr);
+
+    auto row_group_page_index_reader = page_index_reader->RowGroup(0);
+    ASSERT_NE(row_group_page_index_reader, nullptr);
+
+    auto offset_index = row_group_page_index_reader->GetOffsetIndex(0);
+    ASSERT_NE(offset_index, nullptr);
+    size_t num_pages = offset_index->page_locations().size();
+    int64_t num_rows = 0;
+    for (size_t j = 1; j < num_pages; ++j) {
+      int64_t page_rows = offset_index->page_locations()[j].first_row_index -
+                          offset_index->page_locations()[j - 
1].first_row_index;
+      EXPECT_LE(page_rows, max_rows_per_page);
+      num_rows += page_rows;
+    }
+    if (num_pages != 0) {
+      int64_t last_page_rows = file_meta->RowGroup(0)->num_rows() -
+                               
offset_index->page_locations().back().first_row_index;
+      EXPECT_LE(last_page_rows, max_rows_per_page);
+      num_rows += last_page_rows;
+    }
+
+    EXPECT_EQ(num_rows, file_meta->RowGroup(0)->num_rows());
+  }
+
+ private:
+  std::shared_ptr<ParquetFileWriter> file_writer_;
+  std::shared_ptr<ParquetFileReader> file_reader_;
+};
+
+TYPED_TEST_SUITE(TestColumnWriterMaxRowsPerPage, TestTypes);
+
+TYPED_TEST(TestColumnWriterMaxRowsPerPage, Optional) {
+  for (int64_t max_rows_per_page : {1, 10, 100}) {
+    this->SetUpSchema(Repetition::OPTIONAL);
+    this->GenerateData(SMALL_SIZE);
+    std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
+    definition_levels[1] = 0;
+
+    auto writer = this->BuildWriter(max_rows_per_page);
+    writer->WriteBatch(this->values_.size(), definition_levels.data(), nullptr,
+                       this->values_ptr_);
+    this->CloseWriter();
+
+    this->BuildReader();
+    ASSERT_NO_FATAL_FAILURE(this->VerifyMaxRowsPerPage(max_rows_per_page));
+  }
+}
+
+TYPED_TEST(TestColumnWriterMaxRowsPerPage, OptionalSpaced) {
+  for (int64_t max_rows_per_page : {1, 10, 100}) {
+    this->SetUpSchema(Repetition::OPTIONAL);
+
+    this->GenerateData(SMALL_SIZE);
+    std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
+    std::vector<uint8_t> 
valid_bits(::arrow::bit_util::BytesForBits(SMALL_SIZE), 255);
+
+    definition_levels[SMALL_SIZE - 1] = 0;
+    ::arrow::bit_util::ClearBit(valid_bits.data(), SMALL_SIZE - 1);
+    definition_levels[1] = 0;
+    ::arrow::bit_util::ClearBit(valid_bits.data(), 1);
+
+    auto writer = this->BuildWriter(max_rows_per_page);
+    writer->WriteBatchSpaced(this->values_.size(), definition_levels.data(), 
nullptr,
+                             valid_bits.data(), 0, this->values_ptr_);
+    this->CloseWriter();
+
+    this->BuildReader();
+    ASSERT_NO_FATAL_FAILURE(this->VerifyMaxRowsPerPage(max_rows_per_page));
+  }
+}
+
+TYPED_TEST(TestColumnWriterMaxRowsPerPage, Repeated) {
+  for (int64_t max_rows_per_page : {1, 10, 100}) {
+    this->SetUpSchema(Repetition::REPEATED);
+
+    this->GenerateData(SMALL_SIZE);
+    std::vector<int16_t> definition_levels(SMALL_SIZE);
+    std::vector<int16_t> repetition_levels(SMALL_SIZE);
+
+    // Generate levels to include variable-sized lists and empty lists
+    for (int i = 0; i < SMALL_SIZE; i++) {
+      int list_length = (i % 5) + 1;
+      if (i % 13 == 0 || i % 17 == 0) {
+        list_length = 0;
+      }
+
+      if (list_length == 0) {
+        definition_levels[i] = 0;
+        repetition_levels[i] = 0;
+      } else {
+        for (int j = 0; j < list_length && i + j < SMALL_SIZE; j++) {
+          definition_levels[i + j] = 1;
+          repetition_levels[i + j] = (j == 0) ? 0 : 1;
+        }
+        i += list_length - 1;
+      }
+    }
+
+    auto writer = this->BuildWriter(max_rows_per_page);
+    writer->WriteBatch(this->values_.size(), definition_levels.data(),
+                       repetition_levels.data(), this->values_ptr_);
+    this->CloseWriter();
+
+    this->BuildReader();
+    ASSERT_NO_FATAL_FAILURE(this->VerifyMaxRowsPerPage(max_rows_per_page));
+  }
+}
+
+TYPED_TEST(TestColumnWriterMaxRowsPerPage, RequiredLargeChunk) {
+  for (int64_t max_rows_per_page : {10, 100, 10000}) {
+    this->GenerateData(LARGE_SIZE);
+
+    auto writer = this->BuildWriter(max_rows_per_page);
+    writer->WriteBatch(this->values_.size(), nullptr, nullptr, 
this->values_ptr_);
+    this->CloseWriter();
+
+    this->BuildReader();
+    ASSERT_NO_FATAL_FAILURE(this->VerifyMaxRowsPerPage(max_rows_per_page));
+  }
+}
+
 }  // namespace test
 }  // namespace parquet
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index 5a1799c39d..51b549df22 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -155,6 +155,7 @@ class PARQUET_EXPORT ReaderProperties {
 ReaderProperties PARQUET_EXPORT default_reader_properties();
 
 static constexpr int64_t kDefaultDataPageSize = 1024 * 1024;
+static constexpr int64_t kDefaultMaxRowsPerPage = 20'000;
 static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true;
 static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = 
kDefaultDataPageSize;
 static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024;
@@ -293,6 +294,7 @@ class PARQUET_EXPORT WriterProperties {
           write_batch_size_(DEFAULT_WRITE_BATCH_SIZE),
           max_row_group_length_(DEFAULT_MAX_ROW_GROUP_LENGTH),
           pagesize_(kDefaultDataPageSize),
+          max_rows_per_page_(kDefaultMaxRowsPerPage),
           version_(ParquetVersion::PARQUET_2_6),
           data_page_version_(ParquetDataPageVersion::V1),
           created_by_(DEFAULT_CREATED_BY),
@@ -308,6 +310,7 @@ class PARQUET_EXPORT WriterProperties {
           write_batch_size_(properties.write_batch_size()),
           max_row_group_length_(properties.max_row_group_length()),
           pagesize_(properties.data_pagesize()),
+          max_rows_per_page_(properties.max_rows_per_page()),
           version_(properties.version()),
           data_page_version_(properties.data_page_version()),
           created_by_(properties.created_by()),
@@ -422,6 +425,13 @@ class PARQUET_EXPORT WriterProperties {
       return this;
     }
 
+    /// Specify the maximum number of rows per data page.
+    /// Default 20K rows.
+    Builder* max_rows_per_page(int64_t max_rows) {
+      max_rows_per_page_ = max_rows;
+      return this;
+    }
+
     /// Specify the data page version.
     /// Default V1.
     Builder* data_page_version(ParquetDataPageVersion data_page_version) {
@@ -768,7 +778,7 @@ class PARQUET_EXPORT WriterProperties {
 
       return std::shared_ptr<WriterProperties>(new WriterProperties(
           pool_, dictionary_pagesize_limit_, write_batch_size_, 
max_row_group_length_,
-          pagesize_, version_, created_by_, page_checksum_enabled_,
+          pagesize_, max_rows_per_page_, version_, created_by_, 
page_checksum_enabled_,
           size_statistics_level_, std::move(file_encryption_properties_),
           default_column_properties_, column_properties, data_page_version_,
           store_decimal_as_integer_, std::move(sorting_columns_),
@@ -781,6 +791,7 @@ class PARQUET_EXPORT WriterProperties {
     int64_t write_batch_size_;
     int64_t max_row_group_length_;
     int64_t pagesize_;
+    int64_t max_rows_per_page_;
     ParquetVersion::type version_;
     ParquetDataPageVersion data_page_version_;
     std::string created_by_;
@@ -816,6 +827,8 @@ class PARQUET_EXPORT WriterProperties {
 
   inline int64_t data_pagesize() const { return pagesize_; }
 
+  inline int64_t max_rows_per_page() const { return max_rows_per_page_; }
+
   inline ParquetDataPageVersion data_page_version() const {
     return parquet_data_page_version_;
   }
@@ -930,9 +943,9 @@ class PARQUET_EXPORT WriterProperties {
  private:
   explicit WriterProperties(
       MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t 
write_batch_size,
-      int64_t max_row_group_length, int64_t pagesize, ParquetVersion::type 
version,
-      const std::string& created_by, bool page_write_checksum_enabled,
-      SizeStatisticsLevel size_statistics_level,
+      int64_t max_row_group_length, int64_t pagesize, int64_t 
max_rows_per_page,
+      ParquetVersion::type version, const std::string& created_by,
+      bool page_write_checksum_enabled, SizeStatisticsLevel 
size_statistics_level,
       std::shared_ptr<FileEncryptionProperties> file_encryption_properties,
       const ColumnProperties& default_column_properties,
       const std::unordered_map<std::string, ColumnProperties>& 
column_properties,
@@ -944,6 +957,7 @@ class PARQUET_EXPORT WriterProperties {
         write_batch_size_(write_batch_size),
         max_row_group_length_(max_row_group_length),
         pagesize_(pagesize),
+        max_rows_per_page_(max_rows_per_page),
         parquet_data_page_version_(data_page_version),
         parquet_version_(version),
         parquet_created_by_(created_by),
@@ -962,6 +976,7 @@ class PARQUET_EXPORT WriterProperties {
   int64_t write_batch_size_;
   int64_t max_row_group_length_;
   int64_t pagesize_;
+  int64_t max_rows_per_page_;
   ParquetDataPageVersion parquet_data_page_version_;
   ParquetVersion::type parquet_version_;
   std::string parquet_created_by_;
diff --git a/cpp/src/parquet/size_statistics_test.cc 
b/cpp/src/parquet/size_statistics_test.cc
index 90d6df57e7..6e8cec9a13 100644
--- a/cpp/src/parquet/size_statistics_test.cc
+++ b/cpp/src/parquet/size_statistics_test.cc
@@ -140,6 +140,7 @@ class SizeStatisticsRoundTripTest : public ::testing::Test {
     auto writer_properties = WriterProperties::Builder()
                                  .max_row_group_length(max_row_group_length)
                                  ->data_pagesize(page_size)
+                                 
->max_rows_per_page(std::numeric_limits<int64_t>::max())
                                  ->write_batch_size(write_batch_size)
                                  ->enable_write_page_index()
                                  ->enable_statistics()

Reply via email to