This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 12b30dd  ARROW-9280: [Rust] [Parquet] Calculate page and column 
statistics
12b30dd is described below

commit 12b30dda1a23bad70e5b11b8cef845d0effd01d4
Author: Ze'ev Maor <ze...@microsoft.com>
AuthorDate: Thu Jul 2 17:14:16 2020 -0700

    ARROW-9280: [Rust] [Parquet] Calculate page and column statistics
    
    Allow writer to provide pre-calculated stats
    
    Closes #7622 from zeevm/calculate_parquet_statistics
    
    Authored-by: Ze'ev Maor <ze...@microsoft.com>
    Signed-off-by: Chao Sun <sunc...@apache.org>
---
 rust/parquet/src/column/writer.rs | 317 +++++++++++++++++++++++++++++++++++---
 rust/parquet/src/data_type.rs     |  53 ++++++-
 2 files changed, 339 insertions(+), 31 deletions(-)

diff --git a/rust/parquet/src/column/writer.rs 
b/rust/parquet/src/column/writer.rs
index c54c478..f26c37b 100644
--- a/rust/parquet/src/column/writer.rs
+++ b/rust/parquet/src/column/writer.rs
@@ -16,23 +16,25 @@
 // under the License.
 
 //! Contains column writer API.
-
 use std::{cmp, collections::VecDeque, rc::Rc};
 
 use crate::basic::{Compression, Encoding, PageType, Type};
 use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
 use crate::compression::{create_codec, Codec};
+use crate::data_type::AsBytes;
 use crate::data_type::*;
 use crate::encodings::{
     encoding::{get_encoder, DictEncoder, Encoder},
     levels::{max_buffer_size, LevelEncoder},
 };
 use crate::errors::{ParquetError, Result};
+use crate::file::statistics::Statistics;
 use crate::file::{
     metadata::ColumnChunkMetaData,
     properties::{WriterProperties, WriterPropertiesPtr, WriterVersion},
 };
 use crate::schema::types::ColumnDescPtr;
+use crate::util::bit_util::FromBytes;
 use crate::util::memory::{ByteBufferPtr, MemTracker};
 
 /// Column writer for a Parquet type.
@@ -47,6 +49,33 @@ pub enum ColumnWriter {
     FixedLenByteArrayColumnWriter(ColumnWriterImpl<FixedLenByteArrayType>),
 }
 
+pub enum Level {
+    Page,
+    Column,
+}
+
+macro_rules! gen_stats_section {
+    ($physical_ty: ty, $stat_fn: ident, $min: ident, $max: ident, $distinct: 
ident, $nulls: ident) => {{
+        let min = $min.as_ref().and_then(|v| {
+            println!("min: {:?} {}", &v.as_bytes(), v.as_bytes().len());
+            Some(read_num_bytes!(
+                $physical_ty,
+                v.as_bytes().len(),
+                &v.as_bytes()
+            ))
+        });
+        let max = $max.as_ref().and_then(|v| {
+            println!("max: {:?} {}", &v.as_bytes(), v.as_bytes().len());
+            Some(read_num_bytes!(
+                $physical_ty,
+                v.as_bytes().len(),
+                &v.as_bytes()
+            ))
+        });
+        Statistics::$stat_fn(min, max, $distinct, $nulls, false)
+    }};
+}
+
 /// Gets a specific column writer corresponding to column descriptor `descr`.
 pub fn get_column_writer(
     descr: ColumnDescPtr,
@@ -149,6 +178,10 @@ pub struct ColumnWriterImpl<T: DataType> {
     num_buffered_values: u32,
     num_buffered_encoded_values: u32,
     num_buffered_rows: u32,
+    min_page_value: Option<T::T>,
+    max_page_value: Option<T::T>,
+    num_page_nulls: u64,
+    page_distinct_count: Option<u64>,
     // Metrics per column writer
     total_bytes_written: u64,
     total_rows_written: u64,
@@ -157,6 +190,10 @@ pub struct ColumnWriterImpl<T: DataType> {
     total_num_values: u64,
     dictionary_page_offset: Option<u64>,
     data_page_offset: Option<u64>,
+    min_column_value: Option<T::T>,
+    max_column_value: Option<T::T>,
+    num_column_nulls: u64,
+    column_distinct_count: Option<u64>,
     // Reused buffers
     def_levels_sink: Vec<i16>,
     rep_levels_sink: Vec<i16>,
@@ -216,26 +253,26 @@ impl<T: DataType> ColumnWriterImpl<T> {
             def_levels_sink: vec![],
             rep_levels_sink: vec![],
             data_pages: VecDeque::new(),
+            min_page_value: None,
+            max_page_value: None,
+            num_page_nulls: 0,
+            page_distinct_count: None,
+            min_column_value: None,
+            max_column_value: None,
+            num_column_nulls: 0,
+            column_distinct_count: None,
         }
     }
 
-    /// Writes batch of values, definition levels and repetition levels.
-    /// Returns number of values processed (written).
-    ///
-    /// If definition and repetition levels are provided, we write fully those 
levels and
-    /// select how many values to write (this number will be returned), since 
number of
-    /// actual written values may be smaller than provided values.
-    ///
-    /// If only values are provided, then all values are written and the 
length of
-    /// of the values buffer is returned.
-    ///
-    /// Definition and/or repetition levels can be omitted, if values are
-    /// non-nullable and/or non-repeated.
-    pub fn write_batch(
+    fn write_batch_internal(
         &mut self,
         values: &[T::T],
         def_levels: Option<&[i16]>,
         rep_levels: Option<&[i16]>,
+        min: &Option<T::T>,
+        max: &Option<T::T>,
+        null_count: Option<u64>,
+        distinct_count: Option<u64>,
     ) -> Result<usize> {
         // We check for DataPage limits only after we have inserted the 
values. If a user
         // writes a large number of values, the DataPage size can be well 
above the limit.
@@ -263,11 +300,45 @@ impl<T: DataType> ColumnWriterImpl<T> {
         let mut values_offset = 0;
         let mut levels_offset = 0;
 
+        // Process pre-calculated statistics
+        match (min, max) {
+            (Some(min), Some(max)) => {
+                if self.min_column_value.is_none()
+                    || self.min_column_value.as_ref().unwrap() > min
+                {
+                    self.min_column_value = Some(min.clone());
+                }
+                if self.max_column_value.is_none()
+                    || self.max_column_value.as_ref().unwrap() < max
+                {
+                    self.max_column_value = Some(max.clone());
+                }
+            }
+            (None, Some(_)) | (Some(_), None) => {
+                panic!("min/max should be both set or both None")
+            }
+            (None, None) => {}
+        }
+
+        if let Some(distinct) = distinct_count {
+            self.column_distinct_count =
+                Some(self.column_distinct_count.unwrap_or(0) + distinct);
+        }
+
+        if let Some(nulls) = null_count {
+            self.num_column_nulls += nulls;
+        }
+
+        let calculate_page_stats = (min.is_none() || max.is_none())
+            && null_count.is_none()
+            && distinct_count.is_none();
+
         for _ in 0..num_batches {
             values_offset += self.write_mini_batch(
                 &values[values_offset..values_offset + write_batch_size],
                 def_levels.map(|lv| &lv[levels_offset..levels_offset + 
write_batch_size]),
                 rep_levels.map(|lv| &lv[levels_offset..levels_offset + 
write_batch_size]),
+                calculate_page_stats,
             )?;
             levels_offset += write_batch_size;
         }
@@ -276,12 +347,60 @@ impl<T: DataType> ColumnWriterImpl<T> {
             &values[values_offset..],
             def_levels.map(|lv| &lv[levels_offset..]),
             rep_levels.map(|lv| &lv[levels_offset..]),
+            calculate_page_stats,
         )?;
 
         // Return total number of values processed.
         Ok(values_offset)
     }
 
+    /// Writes batch of values, definition levels and repetition levels.
+    /// Returns number of values processed (written).
+    ///
+    /// If definition and repetition levels are provided, we write fully those 
levels and
+    /// select how many values to write (this number will be returned), since 
number of
+    /// actual written values may be smaller than provided values.
+    ///
+    /// If only values are provided, then all values are written and the 
length of
+    /// of the values buffer is returned.
+    ///
+    /// Definition and/or repetition levels can be omitted, if values are
+    /// non-nullable and/or non-repeated.
+    pub fn write_batch(
+        &mut self,
+        values: &[T::T],
+        def_levels: Option<&[i16]>,
+        rep_levels: Option<&[i16]>,
+    ) -> Result<usize> {
+        self.write_batch_internal(
+            values, def_levels, rep_levels, &None, &None, None, None,
+        )
+    }
+
+    /// Writer may optionally provide pre-calculated statistics for this 
batch, in which case we do
+    /// not calculate page level statistics as this will defeat the purpose of 
speeding up the write
+    /// process with pre-calculated statistics.
+    pub fn write_batch_with_statistics(
+        &mut self,
+        values: &[T::T],
+        def_levels: Option<&[i16]>,
+        rep_levels: Option<&[i16]>,
+        min: &Option<T::T>,
+        max: &Option<T::T>,
+        nulls_count: Option<u64>,
+        distinct_count: Option<u64>,
+    ) -> Result<usize> {
+        self.write_batch_internal(
+            values,
+            def_levels,
+            rep_levels,
+            min,
+            max,
+            nulls_count,
+            distinct_count,
+        )
+    }
+
     /// Returns total number of bytes written by this column writer so far.
     /// This value is also returned when column writer is closed.
     pub fn get_total_bytes_written(&self) -> u64 {
@@ -316,6 +435,7 @@ impl<T: DataType> ColumnWriterImpl<T> {
         values: &[T::T],
         def_levels: Option<&[i16]>,
         rep_levels: Option<&[i16]>,
+        calculate_page_stats: bool,
     ) -> Result<usize> {
         let num_values;
         let mut values_to_write = 0;
@@ -346,7 +466,13 @@ impl<T: DataType> ColumnWriterImpl<T> {
             let levels = def_levels.unwrap();
             num_values = levels.len();
             for &level in levels {
-                values_to_write += (level == self.descr.max_def_level()) as 
usize;
+                if level == self.descr.max_def_level() {
+                    values_to_write += 1;
+                } else {
+                    if calculate_page_stats {
+                        self.num_page_nulls += 1
+                    };
+                }
             }
 
             self.write_definition_levels(levels);
@@ -387,7 +513,11 @@ impl<T: DataType> ColumnWriterImpl<T> {
             ));
         }
 
-        // TODO: update page statistics
+        if calculate_page_stats {
+            for val in &values[0..values_to_write] {
+                self.update_page_min_max(val);
+            }
+        }
 
         self.write_values(&values[0..values_to_write])?;
 
@@ -395,7 +525,7 @@ impl<T: DataType> ColumnWriterImpl<T> {
         self.num_buffered_encoded_values += values_to_write as u32;
 
         if self.should_add_data_page() {
-            self.add_data_page()?;
+            self.add_data_page(calculate_page_stats)?;
         }
 
         if self.should_dict_fallback() {
@@ -463,7 +593,7 @@ impl<T: DataType> ColumnWriterImpl<T> {
 
     /// Adds data page.
     /// Data page is either buffered in case of dictionary encoding or written 
directly.
-    fn add_data_page(&mut self) -> Result<()> {
+    fn add_data_page(&mut self, calculate_page_stat: bool) -> Result<()> {
         // Extract encoded values
         let value_bytes = match self.dict_encoder {
             Some(ref mut encoder) => encoder.write_indices()?,
@@ -480,6 +610,14 @@ impl<T: DataType> ColumnWriterImpl<T> {
         let max_def_level = self.descr.max_def_level();
         let max_rep_level = self.descr.max_rep_level();
 
+        let mut page_statistics: Option<Statistics> = None;
+
+        if calculate_page_stat {
+            self.update_column_min_max();
+            self.num_column_nulls += self.num_page_nulls;
+            page_statistics = Some(self.make_page_statistics());
+        }
+
         let compressed_page = match self.props.writer_version() {
             WriterVersion::PARQUET_1_0 => {
                 let mut buffer = vec![];
@@ -519,8 +657,7 @@ impl<T: DataType> ColumnWriterImpl<T> {
                     encoding,
                     def_level_encoding: Encoding::RLE,
                     rep_level_encoding: Encoding::RLE,
-                    // TODO: process statistics
-                    statistics: None,
+                    statistics: page_statistics,
                 };
 
                 CompressedPage::new(data_page, uncompressed_size)
@@ -570,8 +707,7 @@ impl<T: DataType> ColumnWriterImpl<T> {
                     def_levels_byte_len: def_levels_byte_len as u32,
                     rep_levels_byte_len: rep_levels_byte_len as u32,
                     is_compressed: self.compressor.is_some(),
-                    // TODO: process statistics
-                    statistics: None,
+                    statistics: page_statistics,
                 };
 
                 CompressedPage::new(data_page, uncompressed_size)
@@ -594,6 +730,10 @@ impl<T: DataType> ColumnWriterImpl<T> {
         self.num_buffered_values = 0;
         self.num_buffered_encoded_values = 0;
         self.num_buffered_rows = 0;
+        self.min_page_value = None;
+        self.max_page_value = None;
+        self.num_page_nulls = 0;
+        self.page_distinct_count = None;
 
         Ok(())
     }
@@ -603,8 +743,10 @@ impl<T: DataType> ColumnWriterImpl<T> {
     #[inline]
     fn flush_data_pages(&mut self) -> Result<()> {
         // Write all outstanding data to a new page.
+        let calculate_page_stats =
+            self.min_page_value.is_some() && self.max_page_value.is_some();
         if self.num_buffered_values > 0 {
-            self.add_data_page()?;
+            self.add_data_page(calculate_page_stats)?;
         }
 
         while let Some(page) = self.data_pages.pop_front() {
@@ -643,6 +785,7 @@ impl<T: DataType> ColumnWriterImpl<T> {
         // We use only RLE level encoding for data page v1 and data page v2.
         encodings.push(Encoding::RLE);
 
+        let statistics = self.make_column_statistics();
         let metadata = ColumnChunkMetaData::builder(self.descr.clone())
             .set_compression(self.codec)
             .set_encodings(encodings)
@@ -652,6 +795,7 @@ impl<T: DataType> ColumnWriterImpl<T> {
             .set_num_values(num_values)
             .set_data_page_offset(data_page_offset)
             .set_dictionary_page_offset(dict_page_offset)
+            .set_statistics(statistics)
             .build()?;
 
         self.page_writer.write_metadata(&metadata)?;
@@ -755,6 +899,72 @@ impl<T: DataType> ColumnWriterImpl<T> {
     fn get_page_writer_ref(&self) -> &Box<PageWriter> {
         &self.page_writer
     }
+
+    fn make_column_statistics(&self) -> Statistics {
+        self.make_typed_statistics(Level::Column)
+    }
+
+    fn make_page_statistics(&self) -> Statistics {
+        self.make_typed_statistics(Level::Page)
+    }
+
+    pub fn make_typed_statistics(&self, level: Level) -> Statistics {
+        let (min, max, distinct, nulls) = match level {
+            Level::Page => (
+                self.min_page_value.as_ref(),
+                self.max_page_value.as_ref(),
+                self.page_distinct_count,
+                self.num_page_nulls,
+            ),
+            Level::Column => (
+                self.min_column_value.as_ref(),
+                self.max_column_value.as_ref(),
+                self.column_distinct_count,
+                self.num_column_nulls,
+            ),
+        };
+        match self.descr.physical_type() {
+            Type::INT32 => gen_stats_section!(i32, int32, min, max, distinct, 
nulls),
+            Type::BOOLEAN => gen_stats_section!(i32, int32, min, max, 
distinct, nulls),
+            Type::INT64 => gen_stats_section!(i64, int64, min, max, distinct, 
nulls),
+            Type::INT96 => gen_stats_section!(Int96, int96, min, max, 
distinct, nulls),
+            Type::FLOAT => gen_stats_section!(f32, float, min, max, distinct, 
nulls),
+            Type::DOUBLE => gen_stats_section!(f64, double, min, max, 
distinct, nulls),
+            Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
+                let min = min
+                    .as_ref()
+                    .and_then(|v| 
Some(ByteArray::from(v.as_bytes().to_vec())));
+                let max = max
+                    .as_ref()
+                    .and_then(|v| 
Some(ByteArray::from(v.as_bytes().to_vec())));
+                Statistics::byte_array(min, max, distinct, nulls, false)
+            }
+        }
+    }
+
+    fn update_page_min_max(&mut self, val: &T::T) {
+        if self.min_page_value.is_none() || 
self.min_page_value.as_ref().unwrap() > val {
+            self.min_page_value = Some(val.clone());
+        }
+        if self.max_page_value.is_none() || 
self.max_page_value.as_ref().unwrap() < val {
+            self.max_page_value = Some(val.clone());
+        }
+    }
+
+    fn update_column_min_max(&mut self) {
+        if self.min_column_value.is_none()
+            || self.min_column_value.as_ref().unwrap()
+                > self.min_page_value.as_ref().unwrap()
+        {
+            self.min_column_value = self.min_page_value.clone();
+        }
+        if self.max_column_value.is_none()
+            || self.max_column_value.as_ref().unwrap()
+                < self.max_page_value.as_ref().unwrap()
+        {
+            self.max_column_value = self.max_page_value.clone();
+        }
+    }
 }
 
 // ----------------------------------------------------------------------
@@ -846,8 +1056,6 @@ impl EncodingWriteSupport for 
ColumnWriterImpl<FixedLenByteArrayType> {
 
 #[cfg(test)]
 mod tests {
-    use super::*;
-
     use rand::distributions::uniform::SampleUniform;
 
     use crate::column::{
@@ -864,6 +1072,8 @@ mod tests {
         test_common::{get_temp_file, random_numbers_range},
     };
 
+    use super::*;
+
     #[test]
     fn test_column_writer_inconsistent_def_rep_length() {
         let page_writer = get_test_page_writer();
@@ -1255,6 +1465,63 @@ mod tests {
         assert_eq!(metadata.uncompressed_size(), 20);
         assert_eq!(metadata.data_page_offset(), 0);
         assert_eq!(metadata.dictionary_page_offset(), Some(0));
+        if let Some(stats) = metadata.statistics() {
+            assert!(stats.has_min_max_set());
+            assert_eq!(stats.null_count(), 0);
+            assert_eq!(stats.distinct_count(), None);
+            if let Statistics::Int32(stats) = stats {
+                assert_eq!(stats.min(), &1);
+                assert_eq!(stats.max(), &4);
+            } else {
+                assert!(false, "expecting Statistics::Int32");
+            }
+        } else {
+            assert!(false, "metadata missing statistics");
+        }
+    }
+
+    #[test]
+    fn test_column_writer_precalculated_statistics() {
+        let page_writer = get_test_page_writer();
+        let props = Rc::new(WriterProperties::builder().build());
+        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 
0, props);
+        writer
+            .write_batch_with_statistics(
+                &[1, 2, 3, 4],
+                None,
+                None,
+                &Some(-17),
+                &Some(9000),
+                Some(21),
+                Some(55),
+            )
+            .unwrap();
+
+        let (bytes_written, rows_written, metadata) = writer.close().unwrap();
+        assert_eq!(bytes_written, 20);
+        assert_eq!(rows_written, 4);
+        assert_eq!(
+            metadata.encodings(),
+            &vec![Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE]
+        );
+        assert_eq!(metadata.num_values(), 8); // dictionary + value indexes
+        assert_eq!(metadata.compressed_size(), 20);
+        assert_eq!(metadata.uncompressed_size(), 20);
+        assert_eq!(metadata.data_page_offset(), 0);
+        assert_eq!(metadata.dictionary_page_offset(), Some(0));
+        if let Some(stats) = metadata.statistics() {
+            assert!(stats.has_min_max_set());
+            assert_eq!(stats.null_count(), 21);
+            assert_eq!(stats.distinct_count().unwrap_or(0), 55);
+            if let Statistics::Int32(stats) = stats {
+                assert_eq!(stats.min(), &-17);
+                assert_eq!(stats.max(), &9000);
+            } else {
+                assert!(false, "expecting Statistics::Int32");
+            }
+        } else {
+            assert!(false, "metadata missing statistics");
+        }
     }
 
     #[test]
diff --git a/rust/parquet/src/data_type.rs b/rust/parquet/src/data_type.rs
index 1660436..738057a 100644
--- a/rust/parquet/src/data_type.rs
+++ b/rust/parquet/src/data_type.rs
@@ -17,8 +17,9 @@
 
 //! Data types that connect Parquet physical types with their Rust-specific
 //! representations.
-
+use std::cmp::Ordering;
 use std::mem;
+use std::str::from_utf8;
 
 use byteorder::{BigEndian, ByteOrder};
 
@@ -30,11 +31,10 @@ use crate::util::{
     bit_util::{from_ne_slice, FromBytes},
     memory::{ByteBuffer, ByteBufferPtr},
 };
-use std::str::from_utf8;
 
 /// Rust representation for logical type INT96, value is backed by an array of 
`u32`.
 /// The type only takes 12 bytes, without extra padding.
-#[derive(Clone, Debug)]
+#[derive(Clone, Debug, PartialOrd)]
 pub struct Int96 {
     value: Option<[u32; 3]>,
 }
@@ -103,6 +103,29 @@ pub struct ByteArray {
     data: Option<ByteBufferPtr>,
 }
 
+impl PartialOrd for ByteArray {
+    fn partial_cmp(&self, other: &ByteArray) -> Option<Ordering> {
+        if self.data.is_some() && other.data.is_some() {
+            if self.len() > other.len() {
+                Some(Ordering::Greater)
+            } else if self.len() < other.len() {
+                Some(Ordering::Less)
+            } else {
+                for (v1, v2) in self.data().iter().zip(other.data().iter()) {
+                    if *v1 > *v2 {
+                        return Some(Ordering::Greater);
+                    } else if *v1 < *v2 {
+                        return Some(Ordering::Less);
+                    }
+                }
+                return Some(Ordering::Equal);
+            }
+        } else {
+            None
+        }
+    }
+}
+
 impl ByteArray {
     /// Creates new byte array with no data set.
     pub fn new() -> Self {
@@ -405,7 +428,8 @@ pub trait DataType: 'static {
         + std::default::Default
         + std::clone::Clone
         + AsBytes
-        + FromBytes;
+        + FromBytes
+        + PartialOrd;
 
     /// Returns Parquet physical type.
     fn get_physical_type() -> Type;
@@ -450,6 +474,7 @@ where
 
 macro_rules! make_type {
     ($name:ident, $physical_ty:path, $reader_ident: ident, $writer_ident: 
ident, $native_ty:ty, $size:expr) => {
+        #[derive(Clone)]
         pub struct $name {}
 
         impl DataType for $name {
@@ -598,8 +623,8 @@ impl FromBytes for ByteArray {
     fn from_be_bytes(_bs: Self::Buffer) -> Self {
         unreachable!()
     }
-    fn from_ne_bytes(_bs: Self::Buffer) -> Self {
-        unreachable!()
+    fn from_ne_bytes(bs: Self::Buffer) -> Self {
+        ByteArray::from(bs.to_vec())
     }
 }
 
@@ -690,4 +715,20 @@ mod tests {
 
         assert!(Decimal::from_i64(222, 5, 2) != Decimal::from_i32(222, 5, 2));
     }
+
+    #[test]
+    fn test_byte_array_ord() {
+        let ba1 = ByteArray::from(vec![1, 2, 3]);
+        let ba11 = ByteArray::from(vec![1, 2, 3]);
+        let ba2 = ByteArray::from(vec![3, 4]);
+        let ba3 = ByteArray::from(vec![1, 2, 4]);
+        let ba4 = ByteArray::from(vec![]);
+        let ba5 = ByteArray::from(vec![2, 2, 3]);
+
+        assert!(ba1 > ba2);
+        assert!(ba3 > ba1);
+        assert!(ba1 > ba4);
+        assert_eq!(ba1, ba11);
+        assert!(ba5 > ba1);
+    }
 }

Reply via email to