This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 72dada6d6 Support skip_values in ByteArrayColumnValueDecoder (#2076)
72dada6d6 is described below
commit 72dada6d656e88f0d07efa2cc32ea9670d061fc2
Author: Yang Jiang <[email protected]>
AuthorDate: Sat Jul 16 18:59:35 2022 +0800
Support skip_values in ByteArrayColumnValueDecoder (#2076)
* Support skip_values in ByteArrayColumnValueDecoder
* add test with nulls and api align with read
* Update parquet/src/arrow/array_reader/byte_array.rs
Co-authored-by: Raphael Taylor-Davies
<[email protected]>
Co-authored-by: Raphael Taylor-Davies
<[email protected]>
---
parquet/src/arrow/array_reader/byte_array.rs | 187 ++++++++++++++++++++++++++-
1 file changed, 184 insertions(+), 3 deletions(-)
diff --git a/parquet/src/arrow/array_reader/byte_array.rs
b/parquet/src/arrow/array_reader/byte_array.rs
index b762236c4..853bc2b18 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -215,8 +215,13 @@ impl<I: OffsetSizeTrait + ScalarValue> ColumnValueDecoder
decoder.read(out, range.end - range.start, self.dict.as_ref())
}
- fn skip_values(&mut self, _num_values: usize) -> Result<usize> {
- Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+ fn skip_values(&mut self, num_values: usize) -> Result<usize> {
+ let decoder = self
+ .decoder
+ .as_mut()
+ .ok_or_else(|| general_err!("no decoder set"))?;
+
+ decoder.skip(num_values, self.dict.as_ref())
}
}
@@ -284,6 +289,25 @@ impl ByteArrayDecoder {
ByteArrayDecoder::DeltaByteArray(d) => d.read(out, len),
}
}
+
+ /// Skip `len` values
+ pub fn skip<I: OffsetSizeTrait + ScalarValue>(
+ &mut self,
+ len: usize,
+ dict: Option<&OffsetBuffer<I>>,
+ ) -> Result<usize> {
+ match self {
+ ByteArrayDecoder::Plain(d) => d.skip(len),
+ ByteArrayDecoder::Dictionary(d) => {
+ let dict = dict
+ .ok_or_else(|| general_err!("missing dictionary page for
column"))?;
+
+ d.skip(dict, len)
+ }
+ ByteArrayDecoder::DeltaLength(d) => d.skip(len),
+ ByteArrayDecoder::DeltaByteArray(d) => d.skip(len),
+ }
+ }
}
/// Decoder from [`Encoding::PLAIN`] data to [`OffsetBuffer`]
@@ -363,6 +387,27 @@ impl ByteArrayDecoderPlain {
}
Ok(to_read)
}
+
+ pub fn skip(
+ &mut self,
+ to_skip: usize,
+ ) -> Result<usize> {
+ let to_skip = to_skip.min( self.max_remaining_values);
+ let mut skip = 0;
+ let buf = self.buf.as_ref();
+
+ while self.offset < self.buf.len() && skip != to_skip {
+ if self.offset + 4 > buf.len() {
+ return Err(ParquetError::EOF("eof decoding byte
array".into()));
+ }
+ let len_bytes: [u8; 4] =
+ buf[self.offset..self.offset + 4].try_into().unwrap();
+ let len = u32::from_le_bytes(len_bytes) as usize;
+ skip += 1;
+ self.offset = self.offset + 4 + len;
+ }
+ Ok(skip)
+ }
}
/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`OffsetBuffer`]
@@ -431,6 +476,21 @@ impl ByteArrayDecoderDeltaLength {
}
Ok(to_read)
}
+
+ fn skip(
+ &mut self,
+ to_skip: usize,
+ ) -> Result<usize> {
+ let remain_values = self.lengths.len() - self.length_offset;
+ let to_skip = remain_values.min(to_skip);
+
+ let src_lengths = &self.lengths[self.length_offset..self.length_offset
+ to_skip];
+ let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
+
+ self.data_offset += total_bytes;
+ self.length_offset += to_skip;
+ Ok(to_skip)
+ }
}
/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`OffsetBuffer`]
@@ -521,6 +581,37 @@ impl ByteArrayDecoderDelta {
}
Ok(to_read)
}
+
+ fn skip(
+ &mut self,
+ to_skip: usize,
+ ) -> Result<usize> {
+ let to_skip = to_skip.min(self.prefix_lengths.len() -
self.length_offset);
+
+ let length_range = self.length_offset..self.length_offset + to_skip;
+ let iter = self.prefix_lengths[length_range.clone()]
+ .iter()
+ .zip(&self.suffix_lengths[length_range]);
+
+ let data = self.data.as_ref();
+
+ for (prefix_length, suffix_length) in iter {
+ let prefix_length = *prefix_length as usize;
+ let suffix_length = *suffix_length as usize;
+
+ if self.data_offset + suffix_length > self.data.len() {
+ return Err(ParquetError::EOF("eof decoding byte
array".into()));
+ }
+
+ self.last_value.truncate(prefix_length);
+ self.last_value.extend_from_slice(
+ &data[self.data_offset..self.data_offset + suffix_length],
+ );
+ self.data_offset += suffix_length;
+ }
+ self.length_offset += to_skip;
+ Ok(to_skip)
+ }
}
/// Decoder from [`Encoding::RLE_DICTIONARY`] to [`OffsetBuffer`]
@@ -589,6 +680,38 @@ impl ByteArrayDecoderDictionary {
}
Ok(values_read)
}
+
+ fn skip<I: OffsetSizeTrait + ScalarValue>(
+ &mut self,
+ dict: &OffsetBuffer<I>,
+ to_skip: usize,
+ ) -> Result<usize> {
+ let to_skip = to_skip.min(self.max_remaining_values);
+ // All data must be NULL
+ if dict.is_empty() {
+ return Ok(0);
+ }
+
+ let mut values_skip = 0;
+ while values_skip < to_skip {
+ if self.index_offset == self.index_buf_len {
+ let read = self.decoder.get_batch(self.index_buf.as_mut())?;
+ if read == 0 {
+ break;
+ }
+ self.index_buf_len = read;
+ self.index_offset = 0;
+ }
+
+ let skip = (to_skip - values_skip)
+ .min(self.index_buf_len - self.index_offset);
+
+ self.index_offset += skip;
+ self.max_remaining_values -= skip;
+ values_skip += skip;
+ }
+ Ok(values_skip)
+ }
}
#[cfg(test)]
@@ -653,6 +776,57 @@ mod tests {
}
}
+ #[test]
+ fn test_byte_array_decoder_skip() {
+ let (pages, encoded_dictionary) =
+ byte_array_all_encodings(vec!["hello", "world", "a", "b"]);
+
+ let column_desc = utf8_column();
+ let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);
+
+ decoder
+ .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
+ .unwrap();
+
+ for (encoding, page) in pages {
+ let mut output = OffsetBuffer::<i32>::default();
+ decoder.set_data(encoding, page, 4, Some(4)).unwrap();
+
+ assert_eq!(decoder.read(&mut output, 0..1).unwrap(), 1);
+
+ assert_eq!(output.values.as_slice(), "hello".as_bytes());
+ assert_eq!(output.offsets.as_slice(), &[0, 5]);
+
+ assert_eq!(decoder.skip_values(1).unwrap(), 1);
+ assert_eq!(decoder.skip_values(1).unwrap(), 1);
+
+ assert_eq!(decoder.read(&mut output, 1..2).unwrap(), 1);
+ assert_eq!(output.values.as_slice(), "hellob".as_bytes());
+ assert_eq!(output.offsets.as_slice(), &[0, 5, 6]);
+
+ assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0);
+
+ let valid = vec![false, false, true, true, false, false];
+ let valid_buffer = Buffer::from_iter(valid.iter().cloned());
+
+ output.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice());
+ let array = output.into_array(Some(valid_buffer), ArrowType::Utf8);
+ let strings =
array.as_any().downcast_ref::<StringArray>().unwrap();
+
+ assert_eq!(
+ strings.iter().collect::<Vec<_>>(),
+ vec![
+ None,
+ None,
+ Some("hello"),
+ Some("b"),
+ None,
+ None,
+ ]
+ );
+ }
+ }
+
#[test]
fn test_byte_array_decoder_nulls() {
let (pages, encoded_dictionary) =
byte_array_all_encodings(Vec::<&str>::new());
@@ -664,10 +838,17 @@ mod tests {
.set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
.unwrap();
- for (encoding, page) in pages {
+ // test nulls read
+ for (encoding, page) in pages.clone() {
let mut output = OffsetBuffer::<i32>::default();
decoder.set_data(encoding, page, 4, None).unwrap();
assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 0);
}
+
+ // test nulls skip
+ for (encoding, page) in pages {
+ decoder.set_data(encoding, page, 4, None).unwrap();
+ assert_eq!(decoder.skip_values(1024).unwrap(), 0);
+ }
}
}