This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new b2190db54c GH-47030: [C++][Parquet] Add setting to limit the number of
rows written per page (#47090)
b2190db54c is described below
commit b2190db54c7ee31a6ce3c3990a66020345471c99
Author: Gang Wu <[email protected]>
AuthorDate: Wed Oct 29 23:38:22 2025 +0800
GH-47030: [C++][Parquet] Add setting to limit the number of rows written
per page (#47090)
### Rationale for this change
Currently only page size is limited. We need to limit number of rows per
page too.
### What changes are included in this PR?
Add `parquet::WriterProperties::max_rows_per_page(int64_t max_rows)` to
limit number of rows per data page.
### Are these changes tested?
Yes
### Are there any user-facing changes?
Yes, users are allowed to set this config value.
* GitHub Issue: #47030
Authored-by: Gang Wu <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/parquet/column_writer.cc | 153 +++++++++++++++---------
cpp/src/parquet/column_writer_test.cc | 200 +++++++++++++++++++++++++++++++-
cpp/src/parquet/properties.h | 23 +++-
cpp/src/parquet/size_statistics_test.cc | 1 +
4 files changed, 314 insertions(+), 63 deletions(-)
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index 1f3d64f622..22c36531cd 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -1150,67 +1150,99 @@ void ColumnWriterImpl::FlushBufferedDataPages() {
// ----------------------------------------------------------------------
// TypedColumnWriter
-template <typename Action>
-inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
- int64_t num_batches = static_cast<int>(total / batch_size);
- for (int round = 0; round < num_batches; round++) {
- action(round * batch_size, batch_size, /*check_page_size=*/true);
- }
- // Write the remaining values
- if (total % batch_size > 0) {
- action(num_batches * batch_size, total % batch_size,
/*check_page_size=*/true);
- }
-}
+// DoInBatches for non-repeated columns
+template <typename Action, typename GetBufferedRows>
+inline void DoInBatchesNonRepeated(int64_t num_levels, int64_t batch_size,
+ int64_t max_rows_per_page, Action&& action,
+ GetBufferedRows&& curr_page_buffered_rows) {
+ int64_t offset = 0;
+ while (offset < num_levels) {
+ int64_t page_buffered_rows = curr_page_buffered_rows();
+ ARROW_DCHECK_LE(page_buffered_rows, max_rows_per_page);
-template <typename Action>
-inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
- int64_t num_levels, int64_t batch_size, Action&&
action,
- bool pages_change_on_record_boundaries) {
- if (!pages_change_on_record_boundaries || !rep_levels) {
- // If rep_levels is null, then we are writing a non-repeated column.
- // In this case, every record contains only one level.
- return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
+ // Every record contains only one level.
+ int64_t max_batch_size = std::min(batch_size, num_levels - offset);
+ max_batch_size = std::min(max_batch_size, max_rows_per_page -
page_buffered_rows);
+ int64_t end_offset = offset + max_batch_size;
+
+ ARROW_DCHECK_LE(offset, end_offset);
+ ARROW_DCHECK_LE(end_offset, num_levels);
+
+ // Always check page limit for non-repeated columns.
+ action(offset, end_offset - offset, /*check_page_limit=*/true);
+
+ offset = end_offset;
}
+}
+// DoInBatches for repeated columns
+template <typename Action, typename GetBufferedRows>
+inline void DoInBatchesRepeated(const int16_t* def_levels, const int16_t*
rep_levels,
+ int64_t num_levels, int64_t batch_size,
+ int64_t max_rows_per_page,
+ bool pages_change_on_record_boundaries,
Action&& action,
+ GetBufferedRows&& curr_page_buffered_rows) {
int64_t offset = 0;
while (offset < num_levels) {
- int64_t end_offset = std::min(offset + batch_size, num_levels);
-
- // Find next record boundary (i.e. rep_level = 0)
- while (end_offset < num_levels && rep_levels[end_offset] != 0) {
- end_offset++;
- }
-
- if (end_offset < num_levels) {
- // This is not the last chunk of batch and end_offset is a record
boundary.
- // It is a good chance to check the page size.
- action(offset, end_offset - offset, /*check_page_size=*/true);
- } else {
- DCHECK_EQ(end_offset, num_levels);
- // This is the last chunk of batch, and we do not know whether
end_offset is a
- // record boundary. Find the offset to beginning of last record in this
chunk,
- // so we can check page size.
- int64_t last_record_begin_offset = num_levels - 1;
- while (last_record_begin_offset >= offset &&
- rep_levels[last_record_begin_offset] != 0) {
- last_record_begin_offset--;
+ int64_t max_batch_size = std::min(batch_size, num_levels - offset);
+ int64_t end_offset = num_levels; // end offset of the current
batch
+ int64_t check_page_limit_end_offset = -1; // offset to check page limit
(if not -1)
+
+ int64_t page_buffered_rows = curr_page_buffered_rows();
+ ARROW_DCHECK_LE(page_buffered_rows, max_rows_per_page);
+
+ // Iterate rep_levels to find the shortest sequence that ends before a
record
+ // boundary (i.e. rep_levels == 0) with a size no less than max_batch_size
+ for (int64_t i = offset; i < num_levels; ++i) {
+ if (rep_levels[i] == 0) {
+ // Use the beginning of last record to check page limit.
+ check_page_limit_end_offset = i;
+ if (i - offset >= max_batch_size || page_buffered_rows >=
max_rows_per_page) {
+ end_offset = i;
+ break;
+ }
+ page_buffered_rows += 1;
}
+ }
- if (offset <= last_record_begin_offset) {
- // We have found the beginning of last record and can check page size.
- action(offset, last_record_begin_offset - offset,
/*check_page_size=*/true);
- offset = last_record_begin_offset;
- }
+ ARROW_DCHECK_LE(offset, end_offset);
+ ARROW_DCHECK_LE(check_page_limit_end_offset, end_offset);
- // Write remaining data after the record boundary,
- // or all data if no boundary was found.
- action(offset, end_offset - offset, /*check_page_size=*/false);
+ if (check_page_limit_end_offset >= 0) {
+ // At least one record boundary is included in this batch.
+ // It is a good chance to check the page limit.
+ action(offset, check_page_limit_end_offset - offset,
/*check_page_limit=*/true);
+ offset = check_page_limit_end_offset;
+ }
+ if (end_offset > offset) {
+ // The is the last chunk of batch, and we do not know whether end_offset
is a
+ // record boundary so we cannot check page limit if pages cannot change
on
+ // record boundaries.
+ ARROW_DCHECK_EQ(end_offset, num_levels);
+ action(offset, end_offset - offset,
+ /*check_page_limit=*/!pages_change_on_record_boundaries);
}
offset = end_offset;
}
}
+template <typename Action, typename GetBufferedRows>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+ int64_t num_levels, int64_t batch_size, int64_t
max_rows_per_page,
+ bool pages_change_on_record_boundaries, Action&&
action,
+ GetBufferedRows&& curr_page_buffered_rows) {
+ if (!rep_levels) {
+ DoInBatchesNonRepeated(num_levels, batch_size, max_rows_per_page,
+ std::forward<Action>(action),
+
std::forward<GetBufferedRows>(curr_page_buffered_rows));
+ } else {
+ DoInBatchesRepeated(def_levels, rep_levels, num_levels, batch_size,
max_rows_per_page,
+ pages_change_on_record_boundaries,
std::forward<Action>(action),
+
std::forward<GetBufferedRows>(curr_page_buffered_rows));
+ }
+}
+
namespace {
bool DictionaryDirectWriteSupported(const ::arrow::Array& array) {
@@ -1318,7 +1350,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
CheckDictionarySizeLimit();
};
DoInBatches(def_levels, rep_levels, num_values,
properties_->write_batch_size(),
- WriteChunk, pages_change_on_record_boundaries());
+ properties_->max_rows_per_page(),
pages_change_on_record_boundaries(),
+ WriteChunk, [this]() { return num_buffered_rows_; });
return value_offset;
}
@@ -1368,7 +1401,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
CheckDictionarySizeLimit();
};
DoInBatches(def_levels, rep_levels, num_values,
properties_->write_batch_size(),
- WriteChunk, pages_change_on_record_boundaries());
+ properties_->max_rows_per_page(),
pages_change_on_record_boundaries(),
+ WriteChunk, [this]() { return num_buffered_rows_; });
}
Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels,
@@ -1769,13 +1803,14 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
}
void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values,
- int64_t num_nulls, bool check_page_size) {
+ int64_t num_nulls, bool check_page_limit) {
num_buffered_values_ += num_levels;
num_buffered_encoded_values_ += num_values;
num_buffered_nulls_ += num_nulls;
- if (check_page_size &&
- current_encoder_->EstimatedDataEncodedSize() >=
properties_->data_pagesize()) {
+ if (check_page_limit &&
+ (current_encoder_->EstimatedDataEncodedSize() >=
properties_->data_pagesize() ||
+ num_buffered_rows_ >= properties_->max_rows_per_page())) {
AddDataPage();
}
}
@@ -1996,9 +2031,10 @@ Status
TypedColumnWriterImpl<ParquetType>::WriteArrowDictionary(
return WriteDense();
}
- PARQUET_CATCH_NOT_OK(DoInBatches(def_levels, rep_levels, num_levels,
- properties_->write_batch_size(),
WriteIndicesChunk,
- pages_change_on_record_boundaries()));
+ PARQUET_CATCH_NOT_OK(
+ DoInBatches(def_levels, rep_levels, num_levels,
properties_->write_batch_size(),
+ properties_->max_rows_per_page(),
pages_change_on_record_boundaries(),
+ WriteIndicesChunk, [this]() { return num_buffered_rows_; }));
return Status::OK();
}
@@ -2441,9 +2477,10 @@ Status
TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(
value_offset += batch_num_spaced_values;
};
- PARQUET_CATCH_NOT_OK(DoInBatches(def_levels, rep_levels, num_levels,
- properties_->write_batch_size(), WriteChunk,
- pages_change_on_record_boundaries()));
+ PARQUET_CATCH_NOT_OK(
+ DoInBatches(def_levels, rep_levels, num_levels,
properties_->write_batch_size(),
+ properties_->max_rows_per_page(),
pages_change_on_record_boundaries(),
+ WriteChunk, [this]() { return num_buffered_rows_; }));
return Status::OK();
}
diff --git a/cpp/src/parquet/column_writer_test.cc
b/cpp/src/parquet/column_writer_test.cc
index 990125df4e..48cac04f07 100644
--- a/cpp/src/parquet/column_writer_test.cc
+++ b/cpp/src/parquet/column_writer_test.cc
@@ -37,6 +37,7 @@
#include "parquet/file_writer.h"
#include "parquet/geospatial/statistics.h"
#include "parquet/metadata.h"
+#include "parquet/page_index.h"
#include "parquet/platform.h"
#include "parquet/properties.h"
#include "parquet/statistics.h"
@@ -108,7 +109,8 @@ class TestPrimitiveWriter : public
PrimitiveTypedTest<TestType> {
const ColumnProperties& column_properties = ColumnProperties(),
const ParquetVersion::type version = ParquetVersion::PARQUET_1_0,
const ParquetDataPageVersion data_page_version =
ParquetDataPageVersion::V1,
- bool enable_checksum = false, int64_t page_size = kDefaultDataPageSize) {
+ bool enable_checksum = false, int64_t page_size = kDefaultDataPageSize,
+ int64_t max_rows_per_page = kDefaultMaxRowsPerPage) {
sink_ = CreateOutputStream();
WriterProperties::Builder wp_builder;
wp_builder.version(version)->data_page_version(data_page_version);
@@ -125,6 +127,7 @@ class TestPrimitiveWriter : public
PrimitiveTypedTest<TestType> {
}
wp_builder.max_statistics_size(column_properties.max_statistics_size());
wp_builder.data_pagesize(page_size);
+ wp_builder.max_rows_per_page(max_rows_per_page);
writer_properties_ = wp_builder.build();
metadata_ = ColumnChunkMetaDataBuilder::Make(writer_properties_,
this->descr_);
@@ -506,6 +509,44 @@ void
TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type compressio
this->SyncValuesOut();
}
+template <>
+void TestPrimitiveWriter<ByteArrayType>::ReadColumnFully(Compression::type
compression,
+ bool
page_checksum_verify) {
+ int64_t total_values = static_cast<int64_t>(this->values_out_.size());
+ BuildReader(total_values, compression, page_checksum_verify);
+ this->data_buffer_.clear();
+
+ values_read_ = 0;
+ while (values_read_ < total_values) {
+ int64_t values_read_recently = 0;
+ reader_->ReadBatch(
+ static_cast<int>(this->values_out_.size()) -
static_cast<int>(values_read_),
+ definition_levels_out_.data() + values_read_,
+ repetition_levels_out_.data() + values_read_,
+ this->values_out_ptr_ + values_read_, &values_read_recently);
+
+ // Compute the total length of the data
+ int64_t total_length = 0;
+ for (int64_t i = 0; i < values_read_recently; i++) {
+ total_length += this->values_out_[i + values_read_].len;
+ }
+
+ // Copy contents of the pointers
+ std::vector<uint8_t> data(total_length);
+ uint8_t* data_ptr = data.data();
+ for (int64_t i = 0; i < values_read_recently; i++) {
+ const ByteArray& value = this->values_out_ptr_[i + values_read_];
+ memcpy(data_ptr, value.ptr, value.len);
+ this->values_out_[i + values_read_].ptr = data_ptr;
+ data_ptr += value.len;
+ }
+ data_buffer_.emplace_back(std::move(data));
+
+ values_read_ += values_read_recently;
+ }
+ this->SyncValuesOut();
+}
+
typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType,
DoubleType,
BooleanType, ByteArrayType, FLBAType>
TestTypes;
@@ -2075,5 +2116,162 @@ TEST_F(TestGeometryValuesWriter,
TestWriteAndReadAllNull) {
EXPECT_EQ(geospatial_statistics->geometry_types(), std::nullopt);
}
+template <typename TestType>
+class TestColumnWriterMaxRowsPerPage : public TestPrimitiveWriter<TestType> {
+ public:
+ TypedColumnWriter<TestType>* BuildWriter(
+ int64_t max_rows_per_page = kDefaultMaxRowsPerPage,
+ int64_t page_size = kDefaultDataPageSize) {
+ this->sink_ = CreateOutputStream();
+ this->writer_properties_ = WriterProperties::Builder()
+ .max_rows_per_page(max_rows_per_page)
+ ->data_pagesize(page_size)
+ ->enable_write_page_index()
+ ->build();
+ file_writer_ = ParquetFileWriter::Open(
+ this->sink_,
std::static_pointer_cast<GroupNode>(this->schema_.schema_root()),
+ this->writer_properties_);
+ return static_cast<TypedColumnWriter<TestType>*>(
+ file_writer_->AppendRowGroup()->NextColumn());
+ }
+
+ void CloseWriter() const { file_writer_->Close(); }
+
+ void BuildReader() {
+ ASSERT_OK_AND_ASSIGN(auto buffer, this->sink_->Finish());
+ file_reader_ = ParquetFileReader::Open(
+ std::make_shared<::arrow::io::BufferReader>(buffer),
default_reader_properties());
+ this->reader_ = std::static_pointer_cast<TypedColumnReader<TestType>>(
+ file_reader_->RowGroup(0)->Column(0));
+ }
+
+ void VerifyMaxRowsPerPage(int64_t max_rows_per_page) const {
+ auto file_meta = file_reader_->metadata();
+ int64_t num_row_groups = file_meta->num_row_groups();
+ ASSERT_EQ(num_row_groups, 1);
+
+ auto page_index_reader = file_reader_->GetPageIndexReader();
+ ASSERT_NE(page_index_reader, nullptr);
+
+ auto row_group_page_index_reader = page_index_reader->RowGroup(0);
+ ASSERT_NE(row_group_page_index_reader, nullptr);
+
+ auto offset_index = row_group_page_index_reader->GetOffsetIndex(0);
+ ASSERT_NE(offset_index, nullptr);
+ size_t num_pages = offset_index->page_locations().size();
+ int64_t num_rows = 0;
+ for (size_t j = 1; j < num_pages; ++j) {
+ int64_t page_rows = offset_index->page_locations()[j].first_row_index -
+ offset_index->page_locations()[j -
1].first_row_index;
+ EXPECT_LE(page_rows, max_rows_per_page);
+ num_rows += page_rows;
+ }
+ if (num_pages != 0) {
+ int64_t last_page_rows = file_meta->RowGroup(0)->num_rows() -
+
offset_index->page_locations().back().first_row_index;
+ EXPECT_LE(last_page_rows, max_rows_per_page);
+ num_rows += last_page_rows;
+ }
+
+ EXPECT_EQ(num_rows, file_meta->RowGroup(0)->num_rows());
+ }
+
+ private:
+ std::shared_ptr<ParquetFileWriter> file_writer_;
+ std::shared_ptr<ParquetFileReader> file_reader_;
+};
+
+TYPED_TEST_SUITE(TestColumnWriterMaxRowsPerPage, TestTypes);
+
+TYPED_TEST(TestColumnWriterMaxRowsPerPage, Optional) {
+ for (int64_t max_rows_per_page : {1, 10, 100}) {
+ this->SetUpSchema(Repetition::OPTIONAL);
+ this->GenerateData(SMALL_SIZE);
+ std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
+ definition_levels[1] = 0;
+
+ auto writer = this->BuildWriter(max_rows_per_page);
+ writer->WriteBatch(this->values_.size(), definition_levels.data(), nullptr,
+ this->values_ptr_);
+ this->CloseWriter();
+
+ this->BuildReader();
+ ASSERT_NO_FATAL_FAILURE(this->VerifyMaxRowsPerPage(max_rows_per_page));
+ }
+}
+
+TYPED_TEST(TestColumnWriterMaxRowsPerPage, OptionalSpaced) {
+ for (int64_t max_rows_per_page : {1, 10, 100}) {
+ this->SetUpSchema(Repetition::OPTIONAL);
+
+ this->GenerateData(SMALL_SIZE);
+ std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
+ std::vector<uint8_t>
valid_bits(::arrow::bit_util::BytesForBits(SMALL_SIZE), 255);
+
+ definition_levels[SMALL_SIZE - 1] = 0;
+ ::arrow::bit_util::ClearBit(valid_bits.data(), SMALL_SIZE - 1);
+ definition_levels[1] = 0;
+ ::arrow::bit_util::ClearBit(valid_bits.data(), 1);
+
+ auto writer = this->BuildWriter(max_rows_per_page);
+ writer->WriteBatchSpaced(this->values_.size(), definition_levels.data(),
nullptr,
+ valid_bits.data(), 0, this->values_ptr_);
+ this->CloseWriter();
+
+ this->BuildReader();
+ ASSERT_NO_FATAL_FAILURE(this->VerifyMaxRowsPerPage(max_rows_per_page));
+ }
+}
+
+TYPED_TEST(TestColumnWriterMaxRowsPerPage, Repeated) {
+ for (int64_t max_rows_per_page : {1, 10, 100}) {
+ this->SetUpSchema(Repetition::REPEATED);
+
+ this->GenerateData(SMALL_SIZE);
+ std::vector<int16_t> definition_levels(SMALL_SIZE);
+ std::vector<int16_t> repetition_levels(SMALL_SIZE);
+
+ // Generate levels to include variable-sized lists and empty lists
+ for (int i = 0; i < SMALL_SIZE; i++) {
+ int list_length = (i % 5) + 1;
+ if (i % 13 == 0 || i % 17 == 0) {
+ list_length = 0;
+ }
+
+ if (list_length == 0) {
+ definition_levels[i] = 0;
+ repetition_levels[i] = 0;
+ } else {
+ for (int j = 0; j < list_length && i + j < SMALL_SIZE; j++) {
+ definition_levels[i + j] = 1;
+ repetition_levels[i + j] = (j == 0) ? 0 : 1;
+ }
+ i += list_length - 1;
+ }
+ }
+
+ auto writer = this->BuildWriter(max_rows_per_page);
+ writer->WriteBatch(this->values_.size(), definition_levels.data(),
+ repetition_levels.data(), this->values_ptr_);
+ this->CloseWriter();
+
+ this->BuildReader();
+ ASSERT_NO_FATAL_FAILURE(this->VerifyMaxRowsPerPage(max_rows_per_page));
+ }
+}
+
+TYPED_TEST(TestColumnWriterMaxRowsPerPage, RequiredLargeChunk) {
+ for (int64_t max_rows_per_page : {10, 100, 10000}) {
+ this->GenerateData(LARGE_SIZE);
+
+ auto writer = this->BuildWriter(max_rows_per_page);
+ writer->WriteBatch(this->values_.size(), nullptr, nullptr,
this->values_ptr_);
+ this->CloseWriter();
+
+ this->BuildReader();
+ ASSERT_NO_FATAL_FAILURE(this->VerifyMaxRowsPerPage(max_rows_per_page));
+ }
+}
+
} // namespace test
} // namespace parquet
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index 5a1799c39d..51b549df22 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -155,6 +155,7 @@ class PARQUET_EXPORT ReaderProperties {
ReaderProperties PARQUET_EXPORT default_reader_properties();
static constexpr int64_t kDefaultDataPageSize = 1024 * 1024;
+static constexpr int64_t kDefaultMaxRowsPerPage = 20'000;
static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true;
static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT =
kDefaultDataPageSize;
static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024;
@@ -293,6 +294,7 @@ class PARQUET_EXPORT WriterProperties {
write_batch_size_(DEFAULT_WRITE_BATCH_SIZE),
max_row_group_length_(DEFAULT_MAX_ROW_GROUP_LENGTH),
pagesize_(kDefaultDataPageSize),
+ max_rows_per_page_(kDefaultMaxRowsPerPage),
version_(ParquetVersion::PARQUET_2_6),
data_page_version_(ParquetDataPageVersion::V1),
created_by_(DEFAULT_CREATED_BY),
@@ -308,6 +310,7 @@ class PARQUET_EXPORT WriterProperties {
write_batch_size_(properties.write_batch_size()),
max_row_group_length_(properties.max_row_group_length()),
pagesize_(properties.data_pagesize()),
+ max_rows_per_page_(properties.max_rows_per_page()),
version_(properties.version()),
data_page_version_(properties.data_page_version()),
created_by_(properties.created_by()),
@@ -422,6 +425,13 @@ class PARQUET_EXPORT WriterProperties {
return this;
}
+ /// Specify the maximum number of rows per data page.
+ /// Default 20K rows.
+ Builder* max_rows_per_page(int64_t max_rows) {
+ max_rows_per_page_ = max_rows;
+ return this;
+ }
+
/// Specify the data page version.
/// Default V1.
Builder* data_page_version(ParquetDataPageVersion data_page_version) {
@@ -768,7 +778,7 @@ class PARQUET_EXPORT WriterProperties {
return std::shared_ptr<WriterProperties>(new WriterProperties(
pool_, dictionary_pagesize_limit_, write_batch_size_,
max_row_group_length_,
- pagesize_, version_, created_by_, page_checksum_enabled_,
+ pagesize_, max_rows_per_page_, version_, created_by_,
page_checksum_enabled_,
size_statistics_level_, std::move(file_encryption_properties_),
default_column_properties_, column_properties, data_page_version_,
store_decimal_as_integer_, std::move(sorting_columns_),
@@ -781,6 +791,7 @@ class PARQUET_EXPORT WriterProperties {
int64_t write_batch_size_;
int64_t max_row_group_length_;
int64_t pagesize_;
+ int64_t max_rows_per_page_;
ParquetVersion::type version_;
ParquetDataPageVersion data_page_version_;
std::string created_by_;
@@ -816,6 +827,8 @@ class PARQUET_EXPORT WriterProperties {
inline int64_t data_pagesize() const { return pagesize_; }
+ inline int64_t max_rows_per_page() const { return max_rows_per_page_; }
+
inline ParquetDataPageVersion data_page_version() const {
return parquet_data_page_version_;
}
@@ -930,9 +943,9 @@ class PARQUET_EXPORT WriterProperties {
private:
explicit WriterProperties(
MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t
write_batch_size,
- int64_t max_row_group_length, int64_t pagesize, ParquetVersion::type
version,
- const std::string& created_by, bool page_write_checksum_enabled,
- SizeStatisticsLevel size_statistics_level,
+ int64_t max_row_group_length, int64_t pagesize, int64_t
max_rows_per_page,
+ ParquetVersion::type version, const std::string& created_by,
+ bool page_write_checksum_enabled, SizeStatisticsLevel
size_statistics_level,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties,
const ColumnProperties& default_column_properties,
const std::unordered_map<std::string, ColumnProperties>&
column_properties,
@@ -944,6 +957,7 @@ class PARQUET_EXPORT WriterProperties {
write_batch_size_(write_batch_size),
max_row_group_length_(max_row_group_length),
pagesize_(pagesize),
+ max_rows_per_page_(max_rows_per_page),
parquet_data_page_version_(data_page_version),
parquet_version_(version),
parquet_created_by_(created_by),
@@ -962,6 +976,7 @@ class PARQUET_EXPORT WriterProperties {
int64_t write_batch_size_;
int64_t max_row_group_length_;
int64_t pagesize_;
+ int64_t max_rows_per_page_;
ParquetDataPageVersion parquet_data_page_version_;
ParquetVersion::type parquet_version_;
std::string parquet_created_by_;
diff --git a/cpp/src/parquet/size_statistics_test.cc
b/cpp/src/parquet/size_statistics_test.cc
index 90d6df57e7..6e8cec9a13 100644
--- a/cpp/src/parquet/size_statistics_test.cc
+++ b/cpp/src/parquet/size_statistics_test.cc
@@ -140,6 +140,7 @@ class SizeStatisticsRoundTripTest : public ::testing::Test {
auto writer_properties = WriterProperties::Builder()
.max_row_group_length(max_row_group_length)
->data_pagesize(page_size)
+
->max_rows_per_page(std::numeric_limits<int64_t>::max())
->write_batch_size(write_batch_size)
->enable_write_page_index()
->enable_statistics()