tustvold commented on a change in pull request #1054:
URL: https://github.com/apache/arrow-rs/pull/1054#discussion_r776411732



##########
File path: parquet/src/arrow/record_reader/definition_levels.rs
##########
@@ -0,0 +1,379 @@
+use std::ops::Range;
+
+use arrow::array::BooleanBufferBuilder;
+use arrow::bitmap::Bitmap;
+use arrow::buffer::Buffer;
+
+use crate::basic::Encoding;
+use crate::column::reader::decoder::{
+    ColumnLevelDecoder, ColumnLevelDecoderImpl, LevelsWriter,
+};
+use crate::errors::{ParquetError, Result};
+use crate::memory::ByteBufferPtr;
+use crate::schema::types::ColumnDescPtr;
+
+use super::{
+    buffer::{RecordBuffer, TypedBuffer},
+    MIN_BATCH_SIZE,
+};
+
+enum BufferInner {
+    /// Compute levels and null mask
+    Full {
+        levels: TypedBuffer<i16>,
+        nulls: BooleanBufferBuilder,
+        max_level: i16,
+    },
+    /// Only compute null bitmask - requires max level to be 1
+    Mask { nulls: BooleanBufferBuilder },
+}
+
+pub struct DefinitionLevelBuffer {
+    inner: BufferInner,
+
+    /// The length of this buffer
+    ///
+    /// Note: `buffer` and `builder` may contain more elements
+    len: usize,
+}
+
+impl DefinitionLevelBuffer {
+    pub fn new(desc: &ColumnDescPtr, null_mask_only: bool) -> Self {
+        let inner = match null_mask_only {
+            true => {
+                assert_eq!(
+                    desc.max_def_level(),
+                    1,
+                    "max definition level must be 1 to only compute null 
bitmask"
+                );
+
+                assert_eq!(
+                    desc.max_rep_level(),
+                    0,
+                    "max repetition level must be 0 to only compute null 
bitmask"
+                );
+
+                BufferInner::Mask {
+                    nulls: BooleanBufferBuilder::new(0),
+                }
+            }
+            false => BufferInner::Full {
+                levels: TypedBuffer::new(),
+                nulls: BooleanBufferBuilder::new(0),
+                max_level: desc.max_def_level(),
+            },
+        };
+
+        Self { inner, len: 0 }
+    }
+
+    pub fn split_levels(&mut self, len: usize) -> Option<Buffer> {
+        match &mut self.inner {
+            BufferInner::Full { levels, .. } => {
+                let out = levels.split(len);
+                self.len = levels.len();
+                Some(out)
+            }
+            BufferInner::Mask { .. } => None,
+        }
+    }
+
+    pub fn commit(&mut self, len: usize) {
+        self.len = len;
+    }
+
+    /// Split `len` levels out of `self`
+    pub fn split_bitmask(&mut self, len: usize) -> Bitmap {
+        let builder = match &mut self.inner {
+            BufferInner::Full { nulls, .. } => nulls,
+            BufferInner::Mask { nulls } => nulls,
+        };
+
+        let old_len = builder.len();
+        let num_left_values = old_len - len;
+        let new_bitmap_builder =
+            BooleanBufferBuilder::new(MIN_BATCH_SIZE.max(num_left_values));
+
+        let old_bitmap = std::mem::replace(builder, 
new_bitmap_builder).finish();
+        let old_bitmap = Bitmap::from(old_bitmap);
+
+        for i in len..old_len {
+            builder.append(old_bitmap.is_set(i));
+        }
+
+        self.len = builder.len();
+        old_bitmap
+    }
+
+    pub fn valid_position_iter(
+        &self,
+        range: Range<usize>,
+    ) -> impl Iterator<Item = usize> + '_ {
+        assert_eq!(range.start, self.len);
+        iter_set_bits_rev(self.nulls().as_slice())
+    }
+
+    fn nulls(&self) -> &BooleanBufferBuilder {
+        match &self.inner {
+            BufferInner::Full { nulls, .. } => nulls,
+            BufferInner::Mask { nulls } => nulls,
+        }
+    }
+}
+
+impl LevelsWriter for DefinitionLevelBuffer {
+    fn capacity(&self) -> usize {
+        usize::MAX
+    }
+
+    fn get(&self, idx: usize) -> i16 {
+        match &self.inner {
+            BufferInner::Full { levels, .. } => levels.as_slice()[self.len + 
idx],
+            BufferInner::Mask { nulls } => nulls.get_bit(self.len + idx) as 
i16,
+        }
+    }
+
+    fn count_nulls(&self, range: Range<usize>, _max_level: i16) -> usize {
+        let total_count = range.end - range.start;
+        let range = range.start + self.len..range.end + self.len;
+        total_count - self.nulls().count_in_range(range)
+    }
+}
+
+pub struct DefinitionLevelDecoder {
+    max_level: i16,
+    encoding: Encoding,
+    data: Option<ByteBufferPtr>,
+    column_decoder: Option<ColumnLevelDecoderImpl>,
+    packed_decoder: Option<PackedDecoder>,
+}
+
+impl ColumnLevelDecoder for DefinitionLevelDecoder {
+    type Writer = DefinitionLevelBuffer;
+
+    fn create(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self 
{
+        Self {
+            max_level,
+            encoding,
+            data: Some(data),
+            column_decoder: None,
+            packed_decoder: None,
+        }
+    }
+
+    fn read(
+        &mut self,
+        writer: &mut Self::Writer,
+        range: Range<usize>,
+    ) -> crate::errors::Result<usize> {
+        match &mut writer.inner {
+            BufferInner::Full {
+                levels,
+                nulls,
+                max_level,
+            } => {
+                assert_eq!(self.max_level, *max_level);
+                assert_eq!(range.start+writer.len, nulls.len());
+
+                let decoder = match self.data.take() {
+                    Some(data) => {
+                        
self.column_decoder.insert(ColumnLevelDecoderImpl::create(
+                            self.max_level,
+                            self.encoding,
+                            data,
+                        ))
+                    }
+                    None => self
+                        .column_decoder
+                        .as_mut()
+                        .expect("consistent null_mask_only"),
+                };
+
+                levels.resize(range.end + writer.len);
+
+                let slice = &mut levels.as_slice_mut()[writer.len..];
+                let levels_read = decoder.read(slice, range.clone())?;
+
+                nulls.reserve(levels_read);
+                for i in &slice[range.start..range.start + levels_read] {
+                    nulls.append(i == max_level)
+                }
+
+                Ok(levels_read)
+            }
+            BufferInner::Mask { nulls } => {
+                assert_eq!(self.max_level, 1);
+                assert_eq!(range.start+writer.len, nulls.len());
+
+                let decoder = match self.data.take() {
+                    Some(data) => self
+                        .packed_decoder
+                        .insert(PackedDecoder::new(self.encoding, data)),
+                    None => self
+                        .packed_decoder
+                        .as_mut()
+                        .expect("consistent null_mask_only"),
+                };
+
+                decoder.read(nulls, range.end - range.start)
+            }
+        }
+    }
+}
+
+struct PackedDecoder {
+    data: ByteBufferPtr,
+    data_offset: usize,
+    rle_left: usize,
+    rle_value: bool,
+    packed_count: usize,
+    packed_offset: usize,
+}
+
+impl PackedDecoder {
+    fn next_rle_block(&mut self) -> Result<()> {
+        let indicator_value = self.decode_header()?;
+        if indicator_value & 1 == 1 {
+            let len = (indicator_value >> 1) as usize;
+            self.packed_count = len * 8;
+            self.packed_offset = 0;
+        } else {
+            self.rle_left = (indicator_value >> 1) as usize;
+            let byte = *self.data.as_ref().get(self.data_offset).ok_or_else(|| 
{
+                ParquetError::EOF(
+                    "unexpected end of file whilst decoding definition levels 
rle value"
+                        .into(),
+                )
+            })?;
+
+            self.data_offset += 1;
+            self.rle_value = byte != 0;
+        }
+        Ok(())
+    }
+
+    /// Decodes a VLQ encoded little endian integer and returns it
+    fn decode_header(&mut self) -> Result<i64> {
+        let mut offset = 0;
+        let mut v: i64 = 0;
+        while offset < 10 {
+            let byte = *self
+                .data
+                .as_ref()
+                .get(self.data_offset + offset)
+                .ok_or_else(|| {
+                    ParquetError::EOF(
+                        "unexpected end of file whilst decoding definition 
levels rle header"
+                            .into(),
+                    )
+                })?;
+
+            v |= ((byte & 0x7F) as i64) << (offset * 7);
+            offset += 1;
+            if byte & 0x80 == 0 {
+                self.data_offset += offset;
+                return Ok(v);
+            }
+        }
+        Err(general_err!("too many bytes for VLQ"))
+    }
+}
+
+impl PackedDecoder {
+    fn new(encoding: Encoding, data: ByteBufferPtr) -> Self {
+        match encoding {
+            Encoding::RLE => Self {
+                data,
+                data_offset: 0,
+                rle_left: 0,
+                rle_value: false,
+                packed_count: 0,
+                packed_offset: 0,
+            },
+            Encoding::BIT_PACKED => Self {
+                data_offset: 0,
+                rle_left: 0,
+                rle_value: false,
+                packed_count: data.len() * 8,
+                packed_offset: 0,
+                data,
+            },
+            _ => unreachable!("invalid level encoding: {}", encoding),
+        }
+    }
+
+    fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) -> 
Result<usize> {
+        let mut read = 0;
+        while read != len {
+            if self.rle_left != 0 {
+                let to_read = self.rle_left.min(len - read);
+                buffer.append_n(to_read, self.rle_value);
+                self.rle_left -= to_read;
+                read += to_read;
+            } else if self.packed_count != self.packed_offset {
+                let to_read = (self.packed_count - self.packed_offset).min(len 
- read);
+                let offset = self.data_offset * 8 + self.packed_offset;
+                buffer.append_packed_range(offset..offset + to_read, 
self.data.as_ref());

Review comment:
       > plus bit-packing of the def levels
   
   The logic within `RleEncoder` uses run-length encoding if the repetition 
count is greater than 8, otherwise it uses the bit-packed version. Therefore 
how common bit-packed sequences are depends on the distribution of nulls within 
the data.
   
   _TBC what is called RLE encoding by parquet is actually hybrid encoding, a 
page isn't entirely bit-packed or run-length encoded, but contains blocks of 
either_
   
   > but probably not as fast as directly copying the bit-packed values
   
   I'm not sure I agree with this, copying the bit-packed values is actually 
potentially more expensive, as it requires shifting and masking the source 
data. However, a run of nulls is simply a case of incrementing the length of 
the buffer (as everything is 0-initialized), whereas setting sequences of valid 
bits can be done at the byte level (or possibly larger).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to