This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new 12b30dd ARROW-9280: [Rust] [Parquet] Calculate page and column statistics 12b30dd is described below commit 12b30dda1a23bad70e5b11b8cef845d0effd01d4 Author: Ze'ev Maor <ze...@microsoft.com> AuthorDate: Thu Jul 2 17:14:16 2020 -0700 ARROW-9280: [Rust] [Parquet] Calculate page and column statistics Allow writer to provide pre-calculated stats Closes #7622 from zeevm/calculate_parquet_statistics Authored-by: Ze'ev Maor <ze...@microsoft.com> Signed-off-by: Chao Sun <sunc...@apache.org> --- rust/parquet/src/column/writer.rs | 317 +++++++++++++++++++++++++++++++++++--- rust/parquet/src/data_type.rs | 53 ++++++- 2 files changed, 339 insertions(+), 31 deletions(-) diff --git a/rust/parquet/src/column/writer.rs b/rust/parquet/src/column/writer.rs index c54c478..f26c37b 100644 --- a/rust/parquet/src/column/writer.rs +++ b/rust/parquet/src/column/writer.rs @@ -16,23 +16,25 @@ // under the License. //! Contains column writer API. - use std::{cmp, collections::VecDeque, rc::Rc}; use crate::basic::{Compression, Encoding, PageType, Type}; use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter}; use crate::compression::{create_codec, Codec}; +use crate::data_type::AsBytes; use crate::data_type::*; use crate::encodings::{ encoding::{get_encoder, DictEncoder, Encoder}, levels::{max_buffer_size, LevelEncoder}, }; use crate::errors::{ParquetError, Result}; +use crate::file::statistics::Statistics; use crate::file::{ metadata::ColumnChunkMetaData, properties::{WriterProperties, WriterPropertiesPtr, WriterVersion}, }; use crate::schema::types::ColumnDescPtr; +use crate::util::bit_util::FromBytes; use crate::util::memory::{ByteBufferPtr, MemTracker}; /// Column writer for a Parquet type. @@ -47,6 +49,33 @@ pub enum ColumnWriter { FixedLenByteArrayColumnWriter(ColumnWriterImpl<FixedLenByteArrayType>), } +pub enum Level { + Page, + Column, +} + +macro_rules! gen_stats_section { + ($physical_ty: ty, $stat_fn: ident, $min: ident, $max: ident, $distinct: ident, $nulls: ident) => {{ + let min = $min.as_ref().and_then(|v| { + println!("min: {:?} {}", &v.as_bytes(), v.as_bytes().len()); + Some(read_num_bytes!( + $physical_ty, + v.as_bytes().len(), + &v.as_bytes() + )) + }); + let max = $max.as_ref().and_then(|v| { + println!("max: {:?} {}", &v.as_bytes(), v.as_bytes().len()); + Some(read_num_bytes!( + $physical_ty, + v.as_bytes().len(), + &v.as_bytes() + )) + }); + Statistics::$stat_fn(min, max, $distinct, $nulls, false) + }}; +} + /// Gets a specific column writer corresponding to column descriptor `descr`. pub fn get_column_writer( descr: ColumnDescPtr, @@ -149,6 +178,10 @@ pub struct ColumnWriterImpl<T: DataType> { num_buffered_values: u32, num_buffered_encoded_values: u32, num_buffered_rows: u32, + min_page_value: Option<T::T>, + max_page_value: Option<T::T>, + num_page_nulls: u64, + page_distinct_count: Option<u64>, // Metrics per column writer total_bytes_written: u64, total_rows_written: u64, @@ -157,6 +190,10 @@ pub struct ColumnWriterImpl<T: DataType> { total_num_values: u64, dictionary_page_offset: Option<u64>, data_page_offset: Option<u64>, + min_column_value: Option<T::T>, + max_column_value: Option<T::T>, + num_column_nulls: u64, + column_distinct_count: Option<u64>, // Reused buffers def_levels_sink: Vec<i16>, rep_levels_sink: Vec<i16>, @@ -216,26 +253,26 @@ impl<T: DataType> ColumnWriterImpl<T> { def_levels_sink: vec![], rep_levels_sink: vec![], data_pages: VecDeque::new(), + min_page_value: None, + max_page_value: None, + num_page_nulls: 0, + page_distinct_count: None, + min_column_value: None, + max_column_value: None, + num_column_nulls: 0, + column_distinct_count: None, } } - /// Writes batch of values, definition levels and repetition levels. - /// Returns number of values processed (written). - /// - /// If definition and repetition levels are provided, we write fully those levels and - /// select how many values to write (this number will be returned), since number of - /// actual written values may be smaller than provided values. - /// - /// If only values are provided, then all values are written and the length of - /// of the values buffer is returned. - /// - /// Definition and/or repetition levels can be omitted, if values are - /// non-nullable and/or non-repeated. - pub fn write_batch( + fn write_batch_internal( &mut self, values: &[T::T], def_levels: Option<&[i16]>, rep_levels: Option<&[i16]>, + min: &Option<T::T>, + max: &Option<T::T>, + null_count: Option<u64>, + distinct_count: Option<u64>, ) -> Result<usize> { // We check for DataPage limits only after we have inserted the values. If a user // writes a large number of values, the DataPage size can be well above the limit. @@ -263,11 +300,45 @@ impl<T: DataType> ColumnWriterImpl<T> { let mut values_offset = 0; let mut levels_offset = 0; + // Process pre-calculated statistics + match (min, max) { + (Some(min), Some(max)) => { + if self.min_column_value.is_none() + || self.min_column_value.as_ref().unwrap() > min + { + self.min_column_value = Some(min.clone()); + } + if self.max_column_value.is_none() + || self.max_column_value.as_ref().unwrap() < max + { + self.max_column_value = Some(max.clone()); + } + } + (None, Some(_)) | (Some(_), None) => { + panic!("min/max should be both set or both None") + } + (None, None) => {} + } + + if let Some(distinct) = distinct_count { + self.column_distinct_count = + Some(self.column_distinct_count.unwrap_or(0) + distinct); + } + + if let Some(nulls) = null_count { + self.num_column_nulls += nulls; + } + + let calculate_page_stats = (min.is_none() || max.is_none()) + && null_count.is_none() + && distinct_count.is_none(); + for _ in 0..num_batches { values_offset += self.write_mini_batch( &values[values_offset..values_offset + write_batch_size], def_levels.map(|lv| &lv[levels_offset..levels_offset + write_batch_size]), rep_levels.map(|lv| &lv[levels_offset..levels_offset + write_batch_size]), + calculate_page_stats, )?; levels_offset += write_batch_size; } @@ -276,12 +347,60 @@ impl<T: DataType> ColumnWriterImpl<T> { &values[values_offset..], def_levels.map(|lv| &lv[levels_offset..]), rep_levels.map(|lv| &lv[levels_offset..]), + calculate_page_stats, )?; // Return total number of values processed. Ok(values_offset) } + /// Writes batch of values, definition levels and repetition levels. + /// Returns number of values processed (written). + /// + /// If definition and repetition levels are provided, we write fully those levels and + /// select how many values to write (this number will be returned), since number of + /// actual written values may be smaller than provided values. + /// + /// If only values are provided, then all values are written and the length of + /// of the values buffer is returned. + /// + /// Definition and/or repetition levels can be omitted, if values are + /// non-nullable and/or non-repeated. + pub fn write_batch( + &mut self, + values: &[T::T], + def_levels: Option<&[i16]>, + rep_levels: Option<&[i16]>, + ) -> Result<usize> { + self.write_batch_internal( + values, def_levels, rep_levels, &None, &None, None, None, + ) + } + + /// Writer may optionally provide pre-calculated statistics for this batch, in which case we do + /// not calculate page level statistics as this will defeat the purpose of speeding up the write + /// process with pre-calculated statistics. + pub fn write_batch_with_statistics( + &mut self, + values: &[T::T], + def_levels: Option<&[i16]>, + rep_levels: Option<&[i16]>, + min: &Option<T::T>, + max: &Option<T::T>, + nulls_count: Option<u64>, + distinct_count: Option<u64>, + ) -> Result<usize> { + self.write_batch_internal( + values, + def_levels, + rep_levels, + min, + max, + nulls_count, + distinct_count, + ) + } + /// Returns total number of bytes written by this column writer so far. /// This value is also returned when column writer is closed. pub fn get_total_bytes_written(&self) -> u64 { @@ -316,6 +435,7 @@ impl<T: DataType> ColumnWriterImpl<T> { values: &[T::T], def_levels: Option<&[i16]>, rep_levels: Option<&[i16]>, + calculate_page_stats: bool, ) -> Result<usize> { let num_values; let mut values_to_write = 0; @@ -346,7 +466,13 @@ impl<T: DataType> ColumnWriterImpl<T> { let levels = def_levels.unwrap(); num_values = levels.len(); for &level in levels { - values_to_write += (level == self.descr.max_def_level()) as usize; + if level == self.descr.max_def_level() { + values_to_write += 1; + } else { + if calculate_page_stats { + self.num_page_nulls += 1 + }; + } } self.write_definition_levels(levels); @@ -387,7 +513,11 @@ impl<T: DataType> ColumnWriterImpl<T> { )); } - // TODO: update page statistics + if calculate_page_stats { + for val in &values[0..values_to_write] { + self.update_page_min_max(val); + } + } self.write_values(&values[0..values_to_write])?; @@ -395,7 +525,7 @@ impl<T: DataType> ColumnWriterImpl<T> { self.num_buffered_encoded_values += values_to_write as u32; if self.should_add_data_page() { - self.add_data_page()?; + self.add_data_page(calculate_page_stats)?; } if self.should_dict_fallback() { @@ -463,7 +593,7 @@ impl<T: DataType> ColumnWriterImpl<T> { /// Adds data page. /// Data page is either buffered in case of dictionary encoding or written directly. - fn add_data_page(&mut self) -> Result<()> { + fn add_data_page(&mut self, calculate_page_stat: bool) -> Result<()> { // Extract encoded values let value_bytes = match self.dict_encoder { Some(ref mut encoder) => encoder.write_indices()?, @@ -480,6 +610,14 @@ impl<T: DataType> ColumnWriterImpl<T> { let max_def_level = self.descr.max_def_level(); let max_rep_level = self.descr.max_rep_level(); + let mut page_statistics: Option<Statistics> = None; + + if calculate_page_stat { + self.update_column_min_max(); + self.num_column_nulls += self.num_page_nulls; + page_statistics = Some(self.make_page_statistics()); + } + let compressed_page = match self.props.writer_version() { WriterVersion::PARQUET_1_0 => { let mut buffer = vec![]; @@ -519,8 +657,7 @@ impl<T: DataType> ColumnWriterImpl<T> { encoding, def_level_encoding: Encoding::RLE, rep_level_encoding: Encoding::RLE, - // TODO: process statistics - statistics: None, + statistics: page_statistics, }; CompressedPage::new(data_page, uncompressed_size) @@ -570,8 +707,7 @@ impl<T: DataType> ColumnWriterImpl<T> { def_levels_byte_len: def_levels_byte_len as u32, rep_levels_byte_len: rep_levels_byte_len as u32, is_compressed: self.compressor.is_some(), - // TODO: process statistics - statistics: None, + statistics: page_statistics, }; CompressedPage::new(data_page, uncompressed_size) @@ -594,6 +730,10 @@ impl<T: DataType> ColumnWriterImpl<T> { self.num_buffered_values = 0; self.num_buffered_encoded_values = 0; self.num_buffered_rows = 0; + self.min_page_value = None; + self.max_page_value = None; + self.num_page_nulls = 0; + self.page_distinct_count = None; Ok(()) } @@ -603,8 +743,10 @@ impl<T: DataType> ColumnWriterImpl<T> { #[inline] fn flush_data_pages(&mut self) -> Result<()> { // Write all outstanding data to a new page. + let calculate_page_stats = + self.min_page_value.is_some() && self.max_page_value.is_some(); if self.num_buffered_values > 0 { - self.add_data_page()?; + self.add_data_page(calculate_page_stats)?; } while let Some(page) = self.data_pages.pop_front() { @@ -643,6 +785,7 @@ impl<T: DataType> ColumnWriterImpl<T> { // We use only RLE level encoding for data page v1 and data page v2. encodings.push(Encoding::RLE); + let statistics = self.make_column_statistics(); let metadata = ColumnChunkMetaData::builder(self.descr.clone()) .set_compression(self.codec) .set_encodings(encodings) @@ -652,6 +795,7 @@ impl<T: DataType> ColumnWriterImpl<T> { .set_num_values(num_values) .set_data_page_offset(data_page_offset) .set_dictionary_page_offset(dict_page_offset) + .set_statistics(statistics) .build()?; self.page_writer.write_metadata(&metadata)?; @@ -755,6 +899,72 @@ impl<T: DataType> ColumnWriterImpl<T> { fn get_page_writer_ref(&self) -> &Box<PageWriter> { &self.page_writer } + + fn make_column_statistics(&self) -> Statistics { + self.make_typed_statistics(Level::Column) + } + + fn make_page_statistics(&self) -> Statistics { + self.make_typed_statistics(Level::Page) + } + + pub fn make_typed_statistics(&self, level: Level) -> Statistics { + let (min, max, distinct, nulls) = match level { + Level::Page => ( + self.min_page_value.as_ref(), + self.max_page_value.as_ref(), + self.page_distinct_count, + self.num_page_nulls, + ), + Level::Column => ( + self.min_column_value.as_ref(), + self.max_column_value.as_ref(), + self.column_distinct_count, + self.num_column_nulls, + ), + }; + match self.descr.physical_type() { + Type::INT32 => gen_stats_section!(i32, int32, min, max, distinct, nulls), + Type::BOOLEAN => gen_stats_section!(i32, int32, min, max, distinct, nulls), + Type::INT64 => gen_stats_section!(i64, int64, min, max, distinct, nulls), + Type::INT96 => gen_stats_section!(Int96, int96, min, max, distinct, nulls), + Type::FLOAT => gen_stats_section!(f32, float, min, max, distinct, nulls), + Type::DOUBLE => gen_stats_section!(f64, double, min, max, distinct, nulls), + Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => { + let min = min + .as_ref() + .and_then(|v| Some(ByteArray::from(v.as_bytes().to_vec()))); + let max = max + .as_ref() + .and_then(|v| Some(ByteArray::from(v.as_bytes().to_vec()))); + Statistics::byte_array(min, max, distinct, nulls, false) + } + } + } + + fn update_page_min_max(&mut self, val: &T::T) { + if self.min_page_value.is_none() || self.min_page_value.as_ref().unwrap() > val { + self.min_page_value = Some(val.clone()); + } + if self.max_page_value.is_none() || self.max_page_value.as_ref().unwrap() < val { + self.max_page_value = Some(val.clone()); + } + } + + fn update_column_min_max(&mut self) { + if self.min_column_value.is_none() + || self.min_column_value.as_ref().unwrap() + > self.min_page_value.as_ref().unwrap() + { + self.min_column_value = self.min_page_value.clone(); + } + if self.max_column_value.is_none() + || self.max_column_value.as_ref().unwrap() + < self.max_page_value.as_ref().unwrap() + { + self.max_column_value = self.max_page_value.clone(); + } + } } // ---------------------------------------------------------------------- @@ -846,8 +1056,6 @@ impl EncodingWriteSupport for ColumnWriterImpl<FixedLenByteArrayType> { #[cfg(test)] mod tests { - use super::*; - use rand::distributions::uniform::SampleUniform; use crate::column::{ @@ -864,6 +1072,8 @@ mod tests { test_common::{get_temp_file, random_numbers_range}, }; + use super::*; + #[test] fn test_column_writer_inconsistent_def_rep_length() { let page_writer = get_test_page_writer(); @@ -1255,6 +1465,63 @@ mod tests { assert_eq!(metadata.uncompressed_size(), 20); assert_eq!(metadata.data_page_offset(), 0); assert_eq!(metadata.dictionary_page_offset(), Some(0)); + if let Some(stats) = metadata.statistics() { + assert!(stats.has_min_max_set()); + assert_eq!(stats.null_count(), 0); + assert_eq!(stats.distinct_count(), None); + if let Statistics::Int32(stats) = stats { + assert_eq!(stats.min(), &1); + assert_eq!(stats.max(), &4); + } else { + assert!(false, "expecting Statistics::Int32"); + } + } else { + assert!(false, "metadata missing statistics"); + } + } + + #[test] + fn test_column_writer_precalculated_statistics() { + let page_writer = get_test_page_writer(); + let props = Rc::new(WriterProperties::builder().build()); + let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props); + writer + .write_batch_with_statistics( + &[1, 2, 3, 4], + None, + None, + &Some(-17), + &Some(9000), + Some(21), + Some(55), + ) + .unwrap(); + + let (bytes_written, rows_written, metadata) = writer.close().unwrap(); + assert_eq!(bytes_written, 20); + assert_eq!(rows_written, 4); + assert_eq!( + metadata.encodings(), + &vec![Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE] + ); + assert_eq!(metadata.num_values(), 8); // dictionary + value indexes + assert_eq!(metadata.compressed_size(), 20); + assert_eq!(metadata.uncompressed_size(), 20); + assert_eq!(metadata.data_page_offset(), 0); + assert_eq!(metadata.dictionary_page_offset(), Some(0)); + if let Some(stats) = metadata.statistics() { + assert!(stats.has_min_max_set()); + assert_eq!(stats.null_count(), 21); + assert_eq!(stats.distinct_count().unwrap_or(0), 55); + if let Statistics::Int32(stats) = stats { + assert_eq!(stats.min(), &-17); + assert_eq!(stats.max(), &9000); + } else { + assert!(false, "expecting Statistics::Int32"); + } + } else { + assert!(false, "metadata missing statistics"); + } } #[test] diff --git a/rust/parquet/src/data_type.rs b/rust/parquet/src/data_type.rs index 1660436..738057a 100644 --- a/rust/parquet/src/data_type.rs +++ b/rust/parquet/src/data_type.rs @@ -17,8 +17,9 @@ //! Data types that connect Parquet physical types with their Rust-specific //! representations. - +use std::cmp::Ordering; use std::mem; +use std::str::from_utf8; use byteorder::{BigEndian, ByteOrder}; @@ -30,11 +31,10 @@ use crate::util::{ bit_util::{from_ne_slice, FromBytes}, memory::{ByteBuffer, ByteBufferPtr}, }; -use std::str::from_utf8; /// Rust representation for logical type INT96, value is backed by an array of `u32`. /// The type only takes 12 bytes, without extra padding. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialOrd)] pub struct Int96 { value: Option<[u32; 3]>, } @@ -103,6 +103,29 @@ pub struct ByteArray { data: Option<ByteBufferPtr>, } +impl PartialOrd for ByteArray { + fn partial_cmp(&self, other: &ByteArray) -> Option<Ordering> { + if self.data.is_some() && other.data.is_some() { + if self.len() > other.len() { + Some(Ordering::Greater) + } else if self.len() < other.len() { + Some(Ordering::Less) + } else { + for (v1, v2) in self.data().iter().zip(other.data().iter()) { + if *v1 > *v2 { + return Some(Ordering::Greater); + } else if *v1 < *v2 { + return Some(Ordering::Less); + } + } + return Some(Ordering::Equal); + } + } else { + None + } + } +} + impl ByteArray { /// Creates new byte array with no data set. pub fn new() -> Self { @@ -405,7 +428,8 @@ pub trait DataType: 'static { + std::default::Default + std::clone::Clone + AsBytes - + FromBytes; + + FromBytes + + PartialOrd; /// Returns Parquet physical type. fn get_physical_type() -> Type; @@ -450,6 +474,7 @@ where macro_rules! make_type { ($name:ident, $physical_ty:path, $reader_ident: ident, $writer_ident: ident, $native_ty:ty, $size:expr) => { + #[derive(Clone)] pub struct $name {} impl DataType for $name { @@ -598,8 +623,8 @@ impl FromBytes for ByteArray { fn from_be_bytes(_bs: Self::Buffer) -> Self { unreachable!() } - fn from_ne_bytes(_bs: Self::Buffer) -> Self { - unreachable!() + fn from_ne_bytes(bs: Self::Buffer) -> Self { + ByteArray::from(bs.to_vec()) } } @@ -690,4 +715,20 @@ mod tests { assert!(Decimal::from_i64(222, 5, 2) != Decimal::from_i32(222, 5, 2)); } + + #[test] + fn test_byte_array_ord() { + let ba1 = ByteArray::from(vec![1, 2, 3]); + let ba11 = ByteArray::from(vec![1, 2, 3]); + let ba2 = ByteArray::from(vec![3, 4]); + let ba3 = ByteArray::from(vec![1, 2, 4]); + let ba4 = ByteArray::from(vec![]); + let ba5 = ByteArray::from(vec![2, 2, 3]); + + assert!(ba1 > ba2); + assert!(ba3 > ba1); + assert!(ba1 > ba4); + assert_eq!(ba1, ba11); + assert!(ba5 > ba1); + } }