This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 12b30dd ARROW-9280: [Rust] [Parquet] Calculate page and column
statistics
12b30dd is described below
commit 12b30dda1a23bad70e5b11b8cef845d0effd01d4
Author: Ze'ev Maor <[email protected]>
AuthorDate: Thu Jul 2 17:14:16 2020 -0700
ARROW-9280: [Rust] [Parquet] Calculate page and column statistics
Allow writer to provide pre-calculated stats
Closes #7622 from zeevm/calculate_parquet_statistics
Authored-by: Ze'ev Maor <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
---
rust/parquet/src/column/writer.rs | 317 +++++++++++++++++++++++++++++++++++---
rust/parquet/src/data_type.rs | 53 ++++++-
2 files changed, 339 insertions(+), 31 deletions(-)
diff --git a/rust/parquet/src/column/writer.rs
b/rust/parquet/src/column/writer.rs
index c54c478..f26c37b 100644
--- a/rust/parquet/src/column/writer.rs
+++ b/rust/parquet/src/column/writer.rs
@@ -16,23 +16,25 @@
// under the License.
//! Contains column writer API.
-
use std::{cmp, collections::VecDeque, rc::Rc};
use crate::basic::{Compression, Encoding, PageType, Type};
use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
use crate::compression::{create_codec, Codec};
+use crate::data_type::AsBytes;
use crate::data_type::*;
use crate::encodings::{
encoding::{get_encoder, DictEncoder, Encoder},
levels::{max_buffer_size, LevelEncoder},
};
use crate::errors::{ParquetError, Result};
+use crate::file::statistics::Statistics;
use crate::file::{
metadata::ColumnChunkMetaData,
properties::{WriterProperties, WriterPropertiesPtr, WriterVersion},
};
use crate::schema::types::ColumnDescPtr;
+use crate::util::bit_util::FromBytes;
use crate::util::memory::{ByteBufferPtr, MemTracker};
/// Column writer for a Parquet type.
@@ -47,6 +49,33 @@ pub enum ColumnWriter {
FixedLenByteArrayColumnWriter(ColumnWriterImpl<FixedLenByteArrayType>),
}
+pub enum Level {
+ Page,
+ Column,
+}
+
+macro_rules! gen_stats_section {
+ ($physical_ty: ty, $stat_fn: ident, $min: ident, $max: ident, $distinct:
ident, $nulls: ident) => {{
+ let min = $min.as_ref().and_then(|v| {
+ println!("min: {:?} {}", &v.as_bytes(), v.as_bytes().len());
+ Some(read_num_bytes!(
+ $physical_ty,
+ v.as_bytes().len(),
+ &v.as_bytes()
+ ))
+ });
+ let max = $max.as_ref().and_then(|v| {
+ println!("max: {:?} {}", &v.as_bytes(), v.as_bytes().len());
+ Some(read_num_bytes!(
+ $physical_ty,
+ v.as_bytes().len(),
+ &v.as_bytes()
+ ))
+ });
+ Statistics::$stat_fn(min, max, $distinct, $nulls, false)
+ }};
+}
+
/// Gets a specific column writer corresponding to column descriptor `descr`.
pub fn get_column_writer(
descr: ColumnDescPtr,
@@ -149,6 +178,10 @@ pub struct ColumnWriterImpl<T: DataType> {
num_buffered_values: u32,
num_buffered_encoded_values: u32,
num_buffered_rows: u32,
+ min_page_value: Option<T::T>,
+ max_page_value: Option<T::T>,
+ num_page_nulls: u64,
+ page_distinct_count: Option<u64>,
// Metrics per column writer
total_bytes_written: u64,
total_rows_written: u64,
@@ -157,6 +190,10 @@ pub struct ColumnWriterImpl<T: DataType> {
total_num_values: u64,
dictionary_page_offset: Option<u64>,
data_page_offset: Option<u64>,
+ min_column_value: Option<T::T>,
+ max_column_value: Option<T::T>,
+ num_column_nulls: u64,
+ column_distinct_count: Option<u64>,
// Reused buffers
def_levels_sink: Vec<i16>,
rep_levels_sink: Vec<i16>,
@@ -216,26 +253,26 @@ impl<T: DataType> ColumnWriterImpl<T> {
def_levels_sink: vec![],
rep_levels_sink: vec![],
data_pages: VecDeque::new(),
+ min_page_value: None,
+ max_page_value: None,
+ num_page_nulls: 0,
+ page_distinct_count: None,
+ min_column_value: None,
+ max_column_value: None,
+ num_column_nulls: 0,
+ column_distinct_count: None,
}
}
- /// Writes batch of values, definition levels and repetition levels.
- /// Returns number of values processed (written).
- ///
- /// If definition and repetition levels are provided, we write fully those
levels and
- /// select how many values to write (this number will be returned), since
number of
- /// actual written values may be smaller than provided values.
- ///
- /// If only values are provided, then all values are written and the
length of
- /// of the values buffer is returned.
- ///
- /// Definition and/or repetition levels can be omitted, if values are
- /// non-nullable and/or non-repeated.
- pub fn write_batch(
+ fn write_batch_internal(
&mut self,
values: &[T::T],
def_levels: Option<&[i16]>,
rep_levels: Option<&[i16]>,
+ min: &Option<T::T>,
+ max: &Option<T::T>,
+ null_count: Option<u64>,
+ distinct_count: Option<u64>,
) -> Result<usize> {
// We check for DataPage limits only after we have inserted the
values. If a user
// writes a large number of values, the DataPage size can be well
above the limit.
@@ -263,11 +300,45 @@ impl<T: DataType> ColumnWriterImpl<T> {
let mut values_offset = 0;
let mut levels_offset = 0;
+ // Process pre-calculated statistics
+ match (min, max) {
+ (Some(min), Some(max)) => {
+ if self.min_column_value.is_none()
+ || self.min_column_value.as_ref().unwrap() > min
+ {
+ self.min_column_value = Some(min.clone());
+ }
+ if self.max_column_value.is_none()
+ || self.max_column_value.as_ref().unwrap() < max
+ {
+ self.max_column_value = Some(max.clone());
+ }
+ }
+ (None, Some(_)) | (Some(_), None) => {
+ panic!("min/max should be both set or both None")
+ }
+ (None, None) => {}
+ }
+
+ if let Some(distinct) = distinct_count {
+ self.column_distinct_count =
+ Some(self.column_distinct_count.unwrap_or(0) + distinct);
+ }
+
+ if let Some(nulls) = null_count {
+ self.num_column_nulls += nulls;
+ }
+
+ let calculate_page_stats = (min.is_none() || max.is_none())
+ && null_count.is_none()
+ && distinct_count.is_none();
+
for _ in 0..num_batches {
values_offset += self.write_mini_batch(
&values[values_offset..values_offset + write_batch_size],
def_levels.map(|lv| &lv[levels_offset..levels_offset +
write_batch_size]),
rep_levels.map(|lv| &lv[levels_offset..levels_offset +
write_batch_size]),
+ calculate_page_stats,
)?;
levels_offset += write_batch_size;
}
@@ -276,12 +347,60 @@ impl<T: DataType> ColumnWriterImpl<T> {
&values[values_offset..],
def_levels.map(|lv| &lv[levels_offset..]),
rep_levels.map(|lv| &lv[levels_offset..]),
+ calculate_page_stats,
)?;
// Return total number of values processed.
Ok(values_offset)
}
+ /// Writes batch of values, definition levels and repetition levels.
+ /// Returns number of values processed (written).
+ ///
+ /// If definition and repetition levels are provided, we write fully those
levels and
+ /// select how many values to write (this number will be returned), since
number of
+ /// actual written values may be smaller than provided values.
+ ///
+ /// If only values are provided, then all values are written and the
length of
+ /// of the values buffer is returned.
+ ///
+ /// Definition and/or repetition levels can be omitted, if values are
+ /// non-nullable and/or non-repeated.
+ pub fn write_batch(
+ &mut self,
+ values: &[T::T],
+ def_levels: Option<&[i16]>,
+ rep_levels: Option<&[i16]>,
+ ) -> Result<usize> {
+ self.write_batch_internal(
+ values, def_levels, rep_levels, &None, &None, None, None,
+ )
+ }
+
+ /// Writer may optionally provide pre-calculated statistics for this
batch, in which case we do
+ /// not calculate page level statistics as this will defeat the purpose of
speeding up the write
+ /// process with pre-calculated statistics.
+ pub fn write_batch_with_statistics(
+ &mut self,
+ values: &[T::T],
+ def_levels: Option<&[i16]>,
+ rep_levels: Option<&[i16]>,
+ min: &Option<T::T>,
+ max: &Option<T::T>,
+ nulls_count: Option<u64>,
+ distinct_count: Option<u64>,
+ ) -> Result<usize> {
+ self.write_batch_internal(
+ values,
+ def_levels,
+ rep_levels,
+ min,
+ max,
+ nulls_count,
+ distinct_count,
+ )
+ }
+
/// Returns total number of bytes written by this column writer so far.
/// This value is also returned when column writer is closed.
pub fn get_total_bytes_written(&self) -> u64 {
@@ -316,6 +435,7 @@ impl<T: DataType> ColumnWriterImpl<T> {
values: &[T::T],
def_levels: Option<&[i16]>,
rep_levels: Option<&[i16]>,
+ calculate_page_stats: bool,
) -> Result<usize> {
let num_values;
let mut values_to_write = 0;
@@ -346,7 +466,13 @@ impl<T: DataType> ColumnWriterImpl<T> {
let levels = def_levels.unwrap();
num_values = levels.len();
for &level in levels {
- values_to_write += (level == self.descr.max_def_level()) as
usize;
+ if level == self.descr.max_def_level() {
+ values_to_write += 1;
+ } else {
+ if calculate_page_stats {
+ self.num_page_nulls += 1
+ };
+ }
}
self.write_definition_levels(levels);
@@ -387,7 +513,11 @@ impl<T: DataType> ColumnWriterImpl<T> {
));
}
- // TODO: update page statistics
+ if calculate_page_stats {
+ for val in &values[0..values_to_write] {
+ self.update_page_min_max(val);
+ }
+ }
self.write_values(&values[0..values_to_write])?;
@@ -395,7 +525,7 @@ impl<T: DataType> ColumnWriterImpl<T> {
self.num_buffered_encoded_values += values_to_write as u32;
if self.should_add_data_page() {
- self.add_data_page()?;
+ self.add_data_page(calculate_page_stats)?;
}
if self.should_dict_fallback() {
@@ -463,7 +593,7 @@ impl<T: DataType> ColumnWriterImpl<T> {
/// Adds data page.
/// Data page is either buffered in case of dictionary encoding or written
directly.
- fn add_data_page(&mut self) -> Result<()> {
+ fn add_data_page(&mut self, calculate_page_stat: bool) -> Result<()> {
// Extract encoded values
let value_bytes = match self.dict_encoder {
Some(ref mut encoder) => encoder.write_indices()?,
@@ -480,6 +610,14 @@ impl<T: DataType> ColumnWriterImpl<T> {
let max_def_level = self.descr.max_def_level();
let max_rep_level = self.descr.max_rep_level();
+ let mut page_statistics: Option<Statistics> = None;
+
+ if calculate_page_stat {
+ self.update_column_min_max();
+ self.num_column_nulls += self.num_page_nulls;
+ page_statistics = Some(self.make_page_statistics());
+ }
+
let compressed_page = match self.props.writer_version() {
WriterVersion::PARQUET_1_0 => {
let mut buffer = vec![];
@@ -519,8 +657,7 @@ impl<T: DataType> ColumnWriterImpl<T> {
encoding,
def_level_encoding: Encoding::RLE,
rep_level_encoding: Encoding::RLE,
- // TODO: process statistics
- statistics: None,
+ statistics: page_statistics,
};
CompressedPage::new(data_page, uncompressed_size)
@@ -570,8 +707,7 @@ impl<T: DataType> ColumnWriterImpl<T> {
def_levels_byte_len: def_levels_byte_len as u32,
rep_levels_byte_len: rep_levels_byte_len as u32,
is_compressed: self.compressor.is_some(),
- // TODO: process statistics
- statistics: None,
+ statistics: page_statistics,
};
CompressedPage::new(data_page, uncompressed_size)
@@ -594,6 +730,10 @@ impl<T: DataType> ColumnWriterImpl<T> {
self.num_buffered_values = 0;
self.num_buffered_encoded_values = 0;
self.num_buffered_rows = 0;
+ self.min_page_value = None;
+ self.max_page_value = None;
+ self.num_page_nulls = 0;
+ self.page_distinct_count = None;
Ok(())
}
@@ -603,8 +743,10 @@ impl<T: DataType> ColumnWriterImpl<T> {
#[inline]
fn flush_data_pages(&mut self) -> Result<()> {
// Write all outstanding data to a new page.
+ let calculate_page_stats =
+ self.min_page_value.is_some() && self.max_page_value.is_some();
if self.num_buffered_values > 0 {
- self.add_data_page()?;
+ self.add_data_page(calculate_page_stats)?;
}
while let Some(page) = self.data_pages.pop_front() {
@@ -643,6 +785,7 @@ impl<T: DataType> ColumnWriterImpl<T> {
// We use only RLE level encoding for data page v1 and data page v2.
encodings.push(Encoding::RLE);
+ let statistics = self.make_column_statistics();
let metadata = ColumnChunkMetaData::builder(self.descr.clone())
.set_compression(self.codec)
.set_encodings(encodings)
@@ -652,6 +795,7 @@ impl<T: DataType> ColumnWriterImpl<T> {
.set_num_values(num_values)
.set_data_page_offset(data_page_offset)
.set_dictionary_page_offset(dict_page_offset)
+ .set_statistics(statistics)
.build()?;
self.page_writer.write_metadata(&metadata)?;
@@ -755,6 +899,72 @@ impl<T: DataType> ColumnWriterImpl<T> {
fn get_page_writer_ref(&self) -> &Box<PageWriter> {
&self.page_writer
}
+
+ fn make_column_statistics(&self) -> Statistics {
+ self.make_typed_statistics(Level::Column)
+ }
+
+ fn make_page_statistics(&self) -> Statistics {
+ self.make_typed_statistics(Level::Page)
+ }
+
+ pub fn make_typed_statistics(&self, level: Level) -> Statistics {
+ let (min, max, distinct, nulls) = match level {
+ Level::Page => (
+ self.min_page_value.as_ref(),
+ self.max_page_value.as_ref(),
+ self.page_distinct_count,
+ self.num_page_nulls,
+ ),
+ Level::Column => (
+ self.min_column_value.as_ref(),
+ self.max_column_value.as_ref(),
+ self.column_distinct_count,
+ self.num_column_nulls,
+ ),
+ };
+ match self.descr.physical_type() {
+ Type::INT32 => gen_stats_section!(i32, int32, min, max, distinct,
nulls),
+ Type::BOOLEAN => gen_stats_section!(i32, int32, min, max,
distinct, nulls),
+ Type::INT64 => gen_stats_section!(i64, int64, min, max, distinct,
nulls),
+ Type::INT96 => gen_stats_section!(Int96, int96, min, max,
distinct, nulls),
+ Type::FLOAT => gen_stats_section!(f32, float, min, max, distinct,
nulls),
+ Type::DOUBLE => gen_stats_section!(f64, double, min, max,
distinct, nulls),
+ Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
+ let min = min
+ .as_ref()
+ .and_then(|v|
Some(ByteArray::from(v.as_bytes().to_vec())));
+ let max = max
+ .as_ref()
+ .and_then(|v|
Some(ByteArray::from(v.as_bytes().to_vec())));
+ Statistics::byte_array(min, max, distinct, nulls, false)
+ }
+ }
+ }
+
+ fn update_page_min_max(&mut self, val: &T::T) {
+ if self.min_page_value.is_none() ||
self.min_page_value.as_ref().unwrap() > val {
+ self.min_page_value = Some(val.clone());
+ }
+ if self.max_page_value.is_none() ||
self.max_page_value.as_ref().unwrap() < val {
+ self.max_page_value = Some(val.clone());
+ }
+ }
+
+ fn update_column_min_max(&mut self) {
+ if self.min_column_value.is_none()
+ || self.min_column_value.as_ref().unwrap()
+ > self.min_page_value.as_ref().unwrap()
+ {
+ self.min_column_value = self.min_page_value.clone();
+ }
+ if self.max_column_value.is_none()
+ || self.max_column_value.as_ref().unwrap()
+ < self.max_page_value.as_ref().unwrap()
+ {
+ self.max_column_value = self.max_page_value.clone();
+ }
+ }
}
// ----------------------------------------------------------------------
@@ -846,8 +1056,6 @@ impl EncodingWriteSupport for
ColumnWriterImpl<FixedLenByteArrayType> {
#[cfg(test)]
mod tests {
- use super::*;
-
use rand::distributions::uniform::SampleUniform;
use crate::column::{
@@ -864,6 +1072,8 @@ mod tests {
test_common::{get_temp_file, random_numbers_range},
};
+ use super::*;
+
#[test]
fn test_column_writer_inconsistent_def_rep_length() {
let page_writer = get_test_page_writer();
@@ -1255,6 +1465,63 @@ mod tests {
assert_eq!(metadata.uncompressed_size(), 20);
assert_eq!(metadata.data_page_offset(), 0);
assert_eq!(metadata.dictionary_page_offset(), Some(0));
+ if let Some(stats) = metadata.statistics() {
+ assert!(stats.has_min_max_set());
+ assert_eq!(stats.null_count(), 0);
+ assert_eq!(stats.distinct_count(), None);
+ if let Statistics::Int32(stats) = stats {
+ assert_eq!(stats.min(), &1);
+ assert_eq!(stats.max(), &4);
+ } else {
+ assert!(false, "expecting Statistics::Int32");
+ }
+ } else {
+ assert!(false, "metadata missing statistics");
+ }
+ }
+
+ #[test]
+ fn test_column_writer_precalculated_statistics() {
+ let page_writer = get_test_page_writer();
+ let props = Rc::new(WriterProperties::builder().build());
+ let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0,
0, props);
+ writer
+ .write_batch_with_statistics(
+ &[1, 2, 3, 4],
+ None,
+ None,
+ &Some(-17),
+ &Some(9000),
+ Some(21),
+ Some(55),
+ )
+ .unwrap();
+
+ let (bytes_written, rows_written, metadata) = writer.close().unwrap();
+ assert_eq!(bytes_written, 20);
+ assert_eq!(rows_written, 4);
+ assert_eq!(
+ metadata.encodings(),
+ &vec![Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE]
+ );
+ assert_eq!(metadata.num_values(), 8); // dictionary + value indexes
+ assert_eq!(metadata.compressed_size(), 20);
+ assert_eq!(metadata.uncompressed_size(), 20);
+ assert_eq!(metadata.data_page_offset(), 0);
+ assert_eq!(metadata.dictionary_page_offset(), Some(0));
+ if let Some(stats) = metadata.statistics() {
+ assert!(stats.has_min_max_set());
+ assert_eq!(stats.null_count(), 21);
+ assert_eq!(stats.distinct_count().unwrap_or(0), 55);
+ if let Statistics::Int32(stats) = stats {
+ assert_eq!(stats.min(), &-17);
+ assert_eq!(stats.max(), &9000);
+ } else {
+ assert!(false, "expecting Statistics::Int32");
+ }
+ } else {
+ assert!(false, "metadata missing statistics");
+ }
}
#[test]
diff --git a/rust/parquet/src/data_type.rs b/rust/parquet/src/data_type.rs
index 1660436..738057a 100644
--- a/rust/parquet/src/data_type.rs
+++ b/rust/parquet/src/data_type.rs
@@ -17,8 +17,9 @@
//! Data types that connect Parquet physical types with their Rust-specific
//! representations.
-
+use std::cmp::Ordering;
use std::mem;
+use std::str::from_utf8;
use byteorder::{BigEndian, ByteOrder};
@@ -30,11 +31,10 @@ use crate::util::{
bit_util::{from_ne_slice, FromBytes},
memory::{ByteBuffer, ByteBufferPtr},
};
-use std::str::from_utf8;
/// Rust representation for logical type INT96, value is backed by an array of
`u32`.
/// The type only takes 12 bytes, without extra padding.
-#[derive(Clone, Debug)]
+#[derive(Clone, Debug, PartialOrd)]
pub struct Int96 {
value: Option<[u32; 3]>,
}
@@ -103,6 +103,29 @@ pub struct ByteArray {
data: Option<ByteBufferPtr>,
}
+impl PartialOrd for ByteArray {
+ fn partial_cmp(&self, other: &ByteArray) -> Option<Ordering> {
+ if self.data.is_some() && other.data.is_some() {
+ if self.len() > other.len() {
+ Some(Ordering::Greater)
+ } else if self.len() < other.len() {
+ Some(Ordering::Less)
+ } else {
+ for (v1, v2) in self.data().iter().zip(other.data().iter()) {
+ if *v1 > *v2 {
+ return Some(Ordering::Greater);
+ } else if *v1 < *v2 {
+ return Some(Ordering::Less);
+ }
+ }
+ return Some(Ordering::Equal);
+ }
+ } else {
+ None
+ }
+ }
+}
+
impl ByteArray {
/// Creates new byte array with no data set.
pub fn new() -> Self {
@@ -405,7 +428,8 @@ pub trait DataType: 'static {
+ std::default::Default
+ std::clone::Clone
+ AsBytes
- + FromBytes;
+ + FromBytes
+ + PartialOrd;
/// Returns Parquet physical type.
fn get_physical_type() -> Type;
@@ -450,6 +474,7 @@ where
macro_rules! make_type {
($name:ident, $physical_ty:path, $reader_ident: ident, $writer_ident:
ident, $native_ty:ty, $size:expr) => {
+ #[derive(Clone)]
pub struct $name {}
impl DataType for $name {
@@ -598,8 +623,8 @@ impl FromBytes for ByteArray {
fn from_be_bytes(_bs: Self::Buffer) -> Self {
unreachable!()
}
- fn from_ne_bytes(_bs: Self::Buffer) -> Self {
- unreachable!()
+ fn from_ne_bytes(bs: Self::Buffer) -> Self {
+ ByteArray::from(bs.to_vec())
}
}
@@ -690,4 +715,20 @@ mod tests {
assert!(Decimal::from_i64(222, 5, 2) != Decimal::from_i32(222, 5, 2));
}
+
+ #[test]
+ fn test_byte_array_ord() {
+ let ba1 = ByteArray::from(vec![1, 2, 3]);
+ let ba11 = ByteArray::from(vec![1, 2, 3]);
+ let ba2 = ByteArray::from(vec![3, 4]);
+ let ba3 = ByteArray::from(vec![1, 2, 4]);
+ let ba4 = ByteArray::from(vec![]);
+ let ba5 = ByteArray::from(vec![2, 2, 3]);
+
+ assert!(ba1 > ba2);
+ assert!(ba3 > ba1);
+ assert!(ba1 > ba4);
+ assert_eq!(ba1, ba11);
+ assert!(ba5 > ba1);
+ }
}