This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e289134a8 Simplify parquet statistics generation (#5183)
7e289134a8 is described below

commit 7e289134a8d9f46a92a2759a7b2488b17993fd5b
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Dec 8 18:08:44 2023 +0000

    Simplify parquet statistics generation (#5183)
---
 parquet/src/arrow/arrow_writer/byte_array.rs | 40 +++++++------------
 parquet/src/column/writer/encoder.rs         | 29 ++++----------
 parquet/src/column/writer/mod.rs             | 60 ++++++++++------------------
 3 files changed, 44 insertions(+), 85 deletions(-)

diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs 
b/parquet/src/arrow/arrow_writer/byte_array.rs
index 28c7c3b005..61933b2417 100644
--- a/parquet/src/arrow/arrow_writer/byte_array.rs
+++ b/parquet/src/arrow/arrow_writer/byte_array.rs
@@ -22,7 +22,7 @@ use crate::data_type::{AsBytes, ByteArray, Int32Type};
 use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
 use crate::encodings::rle::RleEncoder;
 use crate::errors::{ParquetError, Result};
-use crate::file::properties::{WriterProperties, WriterVersion};
+use crate::file::properties::{EnabledStatistics, WriterProperties, 
WriterVersion};
 use crate::schema::types::ColumnDescPtr;
 use crate::util::bit_util::num_required_bits;
 use crate::util::interner::{Interner, Storage};
@@ -379,6 +379,7 @@ impl DictEncoder {
 pub struct ByteArrayEncoder {
     fallback: FallbackEncoder,
     dict_encoder: Option<DictEncoder>,
+    statistics_enabled: EnabledStatistics,
     min_value: Option<ByteArray>,
     max_value: Option<ByteArray>,
     bloom_filter: Option<Sbbf>,
@@ -387,24 +388,6 @@ pub struct ByteArrayEncoder {
 impl ColumnValueEncoder for ByteArrayEncoder {
     type T = ByteArray;
     type Values = dyn Array;
-
-    fn min_max(
-        &self,
-        values: &dyn Array,
-        value_indices: Option<&[usize]>,
-    ) -> Option<(Self::T, Self::T)> {
-        match value_indices {
-            Some(indices) => {
-                let iter = indices.iter().cloned();
-                downcast_op!(values.data_type(), values, compute_min_max, iter)
-            }
-            None => {
-                let len = Array::len(values);
-                downcast_op!(values.data_type(), values, compute_min_max, 
0..len)
-            }
-        }
-    }
-
     fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
         self.bloom_filter.take()
     }
@@ -424,12 +407,15 @@ impl ColumnValueEncoder for ByteArrayEncoder {
             .map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
             .transpose()?;
 
+        let statistics_enabled = props.statistics_enabled(descr.path());
+
         Ok(Self {
             fallback,
+            statistics_enabled,
+            bloom_filter,
             dict_encoder: dictionary,
             min_value: None,
             max_value: None,
-            bloom_filter,
         })
     }
 
@@ -498,13 +484,15 @@ where
     T: ArrayAccessor + Copy,
     T::Item: Copy + Ord + AsRef<[u8]>,
 {
-    if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) 
{
-        if encoder.min_value.as_ref().map_or(true, |m| m > &min) {
-            encoder.min_value = Some(min);
-        }
+    if encoder.statistics_enabled != EnabledStatistics::None {
+        if let Some((min, max)) = compute_min_max(values, 
indices.iter().cloned()) {
+            if encoder.min_value.as_ref().map_or(true, |m| m > &min) {
+                encoder.min_value = Some(min);
+            }
 
-        if encoder.max_value.as_ref().map_or(true, |m| m < &max) {
-            encoder.max_value = Some(max);
+            if encoder.max_value.as_ref().map_or(true, |m| m < &max) {
+                encoder.max_value = Some(max);
+            }
         }
     }
 
diff --git a/parquet/src/column/writer/encoder.rs 
b/parquet/src/column/writer/encoder.rs
index 0d5144f61c..8624f859f4 100644
--- a/parquet/src/column/writer/encoder.rs
+++ b/parquet/src/column/writer/encoder.rs
@@ -76,15 +76,6 @@ pub trait ColumnValueEncoder {
     /// The values encoded by this encoder
     type Values: ColumnValues + ?Sized;
 
-    /// Returns the min and max values in this collection, skipping any NaN 
values
-    ///
-    /// Returns `None` if no values found
-    fn min_max(
-        &self,
-        values: &Self::Values,
-        value_indices: Option<&[usize]>,
-    ) -> Option<(Self::T, Self::T)>;
-
     /// Create a new [`ColumnValueEncoder`]
     fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
     where
@@ -136,8 +127,15 @@ pub struct ColumnValueEncoderImpl<T: DataType> {
 }
 
 impl<T: DataType> ColumnValueEncoderImpl<T> {
+    fn min_max(&self, values: &[T::T], value_indices: Option<&[usize]>) -> 
Option<(T::T, T::T)> {
+        match value_indices {
+            Some(indices) => get_min_max(&self.descr, indices.iter().map(|x| 
&values[*x])),
+            None => get_min_max(&self.descr, values.iter()),
+        }
+    }
+
     fn write_slice(&mut self, slice: &[T::T]) -> Result<()> {
-        if self.statistics_enabled == EnabledStatistics::Page
+        if self.statistics_enabled != EnabledStatistics::None
             // INTERVAL has undefined sort order, so don't write min/max stats 
for it
             && self.descr.converted_type() != ConvertedType::INTERVAL
         {
@@ -166,17 +164,6 @@ impl<T: DataType> ColumnValueEncoder for 
ColumnValueEncoderImpl<T> {
 
     type Values = [T::T];
 
-    fn min_max(
-        &self,
-        values: &Self::Values,
-        value_indices: Option<&[usize]>,
-    ) -> Option<(Self::T, Self::T)> {
-        match value_indices {
-            Some(indices) => get_min_max(&self.descr, indices.iter().map(|x| 
&values[*x])),
-            None => get_min_max(&self.descr, values.iter()),
-        }
-    }
-
     fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
         self.bloom_filter.take()
     }
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 531af4bd46..9f476595fb 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -329,28 +329,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, 
E> {
             None => values.len(),
         };
 
-        // If only computing chunk-level statistics compute them here, 
page-level statistics
-        // are computed in [`Self::write_mini_batch`] and used to update chunk 
statistics in
-        // [`Self::add_data_page`]
-        if self.statistics_enabled == EnabledStatistics::Chunk
-            // INTERVAL has undefined sort order, so don't write min/max stats 
for it
-            && self.descr.converted_type() != ConvertedType::INTERVAL
-        {
-            match (min, max) {
-                (Some(min), Some(max)) => {
-                    update_min(&self.descr, min, &mut 
self.column_metrics.min_column_value);
-                    update_max(&self.descr, max, &mut 
self.column_metrics.max_column_value);
-                }
-                (None, Some(_)) | (Some(_), None) => {
-                    panic!("min/max should be both set or both None")
-                }
-                (None, None) => {
-                    if let Some((min, max)) = self.encoder.min_max(values, 
value_indices) {
-                        update_min(&self.descr, &min, &mut 
self.column_metrics.min_column_value);
-                        update_max(&self.descr, &max, &mut 
self.column_metrics.max_column_value);
-                    }
-                }
-            };
+        if let Some(min) = min {
+            update_min(&self.descr, min, &mut 
self.column_metrics.min_column_value);
+        }
+        if let Some(max) = max {
+            update_max(&self.descr, max, &mut 
self.column_metrics.max_column_value);
         }
 
         // We can only set the distinct count if there are no other writes
@@ -764,22 +747,23 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, 
E> {
 
         self.column_metrics.num_column_nulls += 
self.page_metrics.num_page_nulls;
 
-        let page_statistics = if let (Some(min), Some(max)) =
-            (values_data.min_value, values_data.max_value)
-        {
-            // Update chunk level statistics
-            update_min(&self.descr, &min, &mut 
self.column_metrics.min_column_value);
-            update_max(&self.descr, &max, &mut 
self.column_metrics.max_column_value);
-
-            (self.statistics_enabled == 
EnabledStatistics::Page).then_some(ValueStatistics::new(
-                Some(min),
-                Some(max),
-                None,
-                self.page_metrics.num_page_nulls,
-                false,
-            ))
-        } else {
-            None
+        let page_statistics = match (values_data.min_value, 
values_data.max_value) {
+            (Some(min), Some(max)) => {
+                // Update chunk level statistics
+                update_min(&self.descr, &min, &mut 
self.column_metrics.min_column_value);
+                update_max(&self.descr, &max, &mut 
self.column_metrics.max_column_value);
+
+                (self.statistics_enabled == EnabledStatistics::Page).then_some(
+                    ValueStatistics::new(
+                        Some(min),
+                        Some(max),
+                        None,
+                        self.page_metrics.num_page_nulls,
+                        false,
+                    ),
+                )
+            }
+            _ => None,
         };
 
         // update column and offset index

Reply via email to