This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 0ae9f66d10 Truncate Parquet page data page statistics (#7555)
0ae9f66d10 is described below
commit 0ae9f66d10141c8d5054fd77f73168c7a2ea2819
Author: Ed Seidl <[email protected]>
AuthorDate: Tue Jun 3 04:31:37 2025 -0700
Truncate Parquet page data page statistics (#7555)
# Which issue does this PR close?
Enables workaround for #7489
- Closes https://github.com/apache/arrow-rs/issues/7579
# Rationale for this change
When `WriterProperties::statistics_truncate_length` is set, the column
chunk statistics are truncated, but the page statistics are not. This
can lead to very large page headers that blow up some readers.
# What changes are included in this PR?
Data Page Header statistics are now truncated as well.
# Are there any user-facing changes?
No
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
parquet/src/arrow/arrow_writer/mod.rs | 67 ++++++++++++++++++++++
parquet/src/column/writer/mod.rs | 102 ++++++++++++++++++----------------
parquet/src/file/properties.rs | 16 ++++--
3 files changed, 134 insertions(+), 51 deletions(-)
diff --git a/parquet/src/arrow/arrow_writer/mod.rs
b/parquet/src/arrow/arrow_writer/mod.rs
index 652e12d45a..fbc32b0c4b 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -1324,9 +1324,12 @@ mod tests {
use super::*;
use std::fs::File;
+ use std::io::Seek;
use crate::arrow::arrow_reader::{ParquetRecordBatchReader,
ParquetRecordBatchReaderBuilder};
use crate::arrow::ARROW_SCHEMA_META_KEY;
+ use crate::format::PageHeader;
+ use crate::thrift::TCompactSliceInputProtocol;
use arrow::datatypes::ToByteSlice;
use arrow::datatypes::{DataType, Schema};
use arrow::error::Result as ArrowResult;
@@ -3766,4 +3769,68 @@ mod tests {
.unwrap();
assert_eq!(batches.len(), 0);
}
+
+ #[test]
+ fn test_page_stats_truncation() {
+ let string_field = Field::new("a", DataType::Utf8, false);
+ let binary_field = Field::new("b", DataType::Binary, false);
+ let schema = Schema::new(vec![string_field, binary_field]);
+
+ let raw_string_values = vec!["Blart Versenwald III"];
+ let raw_binary_values = [b"Blart Versenwald III".to_vec()];
+ let raw_binary_value_refs = raw_binary_values
+ .iter()
+ .map(|x| x.as_slice())
+ .collect::<Vec<_>>();
+
+ let string_values = StringArray::from(raw_string_values.clone());
+ let binary_values = BinaryArray::from(raw_binary_value_refs);
+ let batch = RecordBatch::try_new(
+ Arc::new(schema),
+ vec![Arc::new(string_values), Arc::new(binary_values)],
+ )
+ .unwrap();
+
+ let props = WriterProperties::builder()
+ .set_statistics_truncate_length(Some(2))
+ .set_dictionary_enabled(false)
+ .set_encoding(Encoding::PLAIN)
+ .set_compression(crate::basic::Compression::UNCOMPRESSED)
+ .build();
+
+ let mut file = roundtrip_opts(&batch, props);
+
+ // read file and decode page headers
+ // Note: use the thrift API as there is no Rust API to access the
statistics in the page headers
+ let mut buf = vec![];
+ file.seek(std::io::SeekFrom::Start(0)).unwrap();
+ let read = file.read_to_end(&mut buf).unwrap();
+ assert!(read > 0);
+
+ // decode first page header
+ let first_page = &buf[4..];
+ let mut prot = TCompactSliceInputProtocol::new(first_page);
+ let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
+ let stats = hdr.data_page_header.unwrap().statistics;
+ assert!(stats.is_some());
+ let stats = stats.unwrap();
+ // check that min/max were properly truncated
+ assert!(!stats.is_max_value_exact.unwrap());
+ assert!(!stats.is_min_value_exact.unwrap());
+ assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
+ assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
+
+ // check second page now
+ let second_page = &prot.as_slice()[hdr.compressed_page_size as
usize..];
+ let mut prot = TCompactSliceInputProtocol::new(second_page);
+ let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
+ let stats = hdr.data_page_header.unwrap().statistics;
+ assert!(stats.is_some());
+ let stats = stats.unwrap();
+ // check that min/max were properly truncated
+ assert!(!stats.is_max_value_exact.unwrap());
+ assert!(!stats.is_min_value_exact.unwrap());
+ assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
+ assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
+ }
}
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 02570d3f3c..efc7993c70 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -949,6 +949,59 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E>
{
.unwrap_or_else(|| (data.to_vec(), false))
}
+ /// Truncate the min and max values that will be written to a data page
+ /// header or column chunk Statistics
+ fn truncate_statistics(&self, statistics: Statistics) -> Statistics {
+ let backwards_compatible_min_max = self.descr.sort_order().is_signed();
+ match statistics {
+ Statistics::ByteArray(stats) if stats._internal_has_min_max_set()
=> {
+ let (min, did_truncate_min) = self.truncate_min_value(
+ self.props.statistics_truncate_length(),
+ stats.min_bytes_opt().unwrap(),
+ );
+ let (max, did_truncate_max) = self.truncate_max_value(
+ self.props.statistics_truncate_length(),
+ stats.max_bytes_opt().unwrap(),
+ );
+ Statistics::ByteArray(
+ ValueStatistics::new(
+ Some(min.into()),
+ Some(max.into()),
+ stats.distinct_count(),
+ stats.null_count_opt(),
+ backwards_compatible_min_max,
+ )
+ .with_max_is_exact(!did_truncate_max)
+ .with_min_is_exact(!did_truncate_min),
+ )
+ }
+ Statistics::FixedLenByteArray(stats)
+ if (stats._internal_has_min_max_set() &&
self.can_truncate_value()) =>
+ {
+ let (min, did_truncate_min) = self.truncate_min_value(
+ self.props.statistics_truncate_length(),
+ stats.min_bytes_opt().unwrap(),
+ );
+ let (max, did_truncate_max) = self.truncate_max_value(
+ self.props.statistics_truncate_length(),
+ stats.max_bytes_opt().unwrap(),
+ );
+ Statistics::FixedLenByteArray(
+ ValueStatistics::new(
+ Some(min.into()),
+ Some(max.into()),
+ stats.distinct_count(),
+ stats.null_count_opt(),
+ backwards_compatible_min_max,
+ )
+ .with_max_is_exact(!did_truncate_max)
+ .with_min_is_exact(!did_truncate_min),
+ )
+ }
+ stats => stats,
+ }
+ }
+
/// Adds data page.
/// Data page is either buffered in case of dictionary encoding or written
directly.
fn add_data_page(&mut self) -> Result<()> {
@@ -992,6 +1045,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E>
{
.update_variable_length_bytes(values_data.variable_length_bytes);
let page_statistics = page_statistics.map(Statistics::from);
+ let page_statistics = page_statistics.map(|stats|
self.truncate_statistics(stats));
let compressed_page = match self.props.writer_version() {
WriterVersion::PARQUET_1_0 => {
@@ -1147,53 +1201,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
.with_backwards_compatible_min_max(backwards_compatible_min_max)
.into();
- let statistics = match statistics {
- Statistics::ByteArray(stats) if
stats._internal_has_min_max_set() => {
- let (min, did_truncate_min) = self.truncate_min_value(
- self.props.statistics_truncate_length(),
- stats.min_bytes_opt().unwrap(),
- );
- let (max, did_truncate_max) = self.truncate_max_value(
- self.props.statistics_truncate_length(),
- stats.max_bytes_opt().unwrap(),
- );
- Statistics::ByteArray(
- ValueStatistics::new(
- Some(min.into()),
- Some(max.into()),
- stats.distinct_count(),
- stats.null_count_opt(),
- backwards_compatible_min_max,
- )
- .with_max_is_exact(!did_truncate_max)
- .with_min_is_exact(!did_truncate_min),
- )
- }
- Statistics::FixedLenByteArray(stats)
- if (stats._internal_has_min_max_set() &&
self.can_truncate_value()) =>
- {
- let (min, did_truncate_min) = self.truncate_min_value(
- self.props.statistics_truncate_length(),
- stats.min_bytes_opt().unwrap(),
- );
- let (max, did_truncate_max) = self.truncate_max_value(
- self.props.statistics_truncate_length(),
- stats.max_bytes_opt().unwrap(),
- );
- Statistics::FixedLenByteArray(
- ValueStatistics::new(
- Some(min.into()),
- Some(max.into()),
- stats.distinct_count(),
- stats.null_count_opt(),
- backwards_compatible_min_max,
- )
- .with_max_is_exact(!did_truncate_max)
- .with_min_is_exact(!did_truncate_min),
- )
- }
- stats => stats,
- };
+ let statistics = self.truncate_statistics(statistics);
builder = builder
.set_statistics(statistics)
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index 6ce9ebeea5..0b9fa7e912 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -302,11 +302,13 @@ impl WriterProperties {
self.column_index_truncate_length
}
- /// Returns the maximum length of truncated min/max values in statistics.
+ /// Returns the maximum length of truncated min/max values in
[`Statistics`].
///
/// `None` if truncation is disabled, must be greater than 0 otherwise.
///
/// For more details see
[`WriterPropertiesBuilder::set_statistics_truncate_length`]
+ ///
+ /// [`Statistics`]: crate::file::statistics::Statistics
pub fn statistics_truncate_length(&self) -> Option<usize> {
self.statistics_truncate_length
}
@@ -646,16 +648,22 @@ impl WriterPropertiesBuilder {
self
}
- /// Sets the max length of min/max value fields in row group level
+ /// Sets the max length of min/max value fields in row group and data page
header
/// [`Statistics`] (defaults to `None` (no limit) via
[`DEFAULT_STATISTICS_TRUNCATE_LENGTH`]).
///
/// # Notes
- /// Row group level [`Statistics`] are written when
[`Self::set_statistics_enabled`] is
- /// set to [`EnabledStatistics::Chunk`] or [`EnabledStatistics::Page`].
+ /// Row group [`Statistics`] are written when
[`Self::set_statistics_enabled`] is
+ /// set to [`EnabledStatistics::Chunk`] or [`EnabledStatistics::Page`].
Data page header
+ /// [`Statistics`] are written when [`Self::set_statistics_enabled`] is
set to
+ /// [`EnabledStatistics::Page`].
///
/// * If `Some`, must be greater than 0, otherwise will panic
/// * If `None`, there's no effective limit.
///
+ /// # See also
+ /// Truncation of Page Index statistics is controlled separately via
+ /// [`WriterPropertiesBuilder::set_column_index_truncate_length`]
+ ///
/// [`Statistics`]: crate::file::statistics::Statistics
pub fn set_statistics_truncate_length(mut self, max_length: Option<usize>)
-> Self {
if let Some(value) = max_length {