alamb commented on a change in pull request #1054:
URL: https://github.com/apache/arrow-rs/pull/1054#discussion_r783151026



##########
File path: parquet/src/arrow/record_reader.rs
##########
@@ -73,9 +73,19 @@ where
     V: ValuesBuffer + Default,
     CV: ColumnValueDecoder<Slice = V::Slice>,
 {
+    /// Create a new [`GenericRecordReader`]
     pub fn new(desc: ColumnDescPtr) -> Self {
-        let def_levels =
-            (desc.max_def_level() > 0).then(|| 
DefinitionLevelBuffer::new(&desc));
+        Self::new_with_options(desc, false)
+    }
+
+    /// Create a new [`GenericRecordReader`] with the ability to only generate 
the bitmask
+    ///
+    /// If `null_mask_only` is true only the null bitmask will be generated and
+    /// [`Self::consume_def_levels`] and [`Self::consume_rep_levels`] will 
always return `None`
+    ///
+    pub(crate) fn new_with_options(desc: ColumnDescPtr, null_mask_only: bool) 
-> Self {
+        let def_levels = (desc.max_def_level() > 0)

Review comment:
       I don't understand the use of `null_mask_only` here  -- I thought 
`null_mask_only` would be set only if `max_def_level() == `)
   
   AKA  
https://github.com/apache/arrow-rs/pull/1054/files#diff-0d6bed48d78c5a2472b7680a8185cabdc0bd259d6484e184439ed7830060661fR1374

##########
File path: parquet/src/arrow/record_reader/definition_levels.rs
##########
@@ -91,10 +127,287 @@ impl DefinitionLevelBuffer {
         &self,
         range: Range<usize>,
     ) -> impl Iterator<Item = usize> + '_ {
-        let max_def_level = self.max_level;
-        let slice = self.buffer.as_slice();
-        range.rev().filter(move |x| slice[*x] == max_def_level)
+        assert_eq!(range.start, self.len);
+        iter_set_bits_rev(self.nulls().as_slice())
+    }
+
+    fn nulls(&self) -> &BooleanBufferBuilder {
+        match &self.inner {
+            BufferInner::Full { nulls, .. } => nulls,
+            BufferInner::Mask { nulls } => nulls,
+        }
     }
 }
 
-pub type DefinitionLevelDecoder = ColumnLevelDecoderImpl;
+impl LevelsBufferSlice for DefinitionLevelBuffer {
+    fn capacity(&self) -> usize {
+        usize::MAX
+    }
+
+    fn count_nulls(&self, range: Range<usize>, _max_level: i16) -> usize {
+        let total_count = range.end - range.start;
+        let range = range.start + self.len..range.end + self.len;
+        total_count - count_set_bits(self.nulls().as_slice(), range)
+    }
+}
+
+pub struct DefinitionLevelDecoder {
+    max_level: i16,
+    encoding: Encoding,
+    data: Option<ByteBufferPtr>,
+    column_decoder: Option<ColumnLevelDecoderImpl>,
+    packed_decoder: Option<PackedDecoder>,
+}
+
+impl ColumnLevelDecoder for DefinitionLevelDecoder {
+    type Slice = DefinitionLevelBuffer;
+
+    fn new(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self {
+        Self {
+            max_level,
+            encoding,
+            data: Some(data),
+            column_decoder: None,
+            packed_decoder: None,
+        }
+    }
+
+    fn read(
+        &mut self,
+        writer: &mut Self::Slice,
+        range: Range<usize>,
+    ) -> crate::errors::Result<usize> {
+        match &mut writer.inner {
+            BufferInner::Full {
+                levels,
+                nulls,
+                max_level,
+            } => {
+                assert_eq!(self.max_level, *max_level);
+                assert_eq!(range.start + writer.len, nulls.len());
+
+                let decoder = match self.data.take() {
+                    Some(data) => self.column_decoder.insert(
+                        ColumnLevelDecoderImpl::new(self.max_level, 
self.encoding, data),
+                    ),
+                    None => self
+                        .column_decoder
+                        .as_mut()
+                        .expect("consistent null_mask_only"),
+                };
+
+                levels.resize(range.end + writer.len);
+
+                let slice = &mut levels.as_slice_mut()[writer.len..];
+                let levels_read = decoder.read(slice, range.clone())?;
+
+                nulls.reserve(levels_read);
+                for i in &slice[range.start..range.start + levels_read] {
+                    nulls.append(i == max_level)
+                }
+
+                Ok(levels_read)
+            }
+            BufferInner::Mask { nulls } => {
+                assert_eq!(self.max_level, 1);
+                assert_eq!(range.start + writer.len, nulls.len());
+
+                let decoder = match self.data.take() {
+                    Some(data) => self
+                        .packed_decoder
+                        .insert(PackedDecoder::new(self.encoding, data)),

Review comment:
       TIL: `Option::insert` 👍 

##########
File path: parquet/src/arrow/record_reader/definition_levels.rs
##########
@@ -15,74 +15,110 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::ops::Range;
+
 use arrow::array::BooleanBufferBuilder;
 use arrow::bitmap::Bitmap;
 use arrow::buffer::Buffer;
-use std::ops::Range;
+use arrow::util::bit_chunk_iterator::BitChunks;
 
-use crate::column::reader::decoder::ColumnLevelDecoderImpl;
+use crate::arrow::record_reader::buffer::BufferQueue;
+use crate::basic::Encoding;
+use crate::column::reader::decoder::{
+    ColumnLevelDecoder, ColumnLevelDecoderImpl, LevelsBufferSlice,
+};
+use crate::errors::{ParquetError, Result};
 use crate::schema::types::ColumnDescPtr;
+use crate::util::memory::ByteBufferPtr;
 
-use super::{
-    buffer::{BufferQueue, ScalarBuffer},
-    MIN_BATCH_SIZE,
-};
+use super::{buffer::ScalarBuffer, MIN_BATCH_SIZE};
+
+enum BufferInner {
+    /// Compute levels and null mask
+    Full {
+        levels: ScalarBuffer<i16>,
+        nulls: BooleanBufferBuilder,
+        max_level: i16,
+    },
+    /// Only compute null bitmask - requires max level to be 1

Review comment:
       ```suggestion
       /// Only compute null bitmask - requires max level to be 1
       /// this is an optimization for the common case of a scalar column
       /// the repetition and definition decoding is required when decoding
       /// nested structures. 
   ```

##########
File path: parquet/src/arrow/record_reader/definition_levels.rs
##########
@@ -15,74 +15,110 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::ops::Range;
+
 use arrow::array::BooleanBufferBuilder;
 use arrow::bitmap::Bitmap;
 use arrow::buffer::Buffer;
-use std::ops::Range;
+use arrow::util::bit_chunk_iterator::BitChunks;
 
-use crate::column::reader::decoder::ColumnLevelDecoderImpl;
+use crate::arrow::record_reader::buffer::BufferQueue;
+use crate::basic::Encoding;
+use crate::column::reader::decoder::{
+    ColumnLevelDecoder, ColumnLevelDecoderImpl, LevelsBufferSlice,
+};
+use crate::errors::{ParquetError, Result};
 use crate::schema::types::ColumnDescPtr;
+use crate::util::memory::ByteBufferPtr;
 
-use super::{
-    buffer::{BufferQueue, ScalarBuffer},
-    MIN_BATCH_SIZE,
-};
+use super::{buffer::ScalarBuffer, MIN_BATCH_SIZE};
+
+enum BufferInner {
+    /// Compute levels and null mask
+    Full {
+        levels: ScalarBuffer<i16>,
+        nulls: BooleanBufferBuilder,
+        max_level: i16,
+    },
+    /// Only compute null bitmask - requires max level to be 1
+    Mask { nulls: BooleanBufferBuilder },
+}
 
 pub struct DefinitionLevelBuffer {
-    buffer: ScalarBuffer<i16>,
-    builder: BooleanBufferBuilder,
-    max_level: i16,
+    inner: BufferInner,
+
+    /// The length of this buffer
+    ///
+    /// Note: `buffer` and `builder` may contain more elements
+    len: usize,
 }
 
-impl BufferQueue for DefinitionLevelBuffer {
-    type Output = Buffer;
-    type Slice = [i16];
+impl DefinitionLevelBuffer {
+    pub fn new(desc: &ColumnDescPtr, null_mask_only: bool) -> Self {
+        let inner = match null_mask_only {

Review comment:
       I wonder why `null_mask_only` is passed down all the way here only to be 
rechecked / `assert!`ed. 
   
   Would it be possible / feasible to decide here in 
`DefinitionLevelBuilder::new` to use `BufferInner::Mask` if `max_def_level()` 
is 1 and `max_rep_levels()` is 0 and thus avoid passing plumbing the argument  
around?

##########
File path: parquet/src/arrow/record_reader/definition_levels.rs
##########
@@ -91,10 +127,287 @@ impl DefinitionLevelBuffer {
         &self,
         range: Range<usize>,
     ) -> impl Iterator<Item = usize> + '_ {
-        let max_def_level = self.max_level;
-        let slice = self.buffer.as_slice();
-        range.rev().filter(move |x| slice[*x] == max_def_level)
+        assert_eq!(range.start, self.len);
+        iter_set_bits_rev(self.nulls().as_slice())
+    }
+
+    fn nulls(&self) -> &BooleanBufferBuilder {
+        match &self.inner {
+            BufferInner::Full { nulls, .. } => nulls,
+            BufferInner::Mask { nulls } => nulls,
+        }
     }
 }
 
-pub type DefinitionLevelDecoder = ColumnLevelDecoderImpl;
+impl LevelsBufferSlice for DefinitionLevelBuffer {
+    fn capacity(&self) -> usize {
+        usize::MAX
+    }
+
+    fn count_nulls(&self, range: Range<usize>, _max_level: i16) -> usize {
+        let total_count = range.end - range.start;
+        let range = range.start + self.len..range.end + self.len;
+        total_count - count_set_bits(self.nulls().as_slice(), range)
+    }
+}
+
+pub struct DefinitionLevelDecoder {
+    max_level: i16,
+    encoding: Encoding,
+    data: Option<ByteBufferPtr>,
+    column_decoder: Option<ColumnLevelDecoderImpl>,
+    packed_decoder: Option<PackedDecoder>,
+}
+
+impl ColumnLevelDecoder for DefinitionLevelDecoder {
+    type Slice = DefinitionLevelBuffer;
+
+    fn new(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self {
+        Self {
+            max_level,
+            encoding,
+            data: Some(data),
+            column_decoder: None,
+            packed_decoder: None,
+        }
+    }
+
+    fn read(
+        &mut self,
+        writer: &mut Self::Slice,
+        range: Range<usize>,
+    ) -> crate::errors::Result<usize> {
+        match &mut writer.inner {
+            BufferInner::Full {
+                levels,
+                nulls,
+                max_level,
+            } => {
+                assert_eq!(self.max_level, *max_level);
+                assert_eq!(range.start + writer.len, nulls.len());
+
+                let decoder = match self.data.take() {
+                    Some(data) => self.column_decoder.insert(
+                        ColumnLevelDecoderImpl::new(self.max_level, 
self.encoding, data),
+                    ),
+                    None => self
+                        .column_decoder
+                        .as_mut()
+                        .expect("consistent null_mask_only"),
+                };
+
+                levels.resize(range.end + writer.len);
+
+                let slice = &mut levels.as_slice_mut()[writer.len..];
+                let levels_read = decoder.read(slice, range.clone())?;
+
+                nulls.reserve(levels_read);
+                for i in &slice[range.start..range.start + levels_read] {
+                    nulls.append(i == max_level)
+                }
+
+                Ok(levels_read)
+            }
+            BufferInner::Mask { nulls } => {
+                assert_eq!(self.max_level, 1);
+                assert_eq!(range.start + writer.len, nulls.len());
+
+                let decoder = match self.data.take() {
+                    Some(data) => self
+                        .packed_decoder
+                        .insert(PackedDecoder::new(self.encoding, data)),
+                    None => self
+                        .packed_decoder
+                        .as_mut()
+                        .expect("consistent null_mask_only"),
+                };
+
+                decoder.read(nulls, range.end - range.start)
+            }
+        }
+    }
+}
+
+struct PackedDecoder {
+    data: ByteBufferPtr,
+    data_offset: usize,
+    rle_left: usize,
+    rle_value: bool,
+    packed_count: usize,
+    packed_offset: usize,
+}
+
+impl PackedDecoder {
+    fn next_rle_block(&mut self) -> Result<()> {
+        let indicator_value = self.decode_header()?;
+        if indicator_value & 1 == 1 {
+            let len = (indicator_value >> 1) as usize;
+            self.packed_count = len * 8;
+            self.packed_offset = 0;
+        } else {
+            self.rle_left = (indicator_value >> 1) as usize;
+            let byte = *self.data.as_ref().get(self.data_offset).ok_or_else(|| 
{
+                ParquetError::EOF(
+                    "unexpected end of file whilst decoding definition levels 
rle value"
+                        .into(),
+                )
+            })?;
+
+            self.data_offset += 1;
+            self.rle_value = byte != 0;
+        }
+        Ok(())
+    }
+
+    /// Decodes a VLQ encoded little endian integer and returns it
+    fn decode_header(&mut self) -> Result<i64> {
+        let mut offset = 0;
+        let mut v: i64 = 0;
+        while offset < 10 {
+            let byte = *self
+                .data
+                .as_ref()
+                .get(self.data_offset + offset)
+                .ok_or_else(|| {
+                    ParquetError::EOF(
+                        "unexpected end of file whilst decoding definition 
levels rle header"
+                            .into(),
+                    )
+                })?;
+
+            v |= ((byte & 0x7F) as i64) << (offset * 7);
+            offset += 1;
+            if byte & 0x80 == 0 {
+                self.data_offset += offset;
+                return Ok(v);
+            }
+        }
+        Err(general_err!("too many bytes for VLQ"))
+    }
+}
+
+impl PackedDecoder {
+    fn new(encoding: Encoding, data: ByteBufferPtr) -> Self {
+        match encoding {
+            Encoding::RLE => Self {
+                data,
+                data_offset: 0,
+                rle_left: 0,
+                rle_value: false,
+                packed_count: 0,
+                packed_offset: 0,
+            },
+            Encoding::BIT_PACKED => Self {
+                data_offset: 0,
+                rle_left: 0,
+                rle_value: false,
+                packed_count: data.len() * 8,
+                packed_offset: 0,
+                data,
+            },
+            _ => unreachable!("invalid level encoding: {}", encoding),
+        }
+    }
+
+    fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) -> 
Result<usize> {
+        let mut read = 0;
+        while read != len {
+            if self.rle_left != 0 {
+                let to_read = self.rle_left.min(len - read);
+                buffer.append_n(to_read, self.rle_value);
+                self.rle_left -= to_read;
+                read += to_read;
+            } else if self.packed_count != self.packed_offset {
+                let to_read = (self.packed_count - self.packed_offset).min(len 
- read);
+                let offset = self.data_offset * 8 + self.packed_offset;
+                buffer.append_packed_range(offset..offset + to_read, 
self.data.as_ref());
+                self.packed_offset += to_read;
+                read += to_read;
+
+                if self.packed_offset == self.packed_count {
+                    self.data_offset += self.packed_count / 8;
+                }
+            } else if self.data_offset == self.data.len() {
+                break;
+            } else {
+                self.next_rle_block()?
+            }
+        }
+        Ok(read)
+    }
+}
+
+/// Counts the number of set bits in the provided range
+pub fn count_set_bits(bytes: &[u8], range: Range<usize>) -> usize {
+    let mut count = 0_usize;
+    let chunks = BitChunks::new(bytes, range.start, range.end - range.start);
+    chunks.iter().for_each(|chunk| {
+        count += chunk.count_ones() as usize;
+    });
+    count += chunks.remainder_bits().count_ones() as usize;
+    count
+}
+
+fn iter_set_bits_rev(bytes: &[u8]) -> impl Iterator<Item = usize> + '_ {
+    let (mut byte_idx, mut in_progress) = match bytes.len() {
+        0 => (0, 0),
+        len => (len - 1, bytes[len - 1]),
+    };
+
+    std::iter::from_fn(move || loop {
+        if in_progress != 0 {
+            let bit_pos = 7 - in_progress.leading_zeros();
+            in_progress ^= 1 << bit_pos;
+            return Some((byte_idx << 3) + (bit_pos as usize));
+        }
+
+        if byte_idx == 0 {
+            return None;
+        }
+
+        byte_idx -= 1;
+        in_progress = bytes[byte_idx];
+    })
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use rand::{thread_rng, Rng, RngCore};
+
+    #[test]

Review comment:
       I know there is now significant coverage of this code using the fuzz 
tests -- https://github.com/apache/arrow-rs/pull/1156 and friends. 
   
   Do you think that is sufficient coverage for `PackedDecoder` ? Or would some 
more targeted unit tests be valueble too?

##########
File path: parquet/src/arrow/record_reader/definition_levels.rs
##########
@@ -91,10 +127,287 @@ impl DefinitionLevelBuffer {
         &self,
         range: Range<usize>,
     ) -> impl Iterator<Item = usize> + '_ {
-        let max_def_level = self.max_level;
-        let slice = self.buffer.as_slice();
-        range.rev().filter(move |x| slice[*x] == max_def_level)
+        assert_eq!(range.start, self.len);
+        iter_set_bits_rev(self.nulls().as_slice())
+    }
+
+    fn nulls(&self) -> &BooleanBufferBuilder {
+        match &self.inner {
+            BufferInner::Full { nulls, .. } => nulls,
+            BufferInner::Mask { nulls } => nulls,
+        }
     }
 }
 
-pub type DefinitionLevelDecoder = ColumnLevelDecoderImpl;
+impl LevelsBufferSlice for DefinitionLevelBuffer {
+    fn capacity(&self) -> usize {
+        usize::MAX
+    }
+
+    fn count_nulls(&self, range: Range<usize>, _max_level: i16) -> usize {
+        let total_count = range.end - range.start;
+        let range = range.start + self.len..range.end + self.len;
+        total_count - count_set_bits(self.nulls().as_slice(), range)
+    }
+}
+
+pub struct DefinitionLevelDecoder {
+    max_level: i16,
+    encoding: Encoding,
+    data: Option<ByteBufferPtr>,
+    column_decoder: Option<ColumnLevelDecoderImpl>,
+    packed_decoder: Option<PackedDecoder>,
+}
+
+impl ColumnLevelDecoder for DefinitionLevelDecoder {
+    type Slice = DefinitionLevelBuffer;
+
+    fn new(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self {
+        Self {
+            max_level,
+            encoding,
+            data: Some(data),
+            column_decoder: None,
+            packed_decoder: None,
+        }
+    }
+
+    fn read(
+        &mut self,
+        writer: &mut Self::Slice,
+        range: Range<usize>,
+    ) -> crate::errors::Result<usize> {
+        match &mut writer.inner {
+            BufferInner::Full {
+                levels,
+                nulls,
+                max_level,
+            } => {
+                assert_eq!(self.max_level, *max_level);
+                assert_eq!(range.start + writer.len, nulls.len());
+
+                let decoder = match self.data.take() {
+                    Some(data) => self.column_decoder.insert(
+                        ColumnLevelDecoderImpl::new(self.max_level, 
self.encoding, data),
+                    ),
+                    None => self
+                        .column_decoder
+                        .as_mut()
+                        .expect("consistent null_mask_only"),
+                };
+
+                levels.resize(range.end + writer.len);
+
+                let slice = &mut levels.as_slice_mut()[writer.len..];
+                let levels_read = decoder.read(slice, range.clone())?;
+
+                nulls.reserve(levels_read);
+                for i in &slice[range.start..range.start + levels_read] {
+                    nulls.append(i == max_level)
+                }
+
+                Ok(levels_read)
+            }
+            BufferInner::Mask { nulls } => {
+                assert_eq!(self.max_level, 1);
+                assert_eq!(range.start + writer.len, nulls.len());
+
+                let decoder = match self.data.take() {
+                    Some(data) => self
+                        .packed_decoder
+                        .insert(PackedDecoder::new(self.encoding, data)),
+                    None => self
+                        .packed_decoder
+                        .as_mut()
+                        .expect("consistent null_mask_only"),
+                };
+
+                decoder.read(nulls, range.end - range.start)
+            }
+        }
+    }
+}
+
+struct PackedDecoder {

Review comment:
       ```suggestion
   /// Specialized decoder for bitpacked hybrid format (TODO link) that contains
   /// only 0 and 1 (for example, definition levels in a non-nested column)
   /// that directly decodes into a bitmask in the fastest possible way
   struct PackedDecoder {
   ```

##########
File path: parquet/src/arrow/record_reader/definition_levels.rs
##########
@@ -91,10 +127,287 @@ impl DefinitionLevelBuffer {
         &self,
         range: Range<usize>,
     ) -> impl Iterator<Item = usize> + '_ {
-        let max_def_level = self.max_level;
-        let slice = self.buffer.as_slice();
-        range.rev().filter(move |x| slice[*x] == max_def_level)
+        assert_eq!(range.start, self.len);
+        iter_set_bits_rev(self.nulls().as_slice())
+    }
+
+    fn nulls(&self) -> &BooleanBufferBuilder {
+        match &self.inner {
+            BufferInner::Full { nulls, .. } => nulls,
+            BufferInner::Mask { nulls } => nulls,
+        }
     }
 }
 
-pub type DefinitionLevelDecoder = ColumnLevelDecoderImpl;
+impl LevelsBufferSlice for DefinitionLevelBuffer {
+    fn capacity(&self) -> usize {
+        usize::MAX
+    }
+
+    fn count_nulls(&self, range: Range<usize>, _max_level: i16) -> usize {
+        let total_count = range.end - range.start;
+        let range = range.start + self.len..range.end + self.len;
+        total_count - count_set_bits(self.nulls().as_slice(), range)
+    }
+}
+
+pub struct DefinitionLevelDecoder {
+    max_level: i16,
+    encoding: Encoding,
+    data: Option<ByteBufferPtr>,
+    column_decoder: Option<ColumnLevelDecoderImpl>,
+    packed_decoder: Option<PackedDecoder>,
+}
+
+impl ColumnLevelDecoder for DefinitionLevelDecoder {
+    type Slice = DefinitionLevelBuffer;
+
+    fn new(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self {
+        Self {
+            max_level,
+            encoding,
+            data: Some(data),
+            column_decoder: None,
+            packed_decoder: None,
+        }
+    }
+
+    fn read(
+        &mut self,
+        writer: &mut Self::Slice,
+        range: Range<usize>,
+    ) -> crate::errors::Result<usize> {
+        match &mut writer.inner {
+            BufferInner::Full {
+                levels,
+                nulls,
+                max_level,
+            } => {
+                assert_eq!(self.max_level, *max_level);
+                assert_eq!(range.start + writer.len, nulls.len());
+
+                let decoder = match self.data.take() {
+                    Some(data) => self.column_decoder.insert(
+                        ColumnLevelDecoderImpl::new(self.max_level, 
self.encoding, data),
+                    ),
+                    None => self
+                        .column_decoder
+                        .as_mut()
+                        .expect("consistent null_mask_only"),
+                };
+
+                levels.resize(range.end + writer.len);
+
+                let slice = &mut levels.as_slice_mut()[writer.len..];
+                let levels_read = decoder.read(slice, range.clone())?;
+
+                nulls.reserve(levels_read);
+                for i in &slice[range.start..range.start + levels_read] {
+                    nulls.append(i == max_level)
+                }
+
+                Ok(levels_read)
+            }
+            BufferInner::Mask { nulls } => {
+                assert_eq!(self.max_level, 1);
+                assert_eq!(range.start + writer.len, nulls.len());
+
+                let decoder = match self.data.take() {
+                    Some(data) => self
+                        .packed_decoder
+                        .insert(PackedDecoder::new(self.encoding, data)),
+                    None => self
+                        .packed_decoder
+                        .as_mut()
+                        .expect("consistent null_mask_only"),
+                };
+
+                decoder.read(nulls, range.end - range.start)
+            }
+        }
+    }
+}
+
+struct PackedDecoder {

Review comment:
       This code looks quite similar to `BitReader ` 
https://github.com/tustvold/arrow-rs/blob/bitmask-preservation/parquet/src/util/bit_util.rs#L501
   
   I wonder if you looked at possibly reusing that implmentation?

##########
File path: parquet/src/arrow/record_reader/definition_levels.rs
##########
@@ -91,10 +127,287 @@ impl DefinitionLevelBuffer {
         &self,
         range: Range<usize>,
     ) -> impl Iterator<Item = usize> + '_ {
-        let max_def_level = self.max_level;
-        let slice = self.buffer.as_slice();
-        range.rev().filter(move |x| slice[*x] == max_def_level)
+        assert_eq!(range.start, self.len);
+        iter_set_bits_rev(self.nulls().as_slice())
+    }
+
+    fn nulls(&self) -> &BooleanBufferBuilder {
+        match &self.inner {
+            BufferInner::Full { nulls, .. } => nulls,
+            BufferInner::Mask { nulls } => nulls,
+        }
     }
 }
 
-pub type DefinitionLevelDecoder = ColumnLevelDecoderImpl;
+impl LevelsBufferSlice for DefinitionLevelBuffer {
+    fn capacity(&self) -> usize {
+        usize::MAX
+    }
+
+    fn count_nulls(&self, range: Range<usize>, _max_level: i16) -> usize {
+        let total_count = range.end - range.start;
+        let range = range.start + self.len..range.end + self.len;
+        total_count - count_set_bits(self.nulls().as_slice(), range)
+    }
+}
+
+pub struct DefinitionLevelDecoder {
+    max_level: i16,
+    encoding: Encoding,
+    data: Option<ByteBufferPtr>,
+    column_decoder: Option<ColumnLevelDecoderImpl>,
+    packed_decoder: Option<PackedDecoder>,
+}
+
+impl ColumnLevelDecoder for DefinitionLevelDecoder {
+    type Slice = DefinitionLevelBuffer;
+
+    fn new(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self {
+        Self {
+            max_level,
+            encoding,
+            data: Some(data),
+            column_decoder: None,
+            packed_decoder: None,
+        }
+    }
+
+    fn read(
+        &mut self,
+        writer: &mut Self::Slice,
+        range: Range<usize>,
+    ) -> crate::errors::Result<usize> {
+        match &mut writer.inner {
+            BufferInner::Full {
+                levels,
+                nulls,
+                max_level,
+            } => {
+                assert_eq!(self.max_level, *max_level);
+                assert_eq!(range.start + writer.len, nulls.len());
+
+                let decoder = match self.data.take() {
+                    Some(data) => self.column_decoder.insert(
+                        ColumnLevelDecoderImpl::new(self.max_level, 
self.encoding, data),
+                    ),
+                    None => self
+                        .column_decoder
+                        .as_mut()
+                        .expect("consistent null_mask_only"),
+                };
+
+                levels.resize(range.end + writer.len);
+
+                let slice = &mut levels.as_slice_mut()[writer.len..];
+                let levels_read = decoder.read(slice, range.clone())?;
+
+                nulls.reserve(levels_read);
+                for i in &slice[range.start..range.start + levels_read] {
+                    nulls.append(i == max_level)
+                }
+
+                Ok(levels_read)
+            }
+            BufferInner::Mask { nulls } => {
+                assert_eq!(self.max_level, 1);
+                assert_eq!(range.start + writer.len, nulls.len());
+
+                let decoder = match self.data.take() {
+                    Some(data) => self
+                        .packed_decoder
+                        .insert(PackedDecoder::new(self.encoding, data)),
+                    None => self
+                        .packed_decoder
+                        .as_mut()
+                        .expect("consistent null_mask_only"),
+                };
+
+                decoder.read(nulls, range.end - range.start)
+            }
+        }
+    }
+}
+
+struct PackedDecoder {
+    data: ByteBufferPtr,
+    data_offset: usize,
+    rle_left: usize,
+    rle_value: bool,
+    packed_count: usize,
+    packed_offset: usize,
+}
+
+impl PackedDecoder {

Review comment:
       I don't understand the details of the parquet format sufficiently to 
truly evaluate the correctness of this code; Perhaps some additional test 
coverage would help, but the fuzz testing may be good enough.
   
   

##########
File path: parquet/src/arrow/record_reader/definition_levels.rs
##########
@@ -91,10 +127,287 @@ impl DefinitionLevelBuffer {
         &self,
         range: Range<usize>,
     ) -> impl Iterator<Item = usize> + '_ {
-        let max_def_level = self.max_level;
-        let slice = self.buffer.as_slice();
-        range.rev().filter(move |x| slice[*x] == max_def_level)
+        assert_eq!(range.start, self.len);
+        iter_set_bits_rev(self.nulls().as_slice())
+    }
+
+    fn nulls(&self) -> &BooleanBufferBuilder {
+        match &self.inner {
+            BufferInner::Full { nulls, .. } => nulls,
+            BufferInner::Mask { nulls } => nulls,
+        }
     }
 }
 
-pub type DefinitionLevelDecoder = ColumnLevelDecoderImpl;
+impl LevelsBufferSlice for DefinitionLevelBuffer {
+    fn capacity(&self) -> usize {
+        usize::MAX
+    }
+
+    fn count_nulls(&self, range: Range<usize>, _max_level: i16) -> usize {
+        let total_count = range.end - range.start;
+        let range = range.start + self.len..range.end + self.len;
+        total_count - count_set_bits(self.nulls().as_slice(), range)
+    }
+}
+
+pub struct DefinitionLevelDecoder {
+    max_level: i16,
+    encoding: Encoding,
+    data: Option<ByteBufferPtr>,
+    column_decoder: Option<ColumnLevelDecoderImpl>,
+    packed_decoder: Option<PackedDecoder>,
+}
+
+impl ColumnLevelDecoder for DefinitionLevelDecoder {
+    type Slice = DefinitionLevelBuffer;
+
+    fn new(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self {
+        Self {
+            max_level,
+            encoding,
+            data: Some(data),
+            column_decoder: None,
+            packed_decoder: None,
+        }
+    }
+
+    fn read(
+        &mut self,
+        writer: &mut Self::Slice,
+        range: Range<usize>,
+    ) -> crate::errors::Result<usize> {
+        match &mut writer.inner {
+            BufferInner::Full {
+                levels,
+                nulls,
+                max_level,
+            } => {
+                assert_eq!(self.max_level, *max_level);
+                assert_eq!(range.start + writer.len, nulls.len());
+
+                let decoder = match self.data.take() {
+                    Some(data) => self.column_decoder.insert(
+                        ColumnLevelDecoderImpl::new(self.max_level, 
self.encoding, data),
+                    ),
+                    None => self
+                        .column_decoder
+                        .as_mut()
+                        .expect("consistent null_mask_only"),
+                };
+
+                levels.resize(range.end + writer.len);
+
+                let slice = &mut levels.as_slice_mut()[writer.len..];
+                let levels_read = decoder.read(slice, range.clone())?;
+
+                nulls.reserve(levels_read);
+                for i in &slice[range.start..range.start + levels_read] {
+                    nulls.append(i == max_level)
+                }
+
+                Ok(levels_read)
+            }
+            BufferInner::Mask { nulls } => {
+                assert_eq!(self.max_level, 1);
+                assert_eq!(range.start + writer.len, nulls.len());
+
+                let decoder = match self.data.take() {
+                    Some(data) => self
+                        .packed_decoder
+                        .insert(PackedDecoder::new(self.encoding, data)),
+                    None => self
+                        .packed_decoder
+                        .as_mut()
+                        .expect("consistent null_mask_only"),
+                };
+
+                decoder.read(nulls, range.end - range.start)
+            }
+        }
+    }
+}
+
+struct PackedDecoder {

Review comment:
       I am trying to leave breadcrumbs for the next person to look at this 
code. Is this a correct description of what this structure implements?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to