This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 7e289134a8 Simplify parquet statistics generation (#5183)
7e289134a8 is described below
commit 7e289134a8d9f46a92a2759a7b2488b17993fd5b
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Dec 8 18:08:44 2023 +0000
Simplify parquet statistics generation (#5183)
---
parquet/src/arrow/arrow_writer/byte_array.rs | 40 +++++++------------
parquet/src/column/writer/encoder.rs | 29 ++++----------
parquet/src/column/writer/mod.rs | 60 ++++++++++------------------
3 files changed, 44 insertions(+), 85 deletions(-)
diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs
b/parquet/src/arrow/arrow_writer/byte_array.rs
index 28c7c3b005..61933b2417 100644
--- a/parquet/src/arrow/arrow_writer/byte_array.rs
+++ b/parquet/src/arrow/arrow_writer/byte_array.rs
@@ -22,7 +22,7 @@ use crate::data_type::{AsBytes, ByteArray, Int32Type};
use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
use crate::encodings::rle::RleEncoder;
use crate::errors::{ParquetError, Result};
-use crate::file::properties::{WriterProperties, WriterVersion};
+use crate::file::properties::{EnabledStatistics, WriterProperties,
WriterVersion};
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::num_required_bits;
use crate::util::interner::{Interner, Storage};
@@ -379,6 +379,7 @@ impl DictEncoder {
pub struct ByteArrayEncoder {
fallback: FallbackEncoder,
dict_encoder: Option<DictEncoder>,
+ statistics_enabled: EnabledStatistics,
min_value: Option<ByteArray>,
max_value: Option<ByteArray>,
bloom_filter: Option<Sbbf>,
@@ -387,24 +388,6 @@ pub struct ByteArrayEncoder {
impl ColumnValueEncoder for ByteArrayEncoder {
type T = ByteArray;
type Values = dyn Array;
-
- fn min_max(
- &self,
- values: &dyn Array,
- value_indices: Option<&[usize]>,
- ) -> Option<(Self::T, Self::T)> {
- match value_indices {
- Some(indices) => {
- let iter = indices.iter().cloned();
- downcast_op!(values.data_type(), values, compute_min_max, iter)
- }
- None => {
- let len = Array::len(values);
- downcast_op!(values.data_type(), values, compute_min_max,
0..len)
- }
- }
- }
-
fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
self.bloom_filter.take()
}
@@ -424,12 +407,15 @@ impl ColumnValueEncoder for ByteArrayEncoder {
.map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
.transpose()?;
+ let statistics_enabled = props.statistics_enabled(descr.path());
+
Ok(Self {
fallback,
+ statistics_enabled,
+ bloom_filter,
dict_encoder: dictionary,
min_value: None,
max_value: None,
- bloom_filter,
})
}
@@ -498,13 +484,15 @@ where
T: ArrayAccessor + Copy,
T::Item: Copy + Ord + AsRef<[u8]>,
{
- if let Some((min, max)) = compute_min_max(values, indices.iter().cloned())
{
- if encoder.min_value.as_ref().map_or(true, |m| m > &min) {
- encoder.min_value = Some(min);
- }
+ if encoder.statistics_enabled != EnabledStatistics::None {
+ if let Some((min, max)) = compute_min_max(values,
indices.iter().cloned()) {
+ if encoder.min_value.as_ref().map_or(true, |m| m > &min) {
+ encoder.min_value = Some(min);
+ }
- if encoder.max_value.as_ref().map_or(true, |m| m < &max) {
- encoder.max_value = Some(max);
+ if encoder.max_value.as_ref().map_or(true, |m| m < &max) {
+ encoder.max_value = Some(max);
+ }
}
}
diff --git a/parquet/src/column/writer/encoder.rs
b/parquet/src/column/writer/encoder.rs
index 0d5144f61c..8624f859f4 100644
--- a/parquet/src/column/writer/encoder.rs
+++ b/parquet/src/column/writer/encoder.rs
@@ -76,15 +76,6 @@ pub trait ColumnValueEncoder {
/// The values encoded by this encoder
type Values: ColumnValues + ?Sized;
- /// Returns the min and max values in this collection, skipping any NaN
values
- ///
- /// Returns `None` if no values found
- fn min_max(
- &self,
- values: &Self::Values,
- value_indices: Option<&[usize]>,
- ) -> Option<(Self::T, Self::T)>;
-
/// Create a new [`ColumnValueEncoder`]
fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
where
@@ -136,8 +127,15 @@ pub struct ColumnValueEncoderImpl<T: DataType> {
}
impl<T: DataType> ColumnValueEncoderImpl<T> {
+ fn min_max(&self, values: &[T::T], value_indices: Option<&[usize]>) ->
Option<(T::T, T::T)> {
+ match value_indices {
+ Some(indices) => get_min_max(&self.descr, indices.iter().map(|x|
&values[*x])),
+ None => get_min_max(&self.descr, values.iter()),
+ }
+ }
+
fn write_slice(&mut self, slice: &[T::T]) -> Result<()> {
- if self.statistics_enabled == EnabledStatistics::Page
+ if self.statistics_enabled != EnabledStatistics::None
// INTERVAL has undefined sort order, so don't write min/max stats
for it
&& self.descr.converted_type() != ConvertedType::INTERVAL
{
@@ -166,17 +164,6 @@ impl<T: DataType> ColumnValueEncoder for
ColumnValueEncoderImpl<T> {
type Values = [T::T];
- fn min_max(
- &self,
- values: &Self::Values,
- value_indices: Option<&[usize]>,
- ) -> Option<(Self::T, Self::T)> {
- match value_indices {
- Some(indices) => get_min_max(&self.descr, indices.iter().map(|x|
&values[*x])),
- None => get_min_max(&self.descr, values.iter()),
- }
- }
-
fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
self.bloom_filter.take()
}
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 531af4bd46..9f476595fb 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -329,28 +329,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
None => values.len(),
};
- // If only computing chunk-level statistics compute them here,
page-level statistics
- // are computed in [`Self::write_mini_batch`] and used to update chunk
statistics in
- // [`Self::add_data_page`]
- if self.statistics_enabled == EnabledStatistics::Chunk
- // INTERVAL has undefined sort order, so don't write min/max stats
for it
- && self.descr.converted_type() != ConvertedType::INTERVAL
- {
- match (min, max) {
- (Some(min), Some(max)) => {
- update_min(&self.descr, min, &mut
self.column_metrics.min_column_value);
- update_max(&self.descr, max, &mut
self.column_metrics.max_column_value);
- }
- (None, Some(_)) | (Some(_), None) => {
- panic!("min/max should be both set or both None")
- }
- (None, None) => {
- if let Some((min, max)) = self.encoder.min_max(values,
value_indices) {
- update_min(&self.descr, &min, &mut
self.column_metrics.min_column_value);
- update_max(&self.descr, &max, &mut
self.column_metrics.max_column_value);
- }
- }
- };
+ if let Some(min) = min {
+ update_min(&self.descr, min, &mut
self.column_metrics.min_column_value);
+ }
+ if let Some(max) = max {
+ update_max(&self.descr, max, &mut
self.column_metrics.max_column_value);
}
// We can only set the distinct count if there are no other writes
@@ -764,22 +747,23 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
self.column_metrics.num_column_nulls +=
self.page_metrics.num_page_nulls;
- let page_statistics = if let (Some(min), Some(max)) =
- (values_data.min_value, values_data.max_value)
- {
- // Update chunk level statistics
- update_min(&self.descr, &min, &mut
self.column_metrics.min_column_value);
- update_max(&self.descr, &max, &mut
self.column_metrics.max_column_value);
-
- (self.statistics_enabled ==
EnabledStatistics::Page).then_some(ValueStatistics::new(
- Some(min),
- Some(max),
- None,
- self.page_metrics.num_page_nulls,
- false,
- ))
- } else {
- None
+ let page_statistics = match (values_data.min_value,
values_data.max_value) {
+ (Some(min), Some(max)) => {
+ // Update chunk level statistics
+ update_min(&self.descr, &min, &mut
self.column_metrics.min_column_value);
+ update_max(&self.descr, &max, &mut
self.column_metrics.max_column_value);
+
+ (self.statistics_enabled == EnabledStatistics::Page).then_some(
+ ValueStatistics::new(
+ Some(min),
+ Some(max),
+ None,
+ self.page_metrics.num_page_nulls,
+ false,
+ ),
+ )
+ }
+ _ => None,
};
// update column and offset index