This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new fcfe2b4160 [Parquet] Allow setting page size per column (#9353)
fcfe2b4160 is described below
commit fcfe2b4160f6c99107dc89edecdfcd3ff9a4c867
Author: Xiangpeng Hao <[email protected]>
AuthorDate: Tue Feb 10 15:11:24 2026 -0600
[Parquet] Allow setting page size per column (#9353)
Page sizes were per-file configuration, this PR allows each column to
have their own page size settings.
This enables a few optimizations, for example, configure smaller pages
size for columns that need better random access.
Related to
- related: https://github.com/apache/arrow-rs/issues/9242
- https://github.com/XiangpengHao/liquid-cache/issues/227
- https://github.com/apache/arrow-rs/issues/8358
cc @alamb @coracuity @JigaoLuo
---
parquet/src/column/writer/mod.rs | 93 +++++++++++++++++++++++++++++++++++-
parquet/src/file/properties.rs | 90 ++++++++++++++++++++++++++--------
parquet/tests/arrow_writer_layout.rs | 45 +++++++++++++++++
3 files changed, 206 insertions(+), 22 deletions(-)
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 417c011275..c014397f13 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -755,7 +755,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
}
self.page_metrics.num_buffered_rows as usize >=
self.props.data_page_row_count_limit()
- || self.encoder.estimated_data_page_size() >=
self.props.data_page_size_limit()
+ || self.encoder.estimated_data_page_size()
+ >= self.props.column_data_page_size_limit(self.descr.path())
}
/// Performs dictionary fallback.
@@ -2502,6 +2503,27 @@ mod tests {
);
}
+ #[test]
+ fn test_column_writer_column_data_page_size_limit() {
+ let props = Arc::new(
+ WriterProperties::builder()
+ .set_writer_version(WriterVersion::PARQUET_1_0)
+ .set_dictionary_enabled(false)
+ .set_data_page_size_limit(1000)
+ .set_column_data_page_size_limit(ColumnPath::from("col"), 10)
+ .set_write_batch_size(3)
+ .build(),
+ );
+ let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
+
+ let col_values =
+ write_and_collect_page_values(ColumnPath::from("col"),
Arc::clone(&props), data);
+ let other_values =
write_and_collect_page_values(ColumnPath::from("other"), props, data);
+
+ assert_eq!(col_values, vec![3, 3, 3, 1]);
+ assert_eq!(other_values, vec![10]);
+ }
+
#[test]
fn test_bool_statistics() {
let stats = statistics_roundtrip::<BoolType>(&[true, false, false,
true]);
@@ -4119,6 +4141,22 @@ mod tests {
get_typed_column_writer::<T>(column_writer)
}
+ fn get_test_column_writer_with_path<'a, T: DataType>(
+ page_writer: Box<dyn PageWriter + 'a>,
+ max_def_level: i16,
+ max_rep_level: i16,
+ props: WriterPropertiesPtr,
+ path: ColumnPath,
+ ) -> ColumnWriterImpl<'a, T> {
+ let descr = Arc::new(get_test_column_descr_with_path::<T>(
+ max_def_level,
+ max_rep_level,
+ path,
+ ));
+ let column_writer = get_column_writer(descr, props, page_writer);
+ get_typed_column_writer::<T>(column_writer)
+ }
+
/// Returns column reader.
fn get_test_column_reader<T: DataType>(
page_reader: Box<dyn PageReader>,
@@ -4145,6 +4183,59 @@ mod tests {
ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level,
path)
}
+ fn get_test_column_descr_with_path<T: DataType>(
+ max_def_level: i16,
+ max_rep_level: i16,
+ path: ColumnPath,
+ ) -> ColumnDescriptor {
+ let name = path.string();
+ let tpe = SchemaType::primitive_type_builder(&name,
T::get_physical_type())
+ // length is set for "encoding support" tests for
FIXED_LEN_BYTE_ARRAY type,
+ // it should be no-op for other types
+ .with_length(1)
+ .build()
+ .unwrap();
+ ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level,
path)
+ }
+
+ fn write_and_collect_page_values(
+ path: ColumnPath,
+ props: WriterPropertiesPtr,
+ data: &[i32],
+ ) -> Vec<u32> {
+ let mut file = tempfile::tempfile().unwrap();
+ let mut write = TrackedWrite::new(&mut file);
+ let page_writer = Box::new(SerializedPageWriter::new(&mut write));
+ let mut writer =
+ get_test_column_writer_with_path::<Int32Type>(page_writer, 0, 0,
props, path);
+ writer.write_batch(data, None, None).unwrap();
+ let r = writer.close().unwrap();
+
+ drop(write);
+
+ let props = ReaderProperties::builder()
+ .set_backward_compatible_lz4(false)
+ .build();
+ let mut page_reader = Box::new(
+ SerializedPageReader::new_with_properties(
+ Arc::new(file),
+ &r.metadata,
+ r.rows_written as usize,
+ None,
+ Arc::new(props),
+ )
+ .unwrap(),
+ );
+
+ let mut values_per_page = Vec::new();
+ while let Some(page) = page_reader.get_next_page().unwrap() {
+ assert_eq!(page.page_type(), PageType::DATA_PAGE);
+ values_per_page.push(page.num_values());
+ }
+
+ values_per_page
+ }
+
/// Returns page writer that collects pages without serializing them.
fn get_test_page_writer() -> Box<dyn PageWriter> {
Box::new(TestPageWriter {})
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index 38a5a804c0..aa18df0d68 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -153,7 +153,6 @@ pub type WriterPropertiesPtr = Arc<WriterProperties>;
/// ```
#[derive(Debug, Clone)]
pub struct WriterProperties {
- data_page_size_limit: usize,
data_page_row_count_limit: usize,
write_batch_size: usize,
max_row_group_size: usize,
@@ -204,7 +203,22 @@ impl WriterProperties {
///
/// For more details see
[`WriterPropertiesBuilder::set_data_page_size_limit`]
pub fn data_page_size_limit(&self) -> usize {
- self.data_page_size_limit
+ self.default_column_properties
+ .data_page_size_limit()
+ .unwrap_or(DEFAULT_PAGE_SIZE)
+ }
+
+ /// Returns data page size limit for a specific column.
+ ///
+ /// Takes precedence over [`Self::data_page_size_limit`].
+ ///
+ /// Note: this is a best effort limit based on the write batch size.
+ pub fn column_data_page_size_limit(&self, col: &ColumnPath) -> usize {
+ self.column_properties
+ .get(col)
+ .and_then(|c| c.data_page_size_limit())
+ .or_else(|| self.default_column_properties.data_page_size_limit())
+ .unwrap_or(DEFAULT_PAGE_SIZE)
}
/// Returns dictionary page size limit.
@@ -442,7 +456,6 @@ impl WriterProperties {
/// See example on [`WriterProperties`]
#[derive(Debug, Clone)]
pub struct WriterPropertiesBuilder {
- data_page_size_limit: usize,
data_page_row_count_limit: usize,
write_batch_size: usize,
max_row_group_size: usize,
@@ -465,7 +478,6 @@ impl Default for WriterPropertiesBuilder {
/// Returns default state of the builder.
fn default() -> Self {
Self {
- data_page_size_limit: DEFAULT_PAGE_SIZE,
data_page_row_count_limit: DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT,
write_batch_size: DEFAULT_WRITE_BATCH_SIZE,
max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE,
@@ -490,7 +502,6 @@ impl WriterPropertiesBuilder {
/// Finalizes the configuration and returns immutable writer properties
struct.
pub fn build(self) -> WriterProperties {
WriterProperties {
- data_page_size_limit: self.data_page_size_limit,
data_page_row_count_limit: self.data_page_row_count_limit,
write_batch_size: self.write_batch_size,
max_row_group_size: self.max_row_group_size,
@@ -524,21 +535,6 @@ impl WriterPropertiesBuilder {
self
}
- /// Sets best effort maximum size of a data page in bytes (defaults to
`1024 * 1024`
- /// via [`DEFAULT_PAGE_SIZE`]).
- ///
- /// The parquet writer will attempt to limit the sizes of each
- /// `DataPage` to this many bytes. Reducing this value will result
- /// in larger parquet files, but may improve the effectiveness of
- /// page index based predicate pushdown during reading.
- ///
- /// Note: this is a best effort limit based on value of
- /// [`set_write_batch_size`](Self::set_write_batch_size).
- pub fn set_data_page_size_limit(mut self, value: usize) -> Self {
- self.data_page_size_limit = value;
- self
- }
-
/// Sets best effort maximum number of rows in a data page (defaults to
`20_000`
/// via [`DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT`]).
///
@@ -769,6 +765,22 @@ impl WriterPropertiesBuilder {
self
}
+ /// Sets best effort maximum size of a data page in bytes (defaults to
`1024 * 1024`
+ /// via [`DEFAULT_PAGE_SIZE`]).
+ ///
+ /// The parquet writer will attempt to limit the sizes of each
+ /// `DataPage` to this many bytes. Reducing this value will result
+ /// in larger parquet files, but may improve the effectiveness of
+ /// page index based predicate pushdown during reading.
+ ///
+ /// Note: this is a best effort limit based on value of
+ /// [`set_write_batch_size`](Self::set_write_batch_size).
+ pub fn set_data_page_size_limit(mut self, value: usize) -> Self {
+ self.default_column_properties
+ .set_data_page_size_limit(value);
+ self
+ }
+
/// Sets default [`EnabledStatistics`] level for all columns (defaults to
[`Page`] via
/// [`DEFAULT_STATISTICS_ENABLED`]).
///
@@ -898,6 +910,14 @@ impl WriterPropertiesBuilder {
self
}
+ /// Sets data page size limit for a specific column.
+ ///
+ /// Takes precedence over [`Self::set_data_page_size_limit`].
+ pub fn set_column_data_page_size_limit(mut self, col: ColumnPath, value:
usize) -> Self {
+ self.get_mut_props(col).set_data_page_size_limit(value);
+ self
+ }
+
/// Sets [`EnabledStatistics`] level for a specific column.
///
/// Takes precedence over [`Self::set_statistics_enabled`].
@@ -949,7 +969,6 @@ impl WriterPropertiesBuilder {
impl From<WriterProperties> for WriterPropertiesBuilder {
fn from(props: WriterProperties) -> Self {
WriterPropertiesBuilder {
- data_page_size_limit: props.data_page_size_limit,
data_page_row_count_limit: props.data_page_row_count_limit,
write_batch_size: props.write_batch_size,
max_row_group_size: props.max_row_group_size,
@@ -1065,6 +1084,7 @@ impl Default for BloomFilterProperties {
struct ColumnProperties {
encoding: Option<Encoding>,
codec: Option<Compression>,
+ data_page_size_limit: Option<usize>,
dictionary_page_size_limit: Option<usize>,
dictionary_enabled: Option<bool>,
statistics_enabled: Option<EnabledStatistics>,
@@ -1095,6 +1115,11 @@ impl ColumnProperties {
self.codec = Some(value);
}
+ /// Sets data page size limit for this column.
+ fn set_data_page_size_limit(&mut self, value: usize) {
+ self.data_page_size_limit = Some(value);
+ }
+
/// Sets whether dictionary encoding is enabled for this column.
fn set_dictionary_enabled(&mut self, enabled: bool) {
self.dictionary_enabled = Some(enabled);
@@ -1173,6 +1198,11 @@ impl ColumnProperties {
self.dictionary_page_size_limit
}
+ /// Returns optional data page size limit for this column.
+ fn data_page_size_limit(&self) -> Option<usize> {
+ self.data_page_size_limit
+ }
+
/// Returns optional statistics level requested for this column. If result
is `None`,
/// then no setting has been provided.
fn statistics_enabled(&self) -> Option<EnabledStatistics> {
@@ -1572,6 +1602,24 @@ mod tests {
);
}
+ #[test]
+ fn test_writer_properties_column_data_page_size_limit() {
+ let props = WriterProperties::builder()
+ .set_data_page_size_limit(100)
+ .set_column_data_page_size_limit(ColumnPath::from("col"), 10)
+ .build();
+
+ assert_eq!(props.data_page_size_limit(), 100);
+ assert_eq!(
+ props.column_data_page_size_limit(&ColumnPath::from("col")),
+ 10
+ );
+ assert_eq!(
+ props.column_data_page_size_limit(&ColumnPath::from("other")),
+ 100
+ );
+ }
+
#[test]
fn test_reader_properties_default_settings() {
let props = ReaderProperties::builder().build();
diff --git a/parquet/tests/arrow_writer_layout.rs
b/parquet/tests/arrow_writer_layout.rs
index ca6f89cab4..b9d997beb2 100644
--- a/parquet/tests/arrow_writer_layout.rs
+++ b/parquet/tests/arrow_writer_layout.rs
@@ -28,6 +28,7 @@ use parquet::file::metadata::PageIndexPolicy;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::{ReaderProperties, WriterProperties};
use parquet::file::reader::SerializedPageReader;
+use parquet::schema::types::ColumnPath;
use std::sync::Arc;
struct Layout {
@@ -554,3 +555,47 @@ fn test_list() {
},
});
}
+
+#[test]
+fn test_per_column_data_page_size_limit() {
+ // Test that per-column page size limits work correctly
+ // col_a has a small page size limit (500 bytes), col_b uses the default
(10000 bytes)
+ let col_a = Arc::new(Int32Array::from_iter_values(0..2000)) as _;
+ let col_b = Arc::new(Int32Array::from_iter_values(0..2000)) as _;
+ let batch = RecordBatch::try_from_iter([("col_a", col_a), ("col_b",
col_b)]).unwrap();
+
+ let props = WriterProperties::builder()
+ .set_dictionary_enabled(false)
+ .set_data_page_size_limit(10000)
+ .set_column_data_page_size_limit(ColumnPath::from("col_a"), 500)
+ .set_write_batch_size(10)
+ .build();
+
+ let mut buf = Vec::with_capacity(1024);
+ let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(),
Some(props)).unwrap();
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ let b = Bytes::from(buf);
+ let read_options =
+
ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(true));
+ let reader =
+ ParquetRecordBatchReaderBuilder::try_new_with_options(b.clone(),
read_options).unwrap();
+ let metadata = reader.metadata();
+
+ // Verify we have one row group with two columns
+ assert_eq!(metadata.row_groups().len(), 1);
+ let row_group = &metadata.row_groups()[0];
+ assert_eq!(row_group.columns().len(), 2);
+
+ // Get page counts from offset index
+ let offset_index = metadata.offset_index().unwrap();
+ let col_a_page_count = offset_index[0][0].page_locations.len();
+ let col_b_page_count = offset_index[0][1].page_locations.len();
+
+ // col_a should have many more pages than col_b due to smaller page size
limit
+ // col_a: 500 byte limit for 8000 bytes of data -> 16 pages
+ // col_b: 10000 byte limit for 8000 bytes of data -> 1 page
+ assert_eq!(col_a_page_count, 16);
+ assert_eq!(col_b_page_count, 1);
+}