This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new c58554453 Support skip_values in ColumnValueDecoderImpl  (#2089)
c58554453 is described below

commit c58554453bec8f4a78d503e79576b85108b210b5
Author: Dan Harris <[email protected]>
AuthorDate: Sat Jul 16 17:14:45 2022 -0400

    Support skip_values in ColumnValueDecoderImpl  (#2089)
    
    * Add skip method to ParquetValueType
    
    * Tests
    
    * PR comments
    
    * Update parquet/src/encodings/decoding.rs
    
    Co-authored-by: Yang Jiang <[email protected]>
    
    * PR comments
    
    Co-authored-by: Yang Jiang <[email protected]>
---
 parquet/src/column/reader/decoder.rs |  13 +-
 parquet/src/data_type.rs             |  85 +++++++
 parquet/src/encodings/decoding.rs    | 422 ++++++++++++++++++++++++++++++++++-
 parquet/src/encodings/rle.rs         | 131 +++++++++++
 parquet/src/util/bit_util.rs         | 111 +++++++++
 5 files changed, 752 insertions(+), 10 deletions(-)

diff --git a/parquet/src/column/reader/decoder.rs 
b/parquet/src/column/reader/decoder.rs
index 6fefdca23..53f7e2943 100644
--- a/parquet/src/column/reader/decoder.rs
+++ b/parquet/src/column/reader/decoder.rs
@@ -250,8 +250,17 @@ impl<T: DataType> ColumnValueDecoder for 
ColumnValueDecoderImpl<T> {
         current_decoder.get(&mut out[range])
     }
 
-    fn skip_values(&mut self, _num_values: usize) -> Result<usize> {
-        Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792";))
+    fn skip_values(&mut self, num_values: usize) -> Result<usize> {
+        let encoding = self
+            .current_encoding
+            .expect("current_encoding should be set");
+
+        let current_decoder = self
+            .decoders
+            .get_mut(&encoding)
+            .unwrap_or_else(|| panic!("decoder for encoding {} should be set", 
encoding));
+
+        current_decoder.skip(num_values)
     }
 }
 
diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs
index 86ccefbd8..7b6fb04a7 100644
--- a/parquet/src/data_type.rs
+++ b/parquet/src/data_type.rs
@@ -613,6 +613,8 @@ pub(crate) mod private {
             decoder: &mut PlainDecoderDetails,
         ) -> Result<usize>;
 
+        fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> 
Result<usize>;
+
         /// Return the encoded size for a type
         fn dict_encoding_size(&self) -> (usize, usize) {
             (std::mem::size_of::<Self>(), 1)
@@ -690,6 +692,14 @@ pub(crate) mod private {
             Ok(values_read)
         }
 
+        fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> 
Result<usize> {
+            let bit_reader = decoder.bit_reader.as_mut().unwrap();
+            let num_values = std::cmp::min(num_values, decoder.num_values);
+            let values_read = bit_reader.skip(num_values, 1);
+            decoder.num_values -= values_read;
+            Ok(values_read)
+        }
+
         #[inline]
         fn as_i64(&self) -> Result<i64> {
             Ok(*self as i64)
@@ -764,6 +774,23 @@ pub(crate) mod private {
                     Ok(num_values)
                 }
 
+                #[inline]
+                fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) 
-> Result<usize> {
+                    let data = decoder.data.as_ref().expect("set_data should 
have been called");
+                    let num_values = num_values.min(decoder.num_values);
+                    let bytes_left = data.len() - decoder.start;
+                    let bytes_to_skip = std::mem::size_of::<Self>() * 
num_values;
+
+                    if bytes_left < bytes_to_skip {
+                        return Err(eof_err!("Not enough bytes to skip"));
+                    }
+
+                    decoder.start += bytes_to_skip;
+                    decoder.num_values -= num_values;
+
+                    Ok(num_values)
+                }
+
                 #[inline]
                 fn as_i64(&$self) -> Result<i64> {
                     $as_i64
@@ -853,6 +880,24 @@ pub(crate) mod private {
             Ok(num_values)
         }
 
+        fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> 
Result<usize> {
+            let data = decoder
+                .data
+                .as_ref()
+                .expect("set_data should have been called");
+            let num_values = std::cmp::min(num_values, decoder.num_values);
+            let bytes_left = data.len() - decoder.start;
+            let bytes_to_skip = 12 * num_values;
+
+            if bytes_left < bytes_to_skip {
+                return Err(eof_err!("Not enough bytes to skip"));
+            }
+            decoder.start += bytes_to_skip;
+            decoder.num_values -= num_values;
+
+            Ok(num_values)
+        }
+
         #[inline]
         fn as_any(&self) -> &dyn std::any::Any {
             self
@@ -936,6 +981,24 @@ pub(crate) mod private {
             Ok(num_values)
         }
 
+        fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> 
Result<usize> {
+            let data = decoder
+                .data
+                .as_mut()
+                .expect("set_data should have been called");
+            let num_values = num_values.min(decoder.num_values);
+
+            for _ in 0..num_values {
+                let len: usize =
+                    read_num_bytes!(u32, 4, 
data.start_from(decoder.start).as_ref())
+                        as usize;
+                decoder.start += std::mem::size_of::<u32>() + len;
+            }
+            decoder.num_values -= num_values;
+
+            Ok(num_values)
+        }
+
         #[inline]
         fn dict_encoding_size(&self) -> (usize, usize) {
             (std::mem::size_of::<u32>(), self.len())
@@ -1005,6 +1068,28 @@ pub(crate) mod private {
             Ok(num_values)
         }
 
+        fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> 
Result<usize> {
+            assert!(decoder.type_length > 0);
+
+            let data = decoder
+                .data
+                .as_mut()
+                .expect("set_data should have been called");
+            let num_values = std::cmp::min(num_values, decoder.num_values);
+            for _ in 0..num_values {
+                let len = decoder.type_length as usize;
+
+                if data.len() < decoder.start + len {
+                    return Err(eof_err!("Not enough bytes to skip"));
+                }
+
+                decoder.start += len;
+            }
+            decoder.num_values -= num_values;
+
+            Ok(num_values)
+        }
+
         #[inline]
         fn dict_encoding_size(&self) -> (usize, usize) {
             (std::mem::size_of::<u32>(), self.len())
diff --git a/parquet/src/encodings/decoding.rs 
b/parquet/src/encodings/decoding.rs
index b33514aaf..58aa592d1 100644
--- a/parquet/src/encodings/decoding.rs
+++ b/parquet/src/encodings/decoding.rs
@@ -206,6 +206,9 @@ pub trait Decoder<T: DataType>: Send {
 
     /// Returns the encoding for this decoder.
     fn encoding(&self) -> Encoding;
+
+    /// Skip the specified number of values in this decoder stream.
+    fn skip(&mut self, num_values: usize) -> Result<usize>;
 }
 
 /// Gets a decoder for the column descriptor `descr` and encoding type 
`encoding`.
@@ -291,6 +294,11 @@ impl<T: DataType> Decoder<T> for PlainDecoder<T> {
     fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
         T::T::decode(buffer, &mut self.inner)
     }
+
+    #[inline]
+    fn skip(&mut self, num_values: usize) -> Result<usize> {
+        T::T::skip(&mut self.inner, num_values)
+    }
 }
 
 // ----------------------------------------------------------------------
@@ -363,6 +371,15 @@ impl<T: DataType> Decoder<T> for DictDecoder<T> {
     fn encoding(&self) -> Encoding {
         Encoding::RLE_DICTIONARY
     }
+
+    fn skip(&mut self, num_values: usize) -> Result<usize> {
+        assert!(self.rle_decoder.is_some());
+        assert!(self.has_dictionary, "Must call set_dict() first!");
+
+        let rle = self.rle_decoder.as_mut().unwrap();
+        let num_values = cmp::min(num_values, self.num_values);
+        rle.skip(num_values)
+    }
 }
 
 // ----------------------------------------------------------------------
@@ -419,6 +436,14 @@ impl<T: DataType> Decoder<T> for RleValueDecoder<T> {
         self.values_left -= values_read;
         Ok(values_read)
     }
+
+    #[inline]
+    fn skip(&mut self, num_values: usize) -> Result<usize> {
+        let num_values = cmp::min(num_values, self.values_left);
+        let values_skipped = self.decoder.skip(num_values)?;
+        self.values_left -= values_skipped;
+        Ok(values_skipped)
+    }
 }
 
 // ----------------------------------------------------------------------
@@ -681,6 +706,8 @@ where
         Ok(to_read)
     }
 
+
+
     fn values_left(&self) -> usize {
         self.values_left
     }
@@ -688,6 +715,11 @@ where
     fn encoding(&self) -> Encoding {
         Encoding::DELTA_BINARY_PACKED
     }
+
+    fn skip(&mut self, num_values: usize) -> Result<usize> {
+        let mut buffer = vec![T::T::default(); num_values];
+        self.get(&mut buffer)
+    }
 }
 
 // ----------------------------------------------------------------------
@@ -791,6 +823,25 @@ impl<T: DataType> Decoder<T> for 
DeltaLengthByteArrayDecoder<T> {
     fn encoding(&self) -> Encoding {
         Encoding::DELTA_LENGTH_BYTE_ARRAY
     }
+
+    fn skip(&mut self, num_values: usize) -> Result<usize> {
+        match T::get_physical_type() {
+            Type::BYTE_ARRAY => {
+                let num_values = cmp::min(num_values, self.num_values);
+
+                let next_offset: i32 =  
self.lengths[self.current_idx..self.current_idx + num_values].iter().sum();
+
+                self.current_idx += num_values;
+                self.offset += next_offset as usize;
+
+                self.num_values -= num_values;
+                Ok(num_values)
+            }
+           other_type => Err(general_err!(
+                "DeltaLengthByteArrayDecoder not support {}, only support byte 
array", other_type
+            )),
+        }
+    }
 }
 
 // ----------------------------------------------------------------------
@@ -922,6 +973,11 @@ impl<T: DataType> Decoder<T> for DeltaByteArrayDecoder<T> {
     fn encoding(&self) -> Encoding {
         Encoding::DELTA_BYTE_ARRAY
     }
+
+    fn skip(&mut self, num_values: usize) -> Result<usize> {
+        let mut buffer = vec![T::T::default(); num_values];
+        self.get(&mut buffer)
+    }
 }
 
 #[cfg(test)]
@@ -995,6 +1051,32 @@ mod tests {
         );
     }
 
+    #[test]
+    fn test_plain_skip_int32() {
+        let data = vec![42, 18, 52];
+        let data_bytes = Int32Type::to_byte_array(&data[..]);
+        test_plain_skip::<Int32Type>(
+            ByteBufferPtr::new(data_bytes),
+            3,
+            1,
+            -1,
+            &data[1..],
+        );
+    }
+
+    #[test]
+    fn test_plain_skip_all_int32() {
+        let data = vec![42, 18, 52];
+        let data_bytes = Int32Type::to_byte_array(&data[..]);
+        test_plain_skip::<Int32Type>(
+            ByteBufferPtr::new(data_bytes),
+            3,
+            5,
+            -1,
+            &[],
+        );
+    }
+
     #[test]
     fn test_plain_decode_int32_spaced() {
         let data = [42, 18, 52];
@@ -1014,6 +1096,7 @@ mod tests {
         );
     }
 
+
     #[test]
     fn test_plain_decode_int64() {
         let data = vec![42, 18, 52];
@@ -1028,6 +1111,33 @@ mod tests {
         );
     }
 
+    #[test]
+    fn test_plain_skip_int64() {
+        let data = vec![42, 18, 52];
+        let data_bytes = Int64Type::to_byte_array(&data[..]);
+        test_plain_skip::<Int64Type>(
+            ByteBufferPtr::new(data_bytes),
+            3,
+            2,
+            -1,
+            &data[2..],
+        );
+    }
+
+    #[test]
+    fn test_plain_skip_all_int64() {
+        let data = vec![42, 18, 52];
+        let data_bytes = Int64Type::to_byte_array(&data[..]);
+        test_plain_skip::<Int64Type>(
+            ByteBufferPtr::new(data_bytes),
+            3,
+            3,
+            -1,
+            &[],
+        );
+    }
+
+
     #[test]
     fn test_plain_decode_float() {
         let data = vec![3.14, 2.414, 12.51];
@@ -1042,6 +1152,58 @@ mod tests {
         );
     }
 
+    #[test]
+    fn test_plain_skip_float() {
+        let data = vec![3.14, 2.414, 12.51];
+        let data_bytes = FloatType::to_byte_array(&data[..]);
+        test_plain_skip::<FloatType>(
+            ByteBufferPtr::new(data_bytes),
+            3,
+            1,
+            -1,
+            &data[1..],
+        );
+    }
+
+    #[test]
+    fn test_plain_skip_all_float() {
+        let data = vec![3.14, 2.414, 12.51];
+        let data_bytes = FloatType::to_byte_array(&data[..]);
+        test_plain_skip::<FloatType>(
+            ByteBufferPtr::new(data_bytes),
+            3,
+            4,
+            -1,
+            &[],
+        );
+    }
+
+    #[test]
+    fn test_plain_skip_double() {
+        let data = vec![3.14f64, 2.414f64, 12.51f64];
+        let data_bytes = DoubleType::to_byte_array(&data[..]);
+        test_plain_skip::<DoubleType>(
+            ByteBufferPtr::new(data_bytes),
+            3,
+            1,
+            -1,
+            &data[1..],
+        );
+    }
+
+    #[test]
+    fn test_plain_skip_all_double() {
+        let data = vec![3.14f64, 2.414f64, 12.51f64];
+        let data_bytes = DoubleType::to_byte_array(&data[..]);
+        test_plain_skip::<DoubleType>(
+            ByteBufferPtr::new(data_bytes),
+            3,
+            5,
+            -1,
+            &[],
+        );
+    }
+
     #[test]
     fn test_plain_decode_double() {
         let data = vec![3.14f64, 2.414f64, 12.51f64];
@@ -1074,6 +1236,40 @@ mod tests {
         );
     }
 
+    #[test]
+    fn test_plain_skip_int96() {
+        let mut data = vec![Int96::new(); 4];
+        data[0].set_data(11, 22, 33);
+        data[1].set_data(44, 55, 66);
+        data[2].set_data(10, 20, 30);
+        data[3].set_data(40, 50, 60);
+        let data_bytes = Int96Type::to_byte_array(&data[..]);
+        test_plain_skip::<Int96Type>(
+            ByteBufferPtr::new(data_bytes),
+            4,
+            2,
+            -1,
+            &data[2..],
+        );
+    }
+
+    #[test]
+    fn test_plain_skip_all_int96() {
+        let mut data = vec![Int96::new(); 4];
+        data[0].set_data(11, 22, 33);
+        data[1].set_data(44, 55, 66);
+        data[2].set_data(10, 20, 30);
+        data[3].set_data(40, 50, 60);
+        let data_bytes = Int96Type::to_byte_array(&data[..]);
+        test_plain_skip::<Int96Type>(
+            ByteBufferPtr::new(data_bytes),
+            4,
+            8,
+            -1,
+            &[],
+        );
+    }
+
     #[test]
     fn test_plain_decode_bool() {
         let data = vec![
@@ -1090,6 +1286,37 @@ mod tests {
         );
     }
 
+    #[test]
+    fn test_plain_skip_bool() {
+        let data = vec![
+            false, true, false, false, true, false, true, true, false, true,
+        ];
+        let data_bytes = BoolType::to_byte_array(&data[..]);
+        test_plain_skip::<BoolType>(
+            ByteBufferPtr::new(data_bytes),
+            10,
+            5,
+            -1,
+            &data[5..],
+        );
+    }
+
+    #[test]
+    fn test_plain_skip_all_bool() {
+        let data = vec![
+            false, true, false, false, true, false, true, true, false, true,
+        ];
+        let data_bytes = BoolType::to_byte_array(&data[..]);
+        test_plain_skip::<BoolType>(
+            ByteBufferPtr::new(data_bytes),
+            10,
+            20,
+            -1,
+            &[],
+        );
+    }
+
+
     #[test]
     fn test_plain_decode_byte_array() {
         let mut data = vec![ByteArray::new(); 2];
@@ -1106,6 +1333,36 @@ mod tests {
         );
     }
 
+    #[test]
+    fn test_plain_skip_byte_array() {
+        let mut data = vec![ByteArray::new(); 2];
+        
data[0].set_data(ByteBufferPtr::new(String::from("hello").into_bytes()));
+        
data[1].set_data(ByteBufferPtr::new(String::from("parquet").into_bytes()));
+        let data_bytes = ByteArrayType::to_byte_array(&data[..]);
+        test_plain_skip::<ByteArrayType>(
+            ByteBufferPtr::new(data_bytes),
+            2,
+            1,
+            -1,
+            &data[1..],
+        );
+    }
+
+    #[test]
+    fn test_plain_skip_all_byte_array() {
+        let mut data = vec![ByteArray::new(); 2];
+        
data[0].set_data(ByteBufferPtr::new(String::from("hello").into_bytes()));
+        
data[1].set_data(ByteBufferPtr::new(String::from("parquet").into_bytes()));
+        let data_bytes = ByteArrayType::to_byte_array(&data[..]);
+        test_plain_skip::<ByteArrayType>(
+            ByteBufferPtr::new(data_bytes),
+            2,
+            2,
+            -1,
+            &[],
+        );
+    }
+
     #[test]
     fn test_plain_decode_fixed_len_byte_array() {
         let mut data = vec![FixedLenByteArray::default(); 3];
@@ -1123,6 +1380,38 @@ mod tests {
         );
     }
 
+    #[test]
+    fn test_plain_skip_fixed_len_byte_array() {
+        let mut data = vec![FixedLenByteArray::default(); 3];
+        
data[0].set_data(ByteBufferPtr::new(String::from("bird").into_bytes()));
+        
data[1].set_data(ByteBufferPtr::new(String::from("come").into_bytes()));
+        
data[2].set_data(ByteBufferPtr::new(String::from("flow").into_bytes()));
+        let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
+        test_plain_skip::<FixedLenByteArrayType>(
+            ByteBufferPtr::new(data_bytes),
+            3,
+            1,
+            4,
+            &data[1..],
+        );
+    }
+
+    #[test]
+    fn test_plain_skip_all_fixed_len_byte_array() {
+        let mut data = vec![FixedLenByteArray::default(); 3];
+        
data[0].set_data(ByteBufferPtr::new(String::from("bird").into_bytes()));
+        
data[1].set_data(ByteBufferPtr::new(String::from("come").into_bytes()));
+        
data[2].set_data(ByteBufferPtr::new(String::from("flow").into_bytes()));
+        let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
+        test_plain_skip::<FixedLenByteArrayType>(
+            ByteBufferPtr::new(data_bytes),
+            3,
+            6,
+            4,
+            &[],
+        );
+    }
+
     fn test_plain_decode<T: DataType>(
         data: ByteBufferPtr,
         num_values: usize,
@@ -1139,6 +1428,34 @@ mod tests {
         assert_eq!(buffer, expected);
     }
 
+    fn test_plain_skip<T: DataType>(
+        data: ByteBufferPtr,
+        num_values: usize,
+        skip: usize,
+        type_length: i32,
+        expected: &[T::T],
+    ) {
+        let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
+        let result = decoder.set_data(data, num_values);
+        assert!(result.is_ok());
+        let skipped = decoder.skip(skip).expect("skipping values");
+
+        if skip >= num_values {
+            assert_eq!(skipped, num_values);
+
+            let mut buffer = vec![T::T::default(); 1];
+            let remaining = decoder.get(&mut buffer).expect("getting remaining 
values");
+            assert_eq!(remaining, 0);
+        } else {
+            assert_eq!(skipped, skip);
+            let mut buffer = vec![T::T::default(); num_values - skip];
+            let remaining = decoder.get(&mut buffer).expect("getting remaining 
values");
+            assert_eq!(remaining, num_values - skip);
+            assert_eq!(decoder.values_left(), 0);
+            assert_eq!(buffer, expected);
+        }
+    }
+
     fn test_plain_decode_spaced<T: DataType>(
         data: ByteBufferPtr,
         num_values: usize,
@@ -1217,12 +1534,29 @@ mod tests {
         test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
     }
 
+    #[test]
+    fn test_skip_delta_bit_packed_int32_repeat() {
+        let block_data = vec![
+            1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 
7, 8, 1, 2,
+            3, 4, 5, 6, 7, 8,
+        ];
+        test_skip::<Int32Type>(block_data.clone(), 
Encoding::DELTA_BINARY_PACKED, 10);
+        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
+    }
+
     #[test]
     fn test_delta_bit_packed_int32_uneven() {
         let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
         test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
     }
 
+    #[test]
+    fn test_skip_delta_bit_packed_int32_uneven() {
+        let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
+        test_skip::<Int32Type>(block_data.clone(), 
Encoding::DELTA_BINARY_PACKED, 5);
+        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
+    }
+
     #[test]
     fn test_delta_bit_packed_int32_same_values() {
         let block_data = vec![
@@ -1238,21 +1572,55 @@ mod tests {
         test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
     }
 
+    #[test]
+    fn test_skip_delta_bit_packed_int32_same_values() {
+        let block_data = vec![
+            127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 
127, 127,
+            127,
+        ];
+        test_skip::<Int32Type>(block_data.clone(), 
Encoding::DELTA_BINARY_PACKED, 5);
+        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
+
+        let block_data = vec![
+            -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, 
-127, -127,
+            -127, -127, -127,
+        ];
+        test_skip::<Int32Type>(block_data.clone(), 
Encoding::DELTA_BINARY_PACKED, 5);
+        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
+
+    }
+
     #[test]
     fn test_delta_bit_packed_int32_min_max() {
         let block_data = vec![
-            i32::min_value(),
-            i32::max_value(),
-            i32::min_value(),
-            i32::max_value(),
-            i32::min_value(),
-            i32::max_value(),
-            i32::min_value(),
-            i32::max_value(),
+            i32::MIN,
+            i32::MIN,
+            i32::MIN,
+            i32::MAX,
+            i32::MIN,
+            i32::MAX,
+            i32::MIN,
+            i32::MAX,
         ];
         test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
     }
 
+    #[test]
+    fn test_skip_delta_bit_packed_int32_min_max() {
+        let block_data = vec![
+            i32::MIN,
+            i32::MIN,
+            i32::MIN,
+            i32::MAX,
+            i32::MIN,
+            i32::MAX,
+            i32::MIN,
+            i32::MAX,
+        ];
+        test_skip::<Int32Type>(block_data.clone(), 
Encoding::DELTA_BINARY_PACKED, 5);
+        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
+    }
+
     #[test]
     fn test_delta_bit_packed_int32_multiple_blocks() {
         // Test multiple 'put' calls on the same encoder
@@ -1493,6 +1861,44 @@ mod tests {
         assert_eq!(result, expected);
     }
 
+    fn test_skip<T: DataType>(data: Vec<T::T>, encoding: Encoding, skip: 
usize) {
+        // Type length should not really matter for encode/decode test,
+        // otherwise change it based on type
+        let col_descr = create_test_col_desc_ptr(-1, T::get_physical_type());
+
+        // Encode data
+        let mut encoder =
+            get_encoder::<T>(col_descr.clone(), encoding).expect("get 
encoder");
+
+        encoder.put(&data).expect("ok to encode");
+
+        let bytes = encoder.flush_buffer().expect("ok to flush buffer");
+
+        let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get 
decoder");
+        decoder
+            .set_data(bytes, data.len())
+            .expect("ok to set data");
+
+        if skip >= data.len() {
+            let skipped = decoder.skip(skip).expect("ok to skip");
+            assert_eq!(skipped, data.len());
+
+            let skipped_again = decoder.skip(skip).expect("ok to skip again");
+            assert_eq!(skipped_again, 0);
+        } else {
+            let skipped = decoder.skip(skip).expect("ok to skip");
+            assert_eq!(skipped, skip);
+
+            let remaining = data.len() - skip;
+
+            let expected = &data[skip..];
+            let mut buffer = vec![T::T::default(); remaining];
+            let fetched = decoder.get(&mut buffer).expect("ok to decode");
+            assert_eq!(remaining,fetched);
+            assert_eq!(&buffer, expected);
+        }
+    }
+
     fn create_and_check_decoder<T: DataType>(
         encoding: Encoding,
         err: Option<ParquetError>,
diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs
index 5f6f91a8b..808c8f0d4 100644
--- a/parquet/src/encodings/rle.rs
+++ b/parquet/src/encodings/rle.rs
@@ -434,6 +434,39 @@ impl RleDecoder {
         Ok(values_read)
     }
 
+    #[inline(never)]
+    pub fn skip(&mut self, num_values: usize) -> Result<usize> {
+        let mut values_skipped = 0;
+        while values_skipped < num_values {
+            if self.rle_left > 0 {
+                let num_values = cmp::min(num_values - values_skipped, 
self.rle_left as usize);
+                self.rle_left -= num_values as u32;
+                values_skipped += num_values;
+            } else if self.bit_packed_left > 0 {
+                let mut num_values =
+                    cmp::min(num_values - values_skipped, self.bit_packed_left 
as usize);
+                let bit_reader =
+                    self.bit_reader.as_mut().expect("bit_reader should be 
set");
+
+                num_values = bit_reader.skip(
+                    num_values,
+                    self.bit_width as usize,
+                );
+                if num_values == 0 {
+                    // Handle writers which truncate the final block
+                    self.bit_packed_left = 0;
+                    continue;
+                }
+                self.bit_packed_left -= num_values as u32;
+                values_skipped += num_values;
+            } else if !self.reload() {
+                break;
+            }
+        }
+
+        Ok(values_skipped)
+    }
+
     #[inline(never)]
     pub fn get_batch_with_dict<T>(
         &mut self,
@@ -538,6 +571,23 @@ mod tests {
         assert_eq!(buffer, expected);
     }
 
+    #[test]
+    fn test_rle_skip_int32() {
+        // Test data: 0-7 with bit width 3
+        // 00000011 10001000 11000110 11111010
+        let data = ByteBufferPtr::new(vec![0x03, 0x88, 0xC6, 0xFA]);
+        let mut decoder: RleDecoder = RleDecoder::new(3);
+        decoder.set_data(data);
+        let expected = vec![2, 3, 4, 5, 6, 7];
+        let skipped = decoder.skip(2).expect("skipping values");
+        assert_eq!(skipped, 2);
+
+        let mut buffer = vec![0; 6];
+        let remaining = decoder.get_batch::<i32>(&mut buffer).expect("getting 
remaining");
+        assert_eq!(remaining, 6);
+        assert_eq!(buffer, expected);
+    }
+
     #[test]
     fn test_rle_consume_flush_buffer() {
         let data = vec![1, 1, 1, 2, 2, 3, 3, 3];
@@ -596,6 +646,48 @@ mod tests {
         assert_eq!(buffer, expected);
     }
 
+    #[test]
+    fn test_rle_skip_bool() {
+        // RLE test data: 50 1s followed by 50 0s
+        // 01100100 00000001 01100100 00000000
+        let data1 = ByteBufferPtr::new(vec![0x64, 0x01, 0x64, 0x00]);
+
+        // Bit-packing test data: alternating 1s and 0s, 100 total
+        // 100 / 8 = 13 groups
+        // 00011011 10101010 ... 00001010
+        let data2 = ByteBufferPtr::new(vec![
+            0x1B, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 
0xAA, 0xAA,
+            0x0A,
+        ]);
+
+        let mut decoder: RleDecoder = RleDecoder::new(1);
+        decoder.set_data(data1);
+        let mut buffer = vec![true; 50];
+        let expected = vec![false; 50];
+
+        let skipped = decoder.skip(50).expect("skipping first 50");
+        assert_eq!(skipped, 50);
+        let remainder = decoder.get_batch::<bool>(&mut buffer).expect("getting 
remaining 50");
+        assert_eq!(remainder, 50);
+        assert_eq!(buffer, expected);
+
+        decoder.set_data(data2);
+        let mut buffer = vec![false; 50];
+        let mut expected = vec![];
+        for i in 0..50 {
+            if i % 2 == 0 {
+                expected.push(false);
+            } else {
+                expected.push(true);
+            }
+        }
+        let skipped = decoder.skip(50).expect("skipping first 50");
+        assert_eq!(skipped, 50);
+        let remainder = decoder.get_batch::<bool>(&mut buffer).expect("getting 
remaining 50");
+        assert_eq!(remainder, 50);
+        assert_eq!(buffer, expected);
+    }
+
     #[test]
     fn test_rle_decode_with_dict_int32() {
         // Test RLE encoding: 3 0s followed by 4 1s followed by 5 2s
@@ -631,6 +723,45 @@ mod tests {
         assert_eq!(buffer, expected);
     }
 
+    #[test]
+    fn test_rle_skip_dict() {
+        // Test RLE encoding: 3 0s followed by 4 1s followed by 5 2s
+        // 00000110 00000000 00001000 00000001 00001010 00000010
+        let dict = vec![10, 20, 30];
+        let data = ByteBufferPtr::new(vec![0x06, 0x00, 0x08, 0x01, 0x0A, 
0x02]);
+        let mut decoder: RleDecoder = RleDecoder::new(3);
+        decoder.set_data(data);
+        let mut buffer = vec![0; 10];
+        let expected = vec![10, 20, 20, 20, 20, 30, 30, 30, 30, 30];
+        let skipped = decoder.skip(2).expect("skipping two values");
+        assert_eq!(skipped, 2);
+        let remainder = decoder.get_batch_with_dict::<i32>(&dict, &mut buffer, 
10).expect("getting remainder");
+        assert_eq!(remainder, 10);
+        assert_eq!(buffer, expected);
+
+        // Test bit-pack encoding: 345345345455 (2 groups: 8 and 4)
+        // 011 100 101 011 100 101 011 100 101 100 101 101
+        // 00000011 01100011 11000111 10001110 00000011 01100101 00001011
+        let dict = vec!["aaa", "bbb", "ccc", "ddd", "eee", "fff"];
+        let data = ByteBufferPtr::new(vec![0x03, 0x63, 0xC7, 0x8E, 0x03, 0x65, 
0x0B]);
+        let mut decoder: RleDecoder = RleDecoder::new(3);
+        decoder.set_data(data);
+        let mut buffer = vec![""; 8];
+        let expected = vec![
+            "eee", "fff", "ddd", "eee", "fff", "eee", "fff",
+            "fff",
+        ];
+        let skipped = decoder.skip(4).expect("skipping four values");
+        assert_eq!(skipped, 4);
+        let remainder = decoder.get_batch_with_dict::<&str>(
+            dict.as_slice(),
+            buffer.as_mut_slice(),
+            8,
+        ).expect("getting remainder");
+        assert_eq!(remainder, 8);
+        assert_eq!(buffer, expected);
+    }
+
     fn validate_rle(
         values: &[i64],
         bit_width: u8,
diff --git a/parquet/src/util/bit_util.rs b/parquet/src/util/bit_util.rs
index c4c1f96f9..29269c4ad 100644
--- a/parquet/src/util/bit_util.rs
+++ b/parquet/src/util/bit_util.rs
@@ -519,6 +519,28 @@ impl BitReader {
         Some(from_ne_slice(v.as_bytes()))
     }
 
+    /// Skip one value of size `num_bits`.
+    ///
+    /// Returns `false` if there are no more values to skip, `true` otherwise.
+    pub fn skip_value(&mut self, num_bits: usize) -> bool {
+        assert!(num_bits <= 64);
+
+        if self.byte_offset * 8 + self.bit_offset + num_bits > 
self.total_bytes * 8 {
+            return false;
+        }
+
+        self.bit_offset += num_bits;
+
+        if self.bit_offset >= 64 {
+            self.byte_offset += 8;
+            self.bit_offset -= 64;
+
+            self.reload_buffer_values();
+        }
+
+        true
+    }
+
     /// Read multiple values from their packed representation
     ///
     /// # Panics
@@ -605,6 +627,47 @@ impl BitReader {
         values_to_read
     }
 
+    /// Skip num_value values with num_bits bit width
+    ///
+    /// Return the number of values skipped (up to num_values)
+    pub fn skip(&mut self, num_values: usize, num_bits: usize) -> usize {
+        assert!(num_bits <= 64);
+
+        let mut num_values = num_values;
+        let needed_bits = num_bits * num_values;
+        let remaining_bits = (self.total_bytes - self.byte_offset) * 8 - 
self.bit_offset;
+        if remaining_bits < needed_bits {
+            num_values = remaining_bits / num_bits;
+        }
+
+        let mut values_skipped = 0;
+
+        // First align bit offset to byte offset
+        if self.bit_offset != 0 {
+            while values_skipped < num_values && self.bit_offset != 0 {
+                self
+                    .skip_value(num_bits);
+                values_skipped += 1;
+            }
+        }
+
+        while num_values - values_skipped >= 32 {
+            self.byte_offset += 4 * num_bits;
+            values_skipped += 32;
+        }
+
+
+        assert!(num_values - values_skipped < 32);
+
+        self.reload_buffer_values();
+        while values_skipped < num_values {
+            self.skip_value(num_bits);
+            values_skipped += 1;
+        }
+
+        num_values
+    }
+
     /// Reads up to `num_bytes` to `buf` returning the number of bytes read
     pub(crate) fn get_aligned_bytes(
         &mut self,
@@ -759,6 +822,43 @@ mod tests {
         assert_eq!(bit_reader.get_value::<i32>(4), Some(3));
     }
 
+    #[test]
+    fn test_bit_reader_skip_value() {
+        let buffer = vec![255, 0];
+        let mut bit_reader = BitReader::from(buffer);
+        let skipped = bit_reader.skip_value(1);
+        assert!(skipped);
+        assert_eq!(bit_reader.get_value::<i32>(1), Some(1));
+        let skipped = bit_reader.skip_value(2);
+        assert!(skipped);
+        assert_eq!(bit_reader.get_value::<i32>(2), Some(3));
+        let skipped = bit_reader.skip_value(1);
+        assert!(skipped);
+        assert_eq!(bit_reader.get_value::<i32>(4), Some(1));
+        let skipped = bit_reader.skip_value(1);
+        assert!(skipped);
+        assert_eq!(bit_reader.get_value::<i32>(4), Some(0));
+        let skipped = bit_reader.skip_value(1);
+        assert!(!skipped);
+    }
+
+    #[test]
+    fn test_bit_reader_skip() {
+        let buffer = vec![255, 0];
+        let mut bit_reader = BitReader::from(buffer);
+        let skipped = bit_reader.skip(1,1);
+        assert_eq!(skipped, 1);
+        assert_eq!(bit_reader.get_value::<i32>(1), Some(1));
+        let skipped = bit_reader.skip(2,2);
+        assert_eq!(skipped, 2);
+        assert_eq!(bit_reader.get_value::<i32>(2), Some(3));
+        let skipped = bit_reader.skip(4,1);
+        assert_eq!(skipped, 4);
+        assert_eq!(bit_reader.get_value::<i32>(4), Some(0));
+        let skipped = bit_reader.skip(1,1);
+        assert_eq!(skipped, 0);
+    }
+
     #[test]
     fn test_bit_reader_get_value_boundary() {
         let buffer = vec![10, 0, 0, 0, 20, 0, 30, 0, 0, 0, 40, 0];
@@ -769,6 +869,17 @@ mod tests {
         assert_eq!(bit_reader.get_value::<i64>(16), Some(40));
     }
 
+    #[test]
+    fn test_bit_reader_skip_boundary() {
+        let buffer = vec![10, 0, 0, 0, 20, 0, 30, 0, 0, 0, 40, 0];
+        let mut bit_reader = BitReader::from(buffer);
+        assert_eq!(bit_reader.get_value::<i64>(32), Some(10));
+        let skipped = bit_reader.skip_value(16);
+        assert!(skipped);
+        assert_eq!(bit_reader.get_value::<i64>(32), Some(30));
+        assert_eq!(bit_reader.get_value::<i64>(16), Some(40));
+    }
+
     #[test]
     fn test_bit_reader_get_aligned() {
         // 01110101 11001011

Reply via email to