yordan-pavlov commented on a change in pull request #1054:
URL: https://github.com/apache/arrow-rs/pull/1054#discussion_r776407433
##########
File path: parquet/src/arrow/record_reader/definition_levels.rs
##########
@@ -0,0 +1,379 @@
+use std::ops::Range;
+
+use arrow::array::BooleanBufferBuilder;
+use arrow::bitmap::Bitmap;
+use arrow::buffer::Buffer;
+
+use crate::basic::Encoding;
+use crate::column::reader::decoder::{
+ ColumnLevelDecoder, ColumnLevelDecoderImpl, LevelsWriter,
+};
+use crate::errors::{ParquetError, Result};
+use crate::memory::ByteBufferPtr;
+use crate::schema::types::ColumnDescPtr;
+
+use super::{
+ buffer::{RecordBuffer, TypedBuffer},
+ MIN_BATCH_SIZE,
+};
+
+enum BufferInner {
+ /// Compute levels and null mask
+ Full {
+ levels: TypedBuffer<i16>,
+ nulls: BooleanBufferBuilder,
+ max_level: i16,
+ },
+ /// Only compute null bitmask - requires max level to be 1
+ Mask { nulls: BooleanBufferBuilder },
+}
+
+pub struct DefinitionLevelBuffer {
+ inner: BufferInner,
+
+ /// The length of this buffer
+ ///
+ /// Note: `buffer` and `builder` may contain more elements
+ len: usize,
+}
+
+impl DefinitionLevelBuffer {
+ pub fn new(desc: &ColumnDescPtr, null_mask_only: bool) -> Self {
+ let inner = match null_mask_only {
+ true => {
+ assert_eq!(
+ desc.max_def_level(),
+ 1,
+ "max definition level must be 1 to only compute null
bitmask"
+ );
+
+ assert_eq!(
+ desc.max_rep_level(),
+ 0,
+ "max repetition level must be 0 to only compute null
bitmask"
+ );
+
+ BufferInner::Mask {
+ nulls: BooleanBufferBuilder::new(0),
+ }
+ }
+ false => BufferInner::Full {
+ levels: TypedBuffer::new(),
+ nulls: BooleanBufferBuilder::new(0),
+ max_level: desc.max_def_level(),
+ },
+ };
+
+ Self { inner, len: 0 }
+ }
+
+ pub fn split_levels(&mut self, len: usize) -> Option<Buffer> {
+ match &mut self.inner {
+ BufferInner::Full { levels, .. } => {
+ let out = levels.split(len);
+ self.len = levels.len();
+ Some(out)
+ }
+ BufferInner::Mask { .. } => None,
+ }
+ }
+
+ pub fn commit(&mut self, len: usize) {
+ self.len = len;
+ }
+
+ /// Split `len` levels out of `self`
+ pub fn split_bitmask(&mut self, len: usize) -> Bitmap {
+ let builder = match &mut self.inner {
+ BufferInner::Full { nulls, .. } => nulls,
+ BufferInner::Mask { nulls } => nulls,
+ };
+
+ let old_len = builder.len();
+ let num_left_values = old_len - len;
+ let new_bitmap_builder =
+ BooleanBufferBuilder::new(MIN_BATCH_SIZE.max(num_left_values));
+
+ let old_bitmap = std::mem::replace(builder,
new_bitmap_builder).finish();
+ let old_bitmap = Bitmap::from(old_bitmap);
+
+ for i in len..old_len {
+ builder.append(old_bitmap.is_set(i));
+ }
+
+ self.len = builder.len();
+ old_bitmap
+ }
+
+ pub fn valid_position_iter(
+ &self,
+ range: Range<usize>,
+ ) -> impl Iterator<Item = usize> + '_ {
+ assert_eq!(range.start, self.len);
+ iter_set_bits_rev(self.nulls().as_slice())
+ }
+
+ fn nulls(&self) -> &BooleanBufferBuilder {
+ match &self.inner {
+ BufferInner::Full { nulls, .. } => nulls,
+ BufferInner::Mask { nulls } => nulls,
+ }
+ }
+}
+
+impl LevelsWriter for DefinitionLevelBuffer {
+ fn capacity(&self) -> usize {
+ usize::MAX
+ }
+
+ fn get(&self, idx: usize) -> i16 {
+ match &self.inner {
+ BufferInner::Full { levels, .. } => levels.as_slice()[self.len +
idx],
+ BufferInner::Mask { nulls } => nulls.get_bit(self.len + idx) as
i16,
+ }
+ }
+
+ fn count_nulls(&self, range: Range<usize>, _max_level: i16) -> usize {
+ let total_count = range.end - range.start;
+ let range = range.start + self.len..range.end + self.len;
+ total_count - self.nulls().count_in_range(range)
+ }
+}
+
+pub struct DefinitionLevelDecoder {
+ max_level: i16,
+ encoding: Encoding,
+ data: Option<ByteBufferPtr>,
+ column_decoder: Option<ColumnLevelDecoderImpl>,
+ packed_decoder: Option<PackedDecoder>,
+}
+
+impl ColumnLevelDecoder for DefinitionLevelDecoder {
+ type Writer = DefinitionLevelBuffer;
+
+ fn create(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self
{
+ Self {
+ max_level,
+ encoding,
+ data: Some(data),
+ column_decoder: None,
+ packed_decoder: None,
+ }
+ }
+
+ fn read(
+ &mut self,
+ writer: &mut Self::Writer,
+ range: Range<usize>,
+ ) -> crate::errors::Result<usize> {
+ match &mut writer.inner {
+ BufferInner::Full {
+ levels,
+ nulls,
+ max_level,
+ } => {
+ assert_eq!(self.max_level, *max_level);
+ assert_eq!(range.start+writer.len, nulls.len());
+
+ let decoder = match self.data.take() {
+ Some(data) => {
+
self.column_decoder.insert(ColumnLevelDecoderImpl::create(
+ self.max_level,
+ self.encoding,
+ data,
+ ))
+ }
+ None => self
+ .column_decoder
+ .as_mut()
+ .expect("consistent null_mask_only"),
+ };
+
+ levels.resize(range.end + writer.len);
+
+ let slice = &mut levels.as_slice_mut()[writer.len..];
+ let levels_read = decoder.read(slice, range.clone())?;
+
+ nulls.reserve(levels_read);
+ for i in &slice[range.start..range.start + levels_read] {
+ nulls.append(i == max_level)
+ }
+
+ Ok(levels_read)
+ }
+ BufferInner::Mask { nulls } => {
+ assert_eq!(self.max_level, 1);
+ assert_eq!(range.start+writer.len, nulls.len());
+
+ let decoder = match self.data.take() {
+ Some(data) => self
+ .packed_decoder
+ .insert(PackedDecoder::new(self.encoding, data)),
+ None => self
+ .packed_decoder
+ .as_mut()
+ .expect("consistent null_mask_only"),
+ };
+
+ decoder.read(nulls, range.end - range.start)
+ }
+ }
+ }
+}
+
+struct PackedDecoder {
+ data: ByteBufferPtr,
+ data_offset: usize,
+ rle_left: usize,
+ rle_value: bool,
+ packed_count: usize,
+ packed_offset: usize,
+}
+
+impl PackedDecoder {
+ fn next_rle_block(&mut self) -> Result<()> {
+ let indicator_value = self.decode_header()?;
+ if indicator_value & 1 == 1 {
+ let len = (indicator_value >> 1) as usize;
+ self.packed_count = len * 8;
+ self.packed_offset = 0;
+ } else {
+ self.rle_left = (indicator_value >> 1) as usize;
+ let byte = *self.data.as_ref().get(self.data_offset).ok_or_else(||
{
+ ParquetError::EOF(
+ "unexpected end of file whilst decoding definition levels
rle value"
+ .into(),
+ )
+ })?;
+
+ self.data_offset += 1;
+ self.rle_value = byte != 0;
+ }
+ Ok(())
+ }
+
+ /// Decodes a VLQ encoded little endian integer and returns it
+ fn decode_header(&mut self) -> Result<i64> {
+ let mut offset = 0;
+ let mut v: i64 = 0;
+ while offset < 10 {
+ let byte = *self
+ .data
+ .as_ref()
+ .get(self.data_offset + offset)
+ .ok_or_else(|| {
+ ParquetError::EOF(
+ "unexpected end of file whilst decoding definition
levels rle header"
+ .into(),
+ )
+ })?;
+
+ v |= ((byte & 0x7F) as i64) << (offset * 7);
+ offset += 1;
+ if byte & 0x80 == 0 {
+ self.data_offset += offset;
+ return Ok(v);
+ }
+ }
+ Err(general_err!("too many bytes for VLQ"))
+ }
+}
+
+impl PackedDecoder {
+ fn new(encoding: Encoding, data: ByteBufferPtr) -> Self {
+ match encoding {
+ Encoding::RLE => Self {
+ data,
+ data_offset: 0,
+ rle_left: 0,
+ rle_value: false,
+ packed_count: 0,
+ packed_offset: 0,
+ },
+ Encoding::BIT_PACKED => Self {
+ data_offset: 0,
+ rle_left: 0,
+ rle_value: false,
+ packed_count: data.len() * 8,
+ packed_offset: 0,
+ data,
+ },
+ _ => unreachable!("invalid level encoding: {}", encoding),
+ }
+ }
+
+ fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) ->
Result<usize> {
+ let mut read = 0;
+ while read != len {
+ if self.rle_left != 0 {
+ let to_read = self.rle_left.min(len - read);
+ buffer.append_n(to_read, self.rle_value);
+ self.rle_left -= to_read;
+ read += to_read;
+ } else if self.packed_count != self.packed_offset {
+ let to_read = (self.packed_count - self.packed_offset).min(len
- read);
+ let offset = self.data_offset * 8 + self.packed_offset;
+ buffer.append_packed_range(offset..offset + to_read,
self.data.as_ref());
Review comment:
Apologies, I should have been more explicit; what I meant is how common
is it in practice to have max_def_level == 1 plus bit-packing of the def
levels, because this is where the biggest optimization is, isn't it.
RLE-encoded def level reading would still be better than before (as no
intermediate translation into integers) and that's great, but probably not as
fast as directly copying the bit-packed values. I do agree on flat parquet
files being common though, most parquet files I have seen have been flat as
well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]