yordan-pavlov commented on a change in pull request #1054: URL: https://github.com/apache/arrow-rs/pull/1054#discussion_r775294515
########## File path: parquet/src/arrow/record_reader/definition_levels.rs ########## @@ -0,0 +1,379 @@ +use std::ops::Range; + +use arrow::array::BooleanBufferBuilder; +use arrow::bitmap::Bitmap; +use arrow::buffer::Buffer; + +use crate::basic::Encoding; +use crate::column::reader::decoder::{ + ColumnLevelDecoder, ColumnLevelDecoderImpl, LevelsWriter, +}; +use crate::errors::{ParquetError, Result}; +use crate::memory::ByteBufferPtr; +use crate::schema::types::ColumnDescPtr; + +use super::{ + buffer::{RecordBuffer, TypedBuffer}, + MIN_BATCH_SIZE, +}; + +enum BufferInner { + /// Compute levels and null mask + Full { + levels: TypedBuffer<i16>, + nulls: BooleanBufferBuilder, + max_level: i16, + }, + /// Only compute null bitmask - requires max level to be 1 + Mask { nulls: BooleanBufferBuilder }, +} + +pub struct DefinitionLevelBuffer { + inner: BufferInner, + + /// The length of this buffer + /// + /// Note: `buffer` and `builder` may contain more elements + len: usize, +} + +impl DefinitionLevelBuffer { + pub fn new(desc: &ColumnDescPtr, null_mask_only: bool) -> Self { + let inner = match null_mask_only { + true => { + assert_eq!( + desc.max_def_level(), + 1, + "max definition level must be 1 to only compute null bitmask" + ); + + assert_eq!( + desc.max_rep_level(), + 0, + "max repetition level must be 0 to only compute null bitmask" + ); + + BufferInner::Mask { + nulls: BooleanBufferBuilder::new(0), + } + } + false => BufferInner::Full { + levels: TypedBuffer::new(), + nulls: BooleanBufferBuilder::new(0), + max_level: desc.max_def_level(), + }, + }; + + Self { inner, len: 0 } + } + + pub fn split_levels(&mut self, len: usize) -> Option<Buffer> { + match &mut self.inner { + BufferInner::Full { levels, .. } => { + let out = levels.split(len); + self.len = levels.len(); + Some(out) + } + BufferInner::Mask { .. } => None, + } + } + + pub fn commit(&mut self, len: usize) { + self.len = len; + } + + /// Split `len` levels out of `self` + pub fn split_bitmask(&mut self, len: usize) -> Bitmap { + let builder = match &mut self.inner { + BufferInner::Full { nulls, .. } => nulls, + BufferInner::Mask { nulls } => nulls, + }; + + let old_len = builder.len(); + let num_left_values = old_len - len; + let new_bitmap_builder = + BooleanBufferBuilder::new(MIN_BATCH_SIZE.max(num_left_values)); + + let old_bitmap = std::mem::replace(builder, new_bitmap_builder).finish(); + let old_bitmap = Bitmap::from(old_bitmap); + + for i in len..old_len { + builder.append(old_bitmap.is_set(i)); + } + + self.len = builder.len(); + old_bitmap + } + + pub fn valid_position_iter( + &self, + range: Range<usize>, + ) -> impl Iterator<Item = usize> + '_ { + assert_eq!(range.start, self.len); + iter_set_bits_rev(self.nulls().as_slice()) + } + + fn nulls(&self) -> &BooleanBufferBuilder { + match &self.inner { + BufferInner::Full { nulls, .. } => nulls, + BufferInner::Mask { nulls } => nulls, + } + } +} + +impl LevelsWriter for DefinitionLevelBuffer { + fn capacity(&self) -> usize { + usize::MAX + } + + fn get(&self, idx: usize) -> i16 { + match &self.inner { + BufferInner::Full { levels, .. } => levels.as_slice()[self.len + idx], + BufferInner::Mask { nulls } => nulls.get_bit(self.len + idx) as i16, + } + } + + fn count_nulls(&self, range: Range<usize>, _max_level: i16) -> usize { + let total_count = range.end - range.start; + let range = range.start + self.len..range.end + self.len; + total_count - self.nulls().count_in_range(range) + } +} + +pub struct DefinitionLevelDecoder { + max_level: i16, + encoding: Encoding, + data: Option<ByteBufferPtr>, + column_decoder: Option<ColumnLevelDecoderImpl>, + packed_decoder: Option<PackedDecoder>, +} + +impl ColumnLevelDecoder for DefinitionLevelDecoder { + type Writer = DefinitionLevelBuffer; + + fn create(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self { + Self { + max_level, + encoding, + data: Some(data), + column_decoder: None, + packed_decoder: None, + } + } + + fn read( + &mut self, + writer: &mut Self::Writer, + range: Range<usize>, + ) -> crate::errors::Result<usize> { + match &mut writer.inner { + BufferInner::Full { + levels, + nulls, + max_level, + } => { + assert_eq!(self.max_level, *max_level); + assert_eq!(range.start+writer.len, nulls.len()); + + let decoder = match self.data.take() { + Some(data) => { + self.column_decoder.insert(ColumnLevelDecoderImpl::create( + self.max_level, + self.encoding, + data, + )) + } + None => self + .column_decoder + .as_mut() + .expect("consistent null_mask_only"), + }; + + levels.resize(range.end + writer.len); + + let slice = &mut levels.as_slice_mut()[writer.len..]; + let levels_read = decoder.read(slice, range.clone())?; + + nulls.reserve(levels_read); + for i in &slice[range.start..range.start + levels_read] { + nulls.append(i == max_level) + } + + Ok(levels_read) + } + BufferInner::Mask { nulls } => { + assert_eq!(self.max_level, 1); + assert_eq!(range.start+writer.len, nulls.len()); + + let decoder = match self.data.take() { + Some(data) => self + .packed_decoder + .insert(PackedDecoder::new(self.encoding, data)), + None => self + .packed_decoder + .as_mut() + .expect("consistent null_mask_only"), + }; + + decoder.read(nulls, range.end - range.start) + } + } + } +} + +struct PackedDecoder { + data: ByteBufferPtr, + data_offset: usize, + rle_left: usize, + rle_value: bool, + packed_count: usize, + packed_offset: usize, +} + +impl PackedDecoder { + fn next_rle_block(&mut self) -> Result<()> { + let indicator_value = self.decode_header()?; + if indicator_value & 1 == 1 { + let len = (indicator_value >> 1) as usize; + self.packed_count = len * 8; + self.packed_offset = 0; + } else { + self.rle_left = (indicator_value >> 1) as usize; + let byte = *self.data.as_ref().get(self.data_offset).ok_or_else(|| { + ParquetError::EOF( + "unexpected end of file whilst decoding definition levels rle value" + .into(), + ) + })?; + + self.data_offset += 1; + self.rle_value = byte != 0; + } + Ok(()) + } + + /// Decodes a VLQ encoded little endian integer and returns it + fn decode_header(&mut self) -> Result<i64> { + let mut offset = 0; + let mut v: i64 = 0; + while offset < 10 { + let byte = *self + .data + .as_ref() + .get(self.data_offset + offset) + .ok_or_else(|| { + ParquetError::EOF( + "unexpected end of file whilst decoding definition levels rle header" + .into(), + ) + })?; + + v |= ((byte & 0x7F) as i64) << (offset * 7); + offset += 1; + if byte & 0x80 == 0 { + self.data_offset += offset; + return Ok(v); + } + } + Err(general_err!("too many bytes for VLQ")) + } +} + +impl PackedDecoder { + fn new(encoding: Encoding, data: ByteBufferPtr) -> Self { + match encoding { + Encoding::RLE => Self { + data, + data_offset: 0, + rle_left: 0, + rle_value: false, + packed_count: 0, + packed_offset: 0, + }, + Encoding::BIT_PACKED => Self { + data_offset: 0, + rle_left: 0, + rle_value: false, + packed_count: data.len() * 8, + packed_offset: 0, + data, + }, + _ => unreachable!("invalid level encoding: {}", encoding), + } + } + + fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) -> Result<usize> { + let mut read = 0; + while read != len { + if self.rle_left != 0 { + let to_read = self.rle_left.min(len - read); + buffer.append_n(to_read, self.rle_value); + self.rle_left -= to_read; + read += to_read; + } else if self.packed_count != self.packed_offset { + let to_read = (self.packed_count - self.packed_offset).min(len - read); + let offset = self.data_offset * 8 + self.packed_offset; + buffer.append_packed_range(offset..offset + to_read, self.data.as_ref()); Review comment: looks like this is the main change in this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org