XiangpengHao commented on code in PR #6004:
URL: https://github.com/apache/arrow-rs/pull/6004#discussion_r1668743264
##########
parquet/src/arrow/array_reader/byte_view_array.rs:
##########
@@ -449,8 +458,144 @@ impl ByteViewArrayDecoderDictionary {
}
}
+/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`]
+pub struct ByteViewArrayDecoderDeltaLength {
+ lengths: Vec<i32>,
+ data: Bytes,
+ length_offset: usize,
+ data_offset: usize,
+ validate_utf8: bool,
+}
+
+impl ByteViewArrayDecoderDeltaLength {
+ fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
+ let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
+ len_decoder.set_data(data.clone(), 0)?;
+ let values = len_decoder.values_left();
+
+ let mut lengths = vec![0; values];
+ len_decoder.get(&mut lengths)?;
+
+ Ok(Self {
+ lengths,
+ data,
+ validate_utf8,
+ length_offset: 0,
+ data_offset: len_decoder.get_offset(),
+ })
+ }
+
+ fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
+ let to_read = len.min(self.lengths.len() - self.length_offset);
+ output.views.reserve(to_read);
+
+ let src_lengths = &self.lengths[self.length_offset..self.length_offset
+ to_read];
+
+ let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
+
+ if self.data_offset + total_bytes > self.data.len() {
+ return Err(ParquetError::EOF(
+ "Insufficient delta length byte array bytes".to_string(),
+ ));
+ }
+
+ let block_id = output.append_block(self.data.clone().into());
+
+ let mut start_offset = self.data_offset;
+ let initial_offset = start_offset;
+ for length in src_lengths {
+ // # Safety
+ // The length is from the delta length decoder, so it is valid
+ // The start_offset is calculated from the lengths, so it is valid
+ unsafe { output.append_view_unchecked(block_id, start_offset as
u32, *length as u32) }
+
+ start_offset += *length as usize;
+ }
+
+ // Delta length encoding has continuous strings, we can validate utf8
in one go
+ if self.validate_utf8 {
+ check_valid_utf8(&self.data[initial_offset..start_offset])?;
+ }
+
+ self.data_offset = start_offset;
+ self.length_offset += to_read;
+
+ Ok(to_read)
+ }
+
+ fn skip(&mut self, to_skip: usize) -> Result<usize> {
+ let remain_values = self.lengths.len() - self.length_offset;
+ let to_skip = remain_values.min(to_skip);
+
+ let src_lengths = &self.lengths[self.length_offset..self.length_offset
+ to_skip];
+ let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
+
+ self.data_offset += total_bytes;
+ self.length_offset += to_skip;
+ Ok(to_skip)
+ }
+}
+
+/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`ViewBuffer`]
+pub struct ByteViewArrayDecoderDelta {
+ decoder: DeltaByteArrayDecoder,
+ validate_utf8: bool,
+}
+
+impl ByteViewArrayDecoderDelta {
+ fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
+ Ok(Self {
+ decoder: DeltaByteArrayDecoder::new(data)?,
+ validate_utf8,
+ })
+ }
+
+ // Unlike other encodings, we need to copy the data because we can not
reuse the DeltaByteArray data in Arrow.
+ fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
+ output.views.reserve(len.min(self.decoder.remaining()));
+
+ let mut buffer: Vec<u8> = Vec::with_capacity(4096);
+
+ let buffer_id = output.buffers.len() as u32;
+
+ let read = self.decoder.read(len, |bytes| {
+ let offset = buffer.len();
+ buffer.extend_from_slice(bytes);
Review Comment:
I've updated the implementation to follow this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]