This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 02371d2be Generify parquet write path (#1764) (#2045)
02371d2be is described below

commit 02371d2be6b9fd276ea3423d51122c47935b7714
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Sun Jul 17 16:32:14 2022 -0400

    Generify parquet write path (#1764) (#2045)
    
    * Generify parquet write path (#1764)
    
    * More docs
    
    * Lint
    
    * Fix doc
    
    * Review feedback
    
    * Fix doc
---
 parquet/src/basic.rs                            |   2 +-
 parquet/src/column/writer/encoder.rs            | 250 +++++++++
 parquet/src/column/{writer.rs => writer/mod.rs} | 667 ++++++++----------------
 parquet/src/data_type.rs                        |  90 ++--
 parquet/src/file/statistics.rs                  | 110 ++--
 5 files changed, 584 insertions(+), 535 deletions(-)

diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs
index 59a0fe07b..2d8073e75 100644
--- a/parquet/src/basic.rs
+++ b/parquet/src/basic.rs
@@ -212,7 +212,7 @@ pub enum Repetition {
 /// Encodings supported by Parquet.
 /// Not all encodings are valid for all types. These enums are also used to 
specify the
 /// encoding of definition and repetition levels.
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
 pub enum Encoding {
     /// Default byte encoding.
     /// - BOOLEAN - 1 bit per value, 0 is false; 1 is true.
diff --git a/parquet/src/column/writer/encoder.rs 
b/parquet/src/column/writer/encoder.rs
new file mode 100644
index 000000000..bc31aedf4
--- /dev/null
+++ b/parquet/src/column/writer/encoder.rs
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::Encoding;
+use crate::column::writer::{
+    compare_greater, fallback_encoding, has_dictionary_support, is_nan, 
update_max,
+    update_min,
+};
+use crate::data_type::private::ParquetValueType;
+use crate::data_type::DataType;
+use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
+use crate::errors::{ParquetError, Result};
+use crate::file::properties::WriterProperties;
+use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
+use crate::util::memory::ByteBufferPtr;
+
+/// A collection of [`ParquetValueType`] encoded by a [`ColumnValueEncoder`]
+pub trait ColumnValues {
+    /// The underlying value type
+    type T: ParquetValueType;
+
+    /// The number of values in this collection
+    fn len(&self) -> usize;
+
+    /// Returns the min and max values in this collection, skipping any NaN 
values
+    ///
+    /// Returns `None` if no values found
+    fn min_max(&self, descr: &ColumnDescriptor) -> Option<(&Self::T, 
&Self::T)>;
+}
+
+/// The encoded data for a dictionary page
+pub struct DictionaryPage {
+    pub buf: ByteBufferPtr,
+    pub num_values: usize,
+    pub is_sorted: bool,
+}
+
+/// The encoded values for a data page, with optional statistics
+pub struct DataPageValues<T> {
+    pub buf: ByteBufferPtr,
+    pub num_values: usize,
+    pub encoding: Encoding,
+    pub min_value: Option<T>,
+    pub max_value: Option<T>,
+}
+
+/// A generic encoder of [`ColumnValues`] to data and dictionary pages used by
+/// [super::GenericColumnWriter`]
+pub trait ColumnValueEncoder {
+    /// The underlying value type of [`Self::Values`]
+    ///
+    /// Note: this avoids needing to fully qualify `<Self::Values as 
ColumnValues>::T`
+    type T: ParquetValueType;
+
+    /// The values encoded by this encoder
+    type Values: ColumnValues<T = Self::T> + ?Sized;
+
+    /// Create a new [`ColumnValueEncoder`]
+    fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
+    where
+        Self: Sized;
+
+    /// Write the corresponding values to this [`ColumnValueEncoder`]
+    fn write(&mut self, values: &Self::Values, offset: usize, len: usize) -> 
Result<()>;
+
+    /// Returns the number of buffered values
+    fn num_values(&self) -> usize;
+
+    /// Returns true if this encoder has a dictionary page
+    fn has_dictionary(&self) -> bool;
+
+    /// Returns an estimate of the dictionary page size in bytes, or `None` if 
no dictionary
+    fn estimated_dict_page_size(&self) -> Option<usize>;
+
+    /// Returns an estimate of the data page size in bytes
+    fn estimated_data_page_size(&self) -> usize;
+
+    /// Flush the dictionary page for this column chunk if any. Any subsequent 
calls to
+    /// [`Self::write`] will not be dictionary encoded
+    ///
+    /// Note: [`Self::flush_data_page`] must be called first, as this will 
error if there
+    /// are any pending page values
+    fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>>;
+
+    /// Flush the next data page for this column chunk
+    fn flush_data_page(&mut self) -> Result<DataPageValues<Self::T>>;
+}
+
+pub struct ColumnValueEncoderImpl<T: DataType> {
+    encoder: Box<dyn Encoder<T>>,
+    dict_encoder: Option<DictEncoder<T>>,
+    descr: ColumnDescPtr,
+    num_values: usize,
+    min_value: Option<T::T>,
+    max_value: Option<T::T>,
+}
+
+impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
+    type T = T::T;
+
+    type Values = [T::T];
+
+    fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> 
Result<Self> {
+        let dict_supported = props.dictionary_enabled(descr.path())
+            && has_dictionary_support(T::get_physical_type(), props);
+        let dict_encoder = dict_supported.then(|| 
DictEncoder::new(descr.clone()));
+
+        // Set either main encoder or fallback encoder.
+        let encoder = get_encoder(
+            descr.clone(),
+            props
+                .encoding(descr.path())
+                .unwrap_or_else(|| fallback_encoding(T::get_physical_type(), 
props)),
+        )?;
+
+        Ok(Self {
+            encoder,
+            dict_encoder,
+            descr: descr.clone(),
+            num_values: 0,
+            min_value: None,
+            max_value: None,
+        })
+    }
+
+    fn write(&mut self, values: &[T::T], offset: usize, len: usize) -> 
Result<()> {
+        self.num_values += len;
+
+        let slice = values.get(offset..offset + len).ok_or_else(|| {
+            general_err!(
+                "Expected to write {} values, but have only {}",
+                len,
+                values.len() - offset
+            )
+        })?;
+
+        if let Some((min, max)) = slice.min_max(&self.descr) {
+            update_min(&self.descr, min, &mut self.min_value);
+            update_max(&self.descr, max, &mut self.max_value);
+        }
+
+        match &mut self.dict_encoder {
+            Some(encoder) => encoder.put(slice),
+            _ => self.encoder.put(slice),
+        }
+    }
+
+    fn num_values(&self) -> usize {
+        self.num_values
+    }
+
+    fn has_dictionary(&self) -> bool {
+        self.dict_encoder.is_some()
+    }
+
+    fn estimated_dict_page_size(&self) -> Option<usize> {
+        Some(self.dict_encoder.as_ref()?.dict_encoded_size())
+    }
+
+    fn estimated_data_page_size(&self) -> usize {
+        match &self.dict_encoder {
+            Some(encoder) => encoder.estimated_data_encoded_size(),
+            _ => self.encoder.estimated_data_encoded_size(),
+        }
+    }
+
+    fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>> {
+        match self.dict_encoder.take() {
+            Some(encoder) => {
+                if self.num_values != 0 {
+                    return Err(general_err!(
+                        "Must flush data pages before flushing dictionary"
+                    ));
+                }
+
+                let buf = encoder.write_dict()?;
+
+                Ok(Some(DictionaryPage {
+                    buf,
+                    num_values: encoder.num_entries(),
+                    is_sorted: encoder.is_sorted(),
+                }))
+            }
+            _ => Ok(None),
+        }
+    }
+
+    fn flush_data_page(&mut self) -> Result<DataPageValues<T::T>> {
+        let (buf, encoding) = match &mut self.dict_encoder {
+            Some(encoder) => (encoder.write_indices()?, 
Encoding::RLE_DICTIONARY),
+            _ => (self.encoder.flush_buffer()?, self.encoder.encoding()),
+        };
+
+        Ok(DataPageValues {
+            buf,
+            encoding,
+            num_values: std::mem::take(&mut self.num_values),
+            min_value: self.min_value.take(),
+            max_value: self.max_value.take(),
+        })
+    }
+}
+
+impl<T: ParquetValueType> ColumnValues for [T] {
+    type T = T;
+
+    fn len(&self) -> usize {
+        self.len()
+    }
+
+    fn min_max(&self, descr: &ColumnDescriptor) -> Option<(&T, &T)> {
+        let mut iter = self.iter();
+
+        let first = loop {
+            let next = iter.next()?;
+            if !is_nan(next) {
+                break next;
+            }
+        };
+
+        let mut min = first;
+        let mut max = first;
+        for val in iter {
+            if is_nan(val) {
+                continue;
+            }
+            if compare_greater(descr, min, val) {
+                min = val;
+            }
+            if compare_greater(descr, val, max) {
+                max = val;
+            }
+        }
+        Some((min, max))
+    }
+}
diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer/mod.rs
similarity index 81%
rename from parquet/src/column/writer.rs
rename to parquet/src/column/writer/mod.rs
index 1fc5207f6..ff6c09898 100644
--- a/parquet/src/column/writer.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -17,18 +17,17 @@
 
 //! Contains column writer API.
 use parquet_format::{ColumnIndex, OffsetIndex};
-use std::{collections::VecDeque, convert::TryFrom, marker::PhantomData};
+use std::collections::{BTreeSet, VecDeque};
 
 use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, 
PageType, Type};
 use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
+use crate::column::writer::encoder::{
+    ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues,
+};
 use crate::compression::{create_codec, Codec};
 use crate::data_type::private::ParquetValueType;
-use crate::data_type::AsBytes;
 use crate::data_type::*;
-use crate::encodings::{
-    encoding::{get_encoder, DictEncoder, Encoder},
-    levels::{max_buffer_size, LevelEncoder},
-};
+use crate::encodings::levels::{max_buffer_size, LevelEncoder};
 use crate::errors::{ParquetError, Result};
 use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder};
 use crate::file::properties::EnabledStatistics;
@@ -38,9 +37,10 @@ use crate::file::{
     properties::{WriterProperties, WriterPropertiesPtr, WriterVersion},
 };
 use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
-use crate::util::bit_util::FromBytes;
 use crate::util::memory::ByteBufferPtr;
 
+pub(crate) mod encoder;
+
 /// Column writer for a Parquet type.
 pub enum ColumnWriter<'a> {
     BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
@@ -58,26 +58,6 @@ pub enum Level {
     Column,
 }
 
-macro_rules! gen_stats_section {
-    ($physical_ty: ty, $stat_fn: ident, $min: ident, $max: ident, $distinct: 
ident, $nulls: ident) => {{
-        let min = $min.as_ref().and_then(|v| {
-            Some(read_num_bytes!(
-                $physical_ty,
-                v.as_bytes().len(),
-                &v.as_bytes()
-            ))
-        });
-        let max = $max.as_ref().and_then(|v| {
-            Some(read_num_bytes!(
-                $physical_ty,
-                v.as_bytes().len(),
-                &v.as_bytes()
-            ))
-        });
-        Statistics::$stat_fn(min, max, $distinct, $nulls, false)
-    }};
-}
-
 /// Gets a specific column writer corresponding to column descriptor `descr`.
 pub fn get_column_writer<'a>(
     descr: ColumnDescPtr,
@@ -174,26 +154,27 @@ type ColumnCloseResult = (
 );
 
 /// Typed column writer for a primitive column.
-pub struct ColumnWriterImpl<'a, T: DataType> {
+pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, 
ColumnValueEncoderImpl<T>>;
+
+pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
     // Column writer properties
     descr: ColumnDescPtr,
     props: WriterPropertiesPtr,
     statistics_enabled: EnabledStatistics,
 
     page_writer: Box<dyn PageWriter + 'a>,
-    has_dictionary: bool,
-    dict_encoder: Option<DictEncoder<T>>,
-    encoder: Box<dyn Encoder<T>>,
     codec: Compression,
     compressor: Option<Box<dyn Codec>>,
+    encoder: E,
+
     // Metrics per page
+    /// The number of values including nulls in the in-progress data page
     num_buffered_values: u32,
-    num_buffered_encoded_values: u32,
+    /// The number of rows in the in-progress data page
     num_buffered_rows: u32,
-    min_page_value: Option<T::T>,
-    max_page_value: Option<T::T>,
+    /// The number of nulls in the in-progress data page
     num_page_nulls: u64,
-    page_distinct_count: Option<u64>,
+
     // Metrics per column writer
     total_bytes_written: u64,
     total_rows_written: u64,
@@ -202,21 +183,26 @@ pub struct ColumnWriterImpl<'a, T: DataType> {
     total_num_values: u64,
     dictionary_page_offset: Option<u64>,
     data_page_offset: Option<u64>,
-    min_column_value: Option<T::T>,
-    max_column_value: Option<T::T>,
+    min_column_value: Option<E::T>,
+    max_column_value: Option<E::T>,
     num_column_nulls: u64,
     column_distinct_count: Option<u64>,
+
+    /// The order of encodings within the generated metadata does not impact 
its meaning,
+    /// but we use a BTreeSet so that the output is deterministic
+    encodings: BTreeSet<Encoding>,
+
     // Reused buffers
     def_levels_sink: Vec<i16>,
     rep_levels_sink: Vec<i16>,
     data_pages: VecDeque<CompressedPage>,
-    _phantom: PhantomData<T>,
+
     // column index and offset index
     column_index_builder: ColumnIndexBuilder,
     offset_index_builder: OffsetIndexBuilder,
 }
 
-impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
+impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
     pub fn new(
         descr: ColumnDescPtr,
         props: WriterPropertiesPtr,
@@ -224,43 +210,25 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
     ) -> Self {
         let codec = props.compression(descr.path());
         let compressor = create_codec(codec).unwrap();
-
-        // Optionally set dictionary encoder.
-        let dict_encoder = if props.dictionary_enabled(descr.path())
-            && has_dictionary_support(T::get_physical_type(), &props)
-        {
-            Some(DictEncoder::new(descr.clone()))
-        } else {
-            None
-        };
-
-        // Whether or not this column writer has a dictionary encoding.
-        let has_dictionary = dict_encoder.is_some();
-
-        // Set either main encoder or fallback encoder.
-        let fallback_encoder = get_encoder(
-            descr.clone(),
-            props
-                .encoding(descr.path())
-                .unwrap_or_else(|| fallback_encoding(T::get_physical_type(), 
&props)),
-        )
-        .unwrap();
+        let encoder = E::try_new(&descr, props.as_ref()).unwrap();
 
         let statistics_enabled = props.statistics_enabled(descr.path());
 
+        let mut encodings = BTreeSet::new();
+        // Used for level information
+        encodings.insert(Encoding::RLE);
+
         Self {
             descr,
             props,
             statistics_enabled,
             page_writer,
-            has_dictionary,
-            dict_encoder,
-            encoder: fallback_encoder,
             codec,
             compressor,
+            encoder,
             num_buffered_values: 0,
-            num_buffered_encoded_values: 0,
             num_buffered_rows: 0,
+            num_page_nulls: 0,
             total_bytes_written: 0,
             total_rows_written: 0,
             total_uncompressed_size: 0,
@@ -271,27 +239,23 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
             def_levels_sink: vec![],
             rep_levels_sink: vec![],
             data_pages: VecDeque::new(),
-            min_page_value: None,
-            max_page_value: None,
-            num_page_nulls: 0,
-            page_distinct_count: None,
             min_column_value: None,
             max_column_value: None,
             num_column_nulls: 0,
             column_distinct_count: None,
-            _phantom: PhantomData,
             column_index_builder: ColumnIndexBuilder::new(),
             offset_index_builder: OffsetIndexBuilder::new(),
+            encodings,
         }
     }
 
     fn write_batch_internal(
         &mut self,
-        values: &[T::T],
+        values: &E::Values,
         def_levels: Option<&[i16]>,
         rep_levels: Option<&[i16]>,
-        min: Option<&T::T>,
-        max: Option<&T::T>,
+        min: Option<&E::T>,
+        max: Option<&E::T>,
         distinct_count: Option<u64>,
     ) -> Result<usize> {
         // We check for DataPage limits only after we have inserted the 
values. If a user
@@ -304,18 +268,14 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
         // TODO: find out why we don't account for size of levels when we 
estimate page
         // size.
 
-        // Find out the minimal length to prevent index out of bound errors.
-        let mut min_len = values.len();
-        if let Some(levels) = def_levels {
-            min_len = min_len.min(levels.len());
-        }
-        if let Some(levels) = rep_levels {
-            min_len = min_len.min(levels.len());
-        }
+        let num_levels = match def_levels {
+            Some(def_levels) => def_levels.len(),
+            None => values.len(),
+        };
 
         // Find out number of batches to process.
         let write_batch_size = self.props.write_batch_size();
-        let num_batches = min_len / write_batch_size;
+        let num_batches = num_levels / write_batch_size;
 
         // If only computing chunk-level statistics compute them here, 
page-level statistics
         // are computed in [`Self::write_mini_batch`] and used to update chunk 
statistics in
@@ -323,23 +283,23 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
         if self.statistics_enabled == EnabledStatistics::Chunk {
             match (min, max) {
                 (Some(min), Some(max)) => {
-                    Self::update_min(&self.descr, min, &mut 
self.min_column_value);
-                    Self::update_max(&self.descr, max, &mut 
self.max_column_value);
+                    update_min(&self.descr, min, &mut self.min_column_value);
+                    update_max(&self.descr, max, &mut self.max_column_value);
                 }
                 (None, Some(_)) | (Some(_), None) => {
                     panic!("min/max should be both set or both None")
                 }
                 (None, None) => {
-                    for val in values {
-                        Self::update_min(&self.descr, val, &mut 
self.min_column_value);
-                        Self::update_max(&self.descr, val, &mut 
self.max_column_value);
+                    if let Some((min, max)) = values.min_max(&self.descr) {
+                        update_min(&self.descr, min, &mut 
self.min_column_value);
+                        update_max(&self.descr, max, &mut 
self.max_column_value);
                     }
                 }
             };
         }
 
         // We can only set the distinct count if there are no other writes
-        if self.num_buffered_values == 0 && self.num_page_nulls == 0 {
+        if self.encoder.num_values() == 0 {
             self.column_distinct_count = distinct_count;
         } else {
             self.column_distinct_count = None;
@@ -349,7 +309,9 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
         let mut levels_offset = 0;
         for _ in 0..num_batches {
             values_offset += self.write_mini_batch(
-                &values[values_offset..values_offset + write_batch_size],
+                values,
+                values_offset,
+                write_batch_size,
                 def_levels.map(|lv| &lv[levels_offset..levels_offset + 
write_batch_size]),
                 rep_levels.map(|lv| &lv[levels_offset..levels_offset + 
write_batch_size]),
             )?;
@@ -357,7 +319,9 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
         }
 
         values_offset += self.write_mini_batch(
-            &values[values_offset..],
+            values,
+            values_offset,
+            num_levels - levels_offset,
             def_levels.map(|lv| &lv[levels_offset..]),
             rep_levels.map(|lv| &lv[levels_offset..]),
         )?;
@@ -380,7 +344,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
     /// non-nullable and/or non-repeated.
     pub fn write_batch(
         &mut self,
-        values: &[T::T],
+        values: &E::Values,
         def_levels: Option<&[i16]>,
         rep_levels: Option<&[i16]>,
     ) -> Result<usize> {
@@ -396,11 +360,11 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
     /// computed page statistics
     pub fn write_batch_with_statistics(
         &mut self,
-        values: &[T::T],
+        values: &E::Values,
         def_levels: Option<&[i16]>,
         rep_levels: Option<&[i16]>,
-        min: Option<&T::T>,
-        max: Option<&T::T>,
+        min: Option<&E::T>,
+        max: Option<&E::T>,
         distinct_count: Option<u64>,
     ) -> Result<usize> {
         self.write_batch_internal(
@@ -428,12 +392,14 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
     /// Finalises writes and closes the column writer.
     /// Returns total bytes written, total rows written and column chunk 
metadata.
     pub fn close(mut self) -> Result<ColumnCloseResult> {
-        if self.dict_encoder.is_some() {
+        if self.num_buffered_values > 0 {
+            self.add_data_page()?;
+        }
+        if self.encoder.has_dictionary() {
             self.write_dictionary_page()?;
         }
         self.flush_data_pages()?;
         let metadata = self.write_column_metadata()?;
-        self.dict_encoder = None;
         self.page_writer.close()?;
 
         let (column_index, offset_index) = if 
self.column_index_builder.valid() {
@@ -459,12 +425,12 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
     /// page size.
     fn write_mini_batch(
         &mut self,
-        values: &[T::T],
+        values: &E::Values,
+        values_offset: usize,
+        num_levels: usize,
         def_levels: Option<&[i16]>,
         rep_levels: Option<&[i16]>,
     ) -> Result<usize> {
-        let mut values_to_write = 0;
-
         // Check if number of definition levels is the same as number of 
repetition
         // levels.
         if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
@@ -478,7 +444,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
         }
 
         // Process definition levels and determine how many values to write.
-        let num_values = if self.descr.max_def_level() > 0 {
+        let values_to_write = if self.descr.max_def_level() > 0 {
             let levels = def_levels.ok_or_else(|| {
                 general_err!(
                     "Definition levels are required, because max definition 
level = {}",
@@ -486,6 +452,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
                 )
             })?;
 
+            let mut values_to_write = 0;
             for &level in levels {
                 if level == self.descr.max_def_level() {
                     values_to_write += 1;
@@ -494,11 +461,10 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
                 }
             }
 
-            self.write_definition_levels(levels);
-            u32::try_from(levels.len()).unwrap()
+            self.def_levels_sink.extend_from_slice(levels);
+            values_to_write
         } else {
-            values_to_write = values.len();
-            u32::try_from(values_to_write).unwrap()
+            num_levels
         };
 
         // Process repetition levels and determine how many rows we are about 
to process.
@@ -516,32 +482,15 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
                 self.num_buffered_rows += (level == 0) as u32
             }
 
-            self.write_repetition_levels(levels);
+            self.rep_levels_sink.extend_from_slice(levels);
         } else {
             // Each value is exactly one row.
             // Equals to the number of values, we count nulls as well.
-            self.num_buffered_rows += num_values;
+            self.num_buffered_rows += num_levels as u32;
         }
 
-        // Check that we have enough values to write.
-        let values_to_write = values.get(0..values_to_write).ok_or_else(|| {
-            general_err!(
-                "Expected to write {} values, but have only {}",
-                values_to_write,
-                values.len()
-            )
-        })?;
-
-        if self.statistics_enabled == EnabledStatistics::Page {
-            for val in values_to_write {
-                self.update_page_min_max(val);
-            }
-        }
-
-        self.write_values(values_to_write)?;
-
-        self.num_buffered_values += num_values;
-        self.num_buffered_encoded_values += 
u32::try_from(values_to_write.len()).unwrap();
+        self.encoder.write(values, values_offset, values_to_write)?;
+        self.num_buffered_values += num_levels as u32;
 
         if self.should_add_data_page() {
             self.add_data_page()?;
@@ -551,25 +500,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
             self.dict_fallback()?;
         }
 
-        Ok(values_to_write.len())
-    }
-
-    #[inline]
-    fn write_definition_levels(&mut self, def_levels: &[i16]) {
-        self.def_levels_sink.extend_from_slice(def_levels);
-    }
-
-    #[inline]
-    fn write_repetition_levels(&mut self, rep_levels: &[i16]) {
-        self.rep_levels_sink.extend_from_slice(rep_levels);
-    }
-
-    #[inline]
-    fn write_values(&mut self, values: &[T::T]) -> Result<()> {
-        match self.dict_encoder {
-            Some(ref mut encoder) => encoder.put(values),
-            None => self.encoder.put(values),
-        }
+        Ok(values_to_write)
     }
 
     /// Returns true if we need to fall back to non-dictionary encoding.
@@ -578,10 +509,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
     /// size.
     #[inline]
     fn should_dict_fallback(&self) -> bool {
-        match self.dict_encoder {
-            Some(ref encoder) => {
-                encoder.dict_encoded_size() >= 
self.props.dictionary_pagesize_limit()
-            }
+        match self.encoder.estimated_dict_page_size() {
+            Some(size) => size >= self.props.dictionary_pagesize_limit(),
             None => false,
         }
     }
@@ -593,28 +522,22 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
         //
         // In such a scenario the dictionary decoder may return an estimated 
encoded
         // size in excess of the page size limit, even when there are no 
buffered values
-        if self.num_buffered_values == 0 {
+        if self.encoder.num_values() == 0 {
             return false;
         }
 
-        match self.dict_encoder {
-            Some(ref encoder) => {
-                encoder.estimated_data_encoded_size() >= 
self.props.data_pagesize_limit()
-            }
-            None => {
-                self.encoder.estimated_data_encoded_size()
-                    >= self.props.data_pagesize_limit()
-            }
-        }
+        self.encoder.estimated_data_page_size() >= 
self.props.data_pagesize_limit()
     }
 
     /// Performs dictionary fallback.
     /// Prepares and writes dictionary and all data pages into page writer.
     fn dict_fallback(&mut self) -> Result<()> {
         // At this point we know that we need to fall back.
+        if self.num_buffered_values > 0 {
+            self.add_data_page()?;
+        }
         self.write_dictionary_page()?;
         self.flush_data_pages()?;
-        self.dict_encoder = None;
         Ok(())
     }
 
@@ -658,28 +581,24 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
     /// Data page is either buffered in case of dictionary encoding or written 
directly.
     fn add_data_page(&mut self) -> Result<()> {
         // Extract encoded values
-        let value_bytes = match self.dict_encoder {
-            Some(ref mut encoder) => encoder.write_indices()?,
-            None => self.encoder.flush_buffer()?,
-        };
-
-        // Select encoding based on current encoder and writer version (v1 or 
v2).
-        let encoding = if self.dict_encoder.is_some() {
-            self.props.dictionary_data_page_encoding()
-        } else {
-            self.encoder.encoding()
-        };
+        let values_data = self.encoder.flush_data_page()?;
 
         let max_def_level = self.descr.max_def_level();
         let max_rep_level = self.descr.max_rep_level();
 
         self.num_column_nulls += self.num_page_nulls;
 
-        let has_min_max = self.min_page_value.is_some() && 
self.max_page_value.is_some();
-        let page_statistics = match self.statistics_enabled {
-            EnabledStatistics::Page if has_min_max => {
-                self.update_column_min_max();
-                Some(self.make_page_statistics())
+        let page_statistics = match (values_data.min_value, 
values_data.max_value) {
+            (Some(min), Some(max)) => {
+                update_min(&self.descr, &min, &mut self.min_column_value);
+                update_max(&self.descr, &max, &mut self.max_column_value);
+                Some(Statistics::new(
+                    Some(min),
+                    Some(max),
+                    None,
+                    self.num_page_nulls,
+                    false,
+                ))
             }
             _ => None,
         };
@@ -711,11 +630,11 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
                     );
                 }
 
-                buffer.extend_from_slice(value_bytes.data());
+                buffer.extend_from_slice(values_data.buf.data());
                 let uncompressed_size = buffer.len();
 
                 if let Some(ref mut cmpr) = self.compressor {
-                    let mut compressed_buf = 
Vec::with_capacity(value_bytes.data().len());
+                    let mut compressed_buf = 
Vec::with_capacity(uncompressed_size);
                     cmpr.compress(&buffer[..], &mut compressed_buf)?;
                     buffer = compressed_buf;
                 }
@@ -723,7 +642,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
                 let data_page = Page::DataPage {
                     buf: ByteBufferPtr::new(buffer),
                     num_values: self.num_buffered_values,
-                    encoding,
+                    encoding: values_data.encoding,
                     def_level_encoding: Encoding::RLE,
                     rep_level_encoding: Encoding::RLE,
                     statistics: page_statistics,
@@ -751,20 +670,20 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
                 }
 
                 let uncompressed_size =
-                    rep_levels_byte_len + def_levels_byte_len + 
value_bytes.len();
+                    rep_levels_byte_len + def_levels_byte_len + 
values_data.buf.len();
 
                 // Data Page v2 compresses values only.
                 match self.compressor {
                     Some(ref mut cmpr) => {
-                        cmpr.compress(value_bytes.data(), &mut buffer)?;
+                        cmpr.compress(values_data.buf.data(), &mut buffer)?;
                     }
-                    None => buffer.extend_from_slice(value_bytes.data()),
+                    None => buffer.extend_from_slice(values_data.buf.data()),
                 }
 
                 let data_page = Page::DataPageV2 {
                     buf: ByteBufferPtr::new(buffer),
                     num_values: self.num_buffered_values,
-                    encoding,
+                    encoding: values_data.encoding,
                     num_nulls: self.num_page_nulls as u32,
                     num_rows: self.num_buffered_rows,
                     def_levels_byte_len: def_levels_byte_len as u32,
@@ -778,7 +697,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
         };
 
         // Check if we need to buffer data page or flush it to the sink 
directly.
-        if self.dict_encoder.is_some() {
+        if self.encoder.has_dictionary() {
             self.data_pages.push_back(compressed_page);
         } else {
             self.write_data_page(compressed_page)?;
@@ -791,12 +710,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
         self.rep_levels_sink.clear();
         self.def_levels_sink.clear();
         self.num_buffered_values = 0;
-        self.num_buffered_encoded_values = 0;
         self.num_buffered_rows = 0;
-        self.min_page_value = None;
-        self.max_page_value = None;
         self.num_page_nulls = 0;
-        self.page_distinct_count = None;
 
         Ok(())
     }
@@ -826,30 +741,22 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
         // If data page offset is not set, then no pages have been written
         let data_page_offset = self.data_page_offset.unwrap_or(0) as i64;
 
-        let file_offset;
-        let mut encodings = Vec::new();
-
-        if self.has_dictionary {
-            assert!(dict_page_offset.is_some(), "Dictionary offset is not 
set");
-            file_offset = dict_page_offset.unwrap() + total_compressed_size;
-            // NOTE: This should be in sync with writing dictionary pages.
-            encodings.push(self.props.dictionary_page_encoding());
-            encodings.push(self.props.dictionary_data_page_encoding());
-            // Fallback to alternative encoding, add it to the list.
-            if self.dict_encoder.is_none() {
-                encodings.push(self.encoder.encoding());
-            }
-        } else {
-            file_offset = data_page_offset + total_compressed_size;
-            encodings.push(self.encoder.encoding());
-        }
-        // We use only RLE level encoding for data page v1 and data page v2.
-        encodings.push(Encoding::RLE);
+        let file_offset = match dict_page_offset {
+            Some(dict_offset) => dict_offset + total_compressed_size,
+            None => data_page_offset + total_compressed_size,
+        };
+
+        let statistics = Statistics::new(
+            self.min_column_value.clone(),
+            self.max_column_value.clone(),
+            self.column_distinct_count,
+            self.num_column_nulls,
+            false,
+        );
 
-        let statistics = self.make_column_statistics();
         let metadata = ColumnChunkMetaData::builder(self.descr.clone())
             .set_compression(self.codec)
-            .set_encodings(encodings)
+            .set_encodings(self.encodings.iter().cloned().collect())
             .set_file_offset(file_offset)
             .set_total_compressed_size(total_compressed_size)
             .set_total_uncompressed_size(total_uncompressed_size)
@@ -891,6 +798,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
     /// Writes compressed data page into underlying sink and updates global 
metrics.
     #[inline]
     fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
+        self.encodings.insert(page.encoding());
         let page_spec = self.page_writer.write_page(page)?;
         // update offset index
         // compressed_size = header_size + compressed_data_size
@@ -906,31 +814,29 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
     #[inline]
     fn write_dictionary_page(&mut self) -> Result<()> {
         let compressed_page = {
-            let encoder = self
-                .dict_encoder
-                .as_ref()
+            let mut page = self
+                .encoder
+                .flush_dict_page()?
                 .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
 
-            let is_sorted = encoder.is_sorted();
-            let num_values = encoder.num_entries();
-            let mut values_buf = encoder.write_dict()?;
-            let uncompressed_size = values_buf.len();
+            let uncompressed_size = page.buf.len();
 
             if let Some(ref mut cmpr) = self.compressor {
                 let mut output_buf = Vec::with_capacity(uncompressed_size);
-                cmpr.compress(values_buf.data(), &mut output_buf)?;
-                values_buf = ByteBufferPtr::new(output_buf);
+                cmpr.compress(page.buf.data(), &mut output_buf)?;
+                page.buf = ByteBufferPtr::new(output_buf);
             }
 
             let dict_page = Page::DictionaryPage {
-                buf: values_buf,
-                num_values: num_values as u32,
+                buf: page.buf,
+                num_values: page.num_values as u32,
                 encoding: self.props.dictionary_page_encoding(),
-                is_sorted,
+                is_sorted: page.is_sorted,
             };
             CompressedPage::new(dict_page, uncompressed_size)
         };
 
+        self.encodings.insert(compressed_page.encoding());
         let page_spec = self.page_writer.write_page(compressed_page)?;
         self.update_metrics_for_page(page_spec);
         // For the directory page, don't need to update column/offset index.
@@ -967,149 +873,89 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
     fn get_page_writer_ref(&self) -> &dyn PageWriter {
         self.page_writer.as_ref()
     }
+}
 
-    fn make_column_statistics(&self) -> Statistics {
-        self.make_typed_statistics(Level::Column)
-    }
-
-    fn make_page_statistics(&self) -> Statistics {
-        self.make_typed_statistics(Level::Page)
-    }
+fn update_min<T: ParquetValueType>(
+    descr: &ColumnDescriptor,
+    val: &T,
+    min: &mut Option<T>,
+) {
+    update_stat::<T, _>(val, min, |cur| compare_greater(descr, cur, val))
+}
 
-    pub fn make_typed_statistics(&self, level: Level) -> Statistics {
-        let (min, max, distinct, nulls) = match level {
-            Level::Page => (
-                self.min_page_value.as_ref(),
-                self.max_page_value.as_ref(),
-                self.page_distinct_count,
-                self.num_page_nulls,
-            ),
-            Level::Column => (
-                self.min_column_value.as_ref(),
-                self.max_column_value.as_ref(),
-                self.column_distinct_count,
-                self.num_column_nulls,
-            ),
-        };
-        match self.descr.physical_type() {
-            Type::INT32 => gen_stats_section!(i32, int32, min, max, distinct, 
nulls),
-            Type::BOOLEAN => gen_stats_section!(bool, boolean, min, max, 
distinct, nulls),
-            Type::INT64 => gen_stats_section!(i64, int64, min, max, distinct, 
nulls),
-            Type::INT96 => gen_stats_section!(Int96, int96, min, max, 
distinct, nulls),
-            Type::FLOAT => gen_stats_section!(f32, float, min, max, distinct, 
nulls),
-            Type::DOUBLE => gen_stats_section!(f64, double, min, max, 
distinct, nulls),
-            Type::BYTE_ARRAY => {
-                let min = min.as_ref().map(|v| 
ByteArray::from(v.as_bytes().to_vec()));
-                let max = max.as_ref().map(|v| 
ByteArray::from(v.as_bytes().to_vec()));
-                Statistics::byte_array(min, max, distinct, nulls, false)
-            }
-            Type::FIXED_LEN_BYTE_ARRAY => {
-                let min = min
-                    .as_ref()
-                    .map(|v| ByteArray::from(v.as_bytes().to_vec()))
-                    .map(|ba| {
-                        let ba: FixedLenByteArray = ba.into();
-                        ba
-                    });
-                let max = max
-                    .as_ref()
-                    .map(|v| ByteArray::from(v.as_bytes().to_vec()))
-                    .map(|ba| {
-                        let ba: FixedLenByteArray = ba.into();
-                        ba
-                    });
-                Statistics::fixed_len_byte_array(min, max, distinct, nulls, 
false)
-            }
-        }
-    }
+fn update_max<T: ParquetValueType>(
+    descr: &ColumnDescriptor,
+    val: &T,
+    max: &mut Option<T>,
+) {
+    update_stat::<T, _>(val, max, |cur| compare_greater(descr, val, cur))
+}
 
-    fn update_page_min_max(&mut self, val: &T::T) {
-        Self::update_min(&self.descr, val, &mut self.min_page_value);
-        Self::update_max(&self.descr, val, &mut self.max_page_value);
+#[inline]
+#[allow(clippy::eq_op)]
+fn is_nan<T: ParquetValueType>(val: &T) -> bool {
+    match T::PHYSICAL_TYPE {
+        Type::FLOAT | Type::DOUBLE => val != val,
+        _ => false,
     }
+}
 
-    fn update_column_min_max(&mut self) {
-        let min = self.min_page_value.as_ref().unwrap();
-        Self::update_min(&self.descr, min, &mut self.min_column_value);
-
-        let max = self.max_page_value.as_ref().unwrap();
-        Self::update_max(&self.descr, max, &mut self.max_column_value);
-    }
+/// Perform a conditional update of `cur`, skipping any NaN values
+///
+/// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls 
`should_update` with
+/// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true`
 
-    fn update_min(descr: &ColumnDescriptor, val: &T::T, min: &mut 
Option<T::T>) {
-        Self::update_stat(val, min, |cur| Self::compare_greater(descr, cur, 
val))
+fn update_stat<T: ParquetValueType, F>(val: &T, cur: &mut Option<T>, 
should_update: F)
+where
+    F: Fn(&T) -> bool,
+{
+    if is_nan(val) {
+        return;
     }
 
-    fn update_max(descr: &ColumnDescriptor, val: &T::T, max: &mut 
Option<T::T>) {
-        Self::update_stat(val, max, |cur| Self::compare_greater(descr, val, 
cur))
+    if cur.as_ref().map_or(true, should_update) {
+        *cur = Some(val.clone());
     }
+}
 
-    /// Perform a conditional update of `cur`, skipping any NaN values
-    ///
-    /// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls 
`should_update` with
-    /// the value of `cur`, and updates `cur` to `Some(val)` if it returns 
`true`
-    #[allow(clippy::eq_op)]
-    fn update_stat<F>(val: &T::T, cur: &mut Option<T::T>, should_update: F)
-    where
-        F: Fn(&T::T) -> bool,
-    {
-        if let Type::FLOAT | Type::DOUBLE = T::get_physical_type() {
-            // Skip NaN values
-            if val != val {
-                return;
-            }
-        }
-
-        if cur.as_ref().map_or(true, should_update) {
-            *cur = Some(val.clone());
+/// Evaluate `a > b` according to underlying logical type.
+fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: 
&T) -> bool {
+    if let Some(LogicalType::Integer { is_signed, .. }) = descr.logical_type() 
{
+        if !is_signed {
+            // need to compare unsigned
+            return a.as_u64().unwrap() > b.as_u64().unwrap();
         }
     }
 
-    /// Evaluate `a > b` according to underlying logical type.
-    fn compare_greater(descr: &ColumnDescriptor, a: &T::T, b: &T::T) -> bool {
-        if let Some(LogicalType::Integer { is_signed, .. }) = 
descr.logical_type() {
-            if !is_signed {
-                // need to compare unsigned
-                return a.as_u64().unwrap() > b.as_u64().unwrap();
-            }
+    match descr.converted_type() {
+        ConvertedType::UINT_8
+        | ConvertedType::UINT_16
+        | ConvertedType::UINT_32
+        | ConvertedType::UINT_64 => {
+            return a.as_u64().unwrap() > b.as_u64().unwrap();
         }
+        _ => {}
+    };
 
-        match descr.converted_type() {
-            ConvertedType::UINT_8
-            | ConvertedType::UINT_16
-            | ConvertedType::UINT_32
-            | ConvertedType::UINT_64 => {
-                return a.as_u64().unwrap() > b.as_u64().unwrap();
+    if let Some(LogicalType::Decimal { .. }) = descr.logical_type() {
+        match T::PHYSICAL_TYPE {
+            Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
+                return compare_greater_byte_array_decimals(a.as_bytes(), 
b.as_bytes());
             }
             _ => {}
         };
+    }
 
-        if let Some(LogicalType::Decimal { .. }) = descr.logical_type() {
-            match T::get_physical_type() {
-                Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
-                    return compare_greater_byte_array_decimals(
-                        a.as_bytes(),
-                        b.as_bytes(),
-                    );
-                }
-                _ => {}
-            };
-        }
-
-        if descr.converted_type() == ConvertedType::DECIMAL {
-            match T::get_physical_type() {
-                Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
-                    return compare_greater_byte_array_decimals(
-                        a.as_bytes(),
-                        b.as_bytes(),
-                    );
-                }
-                _ => {}
-            };
+    if descr.converted_type() == ConvertedType::DECIMAL {
+        match T::PHYSICAL_TYPE {
+            Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
+                return compare_greater_byte_array_decimals(a.as_bytes(), 
b.as_bytes());
+            }
+            _ => {}
         };
+    };
 
-        a > b
-    }
+    a > b
 }
 
 // ----------------------------------------------------------------------
@@ -1280,16 +1126,16 @@ mod tests {
     }
 
     #[test]
-    #[should_panic(expected = "Dictionary offset is already set")]
     fn test_column_writer_write_only_one_dictionary_page() {
         let page_writer = get_test_page_writer();
         let props = Arc::new(WriterProperties::builder().build());
         let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 
0, props);
         writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
         // First page should be correctly written.
-        let res = writer.write_dictionary_page();
-        assert!(res.is_ok());
+        writer.add_data_page().unwrap();
         writer.write_dictionary_page().unwrap();
+        let err = writer.write_dictionary_page().unwrap_err().to_string();
+        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
     }
 
     #[test]
@@ -1302,14 +1148,8 @@ mod tests {
         );
         let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 
0, props);
         writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
-        let res = writer.write_dictionary_page();
-        assert!(res.is_err());
-        if let Err(err) = res {
-            assert_eq!(
-                format!("{}", err),
-                "Parquet error: Dictionary encoder is not set"
-            );
-        }
+        let err = writer.write_dictionary_page().unwrap_err().to_string();
+        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
     }
 
     #[test]
@@ -1356,14 +1196,14 @@ mod tests {
             true,
             &[true, false],
             None,
-            &[Encoding::RLE, Encoding::RLE],
+            &[Encoding::RLE],
         );
         check_encoding_write_support::<BoolType>(
             WriterVersion::PARQUET_2_0,
             false,
             &[true, false],
             None,
-            &[Encoding::RLE, Encoding::RLE],
+            &[Encoding::RLE],
         );
     }
 
@@ -1374,7 +1214,7 @@ mod tests {
             true,
             &[1, 2],
             Some(0),
-            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
         );
         check_encoding_write_support::<Int32Type>(
             WriterVersion::PARQUET_1_0,
@@ -1388,14 +1228,14 @@ mod tests {
             true,
             &[1, 2],
             Some(0),
-            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
         );
         check_encoding_write_support::<Int32Type>(
             WriterVersion::PARQUET_2_0,
             false,
             &[1, 2],
             None,
-            &[Encoding::DELTA_BINARY_PACKED, Encoding::RLE],
+            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
         );
     }
 
@@ -1406,7 +1246,7 @@ mod tests {
             true,
             &[1, 2],
             Some(0),
-            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
         );
         check_encoding_write_support::<Int64Type>(
             WriterVersion::PARQUET_1_0,
@@ -1420,14 +1260,14 @@ mod tests {
             true,
             &[1, 2],
             Some(0),
-            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
         );
         check_encoding_write_support::<Int64Type>(
             WriterVersion::PARQUET_2_0,
             false,
             &[1, 2],
             None,
-            &[Encoding::DELTA_BINARY_PACKED, Encoding::RLE],
+            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
         );
     }
 
@@ -1438,7 +1278,7 @@ mod tests {
             true,
             &[Int96::from(vec![1, 2, 3])],
             Some(0),
-            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
         );
         check_encoding_write_support::<Int96Type>(
             WriterVersion::PARQUET_1_0,
@@ -1452,7 +1292,7 @@ mod tests {
             true,
             &[Int96::from(vec![1, 2, 3])],
             Some(0),
-            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
         );
         check_encoding_write_support::<Int96Type>(
             WriterVersion::PARQUET_2_0,
@@ -1470,7 +1310,7 @@ mod tests {
             true,
             &[1.0, 2.0],
             Some(0),
-            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
         );
         check_encoding_write_support::<FloatType>(
             WriterVersion::PARQUET_1_0,
@@ -1484,7 +1324,7 @@ mod tests {
             true,
             &[1.0, 2.0],
             Some(0),
-            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
         );
         check_encoding_write_support::<FloatType>(
             WriterVersion::PARQUET_2_0,
@@ -1502,7 +1342,7 @@ mod tests {
             true,
             &[1.0, 2.0],
             Some(0),
-            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
         );
         check_encoding_write_support::<DoubleType>(
             WriterVersion::PARQUET_1_0,
@@ -1516,7 +1356,7 @@ mod tests {
             true,
             &[1.0, 2.0],
             Some(0),
-            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
         );
         check_encoding_write_support::<DoubleType>(
             WriterVersion::PARQUET_2_0,
@@ -1534,7 +1374,7 @@ mod tests {
             true,
             &[ByteArray::from(vec![1u8])],
             Some(0),
-            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
         );
         check_encoding_write_support::<ByteArrayType>(
             WriterVersion::PARQUET_1_0,
@@ -1548,14 +1388,14 @@ mod tests {
             true,
             &[ByteArray::from(vec![1u8])],
             Some(0),
-            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
         );
         check_encoding_write_support::<ByteArrayType>(
             WriterVersion::PARQUET_2_0,
             false,
             &[ByteArray::from(vec![1u8])],
             None,
-            &[Encoding::DELTA_BYTE_ARRAY, Encoding::RLE],
+            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
         );
     }
 
@@ -1580,14 +1420,14 @@ mod tests {
             true,
             &[ByteArray::from(vec![1u8]).into()],
             Some(0),
-            &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
+            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
         );
         check_encoding_write_support::<FixedLenByteArrayType>(
             WriterVersion::PARQUET_2_0,
             false,
             &[ByteArray::from(vec![1u8]).into()],
             None,
-            &[Encoding::DELTA_BYTE_ARRAY, Encoding::RLE],
+            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
         );
     }
 
@@ -1603,7 +1443,7 @@ mod tests {
         assert_eq!(rows_written, 4);
         assert_eq!(
             metadata.encodings(),
-            &vec![Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE]
+            &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
         );
         assert_eq!(metadata.num_values(), 8); // dictionary + value indexes
         assert_eq!(metadata.compressed_size(), 20);
@@ -1728,7 +1568,7 @@ mod tests {
         assert_eq!(rows_written, 4);
         assert_eq!(
             metadata.encodings(),
-            &vec![Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE]
+            &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
         );
         assert_eq!(metadata.num_values(), 8); // dictionary + value indexes
         assert_eq!(metadata.compressed_size(), 20);
@@ -1808,40 +1648,19 @@ mod tests {
     #[test]
     fn test_column_writer_non_nullable_values_roundtrip() {
         let props = WriterProperties::builder().build();
-        column_roundtrip_random::<Int32Type>(
-            props,
-            1024,
-            std::i32::MIN,
-            std::i32::MAX,
-            0,
-            0,
-        );
+        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 
0, 0);
     }
 
     #[test]
     fn test_column_writer_nullable_non_repeated_values_roundtrip() {
         let props = WriterProperties::builder().build();
-        column_roundtrip_random::<Int32Type>(
-            props,
-            1024,
-            std::i32::MIN,
-            std::i32::MAX,
-            10,
-            0,
-        );
+        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 
10, 0);
     }
 
     #[test]
     fn test_column_writer_nullable_repeated_values_roundtrip() {
         let props = WriterProperties::builder().build();
-        column_roundtrip_random::<Int32Type>(
-            props,
-            1024,
-            std::i32::MIN,
-            std::i32::MAX,
-            10,
-            10,
-        );
+        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 
10, 10);
     }
 
     #[test]
@@ -1850,14 +1669,7 @@ mod tests {
             .set_dictionary_pagesize_limit(32)
             .set_data_pagesize_limit(32)
             .build();
-        column_roundtrip_random::<Int32Type>(
-            props,
-            1024,
-            std::i32::MIN,
-            std::i32::MAX,
-            10,
-            10,
-        );
+        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 
10, 10);
     }
 
     #[test]
@@ -1865,14 +1677,7 @@ mod tests {
         for i in &[1usize, 2, 5, 10, 11, 1023] {
             let props = 
WriterProperties::builder().set_write_batch_size(*i).build();
 
-            column_roundtrip_random::<Int32Type>(
-                props,
-                1024,
-                std::i32::MIN,
-                std::i32::MAX,
-                10,
-                10,
-            );
+            column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, 
i32::MAX, 10, 10);
         }
     }
 
@@ -1882,14 +1687,7 @@ mod tests {
             .set_writer_version(WriterVersion::PARQUET_1_0)
             .set_dictionary_enabled(false)
             .build();
-        column_roundtrip_random::<Int32Type>(
-            props,
-            1024,
-            std::i32::MIN,
-            std::i32::MAX,
-            10,
-            10,
-        );
+        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 
10, 10);
     }
 
     #[test]
@@ -1898,14 +1696,7 @@ mod tests {
             .set_writer_version(WriterVersion::PARQUET_2_0)
             .set_dictionary_enabled(false)
             .build();
-        column_roundtrip_random::<Int32Type>(
-            props,
-            1024,
-            std::i32::MIN,
-            std::i32::MAX,
-            10,
-            10,
-        );
+        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 
10, 10);
     }
 
     #[test]
@@ -1914,14 +1705,7 @@ mod tests {
             .set_writer_version(WriterVersion::PARQUET_1_0)
             .set_compression(Compression::SNAPPY)
             .build();
-        column_roundtrip_random::<Int32Type>(
-            props,
-            2048,
-            std::i32::MIN,
-            std::i32::MAX,
-            10,
-            10,
-        );
+        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 
10, 10);
     }
 
     #[test]
@@ -1930,14 +1714,7 @@ mod tests {
             .set_writer_version(WriterVersion::PARQUET_2_0)
             .set_compression(Compression::SNAPPY)
             .build();
-        column_roundtrip_random::<Int32Type>(
-            props,
-            2048,
-            std::i32::MIN,
-            std::i32::MAX,
-            10,
-            10,
-        );
+        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 
10, 10);
     }
 
     #[test]
diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs
index 7b6fb04a7..1d0b5b231 100644
--- a/parquet/src/data_type.rs
+++ b/parquet/src/data_type.rs
@@ -568,6 +568,7 @@ pub(crate) mod private {
     use crate::util::bit_util::{round_upto_power_of_2, BitReader, BitWriter};
     use crate::util::memory::ByteBufferPtr;
 
+    use crate::basic::Type;
     use byteorder::ByteOrder;
     use std::convert::TryInto;
 
@@ -581,18 +582,21 @@ pub(crate) mod private {
     /// crate, and thus hint to the type system (and end user) traits are 
public for the contract
     /// and not for extension.
     pub trait ParquetValueType:
-        std::cmp::PartialEq
+        PartialEq
         + std::fmt::Debug
         + std::fmt::Display
-        + std::default::Default
-        + std::clone::Clone
+        + Default
+        + Clone
         + super::AsBytes
         + super::FromBytes
-        + super::SliceAsBytes
+        + SliceAsBytes
         + PartialOrd
         + Send
         + crate::encodings::decoding::private::GetDecoder
+        + crate::file::statistics::private::MakeStatistics
     {
+        const PHYSICAL_TYPE: Type;
+
         /// Encode the value directly from a higher level encoder
         fn encode<W: std::io::Write>(
             values: &[Self],
@@ -646,6 +650,8 @@ pub(crate) mod private {
     }
 
     impl ParquetValueType for bool {
+        const PHYSICAL_TYPE: Type = Type::BOOLEAN;
+
         #[inline]
         fn encode<W: std::io::Write>(
             values: &[Self],
@@ -730,8 +736,10 @@ pub(crate) mod private {
     }
 
     macro_rules! impl_from_raw {
-        ($ty: ty, $self: ident => $as_i64: block) => {
+        ($ty: ty, $physical_ty: expr, $self: ident => $as_i64: block) => {
             impl ParquetValueType for $ty {
+                const PHYSICAL_TYPE: Type = $physical_ty;
+
                 #[inline]
                 fn encode<W: std::io::Write>(values: &[Self], writer: &mut W, 
_: &mut BitWriter) -> Result<()> {
                     let raw = unsafe {
@@ -809,12 +817,14 @@ pub(crate) mod private {
         }
     }
 
-    impl_from_raw!(i32, self => { Ok(*self as i64) });
-    impl_from_raw!(i64, self => { Ok(*self) });
-    impl_from_raw!(f32, self => { Err(general_err!("Type cannot be converted 
to i64")) });
-    impl_from_raw!(f64, self => { Err(general_err!("Type cannot be converted 
to i64")) });
+    impl_from_raw!(i32, Type::INT32, self => { Ok(*self as i64) });
+    impl_from_raw!(i64, Type::INT64, self => { Ok(*self) });
+    impl_from_raw!(f32, Type::FLOAT, self => { Err(general_err!("Type cannot 
be converted to i64")) });
+    impl_from_raw!(f64, Type::DOUBLE, self => { Err(general_err!("Type cannot 
be converted to i64")) });
 
     impl ParquetValueType for super::Int96 {
+        const PHYSICAL_TYPE: Type = Type::INT96;
+
         #[inline]
         fn encode<W: std::io::Write>(
             values: &[Self],
@@ -925,6 +935,8 @@ pub(crate) mod private {
     }
 
     impl ParquetValueType for super::ByteArray {
+        const PHYSICAL_TYPE: Type = Type::BYTE_ARRAY;
+
         #[inline]
         fn encode<W: std::io::Write>(
             values: &[Self],
@@ -1016,6 +1028,8 @@ pub(crate) mod private {
     }
 
     impl ParquetValueType for super::FixedLenByteArray {
+        const PHYSICAL_TYPE: Type = Type::FIXED_LEN_BYTE_ARRAY;
+
         #[inline]
         fn encode<W: std::io::Write>(
             values: &[Self],
@@ -1113,7 +1127,9 @@ pub trait DataType: 'static + Send {
     type T: private::ParquetValueType;
 
     /// Returns Parquet physical type.
-    fn get_physical_type() -> Type;
+    fn get_physical_type() -> Type {
+        <Self::T as private::ParquetValueType>::PHYSICAL_TYPE
+    }
 
     /// Returns size in bytes for Rust representation of the physical type.
     fn get_type_size() -> usize;
@@ -1156,17 +1172,13 @@ where
 }
 
 macro_rules! make_type {
-    ($name:ident, $physical_ty:path, $reader_ident: ident, $writer_ident: 
ident, $native_ty:ty, $size:expr) => {
+    ($name:ident, $reader_ident: ident, $writer_ident: ident, $native_ty:ty, 
$size:expr) => {
         #[derive(Clone)]
         pub struct $name {}
 
         impl DataType for $name {
             type T = $native_ty;
 
-            fn get_physical_type() -> Type {
-                $physical_ty
-            }
-
             fn get_type_size() -> usize {
                 $size
             }
@@ -1212,57 +1224,20 @@ macro_rules! make_type {
 
 // Generate struct definitions for all physical types
 
-make_type!(
-    BoolType,
-    Type::BOOLEAN,
-    BoolColumnReader,
-    BoolColumnWriter,
-    bool,
-    1
-);
-make_type!(
-    Int32Type,
-    Type::INT32,
-    Int32ColumnReader,
-    Int32ColumnWriter,
-    i32,
-    4
-);
-make_type!(
-    Int64Type,
-    Type::INT64,
-    Int64ColumnReader,
-    Int64ColumnWriter,
-    i64,
-    8
-);
+make_type!(BoolType, BoolColumnReader, BoolColumnWriter, bool, 1);
+make_type!(Int32Type, Int32ColumnReader, Int32ColumnWriter, i32, 4);
+make_type!(Int64Type, Int64ColumnReader, Int64ColumnWriter, i64, 8);
 make_type!(
     Int96Type,
-    Type::INT96,
     Int96ColumnReader,
     Int96ColumnWriter,
     Int96,
     mem::size_of::<Int96>()
 );
-make_type!(
-    FloatType,
-    Type::FLOAT,
-    FloatColumnReader,
-    FloatColumnWriter,
-    f32,
-    4
-);
-make_type!(
-    DoubleType,
-    Type::DOUBLE,
-    DoubleColumnReader,
-    DoubleColumnWriter,
-    f64,
-    8
-);
+make_type!(FloatType, FloatColumnReader, FloatColumnWriter, f32, 4);
+make_type!(DoubleType, DoubleColumnReader, DoubleColumnWriter, f64, 8);
 make_type!(
     ByteArrayType,
-    Type::BYTE_ARRAY,
     ByteArrayColumnReader,
     ByteArrayColumnWriter,
     ByteArray,
@@ -1270,7 +1245,6 @@ make_type!(
 );
 make_type!(
     FixedLenByteArrayType,
-    Type::FIXED_LEN_BYTE_ARRAY,
     FixedLenByteArrayColumnReader,
     FixedLenByteArrayColumnWriter,
     FixedLenByteArray,
diff --git a/parquet/src/file/statistics.rs b/parquet/src/file/statistics.rs
index 40db3c101..5d1a01df8 100644
--- a/parquet/src/file/statistics.rs
+++ b/parquet/src/file/statistics.rs
@@ -37,15 +37,48 @@
 //! }
 //! ```
 
-use std::{cmp, fmt};
+use std::fmt;
 
 use byteorder::{ByteOrder, LittleEndian};
 use parquet_format::Statistics as TStatistics;
 
 use crate::basic::Type;
+use crate::data_type::private::ParquetValueType;
 use crate::data_type::*;
 use crate::util::bit_util::from_ne_slice;
 
+pub(crate) mod private {
+    use super::*;
+
+    pub trait MakeStatistics {
+        fn make_statistics(statistics: ValueStatistics<Self>) -> Statistics
+        where
+            Self: Sized;
+    }
+
+    macro_rules! gen_make_statistics {
+        ($value_ty:ty, $stat:ident) => {
+            impl MakeStatistics for $value_ty {
+                fn make_statistics(statistics: ValueStatistics<Self>) -> 
Statistics
+                where
+                    Self: Sized,
+                {
+                    Statistics::$stat(statistics)
+                }
+            }
+        };
+    }
+
+    gen_make_statistics!(bool, Boolean);
+    gen_make_statistics!(i32, Int32);
+    gen_make_statistics!(i64, Int64);
+    gen_make_statistics!(Int96, Int96);
+    gen_make_statistics!(f32, Float);
+    gen_make_statistics!(f64, Double);
+    gen_make_statistics!(ByteArray, ByteArray);
+    gen_make_statistics!(FixedLenByteArray, FixedLenByteArray);
+}
+
 // Macro to generate methods create Statistics.
 macro_rules! statistics_new_func {
     ($func:ident, $vtype:ty, $stat:ident) => {
@@ -56,7 +89,7 @@ macro_rules! statistics_new_func {
             nulls: u64,
             is_deprecated: bool,
         ) -> Self {
-            Statistics::$stat(TypedStatistics::new(
+            Statistics::$stat(ValueStatistics::new(
                 min,
                 max,
                 distinct,
@@ -234,17 +267,39 @@ pub fn to_thrift(stats: Option<&Statistics>) -> 
Option<TStatistics> {
 /// Statistics for a column chunk and data page.
 #[derive(Debug, Clone, PartialEq)]
 pub enum Statistics {
-    Boolean(TypedStatistics<BoolType>),
-    Int32(TypedStatistics<Int32Type>),
-    Int64(TypedStatistics<Int64Type>),
-    Int96(TypedStatistics<Int96Type>),
-    Float(TypedStatistics<FloatType>),
-    Double(TypedStatistics<DoubleType>),
-    ByteArray(TypedStatistics<ByteArrayType>),
-    FixedLenByteArray(TypedStatistics<FixedLenByteArrayType>),
+    Boolean(ValueStatistics<bool>),
+    Int32(ValueStatistics<i32>),
+    Int64(ValueStatistics<i64>),
+    Int96(ValueStatistics<Int96>),
+    Float(ValueStatistics<f32>),
+    Double(ValueStatistics<f64>),
+    ByteArray(ValueStatistics<ByteArray>),
+    FixedLenByteArray(ValueStatistics<FixedLenByteArray>),
+}
+
+impl<T: ParquetValueType> From<ValueStatistics<T>> for Statistics {
+    fn from(t: ValueStatistics<T>) -> Self {
+        T::make_statistics(t)
+    }
 }
 
 impl Statistics {
+    pub fn new<T: ParquetValueType>(
+        min: Option<T>,
+        max: Option<T>,
+        distinct_count: Option<u64>,
+        null_count: u64,
+        is_deprecated: bool,
+    ) -> Self {
+        Self::from(ValueStatistics::new(
+            min,
+            max,
+            distinct_count,
+            null_count,
+            is_deprecated,
+        ))
+    }
+
     statistics_new_func![boolean, Option<bool>, Boolean];
 
     statistics_new_func![int32, Option<i32>, Int32];
@@ -341,21 +396,24 @@ impl fmt::Display for Statistics {
 }
 
 /// Typed implementation for [`Statistics`].
-#[derive(Clone)]
-pub struct TypedStatistics<T: DataType> {
-    min: Option<T::T>,
-    max: Option<T::T>,
+pub type TypedStatistics<T> = ValueStatistics<<T as DataType>::T>;
+
+/// Statistics for a particular `ParquetValueType`
+#[derive(Clone, Eq, PartialEq)]
+pub struct ValueStatistics<T> {
+    min: Option<T>,
+    max: Option<T>,
     // Distinct count could be omitted in some cases
     distinct_count: Option<u64>,
     null_count: u64,
     is_min_max_deprecated: bool,
 }
 
-impl<T: DataType> TypedStatistics<T> {
+impl<T: ParquetValueType> ValueStatistics<T> {
     /// Creates new typed statistics.
     pub fn new(
-        min: Option<T::T>,
-        max: Option<T::T>,
+        min: Option<T>,
+        max: Option<T>,
         distinct_count: Option<u64>,
         null_count: u64,
         is_min_max_deprecated: bool,
@@ -373,7 +431,7 @@ impl<T: DataType> TypedStatistics<T> {
     ///
     /// Panics if min value is not set, e.g. all values are `null`.
     /// Use `has_min_max_set` method to check that.
-    pub fn min(&self) -> &T::T {
+    pub fn min(&self) -> &T {
         self.min.as_ref().unwrap()
     }
 
@@ -381,7 +439,7 @@ impl<T: DataType> TypedStatistics<T> {
     ///
     /// Panics if max value is not set, e.g. all values are `null`.
     /// Use `has_min_max_set` method to check that.
-    pub fn max(&self) -> &T::T {
+    pub fn max(&self) -> &T {
         self.max.as_ref().unwrap()
     }
 
@@ -423,7 +481,7 @@ impl<T: DataType> TypedStatistics<T> {
     }
 }
 
-impl<T: DataType> fmt::Display for TypedStatistics<T> {
+impl<T: ParquetValueType> fmt::Display for ValueStatistics<T> {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         write!(f, "{{")?;
         write!(f, "min: ")?;
@@ -447,7 +505,7 @@ impl<T: DataType> fmt::Display for TypedStatistics<T> {
     }
 }
 
-impl<T: DataType> fmt::Debug for TypedStatistics<T> {
+impl<T: ParquetValueType> fmt::Debug for ValueStatistics<T> {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         write!(
             f,
@@ -462,16 +520,6 @@ impl<T: DataType> fmt::Debug for TypedStatistics<T> {
     }
 }
 
-impl<T: DataType> cmp::PartialEq for TypedStatistics<T> {
-    fn eq(&self, other: &TypedStatistics<T>) -> bool {
-        self.min == other.min
-            && self.max == other.max
-            && self.distinct_count == other.distinct_count
-            && self.null_count == other.null_count
-            && self.is_min_max_deprecated == other.is_min_max_deprecated
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use super::*;

Reply via email to