This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 02371d2be Generify parquet write path (#1764) (#2045)
02371d2be is described below
commit 02371d2be6b9fd276ea3423d51122c47935b7714
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Sun Jul 17 16:32:14 2022 -0400
Generify parquet write path (#1764) (#2045)
* Generify parquet write path (#1764)
* More docs
* Lint
* Fix doc
* Review feedback
* Fix doc
---
parquet/src/basic.rs | 2 +-
parquet/src/column/writer/encoder.rs | 250 +++++++++
parquet/src/column/{writer.rs => writer/mod.rs} | 667 ++++++++----------------
parquet/src/data_type.rs | 90 ++--
parquet/src/file/statistics.rs | 110 ++--
5 files changed, 584 insertions(+), 535 deletions(-)
diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs
index 59a0fe07b..2d8073e75 100644
--- a/parquet/src/basic.rs
+++ b/parquet/src/basic.rs
@@ -212,7 +212,7 @@ pub enum Repetition {
/// Encodings supported by Parquet.
/// Not all encodings are valid for all types. These enums are also used to
specify the
/// encoding of definition and repetition levels.
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
pub enum Encoding {
/// Default byte encoding.
/// - BOOLEAN - 1 bit per value, 0 is false; 1 is true.
diff --git a/parquet/src/column/writer/encoder.rs
b/parquet/src/column/writer/encoder.rs
new file mode 100644
index 000000000..bc31aedf4
--- /dev/null
+++ b/parquet/src/column/writer/encoder.rs
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::Encoding;
+use crate::column::writer::{
+ compare_greater, fallback_encoding, has_dictionary_support, is_nan,
update_max,
+ update_min,
+};
+use crate::data_type::private::ParquetValueType;
+use crate::data_type::DataType;
+use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
+use crate::errors::{ParquetError, Result};
+use crate::file::properties::WriterProperties;
+use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
+use crate::util::memory::ByteBufferPtr;
+
+/// A collection of [`ParquetValueType`] encoded by a [`ColumnValueEncoder`]
+pub trait ColumnValues {
+ /// The underlying value type
+ type T: ParquetValueType;
+
+ /// The number of values in this collection
+ fn len(&self) -> usize;
+
+ /// Returns the min and max values in this collection, skipping any NaN
values
+ ///
+ /// Returns `None` if no values found
+ fn min_max(&self, descr: &ColumnDescriptor) -> Option<(&Self::T,
&Self::T)>;
+}
+
+/// The encoded data for a dictionary page
+pub struct DictionaryPage {
+ pub buf: ByteBufferPtr,
+ pub num_values: usize,
+ pub is_sorted: bool,
+}
+
+/// The encoded values for a data page, with optional statistics
+pub struct DataPageValues<T> {
+ pub buf: ByteBufferPtr,
+ pub num_values: usize,
+ pub encoding: Encoding,
+ pub min_value: Option<T>,
+ pub max_value: Option<T>,
+}
+
+/// A generic encoder of [`ColumnValues`] to data and dictionary pages used by
+/// [super::GenericColumnWriter`]
+pub trait ColumnValueEncoder {
+ /// The underlying value type of [`Self::Values`]
+ ///
+ /// Note: this avoids needing to fully qualify `<Self::Values as
ColumnValues>::T`
+ type T: ParquetValueType;
+
+ /// The values encoded by this encoder
+ type Values: ColumnValues<T = Self::T> + ?Sized;
+
+ /// Create a new [`ColumnValueEncoder`]
+ fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
+ where
+ Self: Sized;
+
+ /// Write the corresponding values to this [`ColumnValueEncoder`]
+ fn write(&mut self, values: &Self::Values, offset: usize, len: usize) ->
Result<()>;
+
+ /// Returns the number of buffered values
+ fn num_values(&self) -> usize;
+
+ /// Returns true if this encoder has a dictionary page
+ fn has_dictionary(&self) -> bool;
+
+ /// Returns an estimate of the dictionary page size in bytes, or `None` if
no dictionary
+ fn estimated_dict_page_size(&self) -> Option<usize>;
+
+ /// Returns an estimate of the data page size in bytes
+ fn estimated_data_page_size(&self) -> usize;
+
+ /// Flush the dictionary page for this column chunk if any. Any subsequent
calls to
+ /// [`Self::write`] will not be dictionary encoded
+ ///
+ /// Note: [`Self::flush_data_page`] must be called first, as this will
error if there
+ /// are any pending page values
+ fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>>;
+
+ /// Flush the next data page for this column chunk
+ fn flush_data_page(&mut self) -> Result<DataPageValues<Self::T>>;
+}
+
+pub struct ColumnValueEncoderImpl<T: DataType> {
+ encoder: Box<dyn Encoder<T>>,
+ dict_encoder: Option<DictEncoder<T>>,
+ descr: ColumnDescPtr,
+ num_values: usize,
+ min_value: Option<T::T>,
+ max_value: Option<T::T>,
+}
+
+impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
+ type T = T::T;
+
+ type Values = [T::T];
+
+ fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) ->
Result<Self> {
+ let dict_supported = props.dictionary_enabled(descr.path())
+ && has_dictionary_support(T::get_physical_type(), props);
+ let dict_encoder = dict_supported.then(||
DictEncoder::new(descr.clone()));
+
+ // Set either main encoder or fallback encoder.
+ let encoder = get_encoder(
+ descr.clone(),
+ props
+ .encoding(descr.path())
+ .unwrap_or_else(|| fallback_encoding(T::get_physical_type(),
props)),
+ )?;
+
+ Ok(Self {
+ encoder,
+ dict_encoder,
+ descr: descr.clone(),
+ num_values: 0,
+ min_value: None,
+ max_value: None,
+ })
+ }
+
+ fn write(&mut self, values: &[T::T], offset: usize, len: usize) ->
Result<()> {
+ self.num_values += len;
+
+ let slice = values.get(offset..offset + len).ok_or_else(|| {
+ general_err!(
+ "Expected to write {} values, but have only {}",
+ len,
+ values.len() - offset
+ )
+ })?;
+
+ if let Some((min, max)) = slice.min_max(&self.descr) {
+ update_min(&self.descr, min, &mut self.min_value);
+ update_max(&self.descr, max, &mut self.max_value);
+ }
+
+ match &mut self.dict_encoder {
+ Some(encoder) => encoder.put(slice),
+ _ => self.encoder.put(slice),
+ }
+ }
+
+ fn num_values(&self) -> usize {
+ self.num_values
+ }
+
+ fn has_dictionary(&self) -> bool {
+ self.dict_encoder.is_some()
+ }
+
+ fn estimated_dict_page_size(&self) -> Option<usize> {
+ Some(self.dict_encoder.as_ref()?.dict_encoded_size())
+ }
+
+ fn estimated_data_page_size(&self) -> usize {
+ match &self.dict_encoder {
+ Some(encoder) => encoder.estimated_data_encoded_size(),
+ _ => self.encoder.estimated_data_encoded_size(),
+ }
+ }
+
+ fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>> {
+ match self.dict_encoder.take() {
+ Some(encoder) => {
+ if self.num_values != 0 {
+ return Err(general_err!(
+ "Must flush data pages before flushing dictionary"
+ ));
+ }
+
+ let buf = encoder.write_dict()?;
+
+ Ok(Some(DictionaryPage {
+ buf,
+ num_values: encoder.num_entries(),
+ is_sorted: encoder.is_sorted(),
+ }))
+ }
+ _ => Ok(None),
+ }
+ }
+
+ fn flush_data_page(&mut self) -> Result<DataPageValues<T::T>> {
+ let (buf, encoding) = match &mut self.dict_encoder {
+ Some(encoder) => (encoder.write_indices()?,
Encoding::RLE_DICTIONARY),
+ _ => (self.encoder.flush_buffer()?, self.encoder.encoding()),
+ };
+
+ Ok(DataPageValues {
+ buf,
+ encoding,
+ num_values: std::mem::take(&mut self.num_values),
+ min_value: self.min_value.take(),
+ max_value: self.max_value.take(),
+ })
+ }
+}
+
+impl<T: ParquetValueType> ColumnValues for [T] {
+ type T = T;
+
+ fn len(&self) -> usize {
+ self.len()
+ }
+
+ fn min_max(&self, descr: &ColumnDescriptor) -> Option<(&T, &T)> {
+ let mut iter = self.iter();
+
+ let first = loop {
+ let next = iter.next()?;
+ if !is_nan(next) {
+ break next;
+ }
+ };
+
+ let mut min = first;
+ let mut max = first;
+ for val in iter {
+ if is_nan(val) {
+ continue;
+ }
+ if compare_greater(descr, min, val) {
+ min = val;
+ }
+ if compare_greater(descr, val, max) {
+ max = val;
+ }
+ }
+ Some((min, max))
+ }
+}
diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer/mod.rs
similarity index 81%
rename from parquet/src/column/writer.rs
rename to parquet/src/column/writer/mod.rs
index 1fc5207f6..ff6c09898 100644
--- a/parquet/src/column/writer.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -17,18 +17,17 @@
//! Contains column writer API.
use parquet_format::{ColumnIndex, OffsetIndex};
-use std::{collections::VecDeque, convert::TryFrom, marker::PhantomData};
+use std::collections::{BTreeSet, VecDeque};
use crate::basic::{Compression, ConvertedType, Encoding, LogicalType,
PageType, Type};
use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
+use crate::column::writer::encoder::{
+ ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues,
+};
use crate::compression::{create_codec, Codec};
use crate::data_type::private::ParquetValueType;
-use crate::data_type::AsBytes;
use crate::data_type::*;
-use crate::encodings::{
- encoding::{get_encoder, DictEncoder, Encoder},
- levels::{max_buffer_size, LevelEncoder},
-};
+use crate::encodings::levels::{max_buffer_size, LevelEncoder};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder};
use crate::file::properties::EnabledStatistics;
@@ -38,9 +37,10 @@ use crate::file::{
properties::{WriterProperties, WriterPropertiesPtr, WriterVersion},
};
use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
-use crate::util::bit_util::FromBytes;
use crate::util::memory::ByteBufferPtr;
+pub(crate) mod encoder;
+
/// Column writer for a Parquet type.
pub enum ColumnWriter<'a> {
BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
@@ -58,26 +58,6 @@ pub enum Level {
Column,
}
-macro_rules! gen_stats_section {
- ($physical_ty: ty, $stat_fn: ident, $min: ident, $max: ident, $distinct:
ident, $nulls: ident) => {{
- let min = $min.as_ref().and_then(|v| {
- Some(read_num_bytes!(
- $physical_ty,
- v.as_bytes().len(),
- &v.as_bytes()
- ))
- });
- let max = $max.as_ref().and_then(|v| {
- Some(read_num_bytes!(
- $physical_ty,
- v.as_bytes().len(),
- &v.as_bytes()
- ))
- });
- Statistics::$stat_fn(min, max, $distinct, $nulls, false)
- }};
-}
-
/// Gets a specific column writer corresponding to column descriptor `descr`.
pub fn get_column_writer<'a>(
descr: ColumnDescPtr,
@@ -174,26 +154,27 @@ type ColumnCloseResult = (
);
/// Typed column writer for a primitive column.
-pub struct ColumnWriterImpl<'a, T: DataType> {
+pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a,
ColumnValueEncoderImpl<T>>;
+
+pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
// Column writer properties
descr: ColumnDescPtr,
props: WriterPropertiesPtr,
statistics_enabled: EnabledStatistics,
page_writer: Box<dyn PageWriter + 'a>,
- has_dictionary: bool,
- dict_encoder: Option<DictEncoder<T>>,
- encoder: Box<dyn Encoder<T>>,
codec: Compression,
compressor: Option<Box<dyn Codec>>,
+ encoder: E,
+
// Metrics per page
+ /// The number of values including nulls in the in-progress data page
num_buffered_values: u32,
- num_buffered_encoded_values: u32,
+ /// The number of rows in the in-progress data page
num_buffered_rows: u32,
- min_page_value: Option<T::T>,
- max_page_value: Option<T::T>,
+ /// The number of nulls in the in-progress data page
num_page_nulls: u64,
- page_distinct_count: Option<u64>,
+
// Metrics per column writer
total_bytes_written: u64,
total_rows_written: u64,
@@ -202,21 +183,26 @@ pub struct ColumnWriterImpl<'a, T: DataType> {
total_num_values: u64,
dictionary_page_offset: Option<u64>,
data_page_offset: Option<u64>,
- min_column_value: Option<T::T>,
- max_column_value: Option<T::T>,
+ min_column_value: Option<E::T>,
+ max_column_value: Option<E::T>,
num_column_nulls: u64,
column_distinct_count: Option<u64>,
+
+ /// The order of encodings within the generated metadata does not impact
its meaning,
+ /// but we use a BTreeSet so that the output is deterministic
+ encodings: BTreeSet<Encoding>,
+
// Reused buffers
def_levels_sink: Vec<i16>,
rep_levels_sink: Vec<i16>,
data_pages: VecDeque<CompressedPage>,
- _phantom: PhantomData<T>,
+
// column index and offset index
column_index_builder: ColumnIndexBuilder,
offset_index_builder: OffsetIndexBuilder,
}
-impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
+impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
pub fn new(
descr: ColumnDescPtr,
props: WriterPropertiesPtr,
@@ -224,43 +210,25 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
) -> Self {
let codec = props.compression(descr.path());
let compressor = create_codec(codec).unwrap();
-
- // Optionally set dictionary encoder.
- let dict_encoder = if props.dictionary_enabled(descr.path())
- && has_dictionary_support(T::get_physical_type(), &props)
- {
- Some(DictEncoder::new(descr.clone()))
- } else {
- None
- };
-
- // Whether or not this column writer has a dictionary encoding.
- let has_dictionary = dict_encoder.is_some();
-
- // Set either main encoder or fallback encoder.
- let fallback_encoder = get_encoder(
- descr.clone(),
- props
- .encoding(descr.path())
- .unwrap_or_else(|| fallback_encoding(T::get_physical_type(),
&props)),
- )
- .unwrap();
+ let encoder = E::try_new(&descr, props.as_ref()).unwrap();
let statistics_enabled = props.statistics_enabled(descr.path());
+ let mut encodings = BTreeSet::new();
+ // Used for level information
+ encodings.insert(Encoding::RLE);
+
Self {
descr,
props,
statistics_enabled,
page_writer,
- has_dictionary,
- dict_encoder,
- encoder: fallback_encoder,
codec,
compressor,
+ encoder,
num_buffered_values: 0,
- num_buffered_encoded_values: 0,
num_buffered_rows: 0,
+ num_page_nulls: 0,
total_bytes_written: 0,
total_rows_written: 0,
total_uncompressed_size: 0,
@@ -271,27 +239,23 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
def_levels_sink: vec![],
rep_levels_sink: vec![],
data_pages: VecDeque::new(),
- min_page_value: None,
- max_page_value: None,
- num_page_nulls: 0,
- page_distinct_count: None,
min_column_value: None,
max_column_value: None,
num_column_nulls: 0,
column_distinct_count: None,
- _phantom: PhantomData,
column_index_builder: ColumnIndexBuilder::new(),
offset_index_builder: OffsetIndexBuilder::new(),
+ encodings,
}
}
fn write_batch_internal(
&mut self,
- values: &[T::T],
+ values: &E::Values,
def_levels: Option<&[i16]>,
rep_levels: Option<&[i16]>,
- min: Option<&T::T>,
- max: Option<&T::T>,
+ min: Option<&E::T>,
+ max: Option<&E::T>,
distinct_count: Option<u64>,
) -> Result<usize> {
// We check for DataPage limits only after we have inserted the
values. If a user
@@ -304,18 +268,14 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
// TODO: find out why we don't account for size of levels when we
estimate page
// size.
- // Find out the minimal length to prevent index out of bound errors.
- let mut min_len = values.len();
- if let Some(levels) = def_levels {
- min_len = min_len.min(levels.len());
- }
- if let Some(levels) = rep_levels {
- min_len = min_len.min(levels.len());
- }
+ let num_levels = match def_levels {
+ Some(def_levels) => def_levels.len(),
+ None => values.len(),
+ };
// Find out number of batches to process.
let write_batch_size = self.props.write_batch_size();
- let num_batches = min_len / write_batch_size;
+ let num_batches = num_levels / write_batch_size;
// If only computing chunk-level statistics compute them here,
page-level statistics
// are computed in [`Self::write_mini_batch`] and used to update chunk
statistics in
@@ -323,23 +283,23 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
if self.statistics_enabled == EnabledStatistics::Chunk {
match (min, max) {
(Some(min), Some(max)) => {
- Self::update_min(&self.descr, min, &mut
self.min_column_value);
- Self::update_max(&self.descr, max, &mut
self.max_column_value);
+ update_min(&self.descr, min, &mut self.min_column_value);
+ update_max(&self.descr, max, &mut self.max_column_value);
}
(None, Some(_)) | (Some(_), None) => {
panic!("min/max should be both set or both None")
}
(None, None) => {
- for val in values {
- Self::update_min(&self.descr, val, &mut
self.min_column_value);
- Self::update_max(&self.descr, val, &mut
self.max_column_value);
+ if let Some((min, max)) = values.min_max(&self.descr) {
+ update_min(&self.descr, min, &mut
self.min_column_value);
+ update_max(&self.descr, max, &mut
self.max_column_value);
}
}
};
}
// We can only set the distinct count if there are no other writes
- if self.num_buffered_values == 0 && self.num_page_nulls == 0 {
+ if self.encoder.num_values() == 0 {
self.column_distinct_count = distinct_count;
} else {
self.column_distinct_count = None;
@@ -349,7 +309,9 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
let mut levels_offset = 0;
for _ in 0..num_batches {
values_offset += self.write_mini_batch(
- &values[values_offset..values_offset + write_batch_size],
+ values,
+ values_offset,
+ write_batch_size,
def_levels.map(|lv| &lv[levels_offset..levels_offset +
write_batch_size]),
rep_levels.map(|lv| &lv[levels_offset..levels_offset +
write_batch_size]),
)?;
@@ -357,7 +319,9 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
}
values_offset += self.write_mini_batch(
- &values[values_offset..],
+ values,
+ values_offset,
+ num_levels - levels_offset,
def_levels.map(|lv| &lv[levels_offset..]),
rep_levels.map(|lv| &lv[levels_offset..]),
)?;
@@ -380,7 +344,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
/// non-nullable and/or non-repeated.
pub fn write_batch(
&mut self,
- values: &[T::T],
+ values: &E::Values,
def_levels: Option<&[i16]>,
rep_levels: Option<&[i16]>,
) -> Result<usize> {
@@ -396,11 +360,11 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
/// computed page statistics
pub fn write_batch_with_statistics(
&mut self,
- values: &[T::T],
+ values: &E::Values,
def_levels: Option<&[i16]>,
rep_levels: Option<&[i16]>,
- min: Option<&T::T>,
- max: Option<&T::T>,
+ min: Option<&E::T>,
+ max: Option<&E::T>,
distinct_count: Option<u64>,
) -> Result<usize> {
self.write_batch_internal(
@@ -428,12 +392,14 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
/// Finalises writes and closes the column writer.
/// Returns total bytes written, total rows written and column chunk
metadata.
pub fn close(mut self) -> Result<ColumnCloseResult> {
- if self.dict_encoder.is_some() {
+ if self.num_buffered_values > 0 {
+ self.add_data_page()?;
+ }
+ if self.encoder.has_dictionary() {
self.write_dictionary_page()?;
}
self.flush_data_pages()?;
let metadata = self.write_column_metadata()?;
- self.dict_encoder = None;
self.page_writer.close()?;
let (column_index, offset_index) = if
self.column_index_builder.valid() {
@@ -459,12 +425,12 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
/// page size.
fn write_mini_batch(
&mut self,
- values: &[T::T],
+ values: &E::Values,
+ values_offset: usize,
+ num_levels: usize,
def_levels: Option<&[i16]>,
rep_levels: Option<&[i16]>,
) -> Result<usize> {
- let mut values_to_write = 0;
-
// Check if number of definition levels is the same as number of
repetition
// levels.
if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
@@ -478,7 +444,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
}
// Process definition levels and determine how many values to write.
- let num_values = if self.descr.max_def_level() > 0 {
+ let values_to_write = if self.descr.max_def_level() > 0 {
let levels = def_levels.ok_or_else(|| {
general_err!(
"Definition levels are required, because max definition
level = {}",
@@ -486,6 +452,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
)
})?;
+ let mut values_to_write = 0;
for &level in levels {
if level == self.descr.max_def_level() {
values_to_write += 1;
@@ -494,11 +461,10 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
}
}
- self.write_definition_levels(levels);
- u32::try_from(levels.len()).unwrap()
+ self.def_levels_sink.extend_from_slice(levels);
+ values_to_write
} else {
- values_to_write = values.len();
- u32::try_from(values_to_write).unwrap()
+ num_levels
};
// Process repetition levels and determine how many rows we are about
to process.
@@ -516,32 +482,15 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
self.num_buffered_rows += (level == 0) as u32
}
- self.write_repetition_levels(levels);
+ self.rep_levels_sink.extend_from_slice(levels);
} else {
// Each value is exactly one row.
// Equals to the number of values, we count nulls as well.
- self.num_buffered_rows += num_values;
+ self.num_buffered_rows += num_levels as u32;
}
- // Check that we have enough values to write.
- let values_to_write = values.get(0..values_to_write).ok_or_else(|| {
- general_err!(
- "Expected to write {} values, but have only {}",
- values_to_write,
- values.len()
- )
- })?;
-
- if self.statistics_enabled == EnabledStatistics::Page {
- for val in values_to_write {
- self.update_page_min_max(val);
- }
- }
-
- self.write_values(values_to_write)?;
-
- self.num_buffered_values += num_values;
- self.num_buffered_encoded_values +=
u32::try_from(values_to_write.len()).unwrap();
+ self.encoder.write(values, values_offset, values_to_write)?;
+ self.num_buffered_values += num_levels as u32;
if self.should_add_data_page() {
self.add_data_page()?;
@@ -551,25 +500,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
self.dict_fallback()?;
}
- Ok(values_to_write.len())
- }
-
- #[inline]
- fn write_definition_levels(&mut self, def_levels: &[i16]) {
- self.def_levels_sink.extend_from_slice(def_levels);
- }
-
- #[inline]
- fn write_repetition_levels(&mut self, rep_levels: &[i16]) {
- self.rep_levels_sink.extend_from_slice(rep_levels);
- }
-
- #[inline]
- fn write_values(&mut self, values: &[T::T]) -> Result<()> {
- match self.dict_encoder {
- Some(ref mut encoder) => encoder.put(values),
- None => self.encoder.put(values),
- }
+ Ok(values_to_write)
}
/// Returns true if we need to fall back to non-dictionary encoding.
@@ -578,10 +509,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
/// size.
#[inline]
fn should_dict_fallback(&self) -> bool {
- match self.dict_encoder {
- Some(ref encoder) => {
- encoder.dict_encoded_size() >=
self.props.dictionary_pagesize_limit()
- }
+ match self.encoder.estimated_dict_page_size() {
+ Some(size) => size >= self.props.dictionary_pagesize_limit(),
None => false,
}
}
@@ -593,28 +522,22 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
//
// In such a scenario the dictionary decoder may return an estimated
encoded
// size in excess of the page size limit, even when there are no
buffered values
- if self.num_buffered_values == 0 {
+ if self.encoder.num_values() == 0 {
return false;
}
- match self.dict_encoder {
- Some(ref encoder) => {
- encoder.estimated_data_encoded_size() >=
self.props.data_pagesize_limit()
- }
- None => {
- self.encoder.estimated_data_encoded_size()
- >= self.props.data_pagesize_limit()
- }
- }
+ self.encoder.estimated_data_page_size() >=
self.props.data_pagesize_limit()
}
/// Performs dictionary fallback.
/// Prepares and writes dictionary and all data pages into page writer.
fn dict_fallback(&mut self) -> Result<()> {
// At this point we know that we need to fall back.
+ if self.num_buffered_values > 0 {
+ self.add_data_page()?;
+ }
self.write_dictionary_page()?;
self.flush_data_pages()?;
- self.dict_encoder = None;
Ok(())
}
@@ -658,28 +581,24 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
/// Data page is either buffered in case of dictionary encoding or written
directly.
fn add_data_page(&mut self) -> Result<()> {
// Extract encoded values
- let value_bytes = match self.dict_encoder {
- Some(ref mut encoder) => encoder.write_indices()?,
- None => self.encoder.flush_buffer()?,
- };
-
- // Select encoding based on current encoder and writer version (v1 or
v2).
- let encoding = if self.dict_encoder.is_some() {
- self.props.dictionary_data_page_encoding()
- } else {
- self.encoder.encoding()
- };
+ let values_data = self.encoder.flush_data_page()?;
let max_def_level = self.descr.max_def_level();
let max_rep_level = self.descr.max_rep_level();
self.num_column_nulls += self.num_page_nulls;
- let has_min_max = self.min_page_value.is_some() &&
self.max_page_value.is_some();
- let page_statistics = match self.statistics_enabled {
- EnabledStatistics::Page if has_min_max => {
- self.update_column_min_max();
- Some(self.make_page_statistics())
+ let page_statistics = match (values_data.min_value,
values_data.max_value) {
+ (Some(min), Some(max)) => {
+ update_min(&self.descr, &min, &mut self.min_column_value);
+ update_max(&self.descr, &max, &mut self.max_column_value);
+ Some(Statistics::new(
+ Some(min),
+ Some(max),
+ None,
+ self.num_page_nulls,
+ false,
+ ))
}
_ => None,
};
@@ -711,11 +630,11 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
);
}
- buffer.extend_from_slice(value_bytes.data());
+ buffer.extend_from_slice(values_data.buf.data());
let uncompressed_size = buffer.len();
if let Some(ref mut cmpr) = self.compressor {
- let mut compressed_buf =
Vec::with_capacity(value_bytes.data().len());
+ let mut compressed_buf =
Vec::with_capacity(uncompressed_size);
cmpr.compress(&buffer[..], &mut compressed_buf)?;
buffer = compressed_buf;
}
@@ -723,7 +642,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
let data_page = Page::DataPage {
buf: ByteBufferPtr::new(buffer),
num_values: self.num_buffered_values,
- encoding,
+ encoding: values_data.encoding,
def_level_encoding: Encoding::RLE,
rep_level_encoding: Encoding::RLE,
statistics: page_statistics,
@@ -751,20 +670,20 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
}
let uncompressed_size =
- rep_levels_byte_len + def_levels_byte_len +
value_bytes.len();
+ rep_levels_byte_len + def_levels_byte_len +
values_data.buf.len();
// Data Page v2 compresses values only.
match self.compressor {
Some(ref mut cmpr) => {
- cmpr.compress(value_bytes.data(), &mut buffer)?;
+ cmpr.compress(values_data.buf.data(), &mut buffer)?;
}
- None => buffer.extend_from_slice(value_bytes.data()),
+ None => buffer.extend_from_slice(values_data.buf.data()),
}
let data_page = Page::DataPageV2 {
buf: ByteBufferPtr::new(buffer),
num_values: self.num_buffered_values,
- encoding,
+ encoding: values_data.encoding,
num_nulls: self.num_page_nulls as u32,
num_rows: self.num_buffered_rows,
def_levels_byte_len: def_levels_byte_len as u32,
@@ -778,7 +697,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
};
// Check if we need to buffer data page or flush it to the sink
directly.
- if self.dict_encoder.is_some() {
+ if self.encoder.has_dictionary() {
self.data_pages.push_back(compressed_page);
} else {
self.write_data_page(compressed_page)?;
@@ -791,12 +710,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
self.rep_levels_sink.clear();
self.def_levels_sink.clear();
self.num_buffered_values = 0;
- self.num_buffered_encoded_values = 0;
self.num_buffered_rows = 0;
- self.min_page_value = None;
- self.max_page_value = None;
self.num_page_nulls = 0;
- self.page_distinct_count = None;
Ok(())
}
@@ -826,30 +741,22 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
// If data page offset is not set, then no pages have been written
let data_page_offset = self.data_page_offset.unwrap_or(0) as i64;
- let file_offset;
- let mut encodings = Vec::new();
-
- if self.has_dictionary {
- assert!(dict_page_offset.is_some(), "Dictionary offset is not
set");
- file_offset = dict_page_offset.unwrap() + total_compressed_size;
- // NOTE: This should be in sync with writing dictionary pages.
- encodings.push(self.props.dictionary_page_encoding());
- encodings.push(self.props.dictionary_data_page_encoding());
- // Fallback to alternative encoding, add it to the list.
- if self.dict_encoder.is_none() {
- encodings.push(self.encoder.encoding());
- }
- } else {
- file_offset = data_page_offset + total_compressed_size;
- encodings.push(self.encoder.encoding());
- }
- // We use only RLE level encoding for data page v1 and data page v2.
- encodings.push(Encoding::RLE);
+ let file_offset = match dict_page_offset {
+ Some(dict_offset) => dict_offset + total_compressed_size,
+ None => data_page_offset + total_compressed_size,
+ };
+
+ let statistics = Statistics::new(
+ self.min_column_value.clone(),
+ self.max_column_value.clone(),
+ self.column_distinct_count,
+ self.num_column_nulls,
+ false,
+ );
- let statistics = self.make_column_statistics();
let metadata = ColumnChunkMetaData::builder(self.descr.clone())
.set_compression(self.codec)
- .set_encodings(encodings)
+ .set_encodings(self.encodings.iter().cloned().collect())
.set_file_offset(file_offset)
.set_total_compressed_size(total_compressed_size)
.set_total_uncompressed_size(total_uncompressed_size)
@@ -891,6 +798,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
/// Writes compressed data page into underlying sink and updates global
metrics.
#[inline]
fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
+ self.encodings.insert(page.encoding());
let page_spec = self.page_writer.write_page(page)?;
// update offset index
// compressed_size = header_size + compressed_data_size
@@ -906,31 +814,29 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
#[inline]
fn write_dictionary_page(&mut self) -> Result<()> {
let compressed_page = {
- let encoder = self
- .dict_encoder
- .as_ref()
+ let mut page = self
+ .encoder
+ .flush_dict_page()?
.ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
- let is_sorted = encoder.is_sorted();
- let num_values = encoder.num_entries();
- let mut values_buf = encoder.write_dict()?;
- let uncompressed_size = values_buf.len();
+ let uncompressed_size = page.buf.len();
if let Some(ref mut cmpr) = self.compressor {
let mut output_buf = Vec::with_capacity(uncompressed_size);
- cmpr.compress(values_buf.data(), &mut output_buf)?;
- values_buf = ByteBufferPtr::new(output_buf);
+ cmpr.compress(page.buf.data(), &mut output_buf)?;
+ page.buf = ByteBufferPtr::new(output_buf);
}
let dict_page = Page::DictionaryPage {
- buf: values_buf,
- num_values: num_values as u32,
+ buf: page.buf,
+ num_values: page.num_values as u32,
encoding: self.props.dictionary_page_encoding(),
- is_sorted,
+ is_sorted: page.is_sorted,
};
CompressedPage::new(dict_page, uncompressed_size)
};
+ self.encodings.insert(compressed_page.encoding());
let page_spec = self.page_writer.write_page(compressed_page)?;
self.update_metrics_for_page(page_spec);
// For the directory page, don't need to update column/offset index.
@@ -967,149 +873,89 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
fn get_page_writer_ref(&self) -> &dyn PageWriter {
self.page_writer.as_ref()
}
+}
- fn make_column_statistics(&self) -> Statistics {
- self.make_typed_statistics(Level::Column)
- }
-
- fn make_page_statistics(&self) -> Statistics {
- self.make_typed_statistics(Level::Page)
- }
+fn update_min<T: ParquetValueType>(
+ descr: &ColumnDescriptor,
+ val: &T,
+ min: &mut Option<T>,
+) {
+ update_stat::<T, _>(val, min, |cur| compare_greater(descr, cur, val))
+}
- pub fn make_typed_statistics(&self, level: Level) -> Statistics {
- let (min, max, distinct, nulls) = match level {
- Level::Page => (
- self.min_page_value.as_ref(),
- self.max_page_value.as_ref(),
- self.page_distinct_count,
- self.num_page_nulls,
- ),
- Level::Column => (
- self.min_column_value.as_ref(),
- self.max_column_value.as_ref(),
- self.column_distinct_count,
- self.num_column_nulls,
- ),
- };
- match self.descr.physical_type() {
- Type::INT32 => gen_stats_section!(i32, int32, min, max, distinct,
nulls),
- Type::BOOLEAN => gen_stats_section!(bool, boolean, min, max,
distinct, nulls),
- Type::INT64 => gen_stats_section!(i64, int64, min, max, distinct,
nulls),
- Type::INT96 => gen_stats_section!(Int96, int96, min, max,
distinct, nulls),
- Type::FLOAT => gen_stats_section!(f32, float, min, max, distinct,
nulls),
- Type::DOUBLE => gen_stats_section!(f64, double, min, max,
distinct, nulls),
- Type::BYTE_ARRAY => {
- let min = min.as_ref().map(|v|
ByteArray::from(v.as_bytes().to_vec()));
- let max = max.as_ref().map(|v|
ByteArray::from(v.as_bytes().to_vec()));
- Statistics::byte_array(min, max, distinct, nulls, false)
- }
- Type::FIXED_LEN_BYTE_ARRAY => {
- let min = min
- .as_ref()
- .map(|v| ByteArray::from(v.as_bytes().to_vec()))
- .map(|ba| {
- let ba: FixedLenByteArray = ba.into();
- ba
- });
- let max = max
- .as_ref()
- .map(|v| ByteArray::from(v.as_bytes().to_vec()))
- .map(|ba| {
- let ba: FixedLenByteArray = ba.into();
- ba
- });
- Statistics::fixed_len_byte_array(min, max, distinct, nulls,
false)
- }
- }
- }
+fn update_max<T: ParquetValueType>(
+ descr: &ColumnDescriptor,
+ val: &T,
+ max: &mut Option<T>,
+) {
+ update_stat::<T, _>(val, max, |cur| compare_greater(descr, val, cur))
+}
- fn update_page_min_max(&mut self, val: &T::T) {
- Self::update_min(&self.descr, val, &mut self.min_page_value);
- Self::update_max(&self.descr, val, &mut self.max_page_value);
+#[inline]
+#[allow(clippy::eq_op)]
+fn is_nan<T: ParquetValueType>(val: &T) -> bool {
+ match T::PHYSICAL_TYPE {
+ Type::FLOAT | Type::DOUBLE => val != val,
+ _ => false,
}
+}
- fn update_column_min_max(&mut self) {
- let min = self.min_page_value.as_ref().unwrap();
- Self::update_min(&self.descr, min, &mut self.min_column_value);
-
- let max = self.max_page_value.as_ref().unwrap();
- Self::update_max(&self.descr, max, &mut self.max_column_value);
- }
+/// Perform a conditional update of `cur`, skipping any NaN values
+///
+/// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls
`should_update` with
+/// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true`
- fn update_min(descr: &ColumnDescriptor, val: &T::T, min: &mut
Option<T::T>) {
- Self::update_stat(val, min, |cur| Self::compare_greater(descr, cur,
val))
+fn update_stat<T: ParquetValueType, F>(val: &T, cur: &mut Option<T>,
should_update: F)
+where
+ F: Fn(&T) -> bool,
+{
+ if is_nan(val) {
+ return;
}
- fn update_max(descr: &ColumnDescriptor, val: &T::T, max: &mut
Option<T::T>) {
- Self::update_stat(val, max, |cur| Self::compare_greater(descr, val,
cur))
+ if cur.as_ref().map_or(true, should_update) {
+ *cur = Some(val.clone());
}
+}
- /// Perform a conditional update of `cur`, skipping any NaN values
- ///
- /// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls
`should_update` with
- /// the value of `cur`, and updates `cur` to `Some(val)` if it returns
`true`
- #[allow(clippy::eq_op)]
- fn update_stat<F>(val: &T::T, cur: &mut Option<T::T>, should_update: F)
- where
- F: Fn(&T::T) -> bool,
- {
- if let Type::FLOAT | Type::DOUBLE = T::get_physical_type() {
- // Skip NaN values
- if val != val {
- return;
- }
- }
-
- if cur.as_ref().map_or(true, should_update) {
- *cur = Some(val.clone());
+/// Evaluate `a > b` according to underlying logical type.
+fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b:
&T) -> bool {
+ if let Some(LogicalType::Integer { is_signed, .. }) = descr.logical_type()
{
+ if !is_signed {
+ // need to compare unsigned
+ return a.as_u64().unwrap() > b.as_u64().unwrap();
}
}
- /// Evaluate `a > b` according to underlying logical type.
- fn compare_greater(descr: &ColumnDescriptor, a: &T::T, b: &T::T) -> bool {
- if let Some(LogicalType::Integer { is_signed, .. }) =
descr.logical_type() {
- if !is_signed {
- // need to compare unsigned
- return a.as_u64().unwrap() > b.as_u64().unwrap();
- }
+ match descr.converted_type() {
+ ConvertedType::UINT_8
+ | ConvertedType::UINT_16
+ | ConvertedType::UINT_32
+ | ConvertedType::UINT_64 => {
+ return a.as_u64().unwrap() > b.as_u64().unwrap();
}
+ _ => {}
+ };
- match descr.converted_type() {
- ConvertedType::UINT_8
- | ConvertedType::UINT_16
- | ConvertedType::UINT_32
- | ConvertedType::UINT_64 => {
- return a.as_u64().unwrap() > b.as_u64().unwrap();
+ if let Some(LogicalType::Decimal { .. }) = descr.logical_type() {
+ match T::PHYSICAL_TYPE {
+ Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
+ return compare_greater_byte_array_decimals(a.as_bytes(),
b.as_bytes());
}
_ => {}
};
+ }
- if let Some(LogicalType::Decimal { .. }) = descr.logical_type() {
- match T::get_physical_type() {
- Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
- return compare_greater_byte_array_decimals(
- a.as_bytes(),
- b.as_bytes(),
- );
- }
- _ => {}
- };
- }
-
- if descr.converted_type() == ConvertedType::DECIMAL {
- match T::get_physical_type() {
- Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
- return compare_greater_byte_array_decimals(
- a.as_bytes(),
- b.as_bytes(),
- );
- }
- _ => {}
- };
+ if descr.converted_type() == ConvertedType::DECIMAL {
+ match T::PHYSICAL_TYPE {
+ Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
+ return compare_greater_byte_array_decimals(a.as_bytes(),
b.as_bytes());
+ }
+ _ => {}
};
+ };
- a > b
- }
+ a > b
}
// ----------------------------------------------------------------------
@@ -1280,16 +1126,16 @@ mod tests {
}
#[test]
- #[should_panic(expected = "Dictionary offset is already set")]
fn test_column_writer_write_only_one_dictionary_page() {
let page_writer = get_test_page_writer();
let props = Arc::new(WriterProperties::builder().build());
let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0,
0, props);
writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
// First page should be correctly written.
- let res = writer.write_dictionary_page();
- assert!(res.is_ok());
+ writer.add_data_page().unwrap();
writer.write_dictionary_page().unwrap();
+ let err = writer.write_dictionary_page().unwrap_err().to_string();
+ assert_eq!(err, "Parquet error: Dictionary encoder is not set");
}
#[test]
@@ -1302,14 +1148,8 @@ mod tests {
);
let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0,
0, props);
writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
- let res = writer.write_dictionary_page();
- assert!(res.is_err());
- if let Err(err) = res {
- assert_eq!(
- format!("{}", err),
- "Parquet error: Dictionary encoder is not set"
- );
- }
+ let err = writer.write_dictionary_page().unwrap_err().to_string();
+ assert_eq!(err, "Parquet error: Dictionary encoder is not set");
}
#[test]
@@ -1356,14 +1196,14 @@ mod tests {
true,
&[true, false],
None,
- &[Encoding::RLE, Encoding::RLE],
+ &[Encoding::RLE],
);
check_encoding_write_support::<BoolType>(
WriterVersion::PARQUET_2_0,
false,
&[true, false],
None,
- &[Encoding::RLE, Encoding::RLE],
+ &[Encoding::RLE],
);
}
@@ -1374,7 +1214,7 @@ mod tests {
true,
&[1, 2],
Some(0),
- &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+ &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
);
check_encoding_write_support::<Int32Type>(
WriterVersion::PARQUET_1_0,
@@ -1388,14 +1228,14 @@ mod tests {
true,
&[1, 2],
Some(0),
- &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+ &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
);
check_encoding_write_support::<Int32Type>(
WriterVersion::PARQUET_2_0,
false,
&[1, 2],
None,
- &[Encoding::DELTA_BINARY_PACKED, Encoding::RLE],
+ &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
);
}
@@ -1406,7 +1246,7 @@ mod tests {
true,
&[1, 2],
Some(0),
- &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+ &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
);
check_encoding_write_support::<Int64Type>(
WriterVersion::PARQUET_1_0,
@@ -1420,14 +1260,14 @@ mod tests {
true,
&[1, 2],
Some(0),
- &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+ &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
);
check_encoding_write_support::<Int64Type>(
WriterVersion::PARQUET_2_0,
false,
&[1, 2],
None,
- &[Encoding::DELTA_BINARY_PACKED, Encoding::RLE],
+ &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
);
}
@@ -1438,7 +1278,7 @@ mod tests {
true,
&[Int96::from(vec![1, 2, 3])],
Some(0),
- &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+ &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
);
check_encoding_write_support::<Int96Type>(
WriterVersion::PARQUET_1_0,
@@ -1452,7 +1292,7 @@ mod tests {
true,
&[Int96::from(vec![1, 2, 3])],
Some(0),
- &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+ &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
);
check_encoding_write_support::<Int96Type>(
WriterVersion::PARQUET_2_0,
@@ -1470,7 +1310,7 @@ mod tests {
true,
&[1.0, 2.0],
Some(0),
- &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+ &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
);
check_encoding_write_support::<FloatType>(
WriterVersion::PARQUET_1_0,
@@ -1484,7 +1324,7 @@ mod tests {
true,
&[1.0, 2.0],
Some(0),
- &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+ &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
);
check_encoding_write_support::<FloatType>(
WriterVersion::PARQUET_2_0,
@@ -1502,7 +1342,7 @@ mod tests {
true,
&[1.0, 2.0],
Some(0),
- &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+ &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
);
check_encoding_write_support::<DoubleType>(
WriterVersion::PARQUET_1_0,
@@ -1516,7 +1356,7 @@ mod tests {
true,
&[1.0, 2.0],
Some(0),
- &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+ &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
);
check_encoding_write_support::<DoubleType>(
WriterVersion::PARQUET_2_0,
@@ -1534,7 +1374,7 @@ mod tests {
true,
&[ByteArray::from(vec![1u8])],
Some(0),
- &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+ &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
);
check_encoding_write_support::<ByteArrayType>(
WriterVersion::PARQUET_1_0,
@@ -1548,14 +1388,14 @@ mod tests {
true,
&[ByteArray::from(vec![1u8])],
Some(0),
- &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+ &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
);
check_encoding_write_support::<ByteArrayType>(
WriterVersion::PARQUET_2_0,
false,
&[ByteArray::from(vec![1u8])],
None,
- &[Encoding::DELTA_BYTE_ARRAY, Encoding::RLE],
+ &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
);
}
@@ -1580,14 +1420,14 @@ mod tests {
true,
&[ByteArray::from(vec![1u8]).into()],
Some(0),
- &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+ &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
);
check_encoding_write_support::<FixedLenByteArrayType>(
WriterVersion::PARQUET_2_0,
false,
&[ByteArray::from(vec![1u8]).into()],
None,
- &[Encoding::DELTA_BYTE_ARRAY, Encoding::RLE],
+ &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
);
}
@@ -1603,7 +1443,7 @@ mod tests {
assert_eq!(rows_written, 4);
assert_eq!(
metadata.encodings(),
- &vec![Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE]
+ &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
);
assert_eq!(metadata.num_values(), 8); // dictionary + value indexes
assert_eq!(metadata.compressed_size(), 20);
@@ -1728,7 +1568,7 @@ mod tests {
assert_eq!(rows_written, 4);
assert_eq!(
metadata.encodings(),
- &vec![Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE]
+ &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
);
assert_eq!(metadata.num_values(), 8); // dictionary + value indexes
assert_eq!(metadata.compressed_size(), 20);
@@ -1808,40 +1648,19 @@ mod tests {
#[test]
fn test_column_writer_non_nullable_values_roundtrip() {
let props = WriterProperties::builder().build();
- column_roundtrip_random::<Int32Type>(
- props,
- 1024,
- std::i32::MIN,
- std::i32::MAX,
- 0,
- 0,
- );
+ column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX,
0, 0);
}
#[test]
fn test_column_writer_nullable_non_repeated_values_roundtrip() {
let props = WriterProperties::builder().build();
- column_roundtrip_random::<Int32Type>(
- props,
- 1024,
- std::i32::MIN,
- std::i32::MAX,
- 10,
- 0,
- );
+ column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX,
10, 0);
}
#[test]
fn test_column_writer_nullable_repeated_values_roundtrip() {
let props = WriterProperties::builder().build();
- column_roundtrip_random::<Int32Type>(
- props,
- 1024,
- std::i32::MIN,
- std::i32::MAX,
- 10,
- 10,
- );
+ column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX,
10, 10);
}
#[test]
@@ -1850,14 +1669,7 @@ mod tests {
.set_dictionary_pagesize_limit(32)
.set_data_pagesize_limit(32)
.build();
- column_roundtrip_random::<Int32Type>(
- props,
- 1024,
- std::i32::MIN,
- std::i32::MAX,
- 10,
- 10,
- );
+ column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX,
10, 10);
}
#[test]
@@ -1865,14 +1677,7 @@ mod tests {
for i in &[1usize, 2, 5, 10, 11, 1023] {
let props =
WriterProperties::builder().set_write_batch_size(*i).build();
- column_roundtrip_random::<Int32Type>(
- props,
- 1024,
- std::i32::MIN,
- std::i32::MAX,
- 10,
- 10,
- );
+ column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN,
i32::MAX, 10, 10);
}
}
@@ -1882,14 +1687,7 @@ mod tests {
.set_writer_version(WriterVersion::PARQUET_1_0)
.set_dictionary_enabled(false)
.build();
- column_roundtrip_random::<Int32Type>(
- props,
- 1024,
- std::i32::MIN,
- std::i32::MAX,
- 10,
- 10,
- );
+ column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX,
10, 10);
}
#[test]
@@ -1898,14 +1696,7 @@ mod tests {
.set_writer_version(WriterVersion::PARQUET_2_0)
.set_dictionary_enabled(false)
.build();
- column_roundtrip_random::<Int32Type>(
- props,
- 1024,
- std::i32::MIN,
- std::i32::MAX,
- 10,
- 10,
- );
+ column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX,
10, 10);
}
#[test]
@@ -1914,14 +1705,7 @@ mod tests {
.set_writer_version(WriterVersion::PARQUET_1_0)
.set_compression(Compression::SNAPPY)
.build();
- column_roundtrip_random::<Int32Type>(
- props,
- 2048,
- std::i32::MIN,
- std::i32::MAX,
- 10,
- 10,
- );
+ column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX,
10, 10);
}
#[test]
@@ -1930,14 +1714,7 @@ mod tests {
.set_writer_version(WriterVersion::PARQUET_2_0)
.set_compression(Compression::SNAPPY)
.build();
- column_roundtrip_random::<Int32Type>(
- props,
- 2048,
- std::i32::MIN,
- std::i32::MAX,
- 10,
- 10,
- );
+ column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX,
10, 10);
}
#[test]
diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs
index 7b6fb04a7..1d0b5b231 100644
--- a/parquet/src/data_type.rs
+++ b/parquet/src/data_type.rs
@@ -568,6 +568,7 @@ pub(crate) mod private {
use crate::util::bit_util::{round_upto_power_of_2, BitReader, BitWriter};
use crate::util::memory::ByteBufferPtr;
+ use crate::basic::Type;
use byteorder::ByteOrder;
use std::convert::TryInto;
@@ -581,18 +582,21 @@ pub(crate) mod private {
/// crate, and thus hint to the type system (and end user) traits are
public for the contract
/// and not for extension.
pub trait ParquetValueType:
- std::cmp::PartialEq
+ PartialEq
+ std::fmt::Debug
+ std::fmt::Display
- + std::default::Default
- + std::clone::Clone
+ + Default
+ + Clone
+ super::AsBytes
+ super::FromBytes
- + super::SliceAsBytes
+ + SliceAsBytes
+ PartialOrd
+ Send
+ crate::encodings::decoding::private::GetDecoder
+ + crate::file::statistics::private::MakeStatistics
{
+ const PHYSICAL_TYPE: Type;
+
/// Encode the value directly from a higher level encoder
fn encode<W: std::io::Write>(
values: &[Self],
@@ -646,6 +650,8 @@ pub(crate) mod private {
}
impl ParquetValueType for bool {
+ const PHYSICAL_TYPE: Type = Type::BOOLEAN;
+
#[inline]
fn encode<W: std::io::Write>(
values: &[Self],
@@ -730,8 +736,10 @@ pub(crate) mod private {
}
macro_rules! impl_from_raw {
- ($ty: ty, $self: ident => $as_i64: block) => {
+ ($ty: ty, $physical_ty: expr, $self: ident => $as_i64: block) => {
impl ParquetValueType for $ty {
+ const PHYSICAL_TYPE: Type = $physical_ty;
+
#[inline]
fn encode<W: std::io::Write>(values: &[Self], writer: &mut W,
_: &mut BitWriter) -> Result<()> {
let raw = unsafe {
@@ -809,12 +817,14 @@ pub(crate) mod private {
}
}
- impl_from_raw!(i32, self => { Ok(*self as i64) });
- impl_from_raw!(i64, self => { Ok(*self) });
- impl_from_raw!(f32, self => { Err(general_err!("Type cannot be converted
to i64")) });
- impl_from_raw!(f64, self => { Err(general_err!("Type cannot be converted
to i64")) });
+ impl_from_raw!(i32, Type::INT32, self => { Ok(*self as i64) });
+ impl_from_raw!(i64, Type::INT64, self => { Ok(*self) });
+ impl_from_raw!(f32, Type::FLOAT, self => { Err(general_err!("Type cannot
be converted to i64")) });
+ impl_from_raw!(f64, Type::DOUBLE, self => { Err(general_err!("Type cannot
be converted to i64")) });
impl ParquetValueType for super::Int96 {
+ const PHYSICAL_TYPE: Type = Type::INT96;
+
#[inline]
fn encode<W: std::io::Write>(
values: &[Self],
@@ -925,6 +935,8 @@ pub(crate) mod private {
}
impl ParquetValueType for super::ByteArray {
+ const PHYSICAL_TYPE: Type = Type::BYTE_ARRAY;
+
#[inline]
fn encode<W: std::io::Write>(
values: &[Self],
@@ -1016,6 +1028,8 @@ pub(crate) mod private {
}
impl ParquetValueType for super::FixedLenByteArray {
+ const PHYSICAL_TYPE: Type = Type::FIXED_LEN_BYTE_ARRAY;
+
#[inline]
fn encode<W: std::io::Write>(
values: &[Self],
@@ -1113,7 +1127,9 @@ pub trait DataType: 'static + Send {
type T: private::ParquetValueType;
/// Returns Parquet physical type.
- fn get_physical_type() -> Type;
+ fn get_physical_type() -> Type {
+ <Self::T as private::ParquetValueType>::PHYSICAL_TYPE
+ }
/// Returns size in bytes for Rust representation of the physical type.
fn get_type_size() -> usize;
@@ -1156,17 +1172,13 @@ where
}
macro_rules! make_type {
- ($name:ident, $physical_ty:path, $reader_ident: ident, $writer_ident:
ident, $native_ty:ty, $size:expr) => {
+ ($name:ident, $reader_ident: ident, $writer_ident: ident, $native_ty:ty,
$size:expr) => {
#[derive(Clone)]
pub struct $name {}
impl DataType for $name {
type T = $native_ty;
- fn get_physical_type() -> Type {
- $physical_ty
- }
-
fn get_type_size() -> usize {
$size
}
@@ -1212,57 +1224,20 @@ macro_rules! make_type {
// Generate struct definitions for all physical types
-make_type!(
- BoolType,
- Type::BOOLEAN,
- BoolColumnReader,
- BoolColumnWriter,
- bool,
- 1
-);
-make_type!(
- Int32Type,
- Type::INT32,
- Int32ColumnReader,
- Int32ColumnWriter,
- i32,
- 4
-);
-make_type!(
- Int64Type,
- Type::INT64,
- Int64ColumnReader,
- Int64ColumnWriter,
- i64,
- 8
-);
+make_type!(BoolType, BoolColumnReader, BoolColumnWriter, bool, 1);
+make_type!(Int32Type, Int32ColumnReader, Int32ColumnWriter, i32, 4);
+make_type!(Int64Type, Int64ColumnReader, Int64ColumnWriter, i64, 8);
make_type!(
Int96Type,
- Type::INT96,
Int96ColumnReader,
Int96ColumnWriter,
Int96,
mem::size_of::<Int96>()
);
-make_type!(
- FloatType,
- Type::FLOAT,
- FloatColumnReader,
- FloatColumnWriter,
- f32,
- 4
-);
-make_type!(
- DoubleType,
- Type::DOUBLE,
- DoubleColumnReader,
- DoubleColumnWriter,
- f64,
- 8
-);
+make_type!(FloatType, FloatColumnReader, FloatColumnWriter, f32, 4);
+make_type!(DoubleType, DoubleColumnReader, DoubleColumnWriter, f64, 8);
make_type!(
ByteArrayType,
- Type::BYTE_ARRAY,
ByteArrayColumnReader,
ByteArrayColumnWriter,
ByteArray,
@@ -1270,7 +1245,6 @@ make_type!(
);
make_type!(
FixedLenByteArrayType,
- Type::FIXED_LEN_BYTE_ARRAY,
FixedLenByteArrayColumnReader,
FixedLenByteArrayColumnWriter,
FixedLenByteArray,
diff --git a/parquet/src/file/statistics.rs b/parquet/src/file/statistics.rs
index 40db3c101..5d1a01df8 100644
--- a/parquet/src/file/statistics.rs
+++ b/parquet/src/file/statistics.rs
@@ -37,15 +37,48 @@
//! }
//! ```
-use std::{cmp, fmt};
+use std::fmt;
use byteorder::{ByteOrder, LittleEndian};
use parquet_format::Statistics as TStatistics;
use crate::basic::Type;
+use crate::data_type::private::ParquetValueType;
use crate::data_type::*;
use crate::util::bit_util::from_ne_slice;
+pub(crate) mod private {
+ use super::*;
+
+ pub trait MakeStatistics {
+ fn make_statistics(statistics: ValueStatistics<Self>) -> Statistics
+ where
+ Self: Sized;
+ }
+
+ macro_rules! gen_make_statistics {
+ ($value_ty:ty, $stat:ident) => {
+ impl MakeStatistics for $value_ty {
+ fn make_statistics(statistics: ValueStatistics<Self>) ->
Statistics
+ where
+ Self: Sized,
+ {
+ Statistics::$stat(statistics)
+ }
+ }
+ };
+ }
+
+ gen_make_statistics!(bool, Boolean);
+ gen_make_statistics!(i32, Int32);
+ gen_make_statistics!(i64, Int64);
+ gen_make_statistics!(Int96, Int96);
+ gen_make_statistics!(f32, Float);
+ gen_make_statistics!(f64, Double);
+ gen_make_statistics!(ByteArray, ByteArray);
+ gen_make_statistics!(FixedLenByteArray, FixedLenByteArray);
+}
+
// Macro to generate methods create Statistics.
macro_rules! statistics_new_func {
($func:ident, $vtype:ty, $stat:ident) => {
@@ -56,7 +89,7 @@ macro_rules! statistics_new_func {
nulls: u64,
is_deprecated: bool,
) -> Self {
- Statistics::$stat(TypedStatistics::new(
+ Statistics::$stat(ValueStatistics::new(
min,
max,
distinct,
@@ -234,17 +267,39 @@ pub fn to_thrift(stats: Option<&Statistics>) ->
Option<TStatistics> {
/// Statistics for a column chunk and data page.
#[derive(Debug, Clone, PartialEq)]
pub enum Statistics {
- Boolean(TypedStatistics<BoolType>),
- Int32(TypedStatistics<Int32Type>),
- Int64(TypedStatistics<Int64Type>),
- Int96(TypedStatistics<Int96Type>),
- Float(TypedStatistics<FloatType>),
- Double(TypedStatistics<DoubleType>),
- ByteArray(TypedStatistics<ByteArrayType>),
- FixedLenByteArray(TypedStatistics<FixedLenByteArrayType>),
+ Boolean(ValueStatistics<bool>),
+ Int32(ValueStatistics<i32>),
+ Int64(ValueStatistics<i64>),
+ Int96(ValueStatistics<Int96>),
+ Float(ValueStatistics<f32>),
+ Double(ValueStatistics<f64>),
+ ByteArray(ValueStatistics<ByteArray>),
+ FixedLenByteArray(ValueStatistics<FixedLenByteArray>),
+}
+
+impl<T: ParquetValueType> From<ValueStatistics<T>> for Statistics {
+ fn from(t: ValueStatistics<T>) -> Self {
+ T::make_statistics(t)
+ }
}
impl Statistics {
+ pub fn new<T: ParquetValueType>(
+ min: Option<T>,
+ max: Option<T>,
+ distinct_count: Option<u64>,
+ null_count: u64,
+ is_deprecated: bool,
+ ) -> Self {
+ Self::from(ValueStatistics::new(
+ min,
+ max,
+ distinct_count,
+ null_count,
+ is_deprecated,
+ ))
+ }
+
statistics_new_func![boolean, Option<bool>, Boolean];
statistics_new_func![int32, Option<i32>, Int32];
@@ -341,21 +396,24 @@ impl fmt::Display for Statistics {
}
/// Typed implementation for [`Statistics`].
-#[derive(Clone)]
-pub struct TypedStatistics<T: DataType> {
- min: Option<T::T>,
- max: Option<T::T>,
+pub type TypedStatistics<T> = ValueStatistics<<T as DataType>::T>;
+
+/// Statistics for a particular `ParquetValueType`
+#[derive(Clone, Eq, PartialEq)]
+pub struct ValueStatistics<T> {
+ min: Option<T>,
+ max: Option<T>,
// Distinct count could be omitted in some cases
distinct_count: Option<u64>,
null_count: u64,
is_min_max_deprecated: bool,
}
-impl<T: DataType> TypedStatistics<T> {
+impl<T: ParquetValueType> ValueStatistics<T> {
/// Creates new typed statistics.
pub fn new(
- min: Option<T::T>,
- max: Option<T::T>,
+ min: Option<T>,
+ max: Option<T>,
distinct_count: Option<u64>,
null_count: u64,
is_min_max_deprecated: bool,
@@ -373,7 +431,7 @@ impl<T: DataType> TypedStatistics<T> {
///
/// Panics if min value is not set, e.g. all values are `null`.
/// Use `has_min_max_set` method to check that.
- pub fn min(&self) -> &T::T {
+ pub fn min(&self) -> &T {
self.min.as_ref().unwrap()
}
@@ -381,7 +439,7 @@ impl<T: DataType> TypedStatistics<T> {
///
/// Panics if max value is not set, e.g. all values are `null`.
/// Use `has_min_max_set` method to check that.
- pub fn max(&self) -> &T::T {
+ pub fn max(&self) -> &T {
self.max.as_ref().unwrap()
}
@@ -423,7 +481,7 @@ impl<T: DataType> TypedStatistics<T> {
}
}
-impl<T: DataType> fmt::Display for TypedStatistics<T> {
+impl<T: ParquetValueType> fmt::Display for ValueStatistics<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{{")?;
write!(f, "min: ")?;
@@ -447,7 +505,7 @@ impl<T: DataType> fmt::Display for TypedStatistics<T> {
}
}
-impl<T: DataType> fmt::Debug for TypedStatistics<T> {
+impl<T: ParquetValueType> fmt::Debug for ValueStatistics<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
@@ -462,16 +520,6 @@ impl<T: DataType> fmt::Debug for TypedStatistics<T> {
}
}
-impl<T: DataType> cmp::PartialEq for TypedStatistics<T> {
- fn eq(&self, other: &TypedStatistics<T>) -> bool {
- self.min == other.min
- && self.max == other.max
- && self.distinct_count == other.distinct_count
- && self.null_count == other.null_count
- && self.is_min_max_deprecated == other.is_min_max_deprecated
- }
-}
-
#[cfg(test)]
mod tests {
use super::*;