This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 72dada6d6 Support skip_values in ByteArrayColumnValueDecoder (#2076)
72dada6d6 is described below

commit 72dada6d656e88f0d07efa2cc32ea9670d061fc2
Author: Yang Jiang <[email protected]>
AuthorDate: Sat Jul 16 18:59:35 2022 +0800

    Support skip_values in ByteArrayColumnValueDecoder (#2076)
    
    * Support skip_values in ByteArrayColumnValueDecoder
    
    * add test with nulls and api align with read
    
    * Update parquet/src/arrow/array_reader/byte_array.rs
    
    Co-authored-by: Raphael Taylor-Davies 
<[email protected]>
    
    Co-authored-by: Raphael Taylor-Davies 
<[email protected]>
---
 parquet/src/arrow/array_reader/byte_array.rs | 187 ++++++++++++++++++++++++++-
 1 file changed, 184 insertions(+), 3 deletions(-)

diff --git a/parquet/src/arrow/array_reader/byte_array.rs 
b/parquet/src/arrow/array_reader/byte_array.rs
index b762236c4..853bc2b18 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -215,8 +215,13 @@ impl<I: OffsetSizeTrait + ScalarValue> ColumnValueDecoder
         decoder.read(out, range.end - range.start, self.dict.as_ref())
     }
 
-    fn skip_values(&mut self, _num_values: usize) -> Result<usize> {
-        Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792";))
+    fn skip_values(&mut self, num_values: usize) -> Result<usize> {
+        let decoder = self
+            .decoder
+            .as_mut()
+            .ok_or_else(|| general_err!("no decoder set"))?;
+
+        decoder.skip(num_values, self.dict.as_ref())
     }
 }
 
@@ -284,6 +289,25 @@ impl ByteArrayDecoder {
             ByteArrayDecoder::DeltaByteArray(d) => d.read(out, len),
         }
     }
+
+    /// Skip `len` values
+    pub fn skip<I: OffsetSizeTrait + ScalarValue>(
+        &mut self,
+        len: usize,
+        dict: Option<&OffsetBuffer<I>>,
+    ) -> Result<usize> {
+        match self {
+            ByteArrayDecoder::Plain(d) => d.skip(len),
+            ByteArrayDecoder::Dictionary(d) => {
+                let dict = dict
+                    .ok_or_else(|| general_err!("missing dictionary page for 
column"))?;
+
+                d.skip(dict, len)
+            }
+            ByteArrayDecoder::DeltaLength(d) => d.skip(len),
+            ByteArrayDecoder::DeltaByteArray(d) => d.skip(len),
+        }
+    }
 }
 
 /// Decoder from [`Encoding::PLAIN`] data to [`OffsetBuffer`]
@@ -363,6 +387,27 @@ impl ByteArrayDecoderPlain {
         }
         Ok(to_read)
     }
+
+    pub fn skip(
+        &mut self,
+        to_skip: usize,
+    ) -> Result<usize> {
+        let to_skip = to_skip.min( self.max_remaining_values);
+        let mut skip = 0;
+        let buf = self.buf.as_ref();
+
+        while self.offset < self.buf.len() && skip != to_skip {
+            if self.offset + 4 > buf.len() {
+                return Err(ParquetError::EOF("eof decoding byte 
array".into()));
+            }
+            let len_bytes: [u8; 4] =
+                buf[self.offset..self.offset + 4].try_into().unwrap();
+            let len = u32::from_le_bytes(len_bytes) as usize;
+            skip += 1;
+            self.offset = self.offset + 4 + len;
+        }
+        Ok(skip)
+    }
 }
 
 /// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`OffsetBuffer`]
@@ -431,6 +476,21 @@ impl ByteArrayDecoderDeltaLength {
         }
         Ok(to_read)
     }
+
+    fn skip(
+        &mut self,
+        to_skip: usize,
+    ) -> Result<usize> {
+        let remain_values = self.lengths.len() - self.length_offset;
+        let to_skip = remain_values.min(to_skip);
+
+        let src_lengths = &self.lengths[self.length_offset..self.length_offset 
+ to_skip];
+        let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
+
+        self.data_offset += total_bytes;
+        self.length_offset += to_skip;
+        Ok(to_skip)
+    }
 }
 
 /// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`OffsetBuffer`]
@@ -521,6 +581,37 @@ impl ByteArrayDecoderDelta {
         }
         Ok(to_read)
     }
+
+    fn skip(
+        &mut self,
+        to_skip: usize,
+    ) -> Result<usize> {
+        let to_skip = to_skip.min(self.prefix_lengths.len() - 
self.length_offset);
+
+        let length_range = self.length_offset..self.length_offset + to_skip;
+        let iter = self.prefix_lengths[length_range.clone()]
+            .iter()
+            .zip(&self.suffix_lengths[length_range]);
+
+        let data = self.data.as_ref();
+
+        for (prefix_length, suffix_length) in iter {
+            let prefix_length = *prefix_length as usize;
+            let suffix_length = *suffix_length as usize;
+
+            if self.data_offset + suffix_length > self.data.len() {
+                return Err(ParquetError::EOF("eof decoding byte 
array".into()));
+            }
+
+            self.last_value.truncate(prefix_length);
+            self.last_value.extend_from_slice(
+                &data[self.data_offset..self.data_offset + suffix_length],
+            );
+            self.data_offset += suffix_length;
+        }
+        self.length_offset += to_skip;
+        Ok(to_skip)
+    }
 }
 
 /// Decoder from [`Encoding::RLE_DICTIONARY`] to [`OffsetBuffer`]
@@ -589,6 +680,38 @@ impl ByteArrayDecoderDictionary {
         }
         Ok(values_read)
     }
+
+    fn skip<I: OffsetSizeTrait + ScalarValue>(
+        &mut self,
+        dict: &OffsetBuffer<I>,
+        to_skip: usize,
+    ) -> Result<usize> {
+        let to_skip = to_skip.min(self.max_remaining_values);
+        // All data must be NULL
+        if dict.is_empty() {
+            return Ok(0);
+        }
+
+        let mut values_skip = 0;
+        while values_skip < to_skip {
+            if self.index_offset == self.index_buf_len {
+                let read = self.decoder.get_batch(self.index_buf.as_mut())?;
+                if read == 0 {
+                    break;
+                }
+                self.index_buf_len = read;
+                self.index_offset = 0;
+            }
+
+            let skip = (to_skip - values_skip)
+                .min(self.index_buf_len - self.index_offset);
+
+            self.index_offset += skip;
+            self.max_remaining_values -= skip;
+            values_skip += skip;
+        }
+        Ok(values_skip)
+    }
 }
 
 #[cfg(test)]
@@ -653,6 +776,57 @@ mod tests {
         }
     }
 
+    #[test]
+    fn test_byte_array_decoder_skip() {
+        let (pages, encoded_dictionary) =
+            byte_array_all_encodings(vec!["hello", "world", "a", "b"]);
+
+        let column_desc = utf8_column();
+        let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);
+
+        decoder
+            .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
+            .unwrap();
+
+        for (encoding, page) in pages {
+            let mut output = OffsetBuffer::<i32>::default();
+            decoder.set_data(encoding, page, 4, Some(4)).unwrap();
+
+            assert_eq!(decoder.read(&mut output, 0..1).unwrap(), 1);
+
+            assert_eq!(output.values.as_slice(), "hello".as_bytes());
+            assert_eq!(output.offsets.as_slice(), &[0, 5]);
+
+            assert_eq!(decoder.skip_values(1).unwrap(), 1);
+            assert_eq!(decoder.skip_values(1).unwrap(), 1);
+
+            assert_eq!(decoder.read(&mut output, 1..2).unwrap(), 1);
+            assert_eq!(output.values.as_slice(), "hellob".as_bytes());
+            assert_eq!(output.offsets.as_slice(), &[0, 5, 6]);
+
+            assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0);
+
+            let valid = vec![false, false, true, true, false, false];
+            let valid_buffer = Buffer::from_iter(valid.iter().cloned());
+
+            output.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice());
+            let array = output.into_array(Some(valid_buffer), ArrowType::Utf8);
+            let strings = 
array.as_any().downcast_ref::<StringArray>().unwrap();
+
+            assert_eq!(
+                strings.iter().collect::<Vec<_>>(),
+                vec![
+                    None,
+                    None,
+                    Some("hello"),
+                    Some("b"),
+                    None,
+                    None,
+                ]
+            );
+        }
+    }
+
     #[test]
     fn test_byte_array_decoder_nulls() {
         let (pages, encoded_dictionary) = 
byte_array_all_encodings(Vec::<&str>::new());
@@ -664,10 +838,17 @@ mod tests {
             .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
             .unwrap();
 
-        for (encoding, page) in pages {
+        // test nulls read
+        for (encoding, page) in pages.clone() {
             let mut output = OffsetBuffer::<i32>::default();
             decoder.set_data(encoding, page, 4, None).unwrap();
             assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 0);
         }
+
+        // test nulls skip
+        for (encoding, page) in pages {
+            decoder.set_data(encoding, page, 4, None).unwrap();
+            assert_eq!(decoder.skip_values(1024).unwrap(), 0);
+        }
     }
 }

Reply via email to