This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new d9dbf7271 Fix several bugs in parquet writer statistics generation,
add `EnabledStatistics` to control level of statistics generated (#2022)
d9dbf7271 is described below
commit d9dbf7271899e40c30236c619f7c558d2c1f6788
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Jul 8 11:04:59 2022 -0400
Fix several bugs in parquet writer statistics generation, add
`EnabledStatistics` to control level of statistics generated (#2022)
* Fix parquet writer statistics
* Fix test_column_writer_precalculated_statistics
* Handle NaN floats
* Reduce code duplication
* Review feedback
---
parquet/src/arrow/schema.rs | 6 +-
parquet/src/column/writer.rs | 238 +++++++++++++++++++++++++----------------
parquet/src/file/properties.rs | 52 +++++++--
3 files changed, 191 insertions(+), 105 deletions(-)
diff --git a/parquet/src/arrow/schema.rs b/parquet/src/arrow/schema.rs
index a65c75853..97611d0ec 100644
--- a/parquet/src/arrow/schema.rs
+++ b/parquet/src/arrow/schema.rs
@@ -152,7 +152,10 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata(
value: Some(encoded),
};
- let mut meta = props.key_value_metadata.clone().unwrap_or_default();
+ let meta = props
+ .key_value_metadata
+ .get_or_insert_with(Default::default);
+
// check if ARROW:schema exists, and overwrite it
let schema_meta = meta
.iter()
@@ -167,7 +170,6 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata(
meta.push(schema_kv);
}
}
- props.key_value_metadata = Some(meta);
}
/// Convert arrow schema to parquet schema
diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs
index d589aef5a..5def72135 100644
--- a/parquet/src/column/writer.rs
+++ b/parquet/src/column/writer.rs
@@ -17,7 +17,7 @@
//! Contains column writer API.
use parquet_format::{ColumnIndex, OffsetIndex};
-use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData};
+use std::{collections::VecDeque, convert::TryFrom, marker::PhantomData};
use crate::basic::{Compression, ConvertedType, Encoding, LogicalType,
PageType, Type};
use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
@@ -31,12 +31,13 @@ use crate::encodings::{
};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder};
+use crate::file::properties::EnabledStatistics;
use crate::file::statistics::Statistics;
use crate::file::{
metadata::ColumnChunkMetaData,
properties::{WriterProperties, WriterPropertiesPtr, WriterVersion},
};
-use crate::schema::types::ColumnDescPtr;
+use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
use crate::util::bit_util::FromBytes;
use crate::util::memory::ByteBufferPtr;
@@ -177,6 +178,8 @@ pub struct ColumnWriterImpl<'a, T: DataType> {
// Column writer properties
descr: ColumnDescPtr,
props: WriterPropertiesPtr,
+ statistics_enabled: EnabledStatistics,
+
page_writer: Box<dyn PageWriter + 'a>,
has_dictionary: bool,
dict_encoder: Option<DictEncoder<T>>,
@@ -243,9 +246,12 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
)
.unwrap();
+ let statistics_enabled = props.statistics_enabled(descr.path());
+
Self {
descr,
props,
+ statistics_enabled,
page_writer,
has_dictionary,
dict_encoder,
@@ -302,53 +308,48 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
// Find out the minimal length to prevent index out of bound errors.
let mut min_len = values.len();
if let Some(levels) = def_levels {
- min_len = cmp::min(min_len, levels.len());
+ min_len = min_len.min(levels.len());
}
if let Some(levels) = rep_levels {
- min_len = cmp::min(min_len, levels.len());
+ min_len = min_len.min(levels.len());
}
// Find out number of batches to process.
let write_batch_size = self.props.write_batch_size();
let num_batches = min_len / write_batch_size;
- // Process pre-calculated statistics
- match (min, max) {
- (Some(min), Some(max)) => {
- if self
- .min_column_value
- .as_ref()
- .map_or(true, |v| self.compare_greater(v, min))
- {
- self.min_column_value = Some(min.clone());
+ // If only computing chunk-level statistics compute them here,
page-level statistics
+ // are computed in [`Self::write_mini_batch`] and used to update chunk
statistics in
+ // [`Self::add_data_page`]
+ if self.statistics_enabled == EnabledStatistics::Chunk {
+ match (min, max) {
+ (Some(min), Some(max)) => {
+ Self::update_min(&self.descr, min, &mut
self.min_column_value);
+ Self::update_max(&self.descr, max, &mut
self.max_column_value);
}
- if self
- .max_column_value
- .as_ref()
- .map_or(true, |v| self.compare_greater(max, v))
- {
- self.max_column_value = Some(max.clone());
+ (None, Some(_)) | (Some(_), None) => {
+ panic!("min/max should be both set or both None")
}
- }
- (None, Some(_)) | (Some(_), None) => {
- panic!("min/max should be both set or both None")
- }
- (None, None) => {}
+ (None, None) => {
+ for val in values {
+ Self::update_min(&self.descr, val, &mut
self.min_column_value);
+ Self::update_max(&self.descr, val, &mut
self.max_column_value);
+ }
+ }
+ };
}
- if let Some(distinct) = distinct_count {
- self.column_distinct_count =
- Some(self.column_distinct_count.unwrap_or(0) + distinct);
+ // We can only set the distinct count if there are no other writes
+ if self.num_buffered_values == 0 && self.num_page_nulls == 0 {
+ self.column_distinct_count = distinct_count;
+ } else {
+ self.column_distinct_count = None;
}
if let Some(nulls) = null_count {
self.num_column_nulls += nulls;
}
- let calculate_page_stats = (min.is_none() || max.is_none())
- && null_count.is_none()
- && distinct_count.is_none();
-
let mut values_offset = 0;
let mut levels_offset = 0;
for _ in 0..num_batches {
@@ -356,7 +357,6 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
&values[values_offset..values_offset + write_batch_size],
def_levels.map(|lv| &lv[levels_offset..levels_offset +
write_batch_size]),
rep_levels.map(|lv| &lv[levels_offset..levels_offset +
write_batch_size]),
- calculate_page_stats,
)?;
levels_offset += write_batch_size;
}
@@ -365,7 +365,6 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
&values[values_offset..],
def_levels.map(|lv| &lv[levels_offset..]),
rep_levels.map(|lv| &lv[levels_offset..]),
- calculate_page_stats,
)?;
// Return total number of values processed.
@@ -393,9 +392,13 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
self.write_batch_internal(values, def_levels, rep_levels, None, None,
None, None)
}
- /// Writer may optionally provide pre-calculated statistics for this
batch, in which case we do
- /// not calculate page level statistics as this will defeat the purpose of
speeding up the write
- /// process with pre-calculated statistics.
+ /// Writer may optionally provide pre-calculated statistics for use when
computing
+ /// chunk-level statistics
+ ///
+ /// NB: [`WriterProperties::statistics_enabled`] must be set to
[`EnabledStatistics::Chunk`]
+ /// for these statistics to take effect. If [`EnabledStatistics::None`]
they will be ignored,
+ /// and if [`EnabledStatistics::Page`] the chunk statistics will instead
be computed from the
+ /// computed page statistics
pub fn write_batch_with_statistics(
&mut self,
values: &[T::T],
@@ -466,7 +469,6 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
values: &[T::T],
def_levels: Option<&[i16]>,
rep_levels: Option<&[i16]>,
- calculate_page_stats: bool,
) -> Result<usize> {
let mut values_to_write = 0;
@@ -494,7 +496,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
for &level in levels {
if level == self.descr.max_def_level() {
values_to_write += 1;
- } else if calculate_page_stats {
+ } else if self.statistics_enabled == EnabledStatistics::Page {
self.num_page_nulls += 1
}
}
@@ -537,7 +539,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
)
})?;
- if calculate_page_stats {
+ if self.statistics_enabled == EnabledStatistics::Page {
for val in values_to_write {
self.update_page_min_max(val);
}
@@ -549,7 +551,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
self.num_buffered_encoded_values +=
u32::try_from(values_to_write.len()).unwrap();
if self.should_add_data_page() {
- self.add_data_page(calculate_page_stats)?;
+ self.add_data_page()?;
}
if self.should_dict_fallback() {
@@ -661,7 +663,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
/// Adds data page.
/// Data page is either buffered in case of dictionary encoding or written
directly.
- fn add_data_page(&mut self, calculate_page_stat: bool) -> Result<()> {
+ fn add_data_page(&mut self) -> Result<()> {
// Extract encoded values
let value_bytes = match self.dict_encoder {
Some(ref mut encoder) => encoder.write_indices()?,
@@ -678,14 +680,15 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
let max_def_level = self.descr.max_def_level();
let max_rep_level = self.descr.max_rep_level();
- // always update column NULL count, no matter if page stats are used
self.num_column_nulls += self.num_page_nulls;
- let page_statistics = if calculate_page_stat {
- self.update_column_min_max();
- Some(self.make_page_statistics())
- } else {
- None
+ let has_min_max = self.min_page_value.is_some() &&
self.max_page_value.is_some();
+ let page_statistics = match self.statistics_enabled {
+ EnabledStatistics::Page if has_min_max => {
+ self.update_column_min_max();
+ Some(self.make_page_statistics())
+ }
+ _ => None,
};
// update column and offset index
@@ -810,10 +813,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
#[inline]
fn flush_data_pages(&mut self) -> Result<()> {
// Write all outstanding data to a new page.
- let calculate_page_stats =
- self.min_page_value.is_some() && self.max_page_value.is_some();
if self.num_buffered_values > 0 {
- self.add_data_page(calculate_page_stats)?;
+ self.add_data_page()?;
}
while let Some(page) = self.data_pages.pop_front() {
@@ -1029,8 +1030,36 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
}
}
- #[allow(clippy::eq_op)]
fn update_page_min_max(&mut self, val: &T::T) {
+ Self::update_min(&self.descr, val, &mut self.min_page_value);
+ Self::update_max(&self.descr, val, &mut self.max_page_value);
+ }
+
+ fn update_column_min_max(&mut self) {
+ let min = self.min_page_value.as_ref().unwrap();
+ Self::update_min(&self.descr, min, &mut self.min_column_value);
+
+ let max = self.max_page_value.as_ref().unwrap();
+ Self::update_max(&self.descr, max, &mut self.max_column_value);
+ }
+
+ fn update_min(descr: &ColumnDescriptor, val: &T::T, min: &mut
Option<T::T>) {
+ Self::update_stat(val, min, |cur| Self::compare_greater(descr, cur,
val))
+ }
+
+ fn update_max(descr: &ColumnDescriptor, val: &T::T, max: &mut
Option<T::T>) {
+ Self::update_stat(val, max, |cur| Self::compare_greater(descr, val,
cur))
+ }
+
+ /// Perform a conditional update of `cur`, skipping any NaN values
+ ///
+ /// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls
`should_update` with
+ /// the value of `cur`, and updates `cur` to `Some(val)` if it returns
`true`
+ #[allow(clippy::eq_op)]
+ fn update_stat<F>(val: &T::T, cur: &mut Option<T::T>, should_update: F)
+ where
+ F: Fn(&T::T) -> bool,
+ {
if let Type::FLOAT | Type::DOUBLE = T::get_physical_type() {
// Skip NaN values
if val != val {
@@ -1038,50 +1067,21 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
}
}
- if self
- .min_page_value
- .as_ref()
- .map_or(true, |min| self.compare_greater(min, val))
- {
- self.min_page_value = Some(val.clone());
- }
- if self
- .max_page_value
- .as_ref()
- .map_or(true, |max| self.compare_greater(val, max))
- {
- self.max_page_value = Some(val.clone());
- }
- }
-
- fn update_column_min_max(&mut self) {
- let update_min = self.min_column_value.as_ref().map_or(true, |min| {
- let page_value = self.min_page_value.as_ref().unwrap();
- self.compare_greater(min, page_value)
- });
- if update_min {
- self.min_column_value = self.min_page_value.clone();
- }
-
- let update_max = self.max_column_value.as_ref().map_or(true, |max| {
- let page_value = self.max_page_value.as_ref().unwrap();
- self.compare_greater(page_value, max)
- });
- if update_max {
- self.max_column_value = self.max_page_value.clone();
+ if cur.as_ref().map_or(true, should_update) {
+ *cur = Some(val.clone());
}
}
/// Evaluate `a > b` according to underlying logical type.
- fn compare_greater(&self, a: &T::T, b: &T::T) -> bool {
- if let Some(LogicalType::Integer { is_signed, .. }) =
self.descr.logical_type() {
+ fn compare_greater(descr: &ColumnDescriptor, a: &T::T, b: &T::T) -> bool {
+ if let Some(LogicalType::Integer { is_signed, .. }) =
descr.logical_type() {
if !is_signed {
// need to compare unsigned
return a.as_u64().unwrap() > b.as_u64().unwrap();
}
}
- match self.descr.converted_type() {
+ match descr.converted_type() {
ConvertedType::UINT_8
| ConvertedType::UINT_16
| ConvertedType::UINT_32
@@ -1091,8 +1091,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
_ => {}
};
- if let Some(LogicalType::Decimal { .. }) = self.descr.logical_type() {
- match self.descr.physical_type() {
+ if let Some(LogicalType::Decimal { .. }) = descr.logical_type() {
+ match T::get_physical_type() {
Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
return compare_greater_byte_array_decimals(
a.as_bytes(),
@@ -1103,8 +1103,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
};
}
- if self.descr.converted_type() == ConvertedType::DECIMAL {
- match self.descr.physical_type() {
+ if descr.converted_type() == ConvertedType::DECIMAL {
+ match T::get_physical_type() {
Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
return compare_greater_byte_array_decimals(
a.as_bytes(),
@@ -1713,7 +1713,11 @@ mod tests {
#[test]
fn test_column_writer_precalculated_statistics() {
let page_writer = get_test_page_writer();
- let props = Arc::new(WriterProperties::builder().build());
+ let props = Arc::new(
+ WriterProperties::builder()
+ .set_statistics_enabled(EnabledStatistics::Chunk)
+ .build(),
+ );
let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0,
0, props);
writer
.write_batch_with_statistics(
@@ -1754,6 +1758,56 @@ mod tests {
}
}
+ #[test]
+ fn test_mixed_precomputed_statistics() {
+ let mut buf = Vec::with_capacity(100);
+ let mut write = TrackedWrite::new(&mut buf);
+ let page_writer = Box::new(SerializedPageWriter::new(&mut write));
+ let props = Arc::new(WriterProperties::builder().build());
+ let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0,
0, props);
+
+ writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
+ writer
+ .write_batch_with_statistics(
+ &[5, 6, 7],
+ None,
+ None,
+ Some(&5),
+ Some(&7),
+ Some(0),
+ Some(3),
+ )
+ .unwrap();
+
+ let (_, _, metadata, _, _) = writer.close().unwrap();
+
+ let stats = metadata.statistics().unwrap();
+ assert_eq!(stats.min_bytes(), 1_i32.to_le_bytes());
+ assert_eq!(stats.max_bytes(), 7_i32.to_le_bytes());
+ assert_eq!(stats.null_count(), 0);
+ assert!(stats.distinct_count().is_none());
+
+ let reader = SerializedPageReader::new(
+ std::io::Cursor::new(buf),
+ 7,
+ Compression::UNCOMPRESSED,
+ Type::INT32,
+ )
+ .unwrap();
+
+ let pages = reader.collect::<Result<Vec<_>>>().unwrap();
+ assert_eq!(pages.len(), 2);
+
+ assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
+ assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
+
+ let page_statistics = pages[1].statistics().unwrap();
+ assert_eq!(page_statistics.min_bytes(), 1_i32.to_le_bytes());
+ assert_eq!(page_statistics.max_bytes(), 7_i32.to_le_bytes());
+ assert_eq!(page_statistics.null_count(), 0);
+ assert!(page_statistics.distinct_count().is_none());
+ }
+
#[test]
fn test_column_writer_empty_column_roundtrip() {
let props = WriterProperties::builder().build();
@@ -2279,10 +2333,10 @@ mod tests {
let mut max_batch_size = values.len();
if let Some(levels) = def_levels {
- max_batch_size = cmp::max(max_batch_size, levels.len());
+ max_batch_size = max_batch_size.max(levels.len());
}
if let Some(levels) = rep_levels {
- max_batch_size = cmp::max(max_batch_size, levels.len());
+ max_batch_size = max_batch_size.max(levels.len());
}
let mut writer = get_test_column_writer::<T>(
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index 2baf93933..9ca7c4daa 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -60,7 +60,7 @@ const DEFAULT_WRITER_VERSION: WriterVersion =
WriterVersion::PARQUET_1_0;
const DEFAULT_COMPRESSION: Compression = Compression::UNCOMPRESSED;
const DEFAULT_DICTIONARY_ENABLED: bool = true;
const DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT: usize = DEFAULT_PAGE_SIZE;
-const DEFAULT_STATISTICS_ENABLED: bool = true;
+const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Page;
const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096;
const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
const DEFAULT_CREATED_BY: &str = env!("PARQUET_CREATED_BY");
@@ -198,7 +198,7 @@ impl WriterProperties {
}
/// Returns `true` if statistics are enabled for a column.
- pub fn statistics_enabled(&self, col: &ColumnPath) -> bool {
+ pub fn statistics_enabled(&self, col: &ColumnPath) -> EnabledStatistics {
self.column_properties
.get(col)
.and_then(|c| c.statistics_enabled())
@@ -339,7 +339,7 @@ impl WriterPropertiesBuilder {
}
/// Sets flag to enable/disable statistics for any column.
- pub fn set_statistics_enabled(mut self, value: bool) -> Self {
+ pub fn set_statistics_enabled(mut self, value: EnabledStatistics) -> Self {
self.default_column_properties.set_statistics_enabled(value);
self
}
@@ -394,7 +394,11 @@ impl WriterPropertiesBuilder {
/// Sets flag to enable/disable statistics for a column.
/// Takes precedence over globally defined settings.
- pub fn set_column_statistics_enabled(mut self, col: ColumnPath, value:
bool) -> Self {
+ pub fn set_column_statistics_enabled(
+ mut self,
+ col: ColumnPath,
+ value: EnabledStatistics,
+ ) -> Self {
self.get_mut_props(col).set_statistics_enabled(value);
self
}
@@ -411,6 +415,23 @@ impl WriterPropertiesBuilder {
}
}
+/// Controls the level of statistics to be computed by the writer
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+pub enum EnabledStatistics {
+ /// Compute no statistics
+ None,
+ /// Compute chunk-level statistics but not page-level
+ Chunk,
+ /// Compute page-level and chunk-level statistics
+ Page,
+}
+
+impl Default for EnabledStatistics {
+ fn default() -> Self {
+ DEFAULT_STATISTICS_ENABLED
+ }
+}
+
/// Container for column properties that can be changed as part of writer.
///
/// If a field is `None`, it means that no specific value has been set for
this column,
@@ -420,7 +441,7 @@ struct ColumnProperties {
encoding: Option<Encoding>,
codec: Option<Compression>,
dictionary_enabled: Option<bool>,
- statistics_enabled: Option<bool>,
+ statistics_enabled: Option<EnabledStatistics>,
max_statistics_size: Option<usize>,
}
@@ -463,7 +484,7 @@ impl ColumnProperties {
}
/// Sets whether or not statistics are enabled for this column.
- fn set_statistics_enabled(&mut self, enabled: bool) {
+ fn set_statistics_enabled(&mut self, enabled: EnabledStatistics) {
self.statistics_enabled = Some(enabled);
}
@@ -491,7 +512,7 @@ impl ColumnProperties {
/// Returns `Some(true)` if statistics are enabled for this column, if
disabled then
/// returns `Some(false)`. If result is `None`, then no setting has been
provided.
- fn statistics_enabled(&self) -> Option<bool> {
+ fn statistics_enabled(&self) -> Option<EnabledStatistics> {
self.statistics_enabled
}
@@ -613,13 +634,16 @@ mod tests {
.set_encoding(Encoding::DELTA_BINARY_PACKED)
.set_compression(Compression::GZIP)
.set_dictionary_enabled(false)
- .set_statistics_enabled(false)
+ .set_statistics_enabled(EnabledStatistics::None)
.set_max_statistics_size(50)
// specific column settings
.set_column_encoding(ColumnPath::from("col"), Encoding::RLE)
.set_column_compression(ColumnPath::from("col"),
Compression::SNAPPY)
.set_column_dictionary_enabled(ColumnPath::from("col"), true)
- .set_column_statistics_enabled(ColumnPath::from("col"), true)
+ .set_column_statistics_enabled(
+ ColumnPath::from("col"),
+ EnabledStatistics::Chunk,
+ )
.set_column_max_statistics_size(ColumnPath::from("col"), 123)
.build();
@@ -642,7 +666,10 @@ mod tests {
);
assert_eq!(props.compression(&ColumnPath::from("a")),
Compression::GZIP);
assert!(!props.dictionary_enabled(&ColumnPath::from("a")));
- assert!(!props.statistics_enabled(&ColumnPath::from("a")));
+ assert_eq!(
+ props.statistics_enabled(&ColumnPath::from("a")),
+ EnabledStatistics::None
+ );
assert_eq!(props.max_statistics_size(&ColumnPath::from("a")), 50);
assert_eq!(
@@ -654,7 +681,10 @@ mod tests {
Compression::SNAPPY
);
assert!(props.dictionary_enabled(&ColumnPath::from("col")));
- assert!(props.statistics_enabled(&ColumnPath::from("col")));
+ assert_eq!(
+ props.statistics_enabled(&ColumnPath::from("col")),
+ EnabledStatistics::Chunk
+ );
assert_eq!(props.max_statistics_size(&ColumnPath::from("col")), 123);
}