This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 231cf78  Improve parquet reading performance for columns with nulls by 
preserving bitmask when possible (#1037) (#1054)
231cf78 is described below

commit 231cf788879f12b3cfbc6da776e2360117567d17
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Jan 13 15:18:55 2022 +0000

    Improve parquet reading performance for columns with nulls by preserving 
bitmask when possible (#1037) (#1054)
    
    * Preserve bitmask (#1037)
    
    * Remove now unnecessary box (#1061)
    
    * Fix handling of empty bitmasks
    
    * More docs
    
    * Add nested nullability test case
    
    * Add packed decoder test
---
 arrow/src/array/builder.rs                         |   5 +
 parquet/benches/arrow_array_reader.rs              |   9 +-
 parquet/src/arrow/array_reader.rs                  |  92 +++--
 parquet/src/arrow/arrow_reader.rs                  |  55 ++-
 parquet/src/arrow/record_reader.rs                 |  37 +-
 parquet/src/arrow/record_reader/buffer.rs          |   5 +
 .../src/arrow/record_reader/definition_levels.rs   | 449 +++++++++++++++++++--
 parquet/src/column/reader/decoder.rs               |   5 +-
 8 files changed, 571 insertions(+), 86 deletions(-)

diff --git a/arrow/src/array/builder.rs b/arrow/src/array/builder.rs
index 1f7e91f..85c013c 100644
--- a/arrow/src/array/builder.rs
+++ b/arrow/src/array/builder.rs
@@ -419,6 +419,11 @@ impl BooleanBufferBuilder {
         );
     }
 
+    /// Returns the packed bits
+    pub fn as_slice(&self) -> &[u8] {
+        self.buffer.as_slice()
+    }
+
     #[inline]
     pub fn finish(&mut self) -> Buffer {
         let buf = std::mem::replace(&mut self.buffer, MutableBuffer::new(0));
diff --git a/parquet/benches/arrow_array_reader.rs 
b/parquet/benches/arrow_array_reader.rs
index acc5141..5587b52 100644
--- a/parquet/benches/arrow_array_reader.rs
+++ b/parquet/benches/arrow_array_reader.rs
@@ -301,8 +301,13 @@ fn create_int32_primitive_array_reader(
     column_desc: ColumnDescPtr,
 ) -> impl ArrayReader {
     use parquet::arrow::array_reader::PrimitiveArrayReader;
-    PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), 
column_desc, None)
-        .unwrap()
+    PrimitiveArrayReader::<Int32Type>::new_with_options(
+        Box::new(page_iterator),
+        column_desc,
+        None,
+        true,
+    )
+    .unwrap()
 }
 
 fn create_string_arrow_array_reader(
diff --git a/parquet/src/arrow/array_reader.rs 
b/parquet/src/arrow/array_reader.rs
index d166be7..6ba08f9 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader.rs
@@ -248,6 +248,17 @@ where
         column_desc: ColumnDescPtr,
         arrow_type: Option<ArrowType>,
     ) -> Result<Self> {
+        Self::new_with_options(pages, column_desc, arrow_type, false)
+    }
+
+    /// Construct primitive array reader with ability to only compute null 
mask and not
+    /// buffer level data
+    pub fn new_with_options(
+        pages: Box<dyn PageIterator>,
+        column_desc: ColumnDescPtr,
+        arrow_type: Option<ArrowType>,
+        null_mask_only: bool,
+    ) -> Result<Self> {
         // Check if Arrow type is specified, else create it from Parquet type
         let data_type = match arrow_type {
             Some(t) => t,
@@ -256,7 +267,7 @@ where
                 .clone(),
         };
 
-        let record_reader = RecordReader::<T>::new(column_desc.clone());
+        let record_reader = 
RecordReader::<T>::new_with_options(column_desc.clone(), null_mask_only);
 
         Ok(Self {
             data_type,
@@ -1350,19 +1361,26 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a 
ArrayReaderBuilderContext
             let mut new_context = context.clone();
             new_context.path.append(vec![cur_type.name().to_string()]);
 
-            match cur_type.get_basic_info().repetition() {
+            let null_mask_only = match cur_type.get_basic_info().repetition() {
                 Repetition::REPEATED => {
                     new_context.def_level += 1;
                     new_context.rep_level += 1;
+                    false
                 }
                 Repetition::OPTIONAL => {
                     new_context.def_level += 1;
+
+                    // Can just compute null mask if no parent
+                    context.def_level == 0 && context.rep_level == 0
                 }
-                _ => (),
-            }
+                _ => false,
+            };
 
-            let reader =
-                self.build_for_primitive_type_inner(cur_type.clone(), 
&new_context)?;
+            let reader = self.build_for_primitive_type_inner(
+                cur_type.clone(),
+                &new_context,
+                null_mask_only,
+            )?;
 
             if cur_type.get_basic_info().repetition() == Repetition::REPEATED {
                 Err(ArrowError(
@@ -1641,6 +1659,7 @@ impl<'a> ArrayReaderBuilder {
         &self,
         cur_type: TypePtr,
         context: &'a ArrayReaderBuilderContext,
+        null_mask_only: bool,
     ) -> Result<Box<dyn ArrayReader>> {
         let column_desc = Arc::new(ColumnDescriptor::new(
             cur_type.clone(),
@@ -1658,11 +1677,14 @@ impl<'a> ArrayReaderBuilder {
             .map(|f| f.data_type().clone());
 
         match cur_type.get_physical_type() {
-            PhysicalType::BOOLEAN => 
Ok(Box::new(PrimitiveArrayReader::<BoolType>::new(
-                page_iterator,
-                column_desc,
-                arrow_type,
-            )?)),
+            PhysicalType::BOOLEAN => Ok(Box::new(
+                PrimitiveArrayReader::<BoolType>::new_with_options(
+                    page_iterator,
+                    column_desc,
+                    arrow_type,
+                    null_mask_only,
+                )?,
+            )),
             PhysicalType::INT32 => {
                 if let Some(ArrowType::Null) = arrow_type {
                     Ok(Box::new(NullArrayReader::<Int32Type>::new(
@@ -1670,18 +1692,24 @@ impl<'a> ArrayReaderBuilder {
                         column_desc,
                     )?))
                 } else {
-                    Ok(Box::new(PrimitiveArrayReader::<Int32Type>::new(
-                        page_iterator,
-                        column_desc,
-                        arrow_type,
-                    )?))
+                    Ok(Box::new(
+                        PrimitiveArrayReader::<Int32Type>::new_with_options(
+                            page_iterator,
+                            column_desc,
+                            arrow_type,
+                            null_mask_only,
+                        )?,
+                    ))
                 }
             }
-            PhysicalType::INT64 => 
Ok(Box::new(PrimitiveArrayReader::<Int64Type>::new(
-                page_iterator,
-                column_desc,
-                arrow_type,
-            )?)),
+            PhysicalType::INT64 => Ok(Box::new(
+                PrimitiveArrayReader::<Int64Type>::new_with_options(
+                    page_iterator,
+                    column_desc,
+                    arrow_type,
+                    null_mask_only,
+                )?,
+            )),
             PhysicalType::INT96 => {
                 // get the optional timezone information from arrow type
                 let timezone = arrow_type
@@ -1705,18 +1733,22 @@ impl<'a> ArrayReaderBuilder {
                     arrow_type,
                 )?))
             }
-            PhysicalType::FLOAT => 
Ok(Box::new(PrimitiveArrayReader::<FloatType>::new(
-                page_iterator,
-                column_desc,
-                arrow_type,
-            )?)),
-            PhysicalType::DOUBLE => {
-                Ok(Box::new(PrimitiveArrayReader::<DoubleType>::new(
+            PhysicalType::FLOAT => Ok(Box::new(
+                PrimitiveArrayReader::<FloatType>::new_with_options(
                     page_iterator,
                     column_desc,
                     arrow_type,
-                )?))
-            }
+                    null_mask_only,
+                )?,
+            )),
+            PhysicalType::DOUBLE => Ok(Box::new(
+                PrimitiveArrayReader::<DoubleType>::new_with_options(
+                    page_iterator,
+                    column_desc,
+                    arrow_type,
+                    null_mask_only,
+                )?,
+            )),
             PhysicalType::BYTE_ARRAY => {
                 if cur_type.get_basic_info().converted_type() == 
ConvertedType::UTF8 {
                     if let Some(ArrowType::LargeUtf8) = arrow_type {
diff --git a/parquet/src/arrow/arrow_reader.rs 
b/parquet/src/arrow/arrow_reader.rs
index fd09aa9..066f86c 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -249,10 +249,11 @@ mod tests {
     use crate::file::properties::{WriterProperties, WriterVersion};
     use crate::file::reader::{FileReader, SerializedFileReader};
     use crate::file::writer::{FileWriter, SerializedFileWriter};
+    use crate::schema::parser::parse_message_type;
     use crate::schema::types::{Type, TypePtr};
-    use crate::util::test_common::{get_temp_filename, RandGen};
+    use crate::util::test_common::{get_temp_file, get_temp_filename, RandGen};
     use arrow::array::*;
-    use arrow::datatypes::DataType as ArrowDataType;
+    use arrow::datatypes::{DataType as ArrowDataType, Field};
     use arrow::record_batch::RecordBatchReader;
     use rand::{thread_rng, RngCore};
     use serde_json::json;
@@ -916,4 +917,54 @@ mod tests {
             batch.unwrap();
         }
     }
+
+    #[test]
+    fn test_nested_nullability() {
+        let message_type = "message nested {
+          OPTIONAL Group group {
+            REQUIRED INT32 leaf;
+          }
+        }";
+
+        let file = get_temp_file("nested_nullability.parquet", &[]);
+        let schema = Arc::new(parse_message_type(message_type).unwrap());
+
+        {
+            // Write using low-level parquet API (#1167)
+            let writer_props = Arc::new(WriterProperties::builder().build());
+            let mut writer = SerializedFileWriter::new(
+                file.try_clone().unwrap(),
+                schema,
+                writer_props,
+            )
+            .unwrap();
+
+            let mut row_group_writer = writer.next_row_group().unwrap();
+            let mut column_writer = 
row_group_writer.next_column().unwrap().unwrap();
+
+            get_typed_column_writer_mut::<Int32Type>(&mut column_writer)
+                .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
+                .unwrap();
+
+            row_group_writer.close_column(column_writer).unwrap();
+            writer.close_row_group(row_group_writer).unwrap();
+
+            writer.close().unwrap();
+        }
+
+        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
+        let mut batch = ParquetFileArrowReader::new(file_reader);
+        let reader = batch.get_record_reader_by_columns(vec![0], 
1024).unwrap();
+
+        let expected_schema = arrow::datatypes::Schema::new(vec![Field::new(
+            "group",
+            ArrowDataType::Struct(vec![Field::new("leaf", 
ArrowDataType::Int32, false)]),
+            true,
+        )]);
+
+        let batch = reader.into_iter().next().unwrap().unwrap();
+        assert_eq!(batch.schema().as_ref(), &expected_schema);
+        assert_eq!(batch.num_rows(), 4);
+        assert_eq!(batch.column(0).data().null_count(), 2);
+    }
 }
diff --git a/parquet/src/arrow/record_reader.rs 
b/parquet/src/arrow/record_reader.rs
index 4913e14..df93ebf 100644
--- a/parquet/src/arrow/record_reader.rs
+++ b/parquet/src/arrow/record_reader.rs
@@ -73,9 +73,35 @@ where
     V: ValuesBuffer + Default,
     CV: ColumnValueDecoder<Slice = V::Slice>,
 {
+    /// Create a new [`GenericRecordReader`]
     pub fn new(desc: ColumnDescPtr) -> Self {
-        let def_levels =
-            (desc.max_def_level() > 0).then(|| 
DefinitionLevelBuffer::new(&desc));
+        Self::new_with_options(desc, false)
+    }
+
+    /// Create a new [`GenericRecordReader`] with the ability to only generate 
the bitmask
+    ///
+    /// If `null_mask_only` is true only the null bitmask will be generated and
+    /// [`Self::consume_def_levels`] and [`Self::consume_rep_levels`] will 
always return `None`
+    ///
+    /// It is insufficient to solely check that that the max definition level 
is 1 as we
+    /// need there to be no nullable parent array that will required decoded 
definition levels
+    ///
+    /// In particular consider the case of:
+    ///
+    /// ```ignore
+    /// message nested {
+    ///   OPTIONAL Group group {
+    ///     REQUIRED INT32 leaf;
+    ///   }
+    /// }
+    /// ```
+    ///
+    /// The maximum definition level of leaf is 1, however, we still need to 
decode the
+    /// definition levels so that the parent group can be constructed correctly
+    ///
+    pub(crate) fn new_with_options(desc: ColumnDescPtr, null_mask_only: bool) 
-> Self {
+        let def_levels = (desc.max_def_level() > 0)
+            .then(|| DefinitionLevelBuffer::new(&desc, null_mask_only));
 
         let rep_levels = (desc.max_rep_level() > 0).then(ScalarBuffer::new);
 
@@ -171,7 +197,7 @@ where
     /// as record values, e.g. those from `self.num_values` to 
`self.values_written`.
     pub fn consume_def_levels(&mut self) -> Result<Option<Buffer>> {
         Ok(match self.def_levels.as_mut() {
-            Some(x) => Some(x.split_off(self.num_values)),
+            Some(x) => x.split_levels(self.num_values),
             None => None,
         })
     }
@@ -221,10 +247,7 @@ where
             .as_mut()
             .map(|levels| levels.spare_capacity_mut(batch_size));
 
-        let def_levels = self
-            .def_levels
-            .as_mut()
-            .map(|levels| levels.spare_capacity_mut(batch_size));
+        let def_levels = self.def_levels.as_mut();
 
         let values = self.records.spare_capacity_mut(batch_size);
 
diff --git a/parquet/src/arrow/record_reader/buffer.rs 
b/parquet/src/arrow/record_reader/buffer.rs
index 7dbf2d1..29e6110 100644
--- a/parquet/src/arrow/record_reader/buffer.rs
+++ b/parquet/src/arrow/record_reader/buffer.rs
@@ -114,6 +114,11 @@ impl<T: ScalarValue> ScalarBuffer<T> {
         self.len == 0
     }
 
+    pub fn resize(&mut self, len: usize) {
+        self.buffer.resize(len * std::mem::size_of::<T>(), 0);
+        self.len = len;
+    }
+
     #[inline]
     pub fn as_slice(&self) -> &[T] {
         let (prefix, buf, suffix) = unsafe { 
self.buffer.as_slice().align_to::<T>() };
diff --git a/parquet/src/arrow/record_reader/definition_levels.rs 
b/parquet/src/arrow/record_reader/definition_levels.rs
index 86c089f..c38505a 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -15,74 +15,114 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::ops::Range;
+
 use arrow::array::BooleanBufferBuilder;
 use arrow::bitmap::Bitmap;
 use arrow::buffer::Buffer;
-use std::ops::Range;
+use arrow::util::bit_chunk_iterator::BitChunks;
 
-use crate::column::reader::decoder::ColumnLevelDecoderImpl;
+use crate::arrow::record_reader::buffer::BufferQueue;
+use crate::basic::Encoding;
+use crate::column::reader::decoder::{
+    ColumnLevelDecoder, ColumnLevelDecoderImpl, LevelsBufferSlice,
+};
+use crate::errors::{ParquetError, Result};
 use crate::schema::types::ColumnDescPtr;
+use crate::util::memory::ByteBufferPtr;
 
-use super::{
-    buffer::{BufferQueue, ScalarBuffer},
-    MIN_BATCH_SIZE,
-};
+use super::{buffer::ScalarBuffer, MIN_BATCH_SIZE};
+
+enum BufferInner {
+    /// Compute levels and null mask
+    Full {
+        levels: ScalarBuffer<i16>,
+        nulls: BooleanBufferBuilder,
+        max_level: i16,
+    },
+    /// Only compute null bitmask - requires max level to be 1
+    ///
+    /// This is an optimisation for the common case of a nullable scalar 
column, as decoding
+    /// the definition level data is only required when decoding nested 
structures
+    ///
+    Mask { nulls: BooleanBufferBuilder },
+}
 
 pub struct DefinitionLevelBuffer {
-    buffer: ScalarBuffer<i16>,
-    builder: BooleanBufferBuilder,
-    max_level: i16,
+    inner: BufferInner,
+
+    /// The length of this buffer
+    ///
+    /// Note: `buffer` and `builder` may contain more elements
+    len: usize,
 }
 
-impl BufferQueue for DefinitionLevelBuffer {
-    type Output = Buffer;
-    type Slice = [i16];
+impl DefinitionLevelBuffer {
+    pub fn new(desc: &ColumnDescPtr, null_mask_only: bool) -> Self {
+        let inner = match null_mask_only {
+            true => {
+                assert_eq!(
+                    desc.max_def_level(),
+                    1,
+                    "max definition level must be 1 to only compute null 
bitmask"
+                );
 
-    fn split_off(&mut self, len: usize) -> Self::Output {
-        self.buffer.split_off(len)
-    }
+                assert_eq!(
+                    desc.max_rep_level(),
+                    0,
+                    "max repetition level must be 0 to only compute null 
bitmask"
+                );
 
-    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice {
-        assert_eq!(self.buffer.len(), self.builder.len());
-        self.buffer.spare_capacity_mut(batch_size)
-    }
+                BufferInner::Mask {
+                    nulls: BooleanBufferBuilder::new(0),
+                }
+            }
+            false => BufferInner::Full {
+                levels: ScalarBuffer::new(),
+                nulls: BooleanBufferBuilder::new(0),
+                max_level: desc.max_def_level(),
+            },
+        };
 
-    fn set_len(&mut self, len: usize) {
-        self.buffer.set_len(len);
-        let buf = self.buffer.as_slice();
+        Self { inner, len: 0 }
+    }
 
-        let range = self.builder.len()..len;
-        self.builder.reserve(range.end - range.start);
-        for i in &buf[range] {
-            self.builder.append(*i == self.max_level)
+    pub fn split_levels(&mut self, len: usize) -> Option<Buffer> {
+        match &mut self.inner {
+            BufferInner::Full { levels, .. } => {
+                let out = levels.split_off(len);
+                self.len = levels.len();
+                Some(out)
+            }
+            BufferInner::Mask { .. } => None,
         }
     }
-}
 
-impl DefinitionLevelBuffer {
-    pub fn new(desc: &ColumnDescPtr) -> Self {
-        Self {
-            buffer: ScalarBuffer::new(),
-            builder: BooleanBufferBuilder::new(0),
-            max_level: desc.max_def_level(),
-        }
+    pub fn set_len(&mut self, len: usize) {
+        assert_eq!(self.nulls().len(), len);
+        self.len = len;
     }
 
     /// Split `len` levels out of `self`
     pub fn split_bitmask(&mut self, len: usize) -> Bitmap {
-        let old_len = self.builder.len();
+        let builder = match &mut self.inner {
+            BufferInner::Full { nulls, .. } => nulls,
+            BufferInner::Mask { nulls } => nulls,
+        };
+
+        let old_len = builder.len();
         let num_left_values = old_len - len;
         let new_bitmap_builder =
             BooleanBufferBuilder::new(MIN_BATCH_SIZE.max(num_left_values));
 
-        let old_bitmap =
-            std::mem::replace(&mut self.builder, new_bitmap_builder).finish();
+        let old_bitmap = std::mem::replace(builder, 
new_bitmap_builder).finish();
         let old_bitmap = Bitmap::from(old_bitmap);
 
         for i in len..old_len {
-            self.builder.append(old_bitmap.is_set(i));
+            builder.append(old_bitmap.is_set(i));
         }
 
+        self.len = builder.len();
         old_bitmap
     }
 
@@ -91,10 +131,335 @@ impl DefinitionLevelBuffer {
         &self,
         range: Range<usize>,
     ) -> impl Iterator<Item = usize> + '_ {
-        let max_def_level = self.max_level;
-        let slice = self.buffer.as_slice();
-        range.rev().filter(move |x| slice[*x] == max_def_level)
+        assert_eq!(range.start, self.len);
+        iter_set_bits_rev(self.nulls().as_slice())
+    }
+
+    fn nulls(&self) -> &BooleanBufferBuilder {
+        match &self.inner {
+            BufferInner::Full { nulls, .. } => nulls,
+            BufferInner::Mask { nulls } => nulls,
+        }
+    }
+}
+
+impl LevelsBufferSlice for DefinitionLevelBuffer {
+    fn capacity(&self) -> usize {
+        usize::MAX
+    }
+
+    fn count_nulls(&self, range: Range<usize>, _max_level: i16) -> usize {
+        let total_count = range.end - range.start;
+        let range = range.start + self.len..range.end + self.len;
+        total_count - count_set_bits(self.nulls().as_slice(), range)
+    }
+}
+
+pub struct DefinitionLevelDecoder {
+    max_level: i16,
+    encoding: Encoding,
+    data: Option<ByteBufferPtr>,
+    column_decoder: Option<ColumnLevelDecoderImpl>,
+    packed_decoder: Option<PackedDecoder>,
+}
+
+impl ColumnLevelDecoder for DefinitionLevelDecoder {
+    type Slice = DefinitionLevelBuffer;
+
+    fn new(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self {
+        Self {
+            max_level,
+            encoding,
+            data: Some(data),
+            column_decoder: None,
+            packed_decoder: None,
+        }
+    }
+
+    fn read(
+        &mut self,
+        writer: &mut Self::Slice,
+        range: Range<usize>,
+    ) -> crate::errors::Result<usize> {
+        match &mut writer.inner {
+            BufferInner::Full {
+                levels,
+                nulls,
+                max_level,
+            } => {
+                assert_eq!(self.max_level, *max_level);
+                assert_eq!(range.start + writer.len, nulls.len());
+
+                let decoder = match self.data.take() {
+                    Some(data) => self.column_decoder.insert(
+                        ColumnLevelDecoderImpl::new(self.max_level, 
self.encoding, data),
+                    ),
+                    None => self
+                        .column_decoder
+                        .as_mut()
+                        .expect("consistent null_mask_only"),
+                };
+
+                levels.resize(range.end + writer.len);
+
+                let slice = &mut levels.as_slice_mut()[writer.len..];
+                let levels_read = decoder.read(slice, range.clone())?;
+
+                nulls.reserve(levels_read);
+                for i in &slice[range.start..range.start + levels_read] {
+                    nulls.append(i == max_level)
+                }
+
+                Ok(levels_read)
+            }
+            BufferInner::Mask { nulls } => {
+                assert_eq!(self.max_level, 1);
+                assert_eq!(range.start + writer.len, nulls.len());
+
+                let decoder = match self.data.take() {
+                    Some(data) => self
+                        .packed_decoder
+                        .insert(PackedDecoder::new(self.encoding, data)),
+                    None => self
+                        .packed_decoder
+                        .as_mut()
+                        .expect("consistent null_mask_only"),
+                };
+
+                decoder.read(nulls, range.end - range.start)
+            }
+        }
+    }
+}
+
+/// An optimized decoder for decoding [RLE] and [BIT_PACKED] data with a bit 
width of 1
+/// directly into a bitmask
+///
+/// This is significantly faster than decoding the data into `[i16]` and then 
computing
+/// a bitmask from this, as not only can it skip this buffer allocation and 
construction,
+/// but it can exploit properties of the encoded data to reduce work further
+///
+/// In particular:
+///
+/// * Packed runs are already bitmask encoded and can simply be appended
+/// * Runs of 1 or 0 bits can be efficiently appended with byte (or larger) 
operations
+///
+/// [RLE]: 
https://github.com/apache/parquet-format/blob/master/Encodings.md#run-length-encoding--bit-packing-hybrid-rle--3
+/// [BIT_PACKED]: 
https://github.com/apache/parquet-format/blob/master/Encodings.md#bit-packed-deprecated-bit_packed--4
+struct PackedDecoder {
+    data: ByteBufferPtr,
+    data_offset: usize,
+    rle_left: usize,
+    rle_value: bool,
+    packed_count: usize,
+    packed_offset: usize,
+}
+
+impl PackedDecoder {
+    fn next_rle_block(&mut self) -> Result<()> {
+        let indicator_value = self.decode_header()?;
+        if indicator_value & 1 == 1 {
+            let len = (indicator_value >> 1) as usize;
+            self.packed_count = len * 8;
+            self.packed_offset = 0;
+        } else {
+            self.rle_left = (indicator_value >> 1) as usize;
+            let byte = *self.data.as_ref().get(self.data_offset).ok_or_else(|| 
{
+                ParquetError::EOF(
+                    "unexpected end of file whilst decoding definition levels 
rle value"
+                        .into(),
+                )
+            })?;
+
+            self.data_offset += 1;
+            self.rle_value = byte != 0;
+        }
+        Ok(())
+    }
+
+    /// Decodes a VLQ encoded little endian integer and returns it
+    fn decode_header(&mut self) -> Result<i64> {
+        let mut offset = 0;
+        let mut v: i64 = 0;
+        while offset < 10 {
+            let byte = *self
+                .data
+                .as_ref()
+                .get(self.data_offset + offset)
+                .ok_or_else(|| {
+                    ParquetError::EOF(
+                        "unexpected end of file whilst decoding definition 
levels rle header"
+                            .into(),
+                    )
+                })?;
+
+            v |= ((byte & 0x7F) as i64) << (offset * 7);
+            offset += 1;
+            if byte & 0x80 == 0 {
+                self.data_offset += offset;
+                return Ok(v);
+            }
+        }
+        Err(general_err!("too many bytes for VLQ"))
+    }
+}
+
+impl PackedDecoder {
+    fn new(encoding: Encoding, data: ByteBufferPtr) -> Self {
+        match encoding {
+            Encoding::RLE => Self {
+                data,
+                data_offset: 0,
+                rle_left: 0,
+                rle_value: false,
+                packed_count: 0,
+                packed_offset: 0,
+            },
+            Encoding::BIT_PACKED => Self {
+                data_offset: 0,
+                rle_left: 0,
+                rle_value: false,
+                packed_count: data.len() * 8,
+                packed_offset: 0,
+                data,
+            },
+            _ => unreachable!("invalid level encoding: {}", encoding),
+        }
+    }
+
+    fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) -> 
Result<usize> {
+        let mut read = 0;
+        while read != len {
+            if self.rle_left != 0 {
+                let to_read = self.rle_left.min(len - read);
+                buffer.append_n(to_read, self.rle_value);
+                self.rle_left -= to_read;
+                read += to_read;
+            } else if self.packed_count != self.packed_offset {
+                let to_read = (self.packed_count - self.packed_offset).min(len 
- read);
+                let offset = self.data_offset * 8 + self.packed_offset;
+                buffer.append_packed_range(offset..offset + to_read, 
self.data.as_ref());
+                self.packed_offset += to_read;
+                read += to_read;
+
+                if self.packed_offset == self.packed_count {
+                    self.data_offset += self.packed_count / 8;
+                }
+            } else if self.data_offset == self.data.len() {
+                break;
+            } else {
+                self.next_rle_block()?
+            }
+        }
+        Ok(read)
     }
 }
 
-pub type DefinitionLevelDecoder = ColumnLevelDecoderImpl;
+/// Counts the number of set bits in the provided range
+pub fn count_set_bits(bytes: &[u8], range: Range<usize>) -> usize {
+    let mut count = 0_usize;
+    let chunks = BitChunks::new(bytes, range.start, range.end - range.start);
+    chunks.iter().for_each(|chunk| {
+        count += chunk.count_ones() as usize;
+    });
+    count += chunks.remainder_bits().count_ones() as usize;
+    count
+}
+
+fn iter_set_bits_rev(bytes: &[u8]) -> impl Iterator<Item = usize> + '_ {
+    let (mut byte_idx, mut in_progress) = match bytes.len() {
+        0 => (0, 0),
+        len => (len - 1, bytes[len - 1]),
+    };
+
+    std::iter::from_fn(move || loop {
+        if in_progress != 0 {
+            let bit_pos = 7 - in_progress.leading_zeros();
+            in_progress ^= 1 << bit_pos;
+            return Some((byte_idx << 3) + (bit_pos as usize));
+        }
+
+        if byte_idx == 0 {
+            return None;
+        }
+
+        byte_idx -= 1;
+        in_progress = bytes[byte_idx];
+    })
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use crate::encodings::rle::RleEncoder;
+    use rand::{thread_rng, Rng, RngCore};
+
+    #[test]
+    fn test_packed_decoder() {
+        let mut rng = thread_rng();
+        let len: usize = rng.gen_range(512..1024);
+
+        let mut expected = BooleanBufferBuilder::new(len);
+        let mut encoder = RleEncoder::new(1, 1024);
+        for _ in 0..len {
+            let bool = rng.gen_bool(0.8);
+            assert!(encoder.put(bool as u64).unwrap());
+            expected.append(bool);
+        }
+        assert_eq!(expected.len(), len);
+
+        let encoded = encoder.consume().unwrap();
+        let mut decoder = PackedDecoder::new(Encoding::RLE, 
ByteBufferPtr::new(encoded));
+
+        // Decode data in random length intervals
+        let mut decoded = BooleanBufferBuilder::new(len);
+        loop {
+            let remaining = len - decoded.len();
+            if remaining == 0 {
+                break;
+            }
+
+            let to_read = rng.gen_range(1..=remaining);
+            decoder.read(&mut decoded, to_read).unwrap();
+        }
+
+        assert_eq!(decoded.len(), len);
+        assert_eq!(decoded.as_slice(), expected.as_slice());
+    }
+
+    #[test]
+    fn test_bit_fns() {
+        let mut rng = thread_rng();
+        let mask_length = rng.gen_range(1..20);
+        let bools: Vec<_> = std::iter::from_fn(|| Some(rng.next_u32() & 1 == 
0))
+            .take(mask_length)
+            .collect();
+
+        let mut nulls = BooleanBufferBuilder::new(mask_length);
+        bools.iter().for_each(|b| nulls.append(*b));
+
+        let actual: Vec<_> = iter_set_bits_rev(nulls.as_slice()).collect();
+        let expected: Vec<_> = bools
+            .iter()
+            .enumerate()
+            .rev()
+            .filter_map(|(x, y)| y.then(|| x))
+            .collect();
+        assert_eq!(actual, expected);
+
+        assert_eq!(iter_set_bits_rev(&[]).count(), 0);
+        assert_eq!(count_set_bits(&[], 0..0), 0);
+        assert_eq!(count_set_bits(&[0xFF], 1..1), 0);
+
+        for _ in 0..20 {
+            let start = rng.gen_range(0..bools.len());
+            let end = rng.gen_range(start..bools.len());
+
+            let actual = count_set_bits(nulls.as_slice(), start..end);
+            let expected = bools[start..end].iter().filter(|x| **x).count();
+
+            assert_eq!(actual, expected);
+        }
+    }
+}
diff --git a/parquet/src/column/reader/decoder.rs 
b/parquet/src/column/reader/decoder.rs
index 52caba2..a9221d9 100644
--- a/parquet/src/column/reader/decoder.rs
+++ b/parquet/src/column/reader/decoder.rs
@@ -219,8 +219,7 @@ pub struct ColumnLevelDecoderImpl {
 
 enum LevelDecoderInner {
     Packed(BitReader, u8),
-    /// Boxed as `RleDecoder` contains an inline buffer
-    Rle(Box<RleDecoder>),
+    Rle(RleDecoder),
 }
 
 impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
@@ -230,7 +229,7 @@ impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
         let bit_width = crate::util::bit_util::log2(max_level as u64 + 1) as 
u8;
         match encoding {
             Encoding::RLE => {
-                let mut decoder = Box::new(RleDecoder::new(bit_width));
+                let mut decoder = RleDecoder::new(bit_width);
                 decoder.set_data(data);
                 Self {
                     inner: LevelDecoderInner::Rle(decoder),

Reply via email to