This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new c58554453 Support skip_values in ColumnValueDecoderImpl (#2089)
c58554453 is described below
commit c58554453bec8f4a78d503e79576b85108b210b5
Author: Dan Harris <[email protected]>
AuthorDate: Sat Jul 16 17:14:45 2022 -0400
Support skip_values in ColumnValueDecoderImpl (#2089)
* Add skip method to ParquetValueType
* Tests
* PR comments
* Update parquet/src/encodings/decoding.rs
Co-authored-by: Yang Jiang <[email protected]>
* PR comments
Co-authored-by: Yang Jiang <[email protected]>
---
parquet/src/column/reader/decoder.rs | 13 +-
parquet/src/data_type.rs | 85 +++++++
parquet/src/encodings/decoding.rs | 422 ++++++++++++++++++++++++++++++++++-
parquet/src/encodings/rle.rs | 131 +++++++++++
parquet/src/util/bit_util.rs | 111 +++++++++
5 files changed, 752 insertions(+), 10 deletions(-)
diff --git a/parquet/src/column/reader/decoder.rs
b/parquet/src/column/reader/decoder.rs
index 6fefdca23..53f7e2943 100644
--- a/parquet/src/column/reader/decoder.rs
+++ b/parquet/src/column/reader/decoder.rs
@@ -250,8 +250,17 @@ impl<T: DataType> ColumnValueDecoder for
ColumnValueDecoderImpl<T> {
current_decoder.get(&mut out[range])
}
- fn skip_values(&mut self, _num_values: usize) -> Result<usize> {
- Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+ fn skip_values(&mut self, num_values: usize) -> Result<usize> {
+ let encoding = self
+ .current_encoding
+ .expect("current_encoding should be set");
+
+ let current_decoder = self
+ .decoders
+ .get_mut(&encoding)
+ .unwrap_or_else(|| panic!("decoder for encoding {} should be set",
encoding));
+
+ current_decoder.skip(num_values)
}
}
diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs
index 86ccefbd8..7b6fb04a7 100644
--- a/parquet/src/data_type.rs
+++ b/parquet/src/data_type.rs
@@ -613,6 +613,8 @@ pub(crate) mod private {
decoder: &mut PlainDecoderDetails,
) -> Result<usize>;
+ fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) ->
Result<usize>;
+
/// Return the encoded size for a type
fn dict_encoding_size(&self) -> (usize, usize) {
(std::mem::size_of::<Self>(), 1)
@@ -690,6 +692,14 @@ pub(crate) mod private {
Ok(values_read)
}
+ fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) ->
Result<usize> {
+ let bit_reader = decoder.bit_reader.as_mut().unwrap();
+ let num_values = std::cmp::min(num_values, decoder.num_values);
+ let values_read = bit_reader.skip(num_values, 1);
+ decoder.num_values -= values_read;
+ Ok(values_read)
+ }
+
#[inline]
fn as_i64(&self) -> Result<i64> {
Ok(*self as i64)
@@ -764,6 +774,23 @@ pub(crate) mod private {
Ok(num_values)
}
+ #[inline]
+ fn skip(decoder: &mut PlainDecoderDetails, num_values: usize)
-> Result<usize> {
+ let data = decoder.data.as_ref().expect("set_data should
have been called");
+ let num_values = num_values.min(decoder.num_values);
+ let bytes_left = data.len() - decoder.start;
+ let bytes_to_skip = std::mem::size_of::<Self>() *
num_values;
+
+ if bytes_left < bytes_to_skip {
+ return Err(eof_err!("Not enough bytes to skip"));
+ }
+
+ decoder.start += bytes_to_skip;
+ decoder.num_values -= num_values;
+
+ Ok(num_values)
+ }
+
#[inline]
fn as_i64(&$self) -> Result<i64> {
$as_i64
@@ -853,6 +880,24 @@ pub(crate) mod private {
Ok(num_values)
}
+ fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) ->
Result<usize> {
+ let data = decoder
+ .data
+ .as_ref()
+ .expect("set_data should have been called");
+ let num_values = std::cmp::min(num_values, decoder.num_values);
+ let bytes_left = data.len() - decoder.start;
+ let bytes_to_skip = 12 * num_values;
+
+ if bytes_left < bytes_to_skip {
+ return Err(eof_err!("Not enough bytes to skip"));
+ }
+ decoder.start += bytes_to_skip;
+ decoder.num_values -= num_values;
+
+ Ok(num_values)
+ }
+
#[inline]
fn as_any(&self) -> &dyn std::any::Any {
self
@@ -936,6 +981,24 @@ pub(crate) mod private {
Ok(num_values)
}
+ fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) ->
Result<usize> {
+ let data = decoder
+ .data
+ .as_mut()
+ .expect("set_data should have been called");
+ let num_values = num_values.min(decoder.num_values);
+
+ for _ in 0..num_values {
+ let len: usize =
+ read_num_bytes!(u32, 4,
data.start_from(decoder.start).as_ref())
+ as usize;
+ decoder.start += std::mem::size_of::<u32>() + len;
+ }
+ decoder.num_values -= num_values;
+
+ Ok(num_values)
+ }
+
#[inline]
fn dict_encoding_size(&self) -> (usize, usize) {
(std::mem::size_of::<u32>(), self.len())
@@ -1005,6 +1068,28 @@ pub(crate) mod private {
Ok(num_values)
}
+ fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) ->
Result<usize> {
+ assert!(decoder.type_length > 0);
+
+ let data = decoder
+ .data
+ .as_mut()
+ .expect("set_data should have been called");
+ let num_values = std::cmp::min(num_values, decoder.num_values);
+ for _ in 0..num_values {
+ let len = decoder.type_length as usize;
+
+ if data.len() < decoder.start + len {
+ return Err(eof_err!("Not enough bytes to skip"));
+ }
+
+ decoder.start += len;
+ }
+ decoder.num_values -= num_values;
+
+ Ok(num_values)
+ }
+
#[inline]
fn dict_encoding_size(&self) -> (usize, usize) {
(std::mem::size_of::<u32>(), self.len())
diff --git a/parquet/src/encodings/decoding.rs
b/parquet/src/encodings/decoding.rs
index b33514aaf..58aa592d1 100644
--- a/parquet/src/encodings/decoding.rs
+++ b/parquet/src/encodings/decoding.rs
@@ -206,6 +206,9 @@ pub trait Decoder<T: DataType>: Send {
/// Returns the encoding for this decoder.
fn encoding(&self) -> Encoding;
+
+ /// Skip the specified number of values in this decoder stream.
+ fn skip(&mut self, num_values: usize) -> Result<usize>;
}
/// Gets a decoder for the column descriptor `descr` and encoding type
`encoding`.
@@ -291,6 +294,11 @@ impl<T: DataType> Decoder<T> for PlainDecoder<T> {
fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
T::T::decode(buffer, &mut self.inner)
}
+
+ #[inline]
+ fn skip(&mut self, num_values: usize) -> Result<usize> {
+ T::T::skip(&mut self.inner, num_values)
+ }
}
// ----------------------------------------------------------------------
@@ -363,6 +371,15 @@ impl<T: DataType> Decoder<T> for DictDecoder<T> {
fn encoding(&self) -> Encoding {
Encoding::RLE_DICTIONARY
}
+
+ fn skip(&mut self, num_values: usize) -> Result<usize> {
+ assert!(self.rle_decoder.is_some());
+ assert!(self.has_dictionary, "Must call set_dict() first!");
+
+ let rle = self.rle_decoder.as_mut().unwrap();
+ let num_values = cmp::min(num_values, self.num_values);
+ rle.skip(num_values)
+ }
}
// ----------------------------------------------------------------------
@@ -419,6 +436,14 @@ impl<T: DataType> Decoder<T> for RleValueDecoder<T> {
self.values_left -= values_read;
Ok(values_read)
}
+
+ #[inline]
+ fn skip(&mut self, num_values: usize) -> Result<usize> {
+ let num_values = cmp::min(num_values, self.values_left);
+ let values_skipped = self.decoder.skip(num_values)?;
+ self.values_left -= values_skipped;
+ Ok(values_skipped)
+ }
}
// ----------------------------------------------------------------------
@@ -681,6 +706,8 @@ where
Ok(to_read)
}
+
+
fn values_left(&self) -> usize {
self.values_left
}
@@ -688,6 +715,11 @@ where
fn encoding(&self) -> Encoding {
Encoding::DELTA_BINARY_PACKED
}
+
+ fn skip(&mut self, num_values: usize) -> Result<usize> {
+ let mut buffer = vec![T::T::default(); num_values];
+ self.get(&mut buffer)
+ }
}
// ----------------------------------------------------------------------
@@ -791,6 +823,25 @@ impl<T: DataType> Decoder<T> for
DeltaLengthByteArrayDecoder<T> {
fn encoding(&self) -> Encoding {
Encoding::DELTA_LENGTH_BYTE_ARRAY
}
+
+ fn skip(&mut self, num_values: usize) -> Result<usize> {
+ match T::get_physical_type() {
+ Type::BYTE_ARRAY => {
+ let num_values = cmp::min(num_values, self.num_values);
+
+ let next_offset: i32 =
self.lengths[self.current_idx..self.current_idx + num_values].iter().sum();
+
+ self.current_idx += num_values;
+ self.offset += next_offset as usize;
+
+ self.num_values -= num_values;
+ Ok(num_values)
+ }
+ other_type => Err(general_err!(
+ "DeltaLengthByteArrayDecoder not support {}, only support byte
array", other_type
+ )),
+ }
+ }
}
// ----------------------------------------------------------------------
@@ -922,6 +973,11 @@ impl<T: DataType> Decoder<T> for DeltaByteArrayDecoder<T> {
fn encoding(&self) -> Encoding {
Encoding::DELTA_BYTE_ARRAY
}
+
+ fn skip(&mut self, num_values: usize) -> Result<usize> {
+ let mut buffer = vec![T::T::default(); num_values];
+ self.get(&mut buffer)
+ }
}
#[cfg(test)]
@@ -995,6 +1051,32 @@ mod tests {
);
}
+ #[test]
+ fn test_plain_skip_int32() {
+ let data = vec![42, 18, 52];
+ let data_bytes = Int32Type::to_byte_array(&data[..]);
+ test_plain_skip::<Int32Type>(
+ ByteBufferPtr::new(data_bytes),
+ 3,
+ 1,
+ -1,
+ &data[1..],
+ );
+ }
+
+ #[test]
+ fn test_plain_skip_all_int32() {
+ let data = vec![42, 18, 52];
+ let data_bytes = Int32Type::to_byte_array(&data[..]);
+ test_plain_skip::<Int32Type>(
+ ByteBufferPtr::new(data_bytes),
+ 3,
+ 5,
+ -1,
+ &[],
+ );
+ }
+
#[test]
fn test_plain_decode_int32_spaced() {
let data = [42, 18, 52];
@@ -1014,6 +1096,7 @@ mod tests {
);
}
+
#[test]
fn test_plain_decode_int64() {
let data = vec![42, 18, 52];
@@ -1028,6 +1111,33 @@ mod tests {
);
}
+ #[test]
+ fn test_plain_skip_int64() {
+ let data = vec![42, 18, 52];
+ let data_bytes = Int64Type::to_byte_array(&data[..]);
+ test_plain_skip::<Int64Type>(
+ ByteBufferPtr::new(data_bytes),
+ 3,
+ 2,
+ -1,
+ &data[2..],
+ );
+ }
+
+ #[test]
+ fn test_plain_skip_all_int64() {
+ let data = vec![42, 18, 52];
+ let data_bytes = Int64Type::to_byte_array(&data[..]);
+ test_plain_skip::<Int64Type>(
+ ByteBufferPtr::new(data_bytes),
+ 3,
+ 3,
+ -1,
+ &[],
+ );
+ }
+
+
#[test]
fn test_plain_decode_float() {
let data = vec![3.14, 2.414, 12.51];
@@ -1042,6 +1152,58 @@ mod tests {
);
}
+ #[test]
+ fn test_plain_skip_float() {
+ let data = vec![3.14, 2.414, 12.51];
+ let data_bytes = FloatType::to_byte_array(&data[..]);
+ test_plain_skip::<FloatType>(
+ ByteBufferPtr::new(data_bytes),
+ 3,
+ 1,
+ -1,
+ &data[1..],
+ );
+ }
+
+ #[test]
+ fn test_plain_skip_all_float() {
+ let data = vec![3.14, 2.414, 12.51];
+ let data_bytes = FloatType::to_byte_array(&data[..]);
+ test_plain_skip::<FloatType>(
+ ByteBufferPtr::new(data_bytes),
+ 3,
+ 4,
+ -1,
+ &[],
+ );
+ }
+
+ #[test]
+ fn test_plain_skip_double() {
+ let data = vec![3.14f64, 2.414f64, 12.51f64];
+ let data_bytes = DoubleType::to_byte_array(&data[..]);
+ test_plain_skip::<DoubleType>(
+ ByteBufferPtr::new(data_bytes),
+ 3,
+ 1,
+ -1,
+ &data[1..],
+ );
+ }
+
+ #[test]
+ fn test_plain_skip_all_double() {
+ let data = vec![3.14f64, 2.414f64, 12.51f64];
+ let data_bytes = DoubleType::to_byte_array(&data[..]);
+ test_plain_skip::<DoubleType>(
+ ByteBufferPtr::new(data_bytes),
+ 3,
+ 5,
+ -1,
+ &[],
+ );
+ }
+
#[test]
fn test_plain_decode_double() {
let data = vec![3.14f64, 2.414f64, 12.51f64];
@@ -1074,6 +1236,40 @@ mod tests {
);
}
+ #[test]
+ fn test_plain_skip_int96() {
+ let mut data = vec![Int96::new(); 4];
+ data[0].set_data(11, 22, 33);
+ data[1].set_data(44, 55, 66);
+ data[2].set_data(10, 20, 30);
+ data[3].set_data(40, 50, 60);
+ let data_bytes = Int96Type::to_byte_array(&data[..]);
+ test_plain_skip::<Int96Type>(
+ ByteBufferPtr::new(data_bytes),
+ 4,
+ 2,
+ -1,
+ &data[2..],
+ );
+ }
+
+ #[test]
+ fn test_plain_skip_all_int96() {
+ let mut data = vec![Int96::new(); 4];
+ data[0].set_data(11, 22, 33);
+ data[1].set_data(44, 55, 66);
+ data[2].set_data(10, 20, 30);
+ data[3].set_data(40, 50, 60);
+ let data_bytes = Int96Type::to_byte_array(&data[..]);
+ test_plain_skip::<Int96Type>(
+ ByteBufferPtr::new(data_bytes),
+ 4,
+ 8,
+ -1,
+ &[],
+ );
+ }
+
#[test]
fn test_plain_decode_bool() {
let data = vec![
@@ -1090,6 +1286,37 @@ mod tests {
);
}
+ #[test]
+ fn test_plain_skip_bool() {
+ let data = vec![
+ false, true, false, false, true, false, true, true, false, true,
+ ];
+ let data_bytes = BoolType::to_byte_array(&data[..]);
+ test_plain_skip::<BoolType>(
+ ByteBufferPtr::new(data_bytes),
+ 10,
+ 5,
+ -1,
+ &data[5..],
+ );
+ }
+
+ #[test]
+ fn test_plain_skip_all_bool() {
+ let data = vec![
+ false, true, false, false, true, false, true, true, false, true,
+ ];
+ let data_bytes = BoolType::to_byte_array(&data[..]);
+ test_plain_skip::<BoolType>(
+ ByteBufferPtr::new(data_bytes),
+ 10,
+ 20,
+ -1,
+ &[],
+ );
+ }
+
+
#[test]
fn test_plain_decode_byte_array() {
let mut data = vec![ByteArray::new(); 2];
@@ -1106,6 +1333,36 @@ mod tests {
);
}
+ #[test]
+ fn test_plain_skip_byte_array() {
+ let mut data = vec![ByteArray::new(); 2];
+
data[0].set_data(ByteBufferPtr::new(String::from("hello").into_bytes()));
+
data[1].set_data(ByteBufferPtr::new(String::from("parquet").into_bytes()));
+ let data_bytes = ByteArrayType::to_byte_array(&data[..]);
+ test_plain_skip::<ByteArrayType>(
+ ByteBufferPtr::new(data_bytes),
+ 2,
+ 1,
+ -1,
+ &data[1..],
+ );
+ }
+
+ #[test]
+ fn test_plain_skip_all_byte_array() {
+ let mut data = vec![ByteArray::new(); 2];
+
data[0].set_data(ByteBufferPtr::new(String::from("hello").into_bytes()));
+
data[1].set_data(ByteBufferPtr::new(String::from("parquet").into_bytes()));
+ let data_bytes = ByteArrayType::to_byte_array(&data[..]);
+ test_plain_skip::<ByteArrayType>(
+ ByteBufferPtr::new(data_bytes),
+ 2,
+ 2,
+ -1,
+ &[],
+ );
+ }
+
#[test]
fn test_plain_decode_fixed_len_byte_array() {
let mut data = vec![FixedLenByteArray::default(); 3];
@@ -1123,6 +1380,38 @@ mod tests {
);
}
+ #[test]
+ fn test_plain_skip_fixed_len_byte_array() {
+ let mut data = vec![FixedLenByteArray::default(); 3];
+
data[0].set_data(ByteBufferPtr::new(String::from("bird").into_bytes()));
+
data[1].set_data(ByteBufferPtr::new(String::from("come").into_bytes()));
+
data[2].set_data(ByteBufferPtr::new(String::from("flow").into_bytes()));
+ let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
+ test_plain_skip::<FixedLenByteArrayType>(
+ ByteBufferPtr::new(data_bytes),
+ 3,
+ 1,
+ 4,
+ &data[1..],
+ );
+ }
+
+ #[test]
+ fn test_plain_skip_all_fixed_len_byte_array() {
+ let mut data = vec![FixedLenByteArray::default(); 3];
+
data[0].set_data(ByteBufferPtr::new(String::from("bird").into_bytes()));
+
data[1].set_data(ByteBufferPtr::new(String::from("come").into_bytes()));
+
data[2].set_data(ByteBufferPtr::new(String::from("flow").into_bytes()));
+ let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
+ test_plain_skip::<FixedLenByteArrayType>(
+ ByteBufferPtr::new(data_bytes),
+ 3,
+ 6,
+ 4,
+ &[],
+ );
+ }
+
fn test_plain_decode<T: DataType>(
data: ByteBufferPtr,
num_values: usize,
@@ -1139,6 +1428,34 @@ mod tests {
assert_eq!(buffer, expected);
}
+ fn test_plain_skip<T: DataType>(
+ data: ByteBufferPtr,
+ num_values: usize,
+ skip: usize,
+ type_length: i32,
+ expected: &[T::T],
+ ) {
+ let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
+ let result = decoder.set_data(data, num_values);
+ assert!(result.is_ok());
+ let skipped = decoder.skip(skip).expect("skipping values");
+
+ if skip >= num_values {
+ assert_eq!(skipped, num_values);
+
+ let mut buffer = vec![T::T::default(); 1];
+ let remaining = decoder.get(&mut buffer).expect("getting remaining
values");
+ assert_eq!(remaining, 0);
+ } else {
+ assert_eq!(skipped, skip);
+ let mut buffer = vec![T::T::default(); num_values - skip];
+ let remaining = decoder.get(&mut buffer).expect("getting remaining
values");
+ assert_eq!(remaining, num_values - skip);
+ assert_eq!(decoder.values_left(), 0);
+ assert_eq!(buffer, expected);
+ }
+ }
+
fn test_plain_decode_spaced<T: DataType>(
data: ByteBufferPtr,
num_values: usize,
@@ -1217,12 +1534,29 @@ mod tests {
test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
}
+ #[test]
+ fn test_skip_delta_bit_packed_int32_repeat() {
+ let block_data = vec![
+ 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6,
7, 8, 1, 2,
+ 3, 4, 5, 6, 7, 8,
+ ];
+ test_skip::<Int32Type>(block_data.clone(),
Encoding::DELTA_BINARY_PACKED, 10);
+ test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
+ }
+
#[test]
fn test_delta_bit_packed_int32_uneven() {
let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
}
+ #[test]
+ fn test_skip_delta_bit_packed_int32_uneven() {
+ let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
+ test_skip::<Int32Type>(block_data.clone(),
Encoding::DELTA_BINARY_PACKED, 5);
+ test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
+ }
+
#[test]
fn test_delta_bit_packed_int32_same_values() {
let block_data = vec![
@@ -1238,21 +1572,55 @@ mod tests {
test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
}
+ #[test]
+ fn test_skip_delta_bit_packed_int32_same_values() {
+ let block_data = vec![
+ 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
127, 127,
+ 127,
+ ];
+ test_skip::<Int32Type>(block_data.clone(),
Encoding::DELTA_BINARY_PACKED, 5);
+ test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
+
+ let block_data = vec![
+ -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
-127, -127,
+ -127, -127, -127,
+ ];
+ test_skip::<Int32Type>(block_data.clone(),
Encoding::DELTA_BINARY_PACKED, 5);
+ test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
+
+ }
+
#[test]
fn test_delta_bit_packed_int32_min_max() {
let block_data = vec![
- i32::min_value(),
- i32::max_value(),
- i32::min_value(),
- i32::max_value(),
- i32::min_value(),
- i32::max_value(),
- i32::min_value(),
- i32::max_value(),
+ i32::MIN,
+ i32::MIN,
+ i32::MIN,
+ i32::MAX,
+ i32::MIN,
+ i32::MAX,
+ i32::MIN,
+ i32::MAX,
];
test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
}
+ #[test]
+ fn test_skip_delta_bit_packed_int32_min_max() {
+ let block_data = vec![
+ i32::MIN,
+ i32::MIN,
+ i32::MIN,
+ i32::MAX,
+ i32::MIN,
+ i32::MAX,
+ i32::MIN,
+ i32::MAX,
+ ];
+ test_skip::<Int32Type>(block_data.clone(),
Encoding::DELTA_BINARY_PACKED, 5);
+ test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
+ }
+
#[test]
fn test_delta_bit_packed_int32_multiple_blocks() {
// Test multiple 'put' calls on the same encoder
@@ -1493,6 +1861,44 @@ mod tests {
assert_eq!(result, expected);
}
+ fn test_skip<T: DataType>(data: Vec<T::T>, encoding: Encoding, skip:
usize) {
+ // Type length should not really matter for encode/decode test,
+ // otherwise change it based on type
+ let col_descr = create_test_col_desc_ptr(-1, T::get_physical_type());
+
+ // Encode data
+ let mut encoder =
+ get_encoder::<T>(col_descr.clone(), encoding).expect("get
encoder");
+
+ encoder.put(&data).expect("ok to encode");
+
+ let bytes = encoder.flush_buffer().expect("ok to flush buffer");
+
+ let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get
decoder");
+ decoder
+ .set_data(bytes, data.len())
+ .expect("ok to set data");
+
+ if skip >= data.len() {
+ let skipped = decoder.skip(skip).expect("ok to skip");
+ assert_eq!(skipped, data.len());
+
+ let skipped_again = decoder.skip(skip).expect("ok to skip again");
+ assert_eq!(skipped_again, 0);
+ } else {
+ let skipped = decoder.skip(skip).expect("ok to skip");
+ assert_eq!(skipped, skip);
+
+ let remaining = data.len() - skip;
+
+ let expected = &data[skip..];
+ let mut buffer = vec![T::T::default(); remaining];
+ let fetched = decoder.get(&mut buffer).expect("ok to decode");
+ assert_eq!(remaining,fetched);
+ assert_eq!(&buffer, expected);
+ }
+ }
+
fn create_and_check_decoder<T: DataType>(
encoding: Encoding,
err: Option<ParquetError>,
diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs
index 5f6f91a8b..808c8f0d4 100644
--- a/parquet/src/encodings/rle.rs
+++ b/parquet/src/encodings/rle.rs
@@ -434,6 +434,39 @@ impl RleDecoder {
Ok(values_read)
}
+ #[inline(never)]
+ pub fn skip(&mut self, num_values: usize) -> Result<usize> {
+ let mut values_skipped = 0;
+ while values_skipped < num_values {
+ if self.rle_left > 0 {
+ let num_values = cmp::min(num_values - values_skipped,
self.rle_left as usize);
+ self.rle_left -= num_values as u32;
+ values_skipped += num_values;
+ } else if self.bit_packed_left > 0 {
+ let mut num_values =
+ cmp::min(num_values - values_skipped, self.bit_packed_left
as usize);
+ let bit_reader =
+ self.bit_reader.as_mut().expect("bit_reader should be
set");
+
+ num_values = bit_reader.skip(
+ num_values,
+ self.bit_width as usize,
+ );
+ if num_values == 0 {
+ // Handle writers which truncate the final block
+ self.bit_packed_left = 0;
+ continue;
+ }
+ self.bit_packed_left -= num_values as u32;
+ values_skipped += num_values;
+ } else if !self.reload() {
+ break;
+ }
+ }
+
+ Ok(values_skipped)
+ }
+
#[inline(never)]
pub fn get_batch_with_dict<T>(
&mut self,
@@ -538,6 +571,23 @@ mod tests {
assert_eq!(buffer, expected);
}
+ #[test]
+ fn test_rle_skip_int32() {
+ // Test data: 0-7 with bit width 3
+ // 00000011 10001000 11000110 11111010
+ let data = ByteBufferPtr::new(vec![0x03, 0x88, 0xC6, 0xFA]);
+ let mut decoder: RleDecoder = RleDecoder::new(3);
+ decoder.set_data(data);
+ let expected = vec![2, 3, 4, 5, 6, 7];
+ let skipped = decoder.skip(2).expect("skipping values");
+ assert_eq!(skipped, 2);
+
+ let mut buffer = vec![0; 6];
+ let remaining = decoder.get_batch::<i32>(&mut buffer).expect("getting
remaining");
+ assert_eq!(remaining, 6);
+ assert_eq!(buffer, expected);
+ }
+
#[test]
fn test_rle_consume_flush_buffer() {
let data = vec![1, 1, 1, 2, 2, 3, 3, 3];
@@ -596,6 +646,48 @@ mod tests {
assert_eq!(buffer, expected);
}
+ #[test]
+ fn test_rle_skip_bool() {
+ // RLE test data: 50 1s followed by 50 0s
+ // 01100100 00000001 01100100 00000000
+ let data1 = ByteBufferPtr::new(vec![0x64, 0x01, 0x64, 0x00]);
+
+ // Bit-packing test data: alternating 1s and 0s, 100 total
+ // 100 / 8 = 13 groups
+ // 00011011 10101010 ... 00001010
+ let data2 = ByteBufferPtr::new(vec![
+ 0x1B, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA,
0xAA, 0xAA,
+ 0x0A,
+ ]);
+
+ let mut decoder: RleDecoder = RleDecoder::new(1);
+ decoder.set_data(data1);
+ let mut buffer = vec![true; 50];
+ let expected = vec![false; 50];
+
+ let skipped = decoder.skip(50).expect("skipping first 50");
+ assert_eq!(skipped, 50);
+ let remainder = decoder.get_batch::<bool>(&mut buffer).expect("getting
remaining 50");
+ assert_eq!(remainder, 50);
+ assert_eq!(buffer, expected);
+
+ decoder.set_data(data2);
+ let mut buffer = vec![false; 50];
+ let mut expected = vec![];
+ for i in 0..50 {
+ if i % 2 == 0 {
+ expected.push(false);
+ } else {
+ expected.push(true);
+ }
+ }
+ let skipped = decoder.skip(50).expect("skipping first 50");
+ assert_eq!(skipped, 50);
+ let remainder = decoder.get_batch::<bool>(&mut buffer).expect("getting
remaining 50");
+ assert_eq!(remainder, 50);
+ assert_eq!(buffer, expected);
+ }
+
#[test]
fn test_rle_decode_with_dict_int32() {
// Test RLE encoding: 3 0s followed by 4 1s followed by 5 2s
@@ -631,6 +723,45 @@ mod tests {
assert_eq!(buffer, expected);
}
+ #[test]
+ fn test_rle_skip_dict() {
+ // Test RLE encoding: 3 0s followed by 4 1s followed by 5 2s
+ // 00000110 00000000 00001000 00000001 00001010 00000010
+ let dict = vec![10, 20, 30];
+ let data = ByteBufferPtr::new(vec![0x06, 0x00, 0x08, 0x01, 0x0A,
0x02]);
+ let mut decoder: RleDecoder = RleDecoder::new(3);
+ decoder.set_data(data);
+ let mut buffer = vec![0; 10];
+ let expected = vec![10, 20, 20, 20, 20, 30, 30, 30, 30, 30];
+ let skipped = decoder.skip(2).expect("skipping two values");
+ assert_eq!(skipped, 2);
+ let remainder = decoder.get_batch_with_dict::<i32>(&dict, &mut buffer,
10).expect("getting remainder");
+ assert_eq!(remainder, 10);
+ assert_eq!(buffer, expected);
+
+ // Test bit-pack encoding: 345345345455 (2 groups: 8 and 4)
+ // 011 100 101 011 100 101 011 100 101 100 101 101
+ // 00000011 01100011 11000111 10001110 00000011 01100101 00001011
+ let dict = vec!["aaa", "bbb", "ccc", "ddd", "eee", "fff"];
+ let data = ByteBufferPtr::new(vec![0x03, 0x63, 0xC7, 0x8E, 0x03, 0x65,
0x0B]);
+ let mut decoder: RleDecoder = RleDecoder::new(3);
+ decoder.set_data(data);
+ let mut buffer = vec![""; 8];
+ let expected = vec![
+ "eee", "fff", "ddd", "eee", "fff", "eee", "fff",
+ "fff",
+ ];
+ let skipped = decoder.skip(4).expect("skipping four values");
+ assert_eq!(skipped, 4);
+ let remainder = decoder.get_batch_with_dict::<&str>(
+ dict.as_slice(),
+ buffer.as_mut_slice(),
+ 8,
+ ).expect("getting remainder");
+ assert_eq!(remainder, 8);
+ assert_eq!(buffer, expected);
+ }
+
fn validate_rle(
values: &[i64],
bit_width: u8,
diff --git a/parquet/src/util/bit_util.rs b/parquet/src/util/bit_util.rs
index c4c1f96f9..29269c4ad 100644
--- a/parquet/src/util/bit_util.rs
+++ b/parquet/src/util/bit_util.rs
@@ -519,6 +519,28 @@ impl BitReader {
Some(from_ne_slice(v.as_bytes()))
}
+ /// Skip one value of size `num_bits`.
+ ///
+ /// Returns `false` if there are no more values to skip, `true` otherwise.
+ pub fn skip_value(&mut self, num_bits: usize) -> bool {
+ assert!(num_bits <= 64);
+
+ if self.byte_offset * 8 + self.bit_offset + num_bits >
self.total_bytes * 8 {
+ return false;
+ }
+
+ self.bit_offset += num_bits;
+
+ if self.bit_offset >= 64 {
+ self.byte_offset += 8;
+ self.bit_offset -= 64;
+
+ self.reload_buffer_values();
+ }
+
+ true
+ }
+
/// Read multiple values from their packed representation
///
/// # Panics
@@ -605,6 +627,47 @@ impl BitReader {
values_to_read
}
+ /// Skip num_value values with num_bits bit width
+ ///
+ /// Return the number of values skipped (up to num_values)
+ pub fn skip(&mut self, num_values: usize, num_bits: usize) -> usize {
+ assert!(num_bits <= 64);
+
+ let mut num_values = num_values;
+ let needed_bits = num_bits * num_values;
+ let remaining_bits = (self.total_bytes - self.byte_offset) * 8 -
self.bit_offset;
+ if remaining_bits < needed_bits {
+ num_values = remaining_bits / num_bits;
+ }
+
+ let mut values_skipped = 0;
+
+ // First align bit offset to byte offset
+ if self.bit_offset != 0 {
+ while values_skipped < num_values && self.bit_offset != 0 {
+ self
+ .skip_value(num_bits);
+ values_skipped += 1;
+ }
+ }
+
+ while num_values - values_skipped >= 32 {
+ self.byte_offset += 4 * num_bits;
+ values_skipped += 32;
+ }
+
+
+ assert!(num_values - values_skipped < 32);
+
+ self.reload_buffer_values();
+ while values_skipped < num_values {
+ self.skip_value(num_bits);
+ values_skipped += 1;
+ }
+
+ num_values
+ }
+
/// Reads up to `num_bytes` to `buf` returning the number of bytes read
pub(crate) fn get_aligned_bytes(
&mut self,
@@ -759,6 +822,43 @@ mod tests {
assert_eq!(bit_reader.get_value::<i32>(4), Some(3));
}
+ #[test]
+ fn test_bit_reader_skip_value() {
+ let buffer = vec![255, 0];
+ let mut bit_reader = BitReader::from(buffer);
+ let skipped = bit_reader.skip_value(1);
+ assert!(skipped);
+ assert_eq!(bit_reader.get_value::<i32>(1), Some(1));
+ let skipped = bit_reader.skip_value(2);
+ assert!(skipped);
+ assert_eq!(bit_reader.get_value::<i32>(2), Some(3));
+ let skipped = bit_reader.skip_value(1);
+ assert!(skipped);
+ assert_eq!(bit_reader.get_value::<i32>(4), Some(1));
+ let skipped = bit_reader.skip_value(1);
+ assert!(skipped);
+ assert_eq!(bit_reader.get_value::<i32>(4), Some(0));
+ let skipped = bit_reader.skip_value(1);
+ assert!(!skipped);
+ }
+
+ #[test]
+ fn test_bit_reader_skip() {
+ let buffer = vec![255, 0];
+ let mut bit_reader = BitReader::from(buffer);
+ let skipped = bit_reader.skip(1,1);
+ assert_eq!(skipped, 1);
+ assert_eq!(bit_reader.get_value::<i32>(1), Some(1));
+ let skipped = bit_reader.skip(2,2);
+ assert_eq!(skipped, 2);
+ assert_eq!(bit_reader.get_value::<i32>(2), Some(3));
+ let skipped = bit_reader.skip(4,1);
+ assert_eq!(skipped, 4);
+ assert_eq!(bit_reader.get_value::<i32>(4), Some(0));
+ let skipped = bit_reader.skip(1,1);
+ assert_eq!(skipped, 0);
+ }
+
#[test]
fn test_bit_reader_get_value_boundary() {
let buffer = vec![10, 0, 0, 0, 20, 0, 30, 0, 0, 0, 40, 0];
@@ -769,6 +869,17 @@ mod tests {
assert_eq!(bit_reader.get_value::<i64>(16), Some(40));
}
+ #[test]
+ fn test_bit_reader_skip_boundary() {
+ let buffer = vec![10, 0, 0, 0, 20, 0, 30, 0, 0, 0, 40, 0];
+ let mut bit_reader = BitReader::from(buffer);
+ assert_eq!(bit_reader.get_value::<i64>(32), Some(10));
+ let skipped = bit_reader.skip_value(16);
+ assert!(skipped);
+ assert_eq!(bit_reader.get_value::<i64>(32), Some(30));
+ assert_eq!(bit_reader.get_value::<i64>(16), Some(40));
+ }
+
#[test]
fn test_bit_reader_get_aligned() {
// 01110101 11001011