This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new fcfe2b4160 [Parquet] Allow setting page size per column (#9353)
fcfe2b4160 is described below

commit fcfe2b4160f6c99107dc89edecdfcd3ff9a4c867
Author: Xiangpeng Hao <[email protected]>
AuthorDate: Tue Feb 10 15:11:24 2026 -0600

    [Parquet] Allow setting page size per column (#9353)
    
    Page sizes were per-file configuration, this PR allows each column to
    have their own page size settings.
    
    This enables a few optimizations, for example, configure smaller pages
    size for columns that need better random access.
    
    Related to
    - related: https://github.com/apache/arrow-rs/issues/9242
    - https://github.com/XiangpengHao/liquid-cache/issues/227
    - https://github.com/apache/arrow-rs/issues/8358
    
     cc @alamb @coracuity @JigaoLuo
---
 parquet/src/column/writer/mod.rs     | 93 +++++++++++++++++++++++++++++++++++-
 parquet/src/file/properties.rs       | 90 ++++++++++++++++++++++++++--------
 parquet/tests/arrow_writer_layout.rs | 45 +++++++++++++++++
 3 files changed, 206 insertions(+), 22 deletions(-)

diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 417c011275..c014397f13 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -755,7 +755,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
         }
 
         self.page_metrics.num_buffered_rows as usize >= 
self.props.data_page_row_count_limit()
-            || self.encoder.estimated_data_page_size() >= 
self.props.data_page_size_limit()
+            || self.encoder.estimated_data_page_size()
+                >= self.props.column_data_page_size_limit(self.descr.path())
     }
 
     /// Performs dictionary fallback.
@@ -2502,6 +2503,27 @@ mod tests {
         );
     }
 
+    #[test]
+    fn test_column_writer_column_data_page_size_limit() {
+        let props = Arc::new(
+            WriterProperties::builder()
+                .set_writer_version(WriterVersion::PARQUET_1_0)
+                .set_dictionary_enabled(false)
+                .set_data_page_size_limit(1000)
+                .set_column_data_page_size_limit(ColumnPath::from("col"), 10)
+                .set_write_batch_size(3)
+                .build(),
+        );
+        let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
+
+        let col_values =
+            write_and_collect_page_values(ColumnPath::from("col"), 
Arc::clone(&props), data);
+        let other_values = 
write_and_collect_page_values(ColumnPath::from("other"), props, data);
+
+        assert_eq!(col_values, vec![3, 3, 3, 1]);
+        assert_eq!(other_values, vec![10]);
+    }
+
     #[test]
     fn test_bool_statistics() {
         let stats = statistics_roundtrip::<BoolType>(&[true, false, false, 
true]);
@@ -4119,6 +4141,22 @@ mod tests {
         get_typed_column_writer::<T>(column_writer)
     }
 
+    fn get_test_column_writer_with_path<'a, T: DataType>(
+        page_writer: Box<dyn PageWriter + 'a>,
+        max_def_level: i16,
+        max_rep_level: i16,
+        props: WriterPropertiesPtr,
+        path: ColumnPath,
+    ) -> ColumnWriterImpl<'a, T> {
+        let descr = Arc::new(get_test_column_descr_with_path::<T>(
+            max_def_level,
+            max_rep_level,
+            path,
+        ));
+        let column_writer = get_column_writer(descr, props, page_writer);
+        get_typed_column_writer::<T>(column_writer)
+    }
+
     /// Returns column reader.
     fn get_test_column_reader<T: DataType>(
         page_reader: Box<dyn PageReader>,
@@ -4145,6 +4183,59 @@ mod tests {
         ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, 
path)
     }
 
+    fn get_test_column_descr_with_path<T: DataType>(
+        max_def_level: i16,
+        max_rep_level: i16,
+        path: ColumnPath,
+    ) -> ColumnDescriptor {
+        let name = path.string();
+        let tpe = SchemaType::primitive_type_builder(&name, 
T::get_physical_type())
+            // length is set for "encoding support" tests for 
FIXED_LEN_BYTE_ARRAY type,
+            // it should be no-op for other types
+            .with_length(1)
+            .build()
+            .unwrap();
+        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, 
path)
+    }
+
+    fn write_and_collect_page_values(
+        path: ColumnPath,
+        props: WriterPropertiesPtr,
+        data: &[i32],
+    ) -> Vec<u32> {
+        let mut file = tempfile::tempfile().unwrap();
+        let mut write = TrackedWrite::new(&mut file);
+        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
+        let mut writer =
+            get_test_column_writer_with_path::<Int32Type>(page_writer, 0, 0, 
props, path);
+        writer.write_batch(data, None, None).unwrap();
+        let r = writer.close().unwrap();
+
+        drop(write);
+
+        let props = ReaderProperties::builder()
+            .set_backward_compatible_lz4(false)
+            .build();
+        let mut page_reader = Box::new(
+            SerializedPageReader::new_with_properties(
+                Arc::new(file),
+                &r.metadata,
+                r.rows_written as usize,
+                None,
+                Arc::new(props),
+            )
+            .unwrap(),
+        );
+
+        let mut values_per_page = Vec::new();
+        while let Some(page) = page_reader.get_next_page().unwrap() {
+            assert_eq!(page.page_type(), PageType::DATA_PAGE);
+            values_per_page.push(page.num_values());
+        }
+
+        values_per_page
+    }
+
     /// Returns page writer that collects pages without serializing them.
     fn get_test_page_writer() -> Box<dyn PageWriter> {
         Box::new(TestPageWriter {})
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index 38a5a804c0..aa18df0d68 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -153,7 +153,6 @@ pub type WriterPropertiesPtr = Arc<WriterProperties>;
 /// ```
 #[derive(Debug, Clone)]
 pub struct WriterProperties {
-    data_page_size_limit: usize,
     data_page_row_count_limit: usize,
     write_batch_size: usize,
     max_row_group_size: usize,
@@ -204,7 +203,22 @@ impl WriterProperties {
     ///
     /// For more details see 
[`WriterPropertiesBuilder::set_data_page_size_limit`]
     pub fn data_page_size_limit(&self) -> usize {
-        self.data_page_size_limit
+        self.default_column_properties
+            .data_page_size_limit()
+            .unwrap_or(DEFAULT_PAGE_SIZE)
+    }
+
+    /// Returns data page size limit for a specific column.
+    ///
+    /// Takes precedence over [`Self::data_page_size_limit`].
+    ///
+    /// Note: this is a best effort limit based on the write batch size.
+    pub fn column_data_page_size_limit(&self, col: &ColumnPath) -> usize {
+        self.column_properties
+            .get(col)
+            .and_then(|c| c.data_page_size_limit())
+            .or_else(|| self.default_column_properties.data_page_size_limit())
+            .unwrap_or(DEFAULT_PAGE_SIZE)
     }
 
     /// Returns dictionary page size limit.
@@ -442,7 +456,6 @@ impl WriterProperties {
 /// See example on [`WriterProperties`]
 #[derive(Debug, Clone)]
 pub struct WriterPropertiesBuilder {
-    data_page_size_limit: usize,
     data_page_row_count_limit: usize,
     write_batch_size: usize,
     max_row_group_size: usize,
@@ -465,7 +478,6 @@ impl Default for WriterPropertiesBuilder {
     /// Returns default state of the builder.
     fn default() -> Self {
         Self {
-            data_page_size_limit: DEFAULT_PAGE_SIZE,
             data_page_row_count_limit: DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT,
             write_batch_size: DEFAULT_WRITE_BATCH_SIZE,
             max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE,
@@ -490,7 +502,6 @@ impl WriterPropertiesBuilder {
     /// Finalizes the configuration and returns immutable writer properties 
struct.
     pub fn build(self) -> WriterProperties {
         WriterProperties {
-            data_page_size_limit: self.data_page_size_limit,
             data_page_row_count_limit: self.data_page_row_count_limit,
             write_batch_size: self.write_batch_size,
             max_row_group_size: self.max_row_group_size,
@@ -524,21 +535,6 @@ impl WriterPropertiesBuilder {
         self
     }
 
-    /// Sets best effort maximum size of a data page in bytes (defaults to 
`1024 * 1024`
-    /// via [`DEFAULT_PAGE_SIZE`]).
-    ///
-    /// The parquet writer will attempt to limit the sizes of each
-    /// `DataPage` to this many bytes. Reducing this value will result
-    /// in larger parquet files, but may improve the effectiveness of
-    /// page index based predicate pushdown during reading.
-    ///
-    /// Note: this is a best effort limit based on value of
-    /// [`set_write_batch_size`](Self::set_write_batch_size).
-    pub fn set_data_page_size_limit(mut self, value: usize) -> Self {
-        self.data_page_size_limit = value;
-        self
-    }
-
     /// Sets best effort maximum number of rows in a data page (defaults to 
`20_000`
     /// via [`DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT`]).
     ///
@@ -769,6 +765,22 @@ impl WriterPropertiesBuilder {
         self
     }
 
+    /// Sets best effort maximum size of a data page in bytes (defaults to 
`1024 * 1024`
+    /// via [`DEFAULT_PAGE_SIZE`]).
+    ///
+    /// The parquet writer will attempt to limit the sizes of each
+    /// `DataPage` to this many bytes. Reducing this value will result
+    /// in larger parquet files, but may improve the effectiveness of
+    /// page index based predicate pushdown during reading.
+    ///
+    /// Note: this is a best effort limit based on value of
+    /// [`set_write_batch_size`](Self::set_write_batch_size).
+    pub fn set_data_page_size_limit(mut self, value: usize) -> Self {
+        self.default_column_properties
+            .set_data_page_size_limit(value);
+        self
+    }
+
     /// Sets default [`EnabledStatistics`] level for all columns (defaults to 
[`Page`] via
     /// [`DEFAULT_STATISTICS_ENABLED`]).
     ///
@@ -898,6 +910,14 @@ impl WriterPropertiesBuilder {
         self
     }
 
+    /// Sets data page size limit for a specific column.
+    ///
+    /// Takes precedence over [`Self::set_data_page_size_limit`].
+    pub fn set_column_data_page_size_limit(mut self, col: ColumnPath, value: 
usize) -> Self {
+        self.get_mut_props(col).set_data_page_size_limit(value);
+        self
+    }
+
     /// Sets [`EnabledStatistics`] level for a specific column.
     ///
     /// Takes precedence over [`Self::set_statistics_enabled`].
@@ -949,7 +969,6 @@ impl WriterPropertiesBuilder {
 impl From<WriterProperties> for WriterPropertiesBuilder {
     fn from(props: WriterProperties) -> Self {
         WriterPropertiesBuilder {
-            data_page_size_limit: props.data_page_size_limit,
             data_page_row_count_limit: props.data_page_row_count_limit,
             write_batch_size: props.write_batch_size,
             max_row_group_size: props.max_row_group_size,
@@ -1065,6 +1084,7 @@ impl Default for BloomFilterProperties {
 struct ColumnProperties {
     encoding: Option<Encoding>,
     codec: Option<Compression>,
+    data_page_size_limit: Option<usize>,
     dictionary_page_size_limit: Option<usize>,
     dictionary_enabled: Option<bool>,
     statistics_enabled: Option<EnabledStatistics>,
@@ -1095,6 +1115,11 @@ impl ColumnProperties {
         self.codec = Some(value);
     }
 
+    /// Sets data page size limit for this column.
+    fn set_data_page_size_limit(&mut self, value: usize) {
+        self.data_page_size_limit = Some(value);
+    }
+
     /// Sets whether dictionary encoding is enabled for this column.
     fn set_dictionary_enabled(&mut self, enabled: bool) {
         self.dictionary_enabled = Some(enabled);
@@ -1173,6 +1198,11 @@ impl ColumnProperties {
         self.dictionary_page_size_limit
     }
 
+    /// Returns optional data page size limit for this column.
+    fn data_page_size_limit(&self) -> Option<usize> {
+        self.data_page_size_limit
+    }
+
     /// Returns optional statistics level requested for this column. If result 
is `None`,
     /// then no setting has been provided.
     fn statistics_enabled(&self) -> Option<EnabledStatistics> {
@@ -1572,6 +1602,24 @@ mod tests {
         );
     }
 
+    #[test]
+    fn test_writer_properties_column_data_page_size_limit() {
+        let props = WriterProperties::builder()
+            .set_data_page_size_limit(100)
+            .set_column_data_page_size_limit(ColumnPath::from("col"), 10)
+            .build();
+
+        assert_eq!(props.data_page_size_limit(), 100);
+        assert_eq!(
+            props.column_data_page_size_limit(&ColumnPath::from("col")),
+            10
+        );
+        assert_eq!(
+            props.column_data_page_size_limit(&ColumnPath::from("other")),
+            100
+        );
+    }
+
     #[test]
     fn test_reader_properties_default_settings() {
         let props = ReaderProperties::builder().build();
diff --git a/parquet/tests/arrow_writer_layout.rs 
b/parquet/tests/arrow_writer_layout.rs
index ca6f89cab4..b9d997beb2 100644
--- a/parquet/tests/arrow_writer_layout.rs
+++ b/parquet/tests/arrow_writer_layout.rs
@@ -28,6 +28,7 @@ use parquet::file::metadata::PageIndexPolicy;
 use parquet::file::metadata::ParquetMetaData;
 use parquet::file::properties::{ReaderProperties, WriterProperties};
 use parquet::file::reader::SerializedPageReader;
+use parquet::schema::types::ColumnPath;
 use std::sync::Arc;
 
 struct Layout {
@@ -554,3 +555,47 @@ fn test_list() {
         },
     });
 }
+
+#[test]
+fn test_per_column_data_page_size_limit() {
+    // Test that per-column page size limits work correctly
+    // col_a has a small page size limit (500 bytes), col_b uses the default 
(10000 bytes)
+    let col_a = Arc::new(Int32Array::from_iter_values(0..2000)) as _;
+    let col_b = Arc::new(Int32Array::from_iter_values(0..2000)) as _;
+    let batch = RecordBatch::try_from_iter([("col_a", col_a), ("col_b", 
col_b)]).unwrap();
+
+    let props = WriterProperties::builder()
+        .set_dictionary_enabled(false)
+        .set_data_page_size_limit(10000)
+        .set_column_data_page_size_limit(ColumnPath::from("col_a"), 500)
+        .set_write_batch_size(10)
+        .build();
+
+    let mut buf = Vec::with_capacity(1024);
+    let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), 
Some(props)).unwrap();
+    writer.write(&batch).unwrap();
+    writer.close().unwrap();
+
+    let b = Bytes::from(buf);
+    let read_options =
+        
ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(true));
+    let reader =
+        ParquetRecordBatchReaderBuilder::try_new_with_options(b.clone(), 
read_options).unwrap();
+    let metadata = reader.metadata();
+
+    // Verify we have one row group with two columns
+    assert_eq!(metadata.row_groups().len(), 1);
+    let row_group = &metadata.row_groups()[0];
+    assert_eq!(row_group.columns().len(), 2);
+
+    // Get page counts from offset index
+    let offset_index = metadata.offset_index().unwrap();
+    let col_a_page_count = offset_index[0][0].page_locations.len();
+    let col_b_page_count = offset_index[0][1].page_locations.len();
+
+    // col_a should have many more pages than col_b due to smaller page size 
limit
+    // col_a: 500 byte limit for 8000 bytes of data -> 16 pages
+    // col_b: 10000 byte limit for 8000 bytes of data -> 1 page
+    assert_eq!(col_a_page_count, 16);
+    assert_eq!(col_b_page_count, 1);
+}

Reply via email to