This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new a9fa1b496 Simplify null mask preservation in parquet reader (#2116)
a9fa1b496 is described below

commit a9fa1b496fff8d879424ad7644b9581855503b39
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Jul 22 11:06:17 2022 -0400

    Simplify null mask preservation in parquet reader (#2116)
    
    * Simplify null mask preservation
    
    * Fix benchmarks
    
    * Remove PackedDecoder Option
    
    * Use match expression
    
    * Remove inline from GenericColumnReader::read_batch
---
 parquet/benches/arrow_reader.rs                    |   9 +-
 parquet/src/arrow/array_reader/builder.rs          |  18 +---
 parquet/src/arrow/array_reader/byte_array.rs       |  45 ++------
 .../arrow/array_reader/byte_array_dictionary.rs    |  10 +-
 parquet/src/arrow/array_reader/primitive_array.rs  |  14 +--
 .../src/arrow/record_reader/definition_levels.rs   | 119 ++++++++++-----------
 parquet/src/arrow/record_reader/mod.rs             |  55 +++++-----
 parquet/src/column/reader.rs                       |  69 +++++++-----
 parquet/src/column/reader/decoder.rs               |  37 ++++---
 9 files changed, 173 insertions(+), 203 deletions(-)

diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs
index 647a8dc6f..dc2ed8355 100644
--- a/parquet/benches/arrow_reader.rs
+++ b/parquet/benches/arrow_reader.rs
@@ -307,21 +307,19 @@ fn create_primitive_array_reader(
     use parquet::arrow::array_reader::PrimitiveArrayReader;
     match column_desc.physical_type() {
         Type::INT32 => {
-            let reader = PrimitiveArrayReader::<Int32Type>::new_with_options(
+            let reader = PrimitiveArrayReader::<Int32Type>::new(
                 Box::new(page_iterator),
                 column_desc,
                 None,
-                true,
             )
             .unwrap();
             Box::new(reader)
         }
         Type::INT64 => {
-            let reader = PrimitiveArrayReader::<Int64Type>::new_with_options(
+            let reader = PrimitiveArrayReader::<Int64Type>::new(
                 Box::new(page_iterator),
                 column_desc,
                 None,
-                true,
             )
             .unwrap();
             Box::new(reader)
@@ -335,7 +333,7 @@ fn create_string_byte_array_reader(
     column_desc: ColumnDescPtr,
 ) -> Box<dyn ArrayReader> {
     use parquet::arrow::array_reader::make_byte_array_reader;
-    make_byte_array_reader(Box::new(page_iterator), column_desc, None, 
true).unwrap()
+    make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
 }
 
 fn create_string_byte_array_dictionary_reader(
@@ -350,7 +348,6 @@ fn create_string_byte_array_dictionary_reader(
         Box::new(page_iterator),
         column_desc,
         Some(arrow_type),
-        true,
     )
     .unwrap()
 }
diff --git a/parquet/src/arrow/array_reader/builder.rs 
b/parquet/src/arrow/array_reader/builder.rs
index e8c22f95a..f7954a91a 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -160,16 +160,14 @@ fn build_primitive_reader(
     ));
 
     let page_iterator = row_groups.column_chunks(col_idx)?;
-    let null_mask_only = field.def_level == 1 && field.nullable;
     let arrow_type = Some(field.arrow_type.clone());
 
     match physical_type {
         PhysicalType::BOOLEAN => Ok(Box::new(
-            PrimitiveArrayReader::<BoolType>::new_with_options(
+            PrimitiveArrayReader::<BoolType>::new(
                 page_iterator,
                 column_desc,
                 arrow_type,
-                null_mask_only,
             )?,
         )),
         PhysicalType::INT32 => {
@@ -180,21 +178,19 @@ fn build_primitive_reader(
                 )?))
             } else {
                 Ok(Box::new(
-                    PrimitiveArrayReader::<Int32Type>::new_with_options(
+                    PrimitiveArrayReader::<Int32Type>::new(
                         page_iterator,
                         column_desc,
                         arrow_type,
-                        null_mask_only,
                     )?,
                 ))
             }
         }
         PhysicalType::INT64 => Ok(Box::new(
-            PrimitiveArrayReader::<Int64Type>::new_with_options(
+            PrimitiveArrayReader::<Int64Type>::new(
                 page_iterator,
                 column_desc,
                 arrow_type,
-                null_mask_only,
             )?,
         )),
         PhysicalType::INT96 => {
@@ -218,19 +214,17 @@ fn build_primitive_reader(
             )?))
         }
         PhysicalType::FLOAT => Ok(Box::new(
-            PrimitiveArrayReader::<FloatType>::new_with_options(
+            PrimitiveArrayReader::<FloatType>::new(
                 page_iterator,
                 column_desc,
                 arrow_type,
-                null_mask_only,
             )?,
         )),
         PhysicalType::DOUBLE => Ok(Box::new(
-            PrimitiveArrayReader::<DoubleType>::new_with_options(
+            PrimitiveArrayReader::<DoubleType>::new(
                 page_iterator,
                 column_desc,
                 arrow_type,
-                null_mask_only,
             )?,
         )),
         PhysicalType::BYTE_ARRAY => match arrow_type {
@@ -238,13 +232,11 @@ fn build_primitive_reader(
                 page_iterator,
                 column_desc,
                 arrow_type,
-                null_mask_only,
             ),
             _ => make_byte_array_reader(
                 page_iterator,
                 column_desc,
                 arrow_type,
-                null_mask_only,
             ),
         },
         PhysicalType::FIXED_LEN_BYTE_ARRAY => match field.arrow_type {
diff --git a/parquet/src/arrow/array_reader/byte_array.rs 
b/parquet/src/arrow/array_reader/byte_array.rs
index 96fe02d2d..60489a26b 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -42,7 +42,6 @@ pub fn make_byte_array_reader(
     pages: Box<dyn PageIterator>,
     column_desc: ColumnDescPtr,
     arrow_type: Option<ArrowType>,
-    null_mask_only: bool,
 ) -> Result<Box<dyn ArrayReader>> {
     // Check if Arrow type is specified, else create it from Parquet type
     let data_type = match arrow_type {
@@ -54,15 +53,13 @@ pub fn make_byte_array_reader(
 
     match data_type {
         ArrowType::Binary | ArrowType::Utf8 => {
-            let reader =
-                GenericRecordReader::new_with_options(column_desc, 
null_mask_only);
+            let reader = GenericRecordReader::new(column_desc);
             Ok(Box::new(ByteArrayReader::<i32>::new(
                 pages, data_type, reader,
             )))
         }
         ArrowType::LargeUtf8 | ArrowType::LargeBinary => {
-            let reader =
-                GenericRecordReader::new_with_options(column_desc, 
null_mask_only);
+            let reader = GenericRecordReader::new(column_desc);
             Ok(Box::new(ByteArrayReader::<i64>::new(
                 pages, data_type, reader,
             )))
@@ -127,15 +124,11 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for 
ByteArrayReader<I> {
     }
 
     fn get_def_levels(&self) -> Option<&[i16]> {
-        self.def_levels_buffer
-            .as_ref()
-            .map(|buf| buf.typed_data())
+        self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
     }
 
     fn get_rep_levels(&self) -> Option<&[i16]> {
-        self.rep_levels_buffer
-            .as_ref()
-            .map(|buf| buf.typed_data())
+        self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data())
     }
 }
 
@@ -388,11 +381,8 @@ impl ByteArrayDecoderPlain {
         Ok(to_read)
     }
 
-    pub fn skip(
-        &mut self,
-        to_skip: usize,
-    ) -> Result<usize> {
-        let to_skip = to_skip.min( self.max_remaining_values);
+    pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
+        let to_skip = to_skip.min(self.max_remaining_values);
         let mut skip = 0;
         let buf = self.buf.as_ref();
 
@@ -478,10 +468,7 @@ impl ByteArrayDecoderDeltaLength {
         Ok(to_read)
     }
 
-    fn skip(
-        &mut self,
-        to_skip: usize,
-    ) -> Result<usize> {
+    fn skip(&mut self, to_skip: usize) -> Result<usize> {
         let remain_values = self.lengths.len() - self.length_offset;
         let to_skip = remain_values.min(to_skip);
 
@@ -583,10 +570,7 @@ impl ByteArrayDecoderDelta {
         Ok(to_read)
     }
 
-    fn skip(
-        &mut self,
-        to_skip: usize,
-    ) -> Result<usize> {
+    fn skip(&mut self, to_skip: usize) -> Result<usize> {
         let to_skip = to_skip.min(self.prefix_lengths.len() - 
self.length_offset);
 
         let length_range = self.length_offset..self.length_offset + to_skip;
@@ -704,8 +688,8 @@ impl ByteArrayDecoderDictionary {
                 self.index_offset = 0;
             }
 
-            let skip = (to_skip - values_skip)
-                .min(self.index_buf_len - self.index_offset);
+            let skip =
+                (to_skip - values_skip).min(self.index_buf_len - 
self.index_offset);
 
             self.index_offset += skip;
             self.max_remaining_values -= skip;
@@ -816,14 +800,7 @@ mod tests {
 
             assert_eq!(
                 strings.iter().collect::<Vec<_>>(),
-                vec![
-                    None,
-                    None,
-                    Some("hello"),
-                    Some("b"),
-                    None,
-                    None,
-                ]
+                vec![None, None, Some("hello"), Some("b"), None, None,]
             );
         }
     }
diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs 
b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
index 39d920ef1..486dfe211 100644
--- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
+++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
@@ -44,17 +44,14 @@ use crate::util::memory::ByteBufferPtr;
 /// A macro to reduce verbosity of [`make_byte_array_dictionary_reader`]
 macro_rules! make_reader {
     (
-        ($pages:expr, $column_desc:expr, $data_type:expr, 
$null_mask_only:expr) => match ($k:expr, $v:expr) {
+        ($pages:expr, $column_desc:expr, $data_type:expr) => match ($k:expr, 
$v:expr) {
             $(($key_arrow:pat, $value_arrow:pat) => ($key_type:ty, 
$value_type:ty),)+
         }
     ) => {
         match (($k, $v)) {
             $(
                 ($key_arrow, $value_arrow) => {
-                    let reader = GenericRecordReader::new_with_options(
-                        $column_desc,
-                        $null_mask_only,
-                    );
+                    let reader = GenericRecordReader::new($column_desc);
                     Ok(Box::new(ByteArrayDictionaryReader::<$key_type, 
$value_type>::new(
                         $pages, $data_type, reader,
                     )))
@@ -84,7 +81,6 @@ pub fn make_byte_array_dictionary_reader(
     pages: Box<dyn PageIterator>,
     column_desc: ColumnDescPtr,
     arrow_type: Option<ArrowType>,
-    null_mask_only: bool,
 ) -> Result<Box<dyn ArrayReader>> {
     // Check if Arrow type is specified, else create it from Parquet type
     let data_type = match arrow_type {
@@ -97,7 +93,7 @@ pub fn make_byte_array_dictionary_reader(
     match &data_type {
         ArrowType::Dictionary(key_type, value_type) => {
             make_reader! {
-                (pages, column_desc, data_type, null_mask_only) => match 
(key_type.as_ref(), value_type.as_ref()) {
+                (pages, column_desc, data_type) => match (key_type.as_ref(), 
value_type.as_ref()) {
                     (ArrowType::UInt8, ArrowType::Binary | ArrowType::Utf8) => 
(u8, i32),
                     (ArrowType::UInt8, ArrowType::LargeBinary | 
ArrowType::LargeUtf8) => (u8, i64),
                     (ArrowType::Int8, ArrowType::Binary | ArrowType::Utf8) => 
(i8, i32),
diff --git a/parquet/src/arrow/array_reader/primitive_array.rs 
b/parquet/src/arrow/array_reader/primitive_array.rs
index 6f6644b5d..0488e254c 100644
--- a/parquet/src/arrow/array_reader/primitive_array.rs
+++ b/parquet/src/arrow/array_reader/primitive_array.rs
@@ -58,17 +58,6 @@ where
         pages: Box<dyn PageIterator>,
         column_desc: ColumnDescPtr,
         arrow_type: Option<ArrowType>,
-    ) -> Result<Self> {
-        Self::new_with_options(pages, column_desc, arrow_type, false)
-    }
-
-    /// Construct primitive array reader with ability to only compute null 
mask and not
-    /// buffer level data
-    pub fn new_with_options(
-        pages: Box<dyn PageIterator>,
-        column_desc: ColumnDescPtr,
-        arrow_type: Option<ArrowType>,
-        null_mask_only: bool,
     ) -> Result<Self> {
         // Check if Arrow type is specified, else create it from Parquet type
         let data_type = match arrow_type {
@@ -78,8 +67,7 @@ where
                 .clone(),
         };
 
-        let record_reader =
-            RecordReader::<T>::new_with_options(column_desc.clone(), 
null_mask_only);
+        let record_reader = RecordReader::<T>::new(column_desc.clone());
 
         Ok(Self {
             data_type,
diff --git a/parquet/src/arrow/record_reader/definition_levels.rs 
b/parquet/src/arrow/record_reader/definition_levels.rs
index 21526f21f..a12772af0 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -146,51 +146,50 @@ impl LevelsBufferSlice for DefinitionLevelBuffer {
     }
 }
 
+enum MaybePacked {
+    Packed(PackedDecoder),
+    Fallback(ColumnLevelDecoderImpl),
+}
+
 pub struct DefinitionLevelBufferDecoder {
     max_level: i16,
-    encoding: Encoding,
-    data: Option<ByteBufferPtr>,
-    column_decoder: Option<ColumnLevelDecoderImpl>,
-    packed_decoder: Option<PackedDecoder>,
+    decoder: MaybePacked,
+}
+
+impl DefinitionLevelBufferDecoder {
+    pub fn new(max_level: i16, packed: bool) -> Self {
+        let decoder = match packed {
+            true => MaybePacked::Packed(PackedDecoder::new()),
+            false => 
MaybePacked::Fallback(ColumnLevelDecoderImpl::new(max_level)),
+        };
+
+        Self { max_level, decoder }
+    }
 }
 
 impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
     type Slice = DefinitionLevelBuffer;
 
-    fn new(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self {
-        Self {
-            max_level,
-            encoding,
-            data: Some(data),
-            column_decoder: None,
-            packed_decoder: None,
+    fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
+        match &mut self.decoder {
+            MaybePacked::Packed(d) => d.set_data(encoding, data),
+            MaybePacked::Fallback(d) => d.set_data(encoding, data),
         }
     }
 
-    fn read(
-        &mut self,
-        writer: &mut Self::Slice,
-        range: Range<usize>,
-    ) -> crate::errors::Result<usize> {
-        match &mut writer.inner {
-            BufferInner::Full {
-                levels,
-                nulls,
-                max_level,
-            } => {
+    fn read(&mut self, writer: &mut Self::Slice, range: Range<usize>) -> 
Result<usize> {
+        match (&mut writer.inner, &mut self.decoder) {
+            (
+                BufferInner::Full {
+                    levels,
+                    nulls,
+                    max_level,
+                },
+                MaybePacked::Fallback(decoder),
+            ) => {
                 assert_eq!(self.max_level, *max_level);
                 assert_eq!(range.start + writer.len, nulls.len());
 
-                let decoder = match self.data.take() {
-                    Some(data) => self.column_decoder.insert(
-                        ColumnLevelDecoderImpl::new(self.max_level, 
self.encoding, data),
-                    ),
-                    None => self
-                        .column_decoder
-                        .as_mut()
-                        .expect("consistent null_mask_only"),
-                };
-
                 levels.resize(range.end + writer.len);
 
                 let slice = &mut levels.as_slice_mut()[writer.len..];
@@ -203,22 +202,13 @@ impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
 
                 Ok(levels_read)
             }
-            BufferInner::Mask { nulls } => {
+            (BufferInner::Mask { nulls }, MaybePacked::Packed(decoder)) => {
                 assert_eq!(self.max_level, 1);
                 assert_eq!(range.start + writer.len, nulls.len());
 
-                let decoder = match self.data.take() {
-                    Some(data) => self
-                        .packed_decoder
-                        .insert(PackedDecoder::new(self.encoding, data)),
-                    None => self
-                        .packed_decoder
-                        .as_mut()
-                        .expect("consistent null_mask_only"),
-                };
-
                 decoder.read(nulls, range.end - range.start)
             }
+            _ => unreachable!("inconsistent null mask"),
         }
     }
 }
@@ -306,28 +296,30 @@ impl PackedDecoder {
 }
 
 impl PackedDecoder {
-    fn new(encoding: Encoding, data: ByteBufferPtr) -> Self {
-        match encoding {
-            Encoding::RLE => Self {
-                data,
-                data_offset: 0,
-                rle_left: 0,
-                rle_value: false,
-                packed_count: 0,
-                packed_offset: 0,
-            },
-            Encoding::BIT_PACKED => Self {
-                data_offset: 0,
-                rle_left: 0,
-                rle_value: false,
-                packed_count: data.len() * 8,
-                packed_offset: 0,
-                data,
-            },
-            _ => unreachable!("invalid level encoding: {}", encoding),
+    fn new() -> Self {
+        Self {
+            data: ByteBufferPtr::new(vec![]),
+            data_offset: 0,
+            rle_left: 0,
+            rle_value: false,
+            packed_count: 0,
+            packed_offset: 0,
         }
     }
 
+    fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
+        self.rle_left = 0;
+        self.rle_value = false;
+        self.packed_offset = 0;
+        self.packed_count = match encoding {
+            Encoding::RLE => 0,
+            Encoding::BIT_PACKED => data.len() * 8,
+            _ => unreachable!("invalid level encoding: {}", encoding),
+        };
+        self.data = data;
+        self.data_offset = 0;
+    }
+
     fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) -> 
Result<usize> {
         let mut read = 0;
         while read != len {
@@ -381,7 +373,8 @@ mod tests {
         assert_eq!(expected.len(), len);
 
         let encoded = encoder.consume().unwrap();
-        let mut decoder = PackedDecoder::new(Encoding::RLE, 
ByteBufferPtr::new(encoded));
+        let mut decoder = PackedDecoder::new();
+        decoder.set_data(Encoding::RLE, ByteBufferPtr::new(encoded));
 
         // Decode data in random length intervals
         let mut decoded = BooleanBufferBuilder::new(len);
diff --git a/parquet/src/arrow/record_reader/mod.rs 
b/parquet/src/arrow/record_reader/mod.rs
index a7cd38d3c..04499997e 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -62,6 +62,7 @@ pub struct GenericRecordReader<V, CV> {
 
     /// Number of records accumulated in records
     num_records: usize,
+
     /// Number of values `num_records` contains.
     num_values: usize,
 
@@ -76,33 +77,8 @@ where
 {
     /// Create a new [`GenericRecordReader`]
     pub fn new(desc: ColumnDescPtr) -> Self {
-        Self::new_with_options(desc, false)
-    }
-
-    /// Create a new [`GenericRecordReader`] with the ability to only generate 
the bitmask
-    ///
-    /// If `null_mask_only` is true only the null bitmask will be generated and
-    /// [`Self::consume_def_levels`] and [`Self::consume_rep_levels`] will 
always return `None`
-    ///
-    /// It is insufficient to solely check that that the max definition level 
is 1 as we
-    /// need there to be no nullable parent array that will required decoded 
definition levels
-    ///
-    /// In particular consider the case of:
-    ///
-    /// ```ignore
-    /// message nested {
-    ///   OPTIONAL Group group {
-    ///     REQUIRED INT32 leaf;
-    ///   }
-    /// }
-    /// ```
-    ///
-    /// The maximum definition level of leaf is 1, however, we still need to 
decode the
-    /// definition levels so that the parent group can be constructed correctly
-    ///
-    pub(crate) fn new_with_options(desc: ColumnDescPtr, null_mask_only: bool) 
-> Self {
         let def_levels = (desc.max_def_level() > 0)
-            .then(|| DefinitionLevelBuffer::new(&desc, null_mask_only));
+            .then(|| DefinitionLevelBuffer::new(&desc, 
packed_null_mask(&desc)));
 
         let rep_levels = (desc.max_rep_level() > 0).then(ScalarBuffer::new);
 
@@ -120,9 +96,25 @@ where
 
     /// Set the current page reader.
     pub fn set_page_reader(&mut self, page_reader: Box<dyn PageReader>) -> 
Result<()> {
-        self.column_reader = Some(GenericColumnReader::new(
+        let descr = &self.column_desc;
+        let values_decoder = CV::new(descr);
+
+        let def_level_decoder = (descr.max_def_level() != 0).then(|| {
+            DefinitionLevelBufferDecoder::new(
+                descr.max_def_level(),
+                packed_null_mask(descr),
+            )
+        });
+
+        let rep_level_decoder = (descr.max_rep_level() != 0)
+            .then(|| ColumnLevelDecoderImpl::new(descr.max_rep_level()));
+
+        self.column_reader = Some(GenericColumnReader::new_with_decoders(
             self.column_desc.clone(),
             page_reader,
+            values_decoder,
+            def_level_decoder,
+            rep_level_decoder,
         ));
         Ok(())
     }
@@ -392,6 +384,15 @@ where
     }
 }
 
+/// Returns true if we do not need to unpack the nullability for this column, 
this is
+/// only possible if the max defiition level is 1, and corresponds to nulls at 
the
+/// leaf level, as opposed to a nullable parent nested type
+fn packed_null_mask(descr: &ColumnDescPtr) -> bool {
+    descr.max_def_level() == 1
+        && descr.max_rep_level() == 0
+        && descr.self_type().is_optional()
+}
+
 #[cfg(test)]
 mod tests {
     use std::sync::Arc;
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index cddd45730..ea00bcf1b 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -22,8 +22,8 @@ use std::cmp::min;
 use super::page::{Page, PageReader};
 use crate::basic::*;
 use crate::column::reader::decoder::{
-    ColumnValueDecoder, DefinitionLevelDecoder, LevelsBufferSlice,
-    RepetitionLevelDecoder, ValuesBufferSlice,
+    ColumnLevelDecoderImpl, ColumnValueDecoder, ColumnValueDecoderImpl,
+    DefinitionLevelDecoder, LevelsBufferSlice, RepetitionLevelDecoder, 
ValuesBufferSlice,
 };
 use crate::data_type::*;
 use crate::errors::{ParquetError, Result};
@@ -103,9 +103,9 @@ pub fn get_typed_column_reader<T: DataType>(
 
 /// Typed value reader for a particular primitive column.
 pub type ColumnReaderImpl<T> = GenericColumnReader<
-    decoder::ColumnLevelDecoderImpl,
-    decoder::ColumnLevelDecoderImpl,
-    decoder::ColumnValueDecoderImpl<T>,
+    ColumnLevelDecoderImpl,
+    ColumnLevelDecoderImpl,
+    ColumnValueDecoderImpl<T>,
 >;
 
 /// Reads data for a given column chunk, using the provided decoders:
@@ -135,27 +135,47 @@ pub struct GenericColumnReader<R, D, V> {
     values_decoder: V,
 }
 
-impl<R, D, V> GenericColumnReader<R, D, V>
+impl<V> GenericColumnReader<ColumnLevelDecoderImpl, ColumnLevelDecoderImpl, V>
 where
-    R: RepetitionLevelDecoder,
-    D: DefinitionLevelDecoder,
     V: ColumnValueDecoder,
 {
     /// Creates new column reader based on column descriptor and page reader.
     pub fn new(descr: ColumnDescPtr, page_reader: Box<dyn PageReader>) -> Self 
{
         let values_decoder = V::new(&descr);
-        Self::new_with_decoder(descr, page_reader, values_decoder)
+
+        let def_level_decoder = (descr.max_def_level() != 0)
+            .then(|| ColumnLevelDecoderImpl::new(descr.max_def_level()));
+
+        let rep_level_decoder = (descr.max_rep_level() != 0)
+            .then(|| ColumnLevelDecoderImpl::new(descr.max_rep_level()));
+
+        Self::new_with_decoders(
+            descr,
+            page_reader,
+            values_decoder,
+            def_level_decoder,
+            rep_level_decoder,
+        )
     }
+}
 
-    fn new_with_decoder(
+impl<R, D, V> GenericColumnReader<R, D, V>
+where
+    R: RepetitionLevelDecoder,
+    D: DefinitionLevelDecoder,
+    V: ColumnValueDecoder,
+{
+    pub(crate) fn new_with_decoders(
         descr: ColumnDescPtr,
         page_reader: Box<dyn PageReader>,
         values_decoder: V,
+        def_level_decoder: Option<D>,
+        rep_level_decoder: Option<R>,
     ) -> Self {
         Self {
             descr,
-            def_level_decoder: None,
-            rep_level_decoder: None,
+            def_level_decoder,
+            rep_level_decoder,
             page_reader,
             num_buffered_values: 0,
             num_decoded_values: 0,
@@ -175,7 +195,6 @@ where
     ///
     /// `values` will be contiguously populated with the non-null values. Note 
that if the column
     /// is not required, this may be less than either `batch_size` or the 
number of levels read
-    #[inline]
     pub fn read_batch(
         &mut self,
         batch_size: usize,
@@ -383,10 +402,10 @@ where
                                 )?;
                                 offset += bytes_read;
 
-                                let decoder =
-                                    R::new(max_rep_level, rep_level_encoding, 
level_data);
-
-                                self.rep_level_decoder = Some(decoder);
+                                self.rep_level_decoder
+                                    .as_mut()
+                                    .unwrap()
+                                    .set_data(rep_level_encoding, level_data);
                             }
 
                             if max_def_level > 0 {
@@ -398,10 +417,10 @@ where
                                 )?;
                                 offset += bytes_read;
 
-                                let decoder =
-                                    D::new(max_def_level, def_level_encoding, 
level_data);
-
-                                self.def_level_decoder = Some(decoder);
+                                self.def_level_decoder
+                                    .as_mut()
+                                    .unwrap()
+                                    .set_data(def_level_encoding, level_data);
                             }
 
                             self.values_decoder.set_data(
@@ -434,26 +453,22 @@ where
                             // DataPage v2 only supports RLE encoding for 
repetition
                             // levels
                             if self.descr.max_rep_level() > 0 {
-                                let decoder = R::new(
-                                    self.descr.max_rep_level(),
+                                
self.rep_level_decoder.as_mut().unwrap().set_data(
                                     Encoding::RLE,
                                     buf.range(0, rep_levels_byte_len as usize),
                                 );
-                                self.rep_level_decoder = Some(decoder);
                             }
 
                             // DataPage v2 only supports RLE encoding for 
definition
                             // levels
                             if self.descr.max_def_level() > 0 {
-                                let decoder = D::new(
-                                    self.descr.max_def_level(),
+                                
self.def_level_decoder.as_mut().unwrap().set_data(
                                     Encoding::RLE,
                                     buf.range(
                                         rep_levels_byte_len as usize,
                                         def_levels_byte_len as usize,
                                     ),
                                 );
-                                self.def_level_decoder = Some(decoder);
                             }
 
                             self.values_decoder.set_data(
diff --git a/parquet/src/column/reader/decoder.rs 
b/parquet/src/column/reader/decoder.rs
index 53f7e2943..5879c6180 100644
--- a/parquet/src/column/reader/decoder.rs
+++ b/parquet/src/column/reader/decoder.rs
@@ -66,8 +66,8 @@ impl<T> ValuesBufferSlice for [T] {
 pub trait ColumnLevelDecoder {
     type Slice: LevelsBufferSlice + ?Sized;
 
-    /// Create a new [`ColumnLevelDecoder`]
-    fn new(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self;
+    /// Set data for this [`ColumnLevelDecoder`]
+    fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr);
 
     /// Read level data into `out[range]` returning the number of levels read
     ///
@@ -266,7 +266,18 @@ impl<T: DataType> ColumnValueDecoder for 
ColumnValueDecoderImpl<T> {
 
 /// An implementation of [`ColumnLevelDecoder`] for `[i16]`
 pub struct ColumnLevelDecoderImpl {
-    inner: LevelDecoderInner,
+    decoder: Option<LevelDecoderInner>,
+    bit_width: u8,
+}
+
+impl ColumnLevelDecoderImpl {
+    pub fn new(max_level: i16) -> Self {
+        let bit_width = num_required_bits(max_level as u64);
+        Self {
+            decoder: None,
+            bit_width,
+        }
+    }
 }
 
 enum LevelDecoderInner {
@@ -277,25 +288,25 @@ enum LevelDecoderInner {
 impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
     type Slice = [i16];
 
-    fn new(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self {
-        let bit_width = num_required_bits(max_level as u64);
+    fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
         match encoding {
             Encoding::RLE => {
-                let mut decoder = RleDecoder::new(bit_width);
+                let mut decoder = RleDecoder::new(self.bit_width);
                 decoder.set_data(data);
-                Self {
-                    inner: LevelDecoderInner::Rle(decoder),
-                }
+                self.decoder = Some(LevelDecoderInner::Rle(decoder));
+            }
+            Encoding::BIT_PACKED => {
+                self.decoder = Some(LevelDecoderInner::Packed(
+                    BitReader::new(data),
+                    self.bit_width,
+                ));
             }
-            Encoding::BIT_PACKED => Self {
-                inner: LevelDecoderInner::Packed(BitReader::new(data), 
bit_width),
-            },
             _ => unreachable!("invalid level encoding: {}", encoding),
         }
     }
 
     fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> 
Result<usize> {
-        match &mut self.inner {
+        match self.decoder.as_mut().unwrap() {
             LevelDecoderInner::Packed(reader, bit_width) => {
                 Ok(reader.get_batch::<i16>(&mut out[range], *bit_width as 
usize))
             }

Reply via email to