tustvold commented on code in PR #4376:
URL: https://github.com/apache/arrow-rs/pull/4376#discussion_r1223324759
##########
parquet/src/column/reader/decoder.rs:
##########
@@ -270,182 +297,227 @@ impl<T: DataType> ColumnValueDecoder for
ColumnValueDecoderImpl<T> {
const SKIP_BUFFER_SIZE: usize = 1024;
-/// An implementation of [`ColumnLevelDecoder`] for `[i16]`
-pub struct ColumnLevelDecoderImpl {
- decoder: Option<LevelDecoderInner>,
- /// Temporary buffer populated when skipping values
- buffer: Vec<i16>,
- bit_width: u8,
+enum LevelDecoder {
+ Packed(BitReader, u8),
+ Rle(RleDecoder),
}
-impl ColumnLevelDecoderImpl {
- pub fn new(max_level: i16) -> Self {
- let bit_width = num_required_bits(max_level as u64);
- Self {
- decoder: None,
- buffer: vec![],
- bit_width,
+impl LevelDecoder {
+ fn new(encoding: Encoding, data: ByteBufferPtr, bit_width: u8) -> Self {
+ match encoding {
+ Encoding::RLE => {
+ let mut decoder = RleDecoder::new(bit_width);
+ decoder.set_data(data);
+ Self::Rle(decoder)
+ }
+ Encoding::BIT_PACKED => Self::Packed(BitReader::new(data),
bit_width),
+ _ => unreachable!("invalid level encoding: {}", encoding),
}
}
- /// Drops the first `len` values from the internal buffer
- fn split_off_buffer(&mut self, len: usize) {
- match self.buffer.len() == len {
- true => self.buffer.clear(),
- false => {
- // Move to_read elements to end of slice
- self.buffer.rotate_left(len);
- // Truncate buffer
- self.buffer.truncate(self.buffer.len() - len);
+ fn read(&mut self, out: &mut [i16]) -> Result<usize> {
+ match self {
+ Self::Packed(reader, bit_width) => {
+ Ok(reader.get_batch::<i16>(out, *bit_width as usize))
}
+ Self::Rle(reader) => Ok(reader.get_batch(out)?),
}
}
+}
- /// Reads up to `to_read` values to the internal buffer
- fn read_to_buffer(&mut self, to_read: usize) -> Result<()> {
- let mut buf = std::mem::take(&mut self.buffer);
-
- // Repopulate buffer
- buf.resize(to_read, 0);
- let actual = self.read(&mut buf, 0..to_read)?;
- buf.truncate(actual);
-
- self.buffer = buf;
- Ok(())
- }
+/// An implementation of [`DefinitionLevelDecoder`] for `[i16]`
+pub struct DefinitionLevelDecoderImpl {
+ decoder: Option<LevelDecoder>,
+ bit_width: u8,
}
-enum LevelDecoderInner {
- Packed(BitReader, u8),
- Rle(RleDecoder),
+impl DefinitionLevelDecoderImpl {
+ pub fn new(max_level: i16) -> Self {
+ let bit_width = num_required_bits(max_level as u64);
+ Self {
+ decoder: None,
+ bit_width,
+ }
+ }
}
-impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
+impl ColumnLevelDecoder for DefinitionLevelDecoderImpl {
type Slice = [i16];
fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
- self.buffer.clear();
- match encoding {
- Encoding::RLE => {
- let mut decoder = RleDecoder::new(self.bit_width);
- decoder.set_data(data);
- self.decoder = Some(LevelDecoderInner::Rle(decoder));
- }
- Encoding::BIT_PACKED => {
- self.decoder = Some(LevelDecoderInner::Packed(
- BitReader::new(data),
- self.bit_width,
- ));
- }
- _ => unreachable!("invalid level encoding: {}", encoding),
- }
+ self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width))
}
+}
- fn read(&mut self, out: &mut Self::Slice, mut range: Range<usize>) ->
Result<usize> {
- let read_from_buffer = match self.buffer.is_empty() {
- true => 0,
- false => {
- let read_from_buffer = self.buffer.len().min(range.end -
range.start);
- out[range.start..range.start + read_from_buffer]
- .copy_from_slice(&self.buffer[0..read_from_buffer]);
- self.split_off_buffer(read_from_buffer);
- read_from_buffer
- }
- };
- range.start += read_from_buffer;
-
- match self.decoder.as_mut().unwrap() {
- LevelDecoderInner::Packed(reader, bit_width) => Ok(read_from_buffer
- + reader.get_batch::<i16>(&mut out[range], *bit_width as
usize)),
- LevelDecoderInner::Rle(reader) => {
- Ok(read_from_buffer + reader.get_batch(&mut out[range])?)
- }
- }
+impl DefinitionLevelDecoder for DefinitionLevelDecoderImpl {
+ fn read_def_levels(
+ &mut self,
+ out: &mut Self::Slice,
+ range: Range<usize>,
+ ) -> Result<usize> {
+ self.decoder.as_mut().unwrap().read(&mut out[range])
}
-}
-impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
fn skip_def_levels(
&mut self,
num_levels: usize,
max_def_level: i16,
) -> Result<(usize, usize)> {
let mut level_skip = 0;
let mut value_skip = 0;
+ let mut buf: Vec<i16> = vec![];
while level_skip < num_levels {
let remaining_levels = num_levels - level_skip;
- if self.buffer.is_empty() {
- // Only read number of needed values
- self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
- if self.buffer.is_empty() {
- // Reached end of page
- break;
- }
+ let to_read = remaining_levels.min(SKIP_BUFFER_SIZE);
+ buf.resize(to_read, 0);
+ let read = self.read_def_levels(&mut buf, 0..to_read)?;
+ if read == 0 {
+ // Reached end of page
+ break;
}
- let to_read = self.buffer.len().min(remaining_levels);
- level_skip += to_read;
- value_skip += self.buffer[..to_read]
- .iter()
- .filter(|x| **x == max_def_level)
- .count();
-
- self.split_off_buffer(to_read)
+ level_skip += read;
+ value_skip += buf[..read].iter().filter(|x| **x ==
max_def_level).count();
}
Ok((value_skip, level_skip))
}
}
-impl RepetitionLevelDecoder for ColumnLevelDecoderImpl {
- fn skip_rep_levels(
+pub(crate) const REPETITION_LEVELS_BATCH_SIZE: usize = 1024;
+
+/// An implementation of [`RepetitionLevelDecoder`] for `[i16]`
+pub struct RepetitionLevelDecoderImpl {
+ decoder: Option<LevelDecoder>,
+ bit_width: u8,
+ buffer: Box<[i16; REPETITION_LEVELS_BATCH_SIZE]>,
+ buffer_len: usize,
+ buffer_offset: usize,
+ has_partial: bool,
+}
+
+impl RepetitionLevelDecoderImpl {
+ pub fn new(max_level: i16) -> Self {
+ let bit_width = num_required_bits(max_level as u64);
+ Self {
+ decoder: None,
+ bit_width,
+ buffer: Box::new([0; REPETITION_LEVELS_BATCH_SIZE]),
+ buffer_offset: 0,
+ buffer_len: 0,
+ has_partial: false,
+ }
+ }
+
+ fn fill_buf(&mut self) -> Result<()> {
+ let read = self.decoder.as_mut().unwrap().read(self.buffer.as_mut())?;
+ self.buffer_offset = 0;
+ self.buffer_len = read;
+ Ok(())
+ }
+
+ /// Inspects the buffered repetition levels in the range
`self.buffer_offset..self.buffer_len`
+ /// and returns the number of "complete" records along with the
corresponding number of values
+ ///
+ /// A "complete" record is one where the buffer contains a subsequent
repetition level of 0
+ fn count_records(
&mut self,
- num_records: usize,
+ records_to_read: usize,
num_levels: usize,
- ) -> Result<(usize, usize)> {
- let mut level_skip = 0;
- let mut record_skip = 0;
+ ) -> (bool, usize, usize) {
+ let mut records_read = 0;
- while level_skip < num_levels {
- let remaining_levels = num_levels - level_skip;
+ let levels = num_levels.min(self.buffer_len - self.buffer_offset);
+ let buf = self.buffer.iter().skip(self.buffer_offset);
+ for (idx, item) in buf.take(levels).enumerate() {
+ if *item == 0 && (idx != 0 || self.has_partial) {
+ records_read += 1;
- if self.buffer.is_empty() {
- // Only read number of needed values
- self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
- if self.buffer.is_empty() {
- // Reached end of page
- break;
+ if records_read == records_to_read {
+ return (false, records_read, idx);
}
}
+ }
+ // Either ran out of space in `num_levels` or data in `self.buffer`
+ (true, records_read, levels)
+ }
+}
+
+impl ColumnLevelDecoder for RepetitionLevelDecoderImpl {
+ type Slice = [i16];
- let max_skip = self.buffer.len().min(remaining_levels);
+ fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
+ self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width));
+ self.buffer_len = 0;
+ self.buffer_offset = 0;
+ }
+}
- let mut to_skip = 0;
- while to_skip < max_skip && record_skip != num_records {
- if self.buffer[to_skip] == 0 {
- record_skip += 1;
+impl RepetitionLevelDecoder for RepetitionLevelDecoderImpl {
+ fn read_rep_levels(
+ &mut self,
+ out: &mut Self::Slice,
+ range: Range<usize>,
+ max_records: usize,
+ ) -> Result<(usize, usize)> {
+ let output = &mut out[range];
+ let max_levels = output.len();
+ let mut total_records_read = 0;
+ let mut total_levels_read = 0;
+
+ while total_records_read < max_records && total_levels_read <
max_levels {
+ if self.buffer_len == self.buffer_offset {
+ self.fill_buf()?;
+ if self.buffer_len == 0 {
+ break;
}
- to_skip += 1;
}
- // Find end of record
- while to_skip < max_skip && self.buffer[to_skip] != 0 {
- to_skip += 1;
- }
+ let (partial, records_read, levels_read) = self.count_records(
+ max_records - total_records_read,
+ max_levels - total_levels_read,
+ );
- level_skip += to_skip;
- if to_skip == self.buffer.len() {
- // Need to to read more values
- self.buffer.clear();
- continue;
- }
+ output[total_levels_read..total_levels_read +
levels_read].copy_from_slice(
+ &self.buffer[self.buffer_offset..self.buffer_offset +
levels_read],
+ );
- self.split_off_buffer(to_skip);
- break;
+ total_levels_read += levels_read;
+ total_records_read += records_read;
+ self.buffer_offset += levels_read;
+ self.has_partial = partial;
}
+ Ok((total_records_read, total_levels_read))
+ }
+
+ fn skip_rep_levels(
Review Comment:
I think the docs on the trait definition are sufficient, admittedly less
visible in a PR review without code completion I will concede
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]