tustvold commented on a change in pull request #1054:
URL: https://github.com/apache/arrow-rs/pull/1054#discussion_r776330335
##########
File path: parquet/src/arrow/record_reader/definition_levels.rs
##########
@@ -0,0 +1,379 @@
+use std::ops::Range;
+
+use arrow::array::BooleanBufferBuilder;
+use arrow::bitmap::Bitmap;
+use arrow::buffer::Buffer;
+
+use crate::basic::Encoding;
+use crate::column::reader::decoder::{
+ ColumnLevelDecoder, ColumnLevelDecoderImpl, LevelsWriter,
+};
+use crate::errors::{ParquetError, Result};
+use crate::memory::ByteBufferPtr;
+use crate::schema::types::ColumnDescPtr;
+
+use super::{
+ buffer::{RecordBuffer, TypedBuffer},
+ MIN_BATCH_SIZE,
+};
+
+enum BufferInner {
+ /// Compute levels and null mask
+ Full {
+ levels: TypedBuffer<i16>,
+ nulls: BooleanBufferBuilder,
+ max_level: i16,
+ },
+ /// Only compute null bitmask - requires max level to be 1
+ Mask { nulls: BooleanBufferBuilder },
+}
+
+pub struct DefinitionLevelBuffer {
+ inner: BufferInner,
+
+ /// The length of this buffer
+ ///
+ /// Note: `buffer` and `builder` may contain more elements
+ len: usize,
+}
+
+impl DefinitionLevelBuffer {
+ pub fn new(desc: &ColumnDescPtr, null_mask_only: bool) -> Self {
+ let inner = match null_mask_only {
+ true => {
+ assert_eq!(
+ desc.max_def_level(),
+ 1,
+ "max definition level must be 1 to only compute null
bitmask"
+ );
+
+ assert_eq!(
+ desc.max_rep_level(),
+ 0,
+ "max repetition level must be 0 to only compute null
bitmask"
+ );
+
+ BufferInner::Mask {
+ nulls: BooleanBufferBuilder::new(0),
+ }
+ }
+ false => BufferInner::Full {
+ levels: TypedBuffer::new(),
+ nulls: BooleanBufferBuilder::new(0),
+ max_level: desc.max_def_level(),
+ },
+ };
+
+ Self { inner, len: 0 }
+ }
+
+ pub fn split_levels(&mut self, len: usize) -> Option<Buffer> {
+ match &mut self.inner {
+ BufferInner::Full { levels, .. } => {
+ let out = levels.split(len);
+ self.len = levels.len();
+ Some(out)
+ }
+ BufferInner::Mask { .. } => None,
+ }
+ }
+
+ pub fn commit(&mut self, len: usize) {
+ self.len = len;
+ }
+
+ /// Split `len` levels out of `self`
+ pub fn split_bitmask(&mut self, len: usize) -> Bitmap {
+ let builder = match &mut self.inner {
+ BufferInner::Full { nulls, .. } => nulls,
+ BufferInner::Mask { nulls } => nulls,
+ };
+
+ let old_len = builder.len();
+ let num_left_values = old_len - len;
+ let new_bitmap_builder =
+ BooleanBufferBuilder::new(MIN_BATCH_SIZE.max(num_left_values));
+
+ let old_bitmap = std::mem::replace(builder,
new_bitmap_builder).finish();
+ let old_bitmap = Bitmap::from(old_bitmap);
+
+ for i in len..old_len {
+ builder.append(old_bitmap.is_set(i));
+ }
+
+ self.len = builder.len();
+ old_bitmap
+ }
+
+ pub fn valid_position_iter(
+ &self,
+ range: Range<usize>,
+ ) -> impl Iterator<Item = usize> + '_ {
+ assert_eq!(range.start, self.len);
+ iter_set_bits_rev(self.nulls().as_slice())
+ }
+
+ fn nulls(&self) -> &BooleanBufferBuilder {
+ match &self.inner {
+ BufferInner::Full { nulls, .. } => nulls,
+ BufferInner::Mask { nulls } => nulls,
+ }
+ }
+}
+
+impl LevelsWriter for DefinitionLevelBuffer {
+ fn capacity(&self) -> usize {
+ usize::MAX
+ }
+
+ fn get(&self, idx: usize) -> i16 {
+ match &self.inner {
+ BufferInner::Full { levels, .. } => levels.as_slice()[self.len +
idx],
+ BufferInner::Mask { nulls } => nulls.get_bit(self.len + idx) as
i16,
+ }
+ }
+
+ fn count_nulls(&self, range: Range<usize>, _max_level: i16) -> usize {
+ let total_count = range.end - range.start;
+ let range = range.start + self.len..range.end + self.len;
+ total_count - self.nulls().count_in_range(range)
+ }
+}
+
+pub struct DefinitionLevelDecoder {
+ max_level: i16,
+ encoding: Encoding,
+ data: Option<ByteBufferPtr>,
+ column_decoder: Option<ColumnLevelDecoderImpl>,
+ packed_decoder: Option<PackedDecoder>,
+}
+
+impl ColumnLevelDecoder for DefinitionLevelDecoder {
+ type Writer = DefinitionLevelBuffer;
+
+ fn create(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self
{
+ Self {
+ max_level,
+ encoding,
+ data: Some(data),
+ column_decoder: None,
+ packed_decoder: None,
+ }
+ }
+
+ fn read(
+ &mut self,
+ writer: &mut Self::Writer,
+ range: Range<usize>,
+ ) -> crate::errors::Result<usize> {
+ match &mut writer.inner {
+ BufferInner::Full {
+ levels,
+ nulls,
+ max_level,
+ } => {
+ assert_eq!(self.max_level, *max_level);
+ assert_eq!(range.start+writer.len, nulls.len());
+
+ let decoder = match self.data.take() {
+ Some(data) => {
+
self.column_decoder.insert(ColumnLevelDecoderImpl::create(
+ self.max_level,
+ self.encoding,
+ data,
+ ))
+ }
+ None => self
+ .column_decoder
+ .as_mut()
+ .expect("consistent null_mask_only"),
+ };
+
+ levels.resize(range.end + writer.len);
+
+ let slice = &mut levels.as_slice_mut()[writer.len..];
+ let levels_read = decoder.read(slice, range.clone())?;
+
+ nulls.reserve(levels_read);
+ for i in &slice[range.start..range.start + levels_read] {
+ nulls.append(i == max_level)
+ }
+
+ Ok(levels_read)
+ }
+ BufferInner::Mask { nulls } => {
+ assert_eq!(self.max_level, 1);
+ assert_eq!(range.start+writer.len, nulls.len());
+
+ let decoder = match self.data.take() {
+ Some(data) => self
+ .packed_decoder
+ .insert(PackedDecoder::new(self.encoding, data)),
+ None => self
+ .packed_decoder
+ .as_mut()
+ .expect("consistent null_mask_only"),
+ };
+
+ decoder.read(nulls, range.end - range.start)
+ }
+ }
+ }
+}
+
+struct PackedDecoder {
+ data: ByteBufferPtr,
+ data_offset: usize,
+ rle_left: usize,
+ rle_value: bool,
+ packed_count: usize,
+ packed_offset: usize,
+}
+
+impl PackedDecoder {
+ fn next_rle_block(&mut self) -> Result<()> {
+ let indicator_value = self.decode_header()?;
+ if indicator_value & 1 == 1 {
+ let len = (indicator_value >> 1) as usize;
+ self.packed_count = len * 8;
+ self.packed_offset = 0;
+ } else {
+ self.rle_left = (indicator_value >> 1) as usize;
+ let byte = *self.data.as_ref().get(self.data_offset).ok_or_else(||
{
+ ParquetError::EOF(
+ "unexpected end of file whilst decoding definition levels
rle value"
+ .into(),
+ )
+ })?;
+
+ self.data_offset += 1;
+ self.rle_value = byte != 0;
+ }
+ Ok(())
+ }
+
+ /// Decodes a VLQ encoded little endian integer and returns it
+ fn decode_header(&mut self) -> Result<i64> {
+ let mut offset = 0;
+ let mut v: i64 = 0;
+ while offset < 10 {
+ let byte = *self
+ .data
+ .as_ref()
+ .get(self.data_offset + offset)
+ .ok_or_else(|| {
+ ParquetError::EOF(
+ "unexpected end of file whilst decoding definition
levels rle header"
+ .into(),
+ )
+ })?;
+
+ v |= ((byte & 0x7F) as i64) << (offset * 7);
+ offset += 1;
+ if byte & 0x80 == 0 {
+ self.data_offset += offset;
+ return Ok(v);
+ }
+ }
+ Err(general_err!("too many bytes for VLQ"))
+ }
+}
+
+impl PackedDecoder {
+ fn new(encoding: Encoding, data: ByteBufferPtr) -> Self {
+ match encoding {
+ Encoding::RLE => Self {
+ data,
+ data_offset: 0,
+ rle_left: 0,
+ rle_value: false,
+ packed_count: 0,
+ packed_offset: 0,
+ },
+ Encoding::BIT_PACKED => Self {
+ data_offset: 0,
+ rle_left: 0,
+ rle_value: false,
+ packed_count: data.len() * 8,
+ packed_offset: 0,
+ data,
+ },
+ _ => unreachable!("invalid level encoding: {}", encoding),
+ }
+ }
+
+ fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) ->
Result<usize> {
+ let mut read = 0;
+ while read != len {
+ if self.rle_left != 0 {
+ let to_read = self.rle_left.min(len - read);
+ buffer.append_n(to_read, self.rle_value);
+ self.rle_left -= to_read;
+ read += to_read;
+ } else if self.packed_count != self.packed_offset {
+ let to_read = (self.packed_count - self.packed_offset).min(len
- read);
+ let offset = self.data_offset * 8 + self.packed_offset;
+ buffer.append_packed_range(offset..offset + to_read,
self.data.as_ref());
Review comment:
> how often does this case happen for def levels in practice
This depends on what you mean by "this" :sweat_smile:
The major change in this PR is not decoding definition levels for columns
without nested nullability - i.e. max_def_level == 1, and just decoding
directly to the null bitmask. This is very common, with almost all parquet data
I've come across being flat.
_My personal experience with projects trying to use nested data in parquet
is eventually it becomes too much of a pain due to the patchy ecosystem
support, and the schema ends up just getting flattened_
Previously the code would allocate i16 buffers, populate them with the
decoded data, and then deduce a null bitmask from these i16 buffers. This code
will now decode directly to the null bitmask in the event of max_def_level ==
1, avoiding allocations along with the costs associated with decode and bitmask
reconstruction. Additionally it happens that by decoding directly we can
exploit the inherent properties of the hybrid encoding to improve performance -
with the packed representation already being a bitmask, and the RLE
representation allowing operations on runs of bits.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]