This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new a9fa1b496 Simplify null mask preservation in parquet reader (#2116)
a9fa1b496 is described below
commit a9fa1b496fff8d879424ad7644b9581855503b39
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Jul 22 11:06:17 2022 -0400
Simplify null mask preservation in parquet reader (#2116)
* Simplify null mask preservation
* Fix benchmarks
* Remove PackedDecoder Option
* Use match expression
* Remove inline from GenericColumnReader::read_batch
---
parquet/benches/arrow_reader.rs | 9 +-
parquet/src/arrow/array_reader/builder.rs | 18 +---
parquet/src/arrow/array_reader/byte_array.rs | 45 ++------
.../arrow/array_reader/byte_array_dictionary.rs | 10 +-
parquet/src/arrow/array_reader/primitive_array.rs | 14 +--
.../src/arrow/record_reader/definition_levels.rs | 119 ++++++++++-----------
parquet/src/arrow/record_reader/mod.rs | 55 +++++-----
parquet/src/column/reader.rs | 69 +++++++-----
parquet/src/column/reader/decoder.rs | 37 ++++---
9 files changed, 173 insertions(+), 203 deletions(-)
diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs
index 647a8dc6f..dc2ed8355 100644
--- a/parquet/benches/arrow_reader.rs
+++ b/parquet/benches/arrow_reader.rs
@@ -307,21 +307,19 @@ fn create_primitive_array_reader(
use parquet::arrow::array_reader::PrimitiveArrayReader;
match column_desc.physical_type() {
Type::INT32 => {
- let reader = PrimitiveArrayReader::<Int32Type>::new_with_options(
+ let reader = PrimitiveArrayReader::<Int32Type>::new(
Box::new(page_iterator),
column_desc,
None,
- true,
)
.unwrap();
Box::new(reader)
}
Type::INT64 => {
- let reader = PrimitiveArrayReader::<Int64Type>::new_with_options(
+ let reader = PrimitiveArrayReader::<Int64Type>::new(
Box::new(page_iterator),
column_desc,
None,
- true,
)
.unwrap();
Box::new(reader)
@@ -335,7 +333,7 @@ fn create_string_byte_array_reader(
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
use parquet::arrow::array_reader::make_byte_array_reader;
- make_byte_array_reader(Box::new(page_iterator), column_desc, None,
true).unwrap()
+ make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
}
fn create_string_byte_array_dictionary_reader(
@@ -350,7 +348,6 @@ fn create_string_byte_array_dictionary_reader(
Box::new(page_iterator),
column_desc,
Some(arrow_type),
- true,
)
.unwrap()
}
diff --git a/parquet/src/arrow/array_reader/builder.rs
b/parquet/src/arrow/array_reader/builder.rs
index e8c22f95a..f7954a91a 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -160,16 +160,14 @@ fn build_primitive_reader(
));
let page_iterator = row_groups.column_chunks(col_idx)?;
- let null_mask_only = field.def_level == 1 && field.nullable;
let arrow_type = Some(field.arrow_type.clone());
match physical_type {
PhysicalType::BOOLEAN => Ok(Box::new(
- PrimitiveArrayReader::<BoolType>::new_with_options(
+ PrimitiveArrayReader::<BoolType>::new(
page_iterator,
column_desc,
arrow_type,
- null_mask_only,
)?,
)),
PhysicalType::INT32 => {
@@ -180,21 +178,19 @@ fn build_primitive_reader(
)?))
} else {
Ok(Box::new(
- PrimitiveArrayReader::<Int32Type>::new_with_options(
+ PrimitiveArrayReader::<Int32Type>::new(
page_iterator,
column_desc,
arrow_type,
- null_mask_only,
)?,
))
}
}
PhysicalType::INT64 => Ok(Box::new(
- PrimitiveArrayReader::<Int64Type>::new_with_options(
+ PrimitiveArrayReader::<Int64Type>::new(
page_iterator,
column_desc,
arrow_type,
- null_mask_only,
)?,
)),
PhysicalType::INT96 => {
@@ -218,19 +214,17 @@ fn build_primitive_reader(
)?))
}
PhysicalType::FLOAT => Ok(Box::new(
- PrimitiveArrayReader::<FloatType>::new_with_options(
+ PrimitiveArrayReader::<FloatType>::new(
page_iterator,
column_desc,
arrow_type,
- null_mask_only,
)?,
)),
PhysicalType::DOUBLE => Ok(Box::new(
- PrimitiveArrayReader::<DoubleType>::new_with_options(
+ PrimitiveArrayReader::<DoubleType>::new(
page_iterator,
column_desc,
arrow_type,
- null_mask_only,
)?,
)),
PhysicalType::BYTE_ARRAY => match arrow_type {
@@ -238,13 +232,11 @@ fn build_primitive_reader(
page_iterator,
column_desc,
arrow_type,
- null_mask_only,
),
_ => make_byte_array_reader(
page_iterator,
column_desc,
arrow_type,
- null_mask_only,
),
},
PhysicalType::FIXED_LEN_BYTE_ARRAY => match field.arrow_type {
diff --git a/parquet/src/arrow/array_reader/byte_array.rs
b/parquet/src/arrow/array_reader/byte_array.rs
index 96fe02d2d..60489a26b 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -42,7 +42,6 @@ pub fn make_byte_array_reader(
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
- null_mask_only: bool,
) -> Result<Box<dyn ArrayReader>> {
// Check if Arrow type is specified, else create it from Parquet type
let data_type = match arrow_type {
@@ -54,15 +53,13 @@ pub fn make_byte_array_reader(
match data_type {
ArrowType::Binary | ArrowType::Utf8 => {
- let reader =
- GenericRecordReader::new_with_options(column_desc,
null_mask_only);
+ let reader = GenericRecordReader::new(column_desc);
Ok(Box::new(ByteArrayReader::<i32>::new(
pages, data_type, reader,
)))
}
ArrowType::LargeUtf8 | ArrowType::LargeBinary => {
- let reader =
- GenericRecordReader::new_with_options(column_desc,
null_mask_only);
+ let reader = GenericRecordReader::new(column_desc);
Ok(Box::new(ByteArrayReader::<i64>::new(
pages, data_type, reader,
)))
@@ -127,15 +124,11 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for
ByteArrayReader<I> {
}
fn get_def_levels(&self) -> Option<&[i16]> {
- self.def_levels_buffer
- .as_ref()
- .map(|buf| buf.typed_data())
+ self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
}
fn get_rep_levels(&self) -> Option<&[i16]> {
- self.rep_levels_buffer
- .as_ref()
- .map(|buf| buf.typed_data())
+ self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data())
}
}
@@ -388,11 +381,8 @@ impl ByteArrayDecoderPlain {
Ok(to_read)
}
- pub fn skip(
- &mut self,
- to_skip: usize,
- ) -> Result<usize> {
- let to_skip = to_skip.min( self.max_remaining_values);
+ pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
+ let to_skip = to_skip.min(self.max_remaining_values);
let mut skip = 0;
let buf = self.buf.as_ref();
@@ -478,10 +468,7 @@ impl ByteArrayDecoderDeltaLength {
Ok(to_read)
}
- fn skip(
- &mut self,
- to_skip: usize,
- ) -> Result<usize> {
+ fn skip(&mut self, to_skip: usize) -> Result<usize> {
let remain_values = self.lengths.len() - self.length_offset;
let to_skip = remain_values.min(to_skip);
@@ -583,10 +570,7 @@ impl ByteArrayDecoderDelta {
Ok(to_read)
}
- fn skip(
- &mut self,
- to_skip: usize,
- ) -> Result<usize> {
+ fn skip(&mut self, to_skip: usize) -> Result<usize> {
let to_skip = to_skip.min(self.prefix_lengths.len() -
self.length_offset);
let length_range = self.length_offset..self.length_offset + to_skip;
@@ -704,8 +688,8 @@ impl ByteArrayDecoderDictionary {
self.index_offset = 0;
}
- let skip = (to_skip - values_skip)
- .min(self.index_buf_len - self.index_offset);
+ let skip =
+ (to_skip - values_skip).min(self.index_buf_len -
self.index_offset);
self.index_offset += skip;
self.max_remaining_values -= skip;
@@ -816,14 +800,7 @@ mod tests {
assert_eq!(
strings.iter().collect::<Vec<_>>(),
- vec![
- None,
- None,
- Some("hello"),
- Some("b"),
- None,
- None,
- ]
+ vec![None, None, Some("hello"), Some("b"), None, None,]
);
}
}
diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
index 39d920ef1..486dfe211 100644
--- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
+++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
@@ -44,17 +44,14 @@ use crate::util::memory::ByteBufferPtr;
/// A macro to reduce verbosity of [`make_byte_array_dictionary_reader`]
macro_rules! make_reader {
(
- ($pages:expr, $column_desc:expr, $data_type:expr,
$null_mask_only:expr) => match ($k:expr, $v:expr) {
+ ($pages:expr, $column_desc:expr, $data_type:expr) => match ($k:expr,
$v:expr) {
$(($key_arrow:pat, $value_arrow:pat) => ($key_type:ty,
$value_type:ty),)+
}
) => {
match (($k, $v)) {
$(
($key_arrow, $value_arrow) => {
- let reader = GenericRecordReader::new_with_options(
- $column_desc,
- $null_mask_only,
- );
+ let reader = GenericRecordReader::new($column_desc);
Ok(Box::new(ByteArrayDictionaryReader::<$key_type,
$value_type>::new(
$pages, $data_type, reader,
)))
@@ -84,7 +81,6 @@ pub fn make_byte_array_dictionary_reader(
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
- null_mask_only: bool,
) -> Result<Box<dyn ArrayReader>> {
// Check if Arrow type is specified, else create it from Parquet type
let data_type = match arrow_type {
@@ -97,7 +93,7 @@ pub fn make_byte_array_dictionary_reader(
match &data_type {
ArrowType::Dictionary(key_type, value_type) => {
make_reader! {
- (pages, column_desc, data_type, null_mask_only) => match
(key_type.as_ref(), value_type.as_ref()) {
+ (pages, column_desc, data_type) => match (key_type.as_ref(),
value_type.as_ref()) {
(ArrowType::UInt8, ArrowType::Binary | ArrowType::Utf8) =>
(u8, i32),
(ArrowType::UInt8, ArrowType::LargeBinary |
ArrowType::LargeUtf8) => (u8, i64),
(ArrowType::Int8, ArrowType::Binary | ArrowType::Utf8) =>
(i8, i32),
diff --git a/parquet/src/arrow/array_reader/primitive_array.rs
b/parquet/src/arrow/array_reader/primitive_array.rs
index 6f6644b5d..0488e254c 100644
--- a/parquet/src/arrow/array_reader/primitive_array.rs
+++ b/parquet/src/arrow/array_reader/primitive_array.rs
@@ -58,17 +58,6 @@ where
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
- ) -> Result<Self> {
- Self::new_with_options(pages, column_desc, arrow_type, false)
- }
-
- /// Construct primitive array reader with ability to only compute null
mask and not
- /// buffer level data
- pub fn new_with_options(
- pages: Box<dyn PageIterator>,
- column_desc: ColumnDescPtr,
- arrow_type: Option<ArrowType>,
- null_mask_only: bool,
) -> Result<Self> {
// Check if Arrow type is specified, else create it from Parquet type
let data_type = match arrow_type {
@@ -78,8 +67,7 @@ where
.clone(),
};
- let record_reader =
- RecordReader::<T>::new_with_options(column_desc.clone(),
null_mask_only);
+ let record_reader = RecordReader::<T>::new(column_desc.clone());
Ok(Self {
data_type,
diff --git a/parquet/src/arrow/record_reader/definition_levels.rs
b/parquet/src/arrow/record_reader/definition_levels.rs
index 21526f21f..a12772af0 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -146,51 +146,50 @@ impl LevelsBufferSlice for DefinitionLevelBuffer {
}
}
+enum MaybePacked {
+ Packed(PackedDecoder),
+ Fallback(ColumnLevelDecoderImpl),
+}
+
pub struct DefinitionLevelBufferDecoder {
max_level: i16,
- encoding: Encoding,
- data: Option<ByteBufferPtr>,
- column_decoder: Option<ColumnLevelDecoderImpl>,
- packed_decoder: Option<PackedDecoder>,
+ decoder: MaybePacked,
+}
+
+impl DefinitionLevelBufferDecoder {
+ pub fn new(max_level: i16, packed: bool) -> Self {
+ let decoder = match packed {
+ true => MaybePacked::Packed(PackedDecoder::new()),
+ false =>
MaybePacked::Fallback(ColumnLevelDecoderImpl::new(max_level)),
+ };
+
+ Self { max_level, decoder }
+ }
}
impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
type Slice = DefinitionLevelBuffer;
- fn new(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self {
- Self {
- max_level,
- encoding,
- data: Some(data),
- column_decoder: None,
- packed_decoder: None,
+ fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
+ match &mut self.decoder {
+ MaybePacked::Packed(d) => d.set_data(encoding, data),
+ MaybePacked::Fallback(d) => d.set_data(encoding, data),
}
}
- fn read(
- &mut self,
- writer: &mut Self::Slice,
- range: Range<usize>,
- ) -> crate::errors::Result<usize> {
- match &mut writer.inner {
- BufferInner::Full {
- levels,
- nulls,
- max_level,
- } => {
+ fn read(&mut self, writer: &mut Self::Slice, range: Range<usize>) ->
Result<usize> {
+ match (&mut writer.inner, &mut self.decoder) {
+ (
+ BufferInner::Full {
+ levels,
+ nulls,
+ max_level,
+ },
+ MaybePacked::Fallback(decoder),
+ ) => {
assert_eq!(self.max_level, *max_level);
assert_eq!(range.start + writer.len, nulls.len());
- let decoder = match self.data.take() {
- Some(data) => self.column_decoder.insert(
- ColumnLevelDecoderImpl::new(self.max_level,
self.encoding, data),
- ),
- None => self
- .column_decoder
- .as_mut()
- .expect("consistent null_mask_only"),
- };
-
levels.resize(range.end + writer.len);
let slice = &mut levels.as_slice_mut()[writer.len..];
@@ -203,22 +202,13 @@ impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
Ok(levels_read)
}
- BufferInner::Mask { nulls } => {
+ (BufferInner::Mask { nulls }, MaybePacked::Packed(decoder)) => {
assert_eq!(self.max_level, 1);
assert_eq!(range.start + writer.len, nulls.len());
- let decoder = match self.data.take() {
- Some(data) => self
- .packed_decoder
- .insert(PackedDecoder::new(self.encoding, data)),
- None => self
- .packed_decoder
- .as_mut()
- .expect("consistent null_mask_only"),
- };
-
decoder.read(nulls, range.end - range.start)
}
+ _ => unreachable!("inconsistent null mask"),
}
}
}
@@ -306,28 +296,30 @@ impl PackedDecoder {
}
impl PackedDecoder {
- fn new(encoding: Encoding, data: ByteBufferPtr) -> Self {
- match encoding {
- Encoding::RLE => Self {
- data,
- data_offset: 0,
- rle_left: 0,
- rle_value: false,
- packed_count: 0,
- packed_offset: 0,
- },
- Encoding::BIT_PACKED => Self {
- data_offset: 0,
- rle_left: 0,
- rle_value: false,
- packed_count: data.len() * 8,
- packed_offset: 0,
- data,
- },
- _ => unreachable!("invalid level encoding: {}", encoding),
+ fn new() -> Self {
+ Self {
+ data: ByteBufferPtr::new(vec![]),
+ data_offset: 0,
+ rle_left: 0,
+ rle_value: false,
+ packed_count: 0,
+ packed_offset: 0,
}
}
+ fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
+ self.rle_left = 0;
+ self.rle_value = false;
+ self.packed_offset = 0;
+ self.packed_count = match encoding {
+ Encoding::RLE => 0,
+ Encoding::BIT_PACKED => data.len() * 8,
+ _ => unreachable!("invalid level encoding: {}", encoding),
+ };
+ self.data = data;
+ self.data_offset = 0;
+ }
+
fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) ->
Result<usize> {
let mut read = 0;
while read != len {
@@ -381,7 +373,8 @@ mod tests {
assert_eq!(expected.len(), len);
let encoded = encoder.consume().unwrap();
- let mut decoder = PackedDecoder::new(Encoding::RLE,
ByteBufferPtr::new(encoded));
+ let mut decoder = PackedDecoder::new();
+ decoder.set_data(Encoding::RLE, ByteBufferPtr::new(encoded));
// Decode data in random length intervals
let mut decoded = BooleanBufferBuilder::new(len);
diff --git a/parquet/src/arrow/record_reader/mod.rs
b/parquet/src/arrow/record_reader/mod.rs
index a7cd38d3c..04499997e 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -62,6 +62,7 @@ pub struct GenericRecordReader<V, CV> {
/// Number of records accumulated in records
num_records: usize,
+
/// Number of values `num_records` contains.
num_values: usize,
@@ -76,33 +77,8 @@ where
{
/// Create a new [`GenericRecordReader`]
pub fn new(desc: ColumnDescPtr) -> Self {
- Self::new_with_options(desc, false)
- }
-
- /// Create a new [`GenericRecordReader`] with the ability to only generate
the bitmask
- ///
- /// If `null_mask_only` is true only the null bitmask will be generated and
- /// [`Self::consume_def_levels`] and [`Self::consume_rep_levels`] will
always return `None`
- ///
- /// It is insufficient to solely check that that the max definition level
is 1 as we
- /// need there to be no nullable parent array that will required decoded
definition levels
- ///
- /// In particular consider the case of:
- ///
- /// ```ignore
- /// message nested {
- /// OPTIONAL Group group {
- /// REQUIRED INT32 leaf;
- /// }
- /// }
- /// ```
- ///
- /// The maximum definition level of leaf is 1, however, we still need to
decode the
- /// definition levels so that the parent group can be constructed correctly
- ///
- pub(crate) fn new_with_options(desc: ColumnDescPtr, null_mask_only: bool)
-> Self {
let def_levels = (desc.max_def_level() > 0)
- .then(|| DefinitionLevelBuffer::new(&desc, null_mask_only));
+ .then(|| DefinitionLevelBuffer::new(&desc,
packed_null_mask(&desc)));
let rep_levels = (desc.max_rep_level() > 0).then(ScalarBuffer::new);
@@ -120,9 +96,25 @@ where
/// Set the current page reader.
pub fn set_page_reader(&mut self, page_reader: Box<dyn PageReader>) ->
Result<()> {
- self.column_reader = Some(GenericColumnReader::new(
+ let descr = &self.column_desc;
+ let values_decoder = CV::new(descr);
+
+ let def_level_decoder = (descr.max_def_level() != 0).then(|| {
+ DefinitionLevelBufferDecoder::new(
+ descr.max_def_level(),
+ packed_null_mask(descr),
+ )
+ });
+
+ let rep_level_decoder = (descr.max_rep_level() != 0)
+ .then(|| ColumnLevelDecoderImpl::new(descr.max_rep_level()));
+
+ self.column_reader = Some(GenericColumnReader::new_with_decoders(
self.column_desc.clone(),
page_reader,
+ values_decoder,
+ def_level_decoder,
+ rep_level_decoder,
));
Ok(())
}
@@ -392,6 +384,15 @@ where
}
}
+/// Returns true if we do not need to unpack the nullability for this column,
this is
+/// only possible if the max defiition level is 1, and corresponds to nulls at
the
+/// leaf level, as opposed to a nullable parent nested type
+fn packed_null_mask(descr: &ColumnDescPtr) -> bool {
+ descr.max_def_level() == 1
+ && descr.max_rep_level() == 0
+ && descr.self_type().is_optional()
+}
+
#[cfg(test)]
mod tests {
use std::sync::Arc;
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index cddd45730..ea00bcf1b 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -22,8 +22,8 @@ use std::cmp::min;
use super::page::{Page, PageReader};
use crate::basic::*;
use crate::column::reader::decoder::{
- ColumnValueDecoder, DefinitionLevelDecoder, LevelsBufferSlice,
- RepetitionLevelDecoder, ValuesBufferSlice,
+ ColumnLevelDecoderImpl, ColumnValueDecoder, ColumnValueDecoderImpl,
+ DefinitionLevelDecoder, LevelsBufferSlice, RepetitionLevelDecoder,
ValuesBufferSlice,
};
use crate::data_type::*;
use crate::errors::{ParquetError, Result};
@@ -103,9 +103,9 @@ pub fn get_typed_column_reader<T: DataType>(
/// Typed value reader for a particular primitive column.
pub type ColumnReaderImpl<T> = GenericColumnReader<
- decoder::ColumnLevelDecoderImpl,
- decoder::ColumnLevelDecoderImpl,
- decoder::ColumnValueDecoderImpl<T>,
+ ColumnLevelDecoderImpl,
+ ColumnLevelDecoderImpl,
+ ColumnValueDecoderImpl<T>,
>;
/// Reads data for a given column chunk, using the provided decoders:
@@ -135,27 +135,47 @@ pub struct GenericColumnReader<R, D, V> {
values_decoder: V,
}
-impl<R, D, V> GenericColumnReader<R, D, V>
+impl<V> GenericColumnReader<ColumnLevelDecoderImpl, ColumnLevelDecoderImpl, V>
where
- R: RepetitionLevelDecoder,
- D: DefinitionLevelDecoder,
V: ColumnValueDecoder,
{
/// Creates new column reader based on column descriptor and page reader.
pub fn new(descr: ColumnDescPtr, page_reader: Box<dyn PageReader>) -> Self
{
let values_decoder = V::new(&descr);
- Self::new_with_decoder(descr, page_reader, values_decoder)
+
+ let def_level_decoder = (descr.max_def_level() != 0)
+ .then(|| ColumnLevelDecoderImpl::new(descr.max_def_level()));
+
+ let rep_level_decoder = (descr.max_rep_level() != 0)
+ .then(|| ColumnLevelDecoderImpl::new(descr.max_rep_level()));
+
+ Self::new_with_decoders(
+ descr,
+ page_reader,
+ values_decoder,
+ def_level_decoder,
+ rep_level_decoder,
+ )
}
+}
- fn new_with_decoder(
+impl<R, D, V> GenericColumnReader<R, D, V>
+where
+ R: RepetitionLevelDecoder,
+ D: DefinitionLevelDecoder,
+ V: ColumnValueDecoder,
+{
+ pub(crate) fn new_with_decoders(
descr: ColumnDescPtr,
page_reader: Box<dyn PageReader>,
values_decoder: V,
+ def_level_decoder: Option<D>,
+ rep_level_decoder: Option<R>,
) -> Self {
Self {
descr,
- def_level_decoder: None,
- rep_level_decoder: None,
+ def_level_decoder,
+ rep_level_decoder,
page_reader,
num_buffered_values: 0,
num_decoded_values: 0,
@@ -175,7 +195,6 @@ where
///
/// `values` will be contiguously populated with the non-null values. Note
that if the column
/// is not required, this may be less than either `batch_size` or the
number of levels read
- #[inline]
pub fn read_batch(
&mut self,
batch_size: usize,
@@ -383,10 +402,10 @@ where
)?;
offset += bytes_read;
- let decoder =
- R::new(max_rep_level, rep_level_encoding,
level_data);
-
- self.rep_level_decoder = Some(decoder);
+ self.rep_level_decoder
+ .as_mut()
+ .unwrap()
+ .set_data(rep_level_encoding, level_data);
}
if max_def_level > 0 {
@@ -398,10 +417,10 @@ where
)?;
offset += bytes_read;
- let decoder =
- D::new(max_def_level, def_level_encoding,
level_data);
-
- self.def_level_decoder = Some(decoder);
+ self.def_level_decoder
+ .as_mut()
+ .unwrap()
+ .set_data(def_level_encoding, level_data);
}
self.values_decoder.set_data(
@@ -434,26 +453,22 @@ where
// DataPage v2 only supports RLE encoding for
repetition
// levels
if self.descr.max_rep_level() > 0 {
- let decoder = R::new(
- self.descr.max_rep_level(),
+
self.rep_level_decoder.as_mut().unwrap().set_data(
Encoding::RLE,
buf.range(0, rep_levels_byte_len as usize),
);
- self.rep_level_decoder = Some(decoder);
}
// DataPage v2 only supports RLE encoding for
definition
// levels
if self.descr.max_def_level() > 0 {
- let decoder = D::new(
- self.descr.max_def_level(),
+
self.def_level_decoder.as_mut().unwrap().set_data(
Encoding::RLE,
buf.range(
rep_levels_byte_len as usize,
def_levels_byte_len as usize,
),
);
- self.def_level_decoder = Some(decoder);
}
self.values_decoder.set_data(
diff --git a/parquet/src/column/reader/decoder.rs
b/parquet/src/column/reader/decoder.rs
index 53f7e2943..5879c6180 100644
--- a/parquet/src/column/reader/decoder.rs
+++ b/parquet/src/column/reader/decoder.rs
@@ -66,8 +66,8 @@ impl<T> ValuesBufferSlice for [T] {
pub trait ColumnLevelDecoder {
type Slice: LevelsBufferSlice + ?Sized;
- /// Create a new [`ColumnLevelDecoder`]
- fn new(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self;
+ /// Set data for this [`ColumnLevelDecoder`]
+ fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr);
/// Read level data into `out[range]` returning the number of levels read
///
@@ -266,7 +266,18 @@ impl<T: DataType> ColumnValueDecoder for
ColumnValueDecoderImpl<T> {
/// An implementation of [`ColumnLevelDecoder`] for `[i16]`
pub struct ColumnLevelDecoderImpl {
- inner: LevelDecoderInner,
+ decoder: Option<LevelDecoderInner>,
+ bit_width: u8,
+}
+
+impl ColumnLevelDecoderImpl {
+ pub fn new(max_level: i16) -> Self {
+ let bit_width = num_required_bits(max_level as u64);
+ Self {
+ decoder: None,
+ bit_width,
+ }
+ }
}
enum LevelDecoderInner {
@@ -277,25 +288,25 @@ enum LevelDecoderInner {
impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
type Slice = [i16];
- fn new(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self {
- let bit_width = num_required_bits(max_level as u64);
+ fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
match encoding {
Encoding::RLE => {
- let mut decoder = RleDecoder::new(bit_width);
+ let mut decoder = RleDecoder::new(self.bit_width);
decoder.set_data(data);
- Self {
- inner: LevelDecoderInner::Rle(decoder),
- }
+ self.decoder = Some(LevelDecoderInner::Rle(decoder));
+ }
+ Encoding::BIT_PACKED => {
+ self.decoder = Some(LevelDecoderInner::Packed(
+ BitReader::new(data),
+ self.bit_width,
+ ));
}
- Encoding::BIT_PACKED => Self {
- inner: LevelDecoderInner::Packed(BitReader::new(data),
bit_width),
- },
_ => unreachable!("invalid level encoding: {}", encoding),
}
}
fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) ->
Result<usize> {
- match &mut self.inner {
+ match self.decoder.as_mut().unwrap() {
LevelDecoderInner::Packed(reader, bit_width) => {
Ok(reader.get_batch::<i16>(&mut out[range], *bit_width as
usize))
}