alamb commented on code in PR #6004:
URL: https://github.com/apache/arrow-rs/pull/6004#discussion_r1667664608


##########
parquet/src/arrow/array_reader/byte_view_array.rs:
##########
@@ -449,8 +458,144 @@ impl ByteViewArrayDecoderDictionary {
     }
 }
 
+/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`]
+pub struct ByteViewArrayDecoderDeltaLength {
+    lengths: Vec<i32>,
+    data: Bytes,
+    length_offset: usize,
+    data_offset: usize,
+    validate_utf8: bool,
+}
+
+impl ByteViewArrayDecoderDeltaLength {
+    fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
+        let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
+        len_decoder.set_data(data.clone(), 0)?;
+        let values = len_decoder.values_left();
+
+        let mut lengths = vec![0; values];
+        len_decoder.get(&mut lengths)?;
+
+        Ok(Self {
+            lengths,
+            data,
+            validate_utf8,
+            length_offset: 0,
+            data_offset: len_decoder.get_offset(),
+        })
+    }
+
+    fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
+        let to_read = len.min(self.lengths.len() - self.length_offset);
+        output.views.reserve(to_read);
+
+        let src_lengths = &self.lengths[self.length_offset..self.length_offset 
+ to_read];
+
+        let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();

Review Comment:
   ### Optimization?
   
   As written in this PR, the length validation will be done on every call to 
`read`. It seems to me like it would be faster if we did this check once in 
`ByteViewArrayDecoderDeltaLength::new` rather than on each call to `read`
   
   ### Overflow Checking?
   
   1. Should we check for overflow here (like should we use `checked_add` 🤔 )
   
   2. `src_lengths` are `i32` but are treated as unsigned (both here with the 
cast to `usize` as well as below in the cast to `u32`). I wonder if we should 
also check if any are negative 🤔 



##########
parquet/src/arrow/array_reader/byte_view_array.rs:
##########
@@ -449,8 +458,144 @@ impl ByteViewArrayDecoderDictionary {
     }
 }
 
+/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`]
+pub struct ByteViewArrayDecoderDeltaLength {
+    lengths: Vec<i32>,
+    data: Bytes,
+    length_offset: usize,
+    data_offset: usize,
+    validate_utf8: bool,
+}
+
+impl ByteViewArrayDecoderDeltaLength {
+    fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
+        let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
+        len_decoder.set_data(data.clone(), 0)?;
+        let values = len_decoder.values_left();
+
+        let mut lengths = vec![0; values];
+        len_decoder.get(&mut lengths)?;
+
+        Ok(Self {
+            lengths,
+            data,
+            validate_utf8,
+            length_offset: 0,
+            data_offset: len_decoder.get_offset(),
+        })
+    }
+
+    fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
+        let to_read = len.min(self.lengths.len() - self.length_offset);
+        output.views.reserve(to_read);
+
+        let src_lengths = &self.lengths[self.length_offset..self.length_offset 
+ to_read];
+
+        let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
+
+        if self.data_offset + total_bytes > self.data.len() {
+            return Err(ParquetError::EOF(
+                "Insufficient delta length byte array bytes".to_string(),
+            ));
+        }
+
+        let block_id = output.append_block(self.data.clone().into());
+
+        let mut start_offset = self.data_offset;
+        let initial_offset = start_offset;
+        for length in src_lengths {
+            // # Safety
+            // The length is from the delta length decoder, so it is valid
+            // The start_offset is calculated from the lengths, so it is valid
+            unsafe { output.append_view_unchecked(block_id, start_offset as 
u32, *length as u32) }
+
+            start_offset += *length as usize;
+        }
+
+        // Delta length encoding has continuous strings, we can validate utf8 
in one go
+        if self.validate_utf8 {
+            check_valid_utf8(&self.data[initial_offset..start_offset])?;
+        }
+
+        self.data_offset = start_offset;
+        self.length_offset += to_read;
+
+        Ok(to_read)
+    }
+
+    fn skip(&mut self, to_skip: usize) -> Result<usize> {
+        let remain_values = self.lengths.len() - self.length_offset;
+        let to_skip = remain_values.min(to_skip);
+
+        let src_lengths = &self.lengths[self.length_offset..self.length_offset 
+ to_skip];
+        let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
+
+        self.data_offset += total_bytes;
+        self.length_offset += to_skip;
+        Ok(to_skip)
+    }
+}
+
+/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`ViewBuffer`]
+pub struct ByteViewArrayDecoderDelta {
+    decoder: DeltaByteArrayDecoder,
+    validate_utf8: bool,
+}
+
+impl ByteViewArrayDecoderDelta {
+    fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
+        Ok(Self {
+            decoder: DeltaByteArrayDecoder::new(data)?,
+            validate_utf8,
+        })
+    }
+
+    // Unlike other encodings, we need to copy the data because we can not 
reuse the DeltaByteArray data in Arrow.
+    fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
+        output.views.reserve(len.min(self.decoder.remaining()));
+
+        let mut buffer: Vec<u8> = Vec::with_capacity(4096);
+
+        let buffer_id = output.buffers.len() as u32;
+
+        let read = self.decoder.read(len, |bytes| {
+            let offset = buffer.len();
+            buffer.extend_from_slice(bytes);
+            let view = make_view(bytes, buffer_id, offset as u32);
+            // # Safety
+            // The buffer_id is the last buffer in the output buffers
+            // The offset is calculated from the buffer, so it is valid
+            unsafe {
+                output.append_raw_view_unchecked(&view);
+            }
+            Ok(())
+        })?;
+
+        // The buffer is dense, so we can validate utf8 in one go
+        if self.validate_utf8 {
+            check_valid_utf8(&buffer)?;
+        }
+        let actual_block_id = output.append_block(buffer.into());
+        assert_eq!(actual_block_id, buffer_id);

Review Comment:
   💯  for the verification



##########
parquet/src/arrow/array_reader/byte_view_array.rs:
##########
@@ -449,8 +458,144 @@ impl ByteViewArrayDecoderDictionary {
     }
 }
 
+/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`]
+pub struct ByteViewArrayDecoderDeltaLength {
+    lengths: Vec<i32>,
+    data: Bytes,
+    length_offset: usize,
+    data_offset: usize,
+    validate_utf8: bool,
+}
+
+impl ByteViewArrayDecoderDeltaLength {
+    fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
+        let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
+        len_decoder.set_data(data.clone(), 0)?;
+        let values = len_decoder.values_left();
+
+        let mut lengths = vec![0; values];
+        len_decoder.get(&mut lengths)?;
+
+        Ok(Self {
+            lengths,
+            data,
+            validate_utf8,
+            length_offset: 0,
+            data_offset: len_decoder.get_offset(),
+        })
+    }
+
+    fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
+        let to_read = len.min(self.lengths.len() - self.length_offset);
+        output.views.reserve(to_read);
+
+        let src_lengths = &self.lengths[self.length_offset..self.length_offset 
+ to_read];
+
+        let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
+
+        if self.data_offset + total_bytes > self.data.len() {
+            return Err(ParquetError::EOF(
+                "Insufficient delta length byte array bytes".to_string(),
+            ));
+        }
+
+        let block_id = output.append_block(self.data.clone().into());

Review Comment:
   This pattern is so cool that it can just reuse the bytes



##########
parquet/src/arrow/array_reader/byte_view_array.rs:
##########
@@ -449,8 +458,144 @@ impl ByteViewArrayDecoderDictionary {
     }
 }
 
+/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`]
+pub struct ByteViewArrayDecoderDeltaLength {
+    lengths: Vec<i32>,
+    data: Bytes,
+    length_offset: usize,
+    data_offset: usize,
+    validate_utf8: bool,
+}
+
+impl ByteViewArrayDecoderDeltaLength {
+    fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
+        let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
+        len_decoder.set_data(data.clone(), 0)?;
+        let values = len_decoder.values_left();
+
+        let mut lengths = vec![0; values];
+        len_decoder.get(&mut lengths)?;
+
+        Ok(Self {
+            lengths,
+            data,
+            validate_utf8,
+            length_offset: 0,
+            data_offset: len_decoder.get_offset(),
+        })
+    }
+
+    fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
+        let to_read = len.min(self.lengths.len() - self.length_offset);
+        output.views.reserve(to_read);
+
+        let src_lengths = &self.lengths[self.length_offset..self.length_offset 
+ to_read];
+
+        let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
+
+        if self.data_offset + total_bytes > self.data.len() {
+            return Err(ParquetError::EOF(
+                "Insufficient delta length byte array bytes".to_string(),
+            ));
+        }
+
+        let block_id = output.append_block(self.data.clone().into());
+
+        let mut start_offset = self.data_offset;
+        let initial_offset = start_offset;
+        for length in src_lengths {
+            // # Safety
+            // The length is from the delta length decoder, so it is valid
+            // The start_offset is calculated from the lengths, so it is valid
+            unsafe { output.append_view_unchecked(block_id, start_offset as 
u32, *length as u32) }
+
+            start_offset += *length as usize;
+        }
+
+        // Delta length encoding has continuous strings, we can validate utf8 
in one go
+        if self.validate_utf8 {
+            check_valid_utf8(&self.data[initial_offset..start_offset])?;
+        }
+
+        self.data_offset = start_offset;
+        self.length_offset += to_read;
+
+        Ok(to_read)
+    }
+
+    fn skip(&mut self, to_skip: usize) -> Result<usize> {
+        let remain_values = self.lengths.len() - self.length_offset;
+        let to_skip = remain_values.min(to_skip);
+
+        let src_lengths = &self.lengths[self.length_offset..self.length_offset 
+ to_skip];
+        let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
+
+        self.data_offset += total_bytes;
+        self.length_offset += to_skip;
+        Ok(to_skip)
+    }
+}
+
+/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`ViewBuffer`]
+pub struct ByteViewArrayDecoderDelta {
+    decoder: DeltaByteArrayDecoder,
+    validate_utf8: bool,
+}
+
+impl ByteViewArrayDecoderDelta {
+    fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
+        Ok(Self {
+            decoder: DeltaByteArrayDecoder::new(data)?,
+            validate_utf8,
+        })
+    }
+
+    // Unlike other encodings, we need to copy the data because we can not 
reuse the DeltaByteArray data in Arrow.
+    fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
+        output.views.reserve(len.min(self.decoder.remaining()));
+
+        let mut buffer: Vec<u8> = Vec::with_capacity(4096);
+
+        let buffer_id = output.buffers.len() as u32;
+
+        let read = self.decoder.read(len, |bytes| {
+            let offset = buffer.len();
+            buffer.extend_from_slice(bytes);

Review Comment:
   It is sad that this has to copy all strings (even those that are 12 bytes or 
shorter) into `buffer`, as they are inlined into the view. However, I think it 
is required to ensure utf8 validation is done
   
   I suppose it would be possible to accumulate the shorter strings in a 
different buffer and call `validate_utf8` twice
   
   The potential benefit of doing so woud be less memory required to store the 
final array.
   
   I don't think this is required, I am just pointing it out
   
    



##########
parquet/src/arrow/array_reader/byte_view_array.rs:
##########
@@ -449,8 +458,144 @@ impl ByteViewArrayDecoderDictionary {
     }
 }
 
+/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`]
+pub struct ByteViewArrayDecoderDeltaLength {
+    lengths: Vec<i32>,
+    data: Bytes,
+    length_offset: usize,
+    data_offset: usize,
+    validate_utf8: bool,
+}
+
+impl ByteViewArrayDecoderDeltaLength {
+    fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
+        let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
+        len_decoder.set_data(data.clone(), 0)?;
+        let values = len_decoder.values_left();
+
+        let mut lengths = vec![0; values];
+        len_decoder.get(&mut lengths)?;
+
+        Ok(Self {
+            lengths,
+            data,
+            validate_utf8,
+            length_offset: 0,
+            data_offset: len_decoder.get_offset(),
+        })
+    }
+
+    fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
+        let to_read = len.min(self.lengths.len() - self.length_offset);
+        output.views.reserve(to_read);
+
+        let src_lengths = &self.lengths[self.length_offset..self.length_offset 
+ to_read];
+
+        let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
+
+        if self.data_offset + total_bytes > self.data.len() {
+            return Err(ParquetError::EOF(
+                "Insufficient delta length byte array bytes".to_string(),
+            ));
+        }
+
+        let block_id = output.append_block(self.data.clone().into());
+
+        let mut start_offset = self.data_offset;

Review Comment:
   This variable tracks the current ending offset in my mind,  so I found it 
confusing that it was named 'start_offset'



##########
parquet/src/arrow/array_reader/byte_view_array.rs:
##########
@@ -449,8 +458,144 @@ impl ByteViewArrayDecoderDictionary {
     }
 }
 
+/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`]
+pub struct ByteViewArrayDecoderDeltaLength {
+    lengths: Vec<i32>,
+    data: Bytes,
+    length_offset: usize,
+    data_offset: usize,
+    validate_utf8: bool,
+}
+
+impl ByteViewArrayDecoderDeltaLength {
+    fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
+        let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
+        len_decoder.set_data(data.clone(), 0)?;
+        let values = len_decoder.values_left();
+
+        let mut lengths = vec![0; values];
+        len_decoder.get(&mut lengths)?;
+
+        Ok(Self {
+            lengths,
+            data,
+            validate_utf8,
+            length_offset: 0,
+            data_offset: len_decoder.get_offset(),
+        })
+    }
+
+    fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
+        let to_read = len.min(self.lengths.len() - self.length_offset);
+        output.views.reserve(to_read);
+
+        let src_lengths = &self.lengths[self.length_offset..self.length_offset 
+ to_read];
+
+        let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
+
+        if self.data_offset + total_bytes > self.data.len() {
+            return Err(ParquetError::EOF(
+                "Insufficient delta length byte array bytes".to_string(),
+            ));
+        }
+
+        let block_id = output.append_block(self.data.clone().into());
+
+        let mut start_offset = self.data_offset;
+        let initial_offset = start_offset;
+        for length in src_lengths {
+            // # Safety
+            // The length is from the delta length decoder, so it is valid
+            // The start_offset is calculated from the lengths, so it is valid

Review Comment:
   The other part of the safety argument is that data-offset + total_bytes was 
checked above as well



##########
parquet/src/arrow/array_reader/byte_view_array.rs:
##########
@@ -449,8 +458,144 @@ impl ByteViewArrayDecoderDictionary {
     }
 }
 
+/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`]
+pub struct ByteViewArrayDecoderDeltaLength {
+    lengths: Vec<i32>,
+    data: Bytes,
+    length_offset: usize,
+    data_offset: usize,
+    validate_utf8: bool,
+}
+
+impl ByteViewArrayDecoderDeltaLength {
+    fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
+        let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
+        len_decoder.set_data(data.clone(), 0)?;
+        let values = len_decoder.values_left();
+
+        let mut lengths = vec![0; values];
+        len_decoder.get(&mut lengths)?;
+
+        Ok(Self {
+            lengths,
+            data,
+            validate_utf8,
+            length_offset: 0,
+            data_offset: len_decoder.get_offset(),
+        })
+    }
+
+    fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
+        let to_read = len.min(self.lengths.len() - self.length_offset);
+        output.views.reserve(to_read);
+
+        let src_lengths = &self.lengths[self.length_offset..self.length_offset 
+ to_read];
+
+        let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
+
+        if self.data_offset + total_bytes > self.data.len() {
+            return Err(ParquetError::EOF(
+                "Insufficient delta length byte array bytes".to_string(),
+            ));
+        }
+
+        let block_id = output.append_block(self.data.clone().into());
+
+        let mut start_offset = self.data_offset;
+        let initial_offset = start_offset;
+        for length in src_lengths {
+            // # Safety
+            // The length is from the delta length decoder, so it is valid
+            // The start_offset is calculated from the lengths, so it is valid
+            unsafe { output.append_view_unchecked(block_id, start_offset as 
u32, *length as u32) }
+
+            start_offset += *length as usize;
+        }
+
+        // Delta length encoding has continuous strings, we can validate utf8 
in one go
+        if self.validate_utf8 {
+            check_valid_utf8(&self.data[initial_offset..start_offset])?;
+        }
+
+        self.data_offset = start_offset;
+        self.length_offset += to_read;
+
+        Ok(to_read)
+    }
+
+    fn skip(&mut self, to_skip: usize) -> Result<usize> {
+        let remain_values = self.lengths.len() - self.length_offset;
+        let to_skip = remain_values.min(to_skip);
+
+        let src_lengths = &self.lengths[self.length_offset..self.length_offset 
+ to_skip];
+        let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
+
+        self.data_offset += total_bytes;
+        self.length_offset += to_skip;
+        Ok(to_skip)
+    }
+}
+
+/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`ViewBuffer`]
+pub struct ByteViewArrayDecoderDelta {
+    decoder: DeltaByteArrayDecoder,
+    validate_utf8: bool,
+}
+
+impl ByteViewArrayDecoderDelta {
+    fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
+        Ok(Self {
+            decoder: DeltaByteArrayDecoder::new(data)?,
+            validate_utf8,
+        })
+    }
+
+    // Unlike other encodings, we need to copy the data because we can not 
reuse the DeltaByteArray data in Arrow.

Review Comment:
   I think it would help to clarify what about the encoding requires copying
   
   ```suggestion
       // Unlike other encodings, we need to copy the data.
       //
       //  DeltaByteArray data is stored using shared prefixes/suffixes, 
       // which results in potentially non-contiguous
       // strings, while Arrow encodings require contiguous strings
       //
       // 
<https://parquet.apache.org/docs/file-format/data-pages/encodings/#delta-strings-delta_byte_array--7>
       
   ```



##########
parquet/src/arrow/array_reader/byte_view_array.rs:
##########
@@ -449,8 +458,144 @@ impl ByteViewArrayDecoderDictionary {
     }
 }
 
+/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`]
+pub struct ByteViewArrayDecoderDeltaLength {
+    lengths: Vec<i32>,
+    data: Bytes,
+    length_offset: usize,
+    data_offset: usize,
+    validate_utf8: bool,
+}
+
+impl ByteViewArrayDecoderDeltaLength {
+    fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
+        let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
+        len_decoder.set_data(data.clone(), 0)?;
+        let values = len_decoder.values_left();
+
+        let mut lengths = vec![0; values];
+        len_decoder.get(&mut lengths)?;
+
+        Ok(Self {
+            lengths,
+            data,
+            validate_utf8,
+            length_offset: 0,
+            data_offset: len_decoder.get_offset(),
+        })
+    }
+
+    fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
+        let to_read = len.min(self.lengths.len() - self.length_offset);
+        output.views.reserve(to_read);
+
+        let src_lengths = &self.lengths[self.length_offset..self.length_offset 
+ to_read];
+
+        let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
+
+        if self.data_offset + total_bytes > self.data.len() {
+            return Err(ParquetError::EOF(
+                "Insufficient delta length byte array bytes".to_string(),
+            ));
+        }
+
+        let block_id = output.append_block(self.data.clone().into());
+
+        let mut start_offset = self.data_offset;
+        let initial_offset = start_offset;
+        for length in src_lengths {
+            // # Safety
+            // The length is from the delta length decoder, so it is valid
+            // The start_offset is calculated from the lengths, so it is valid
+            unsafe { output.append_view_unchecked(block_id, start_offset as 
u32, *length as u32) }
+
+            start_offset += *length as usize;
+        }
+
+        // Delta length encoding has continuous strings, we can validate utf8 
in one go
+        if self.validate_utf8 {
+            check_valid_utf8(&self.data[initial_offset..start_offset])?;
+        }
+
+        self.data_offset = start_offset;
+        self.length_offset += to_read;
+
+        Ok(to_read)
+    }
+
+    fn skip(&mut self, to_skip: usize) -> Result<usize> {
+        let remain_values = self.lengths.len() - self.length_offset;
+        let to_skip = remain_values.min(to_skip);
+
+        let src_lengths = &self.lengths[self.length_offset..self.length_offset 
+ to_skip];
+        let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
+
+        self.data_offset += total_bytes;
+        self.length_offset += to_skip;
+        Ok(to_skip)
+    }
+}
+
+/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`ViewBuffer`]
+pub struct ByteViewArrayDecoderDelta {
+    decoder: DeltaByteArrayDecoder,
+    validate_utf8: bool,
+}
+
+impl ByteViewArrayDecoderDelta {
+    fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
+        Ok(Self {
+            decoder: DeltaByteArrayDecoder::new(data)?,
+            validate_utf8,
+        })
+    }
+
+    // Unlike other encodings, we need to copy the data because we can not 
reuse the DeltaByteArray data in Arrow.
+    fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
+        output.views.reserve(len.min(self.decoder.remaining()));
+
+        let mut buffer: Vec<u8> = Vec::with_capacity(4096);
+
+        let buffer_id = output.buffers.len() as u32;
+
+        let read = self.decoder.read(len, |bytes| {
+            let offset = buffer.len();
+            buffer.extend_from_slice(bytes);
+            let view = make_view(bytes, buffer_id, offset as u32);
+            // # Safety
+            // The buffer_id is the last buffer in the output buffers
+            // The offset is calculated from the buffer, so it is valid

Review Comment:
   Also important is that utf8 validation is done later



##########
parquet/src/arrow/array_reader/byte_view_array.rs:
##########
@@ -449,8 +458,144 @@ impl ByteViewArrayDecoderDictionary {
     }
 }
 
+/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`]
+pub struct ByteViewArrayDecoderDeltaLength {
+    lengths: Vec<i32>,
+    data: Bytes,
+    length_offset: usize,
+    data_offset: usize,
+    validate_utf8: bool,
+}
+
+impl ByteViewArrayDecoderDeltaLength {
+    fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
+        let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
+        len_decoder.set_data(data.clone(), 0)?;
+        let values = len_decoder.values_left();
+
+        let mut lengths = vec![0; values];
+        len_decoder.get(&mut lengths)?;
+
+        Ok(Self {
+            lengths,
+            data,
+            validate_utf8,
+            length_offset: 0,
+            data_offset: len_decoder.get_offset(),
+        })
+    }
+
+    fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
+        let to_read = len.min(self.lengths.len() - self.length_offset);
+        output.views.reserve(to_read);
+
+        let src_lengths = &self.lengths[self.length_offset..self.length_offset 
+ to_read];
+
+        let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
+
+        if self.data_offset + total_bytes > self.data.len() {
+            return Err(ParquetError::EOF(
+                "Insufficient delta length byte array bytes".to_string(),
+            ));
+        }
+
+        let block_id = output.append_block(self.data.clone().into());
+
+        let mut start_offset = self.data_offset;
+        let initial_offset = start_offset;
+        for length in src_lengths {
+            // # Safety
+            // The length is from the delta length decoder, so it is valid
+            // The start_offset is calculated from the lengths, so it is valid
+            unsafe { output.append_view_unchecked(block_id, start_offset as 
u32, *length as u32) }
+
+            start_offset += *length as usize;
+        }
+
+        // Delta length encoding has continuous strings, we can validate utf8 
in one go
+        if self.validate_utf8 {
+            check_valid_utf8(&self.data[initial_offset..start_offset])?;
+        }
+
+        self.data_offset = start_offset;
+        self.length_offset += to_read;
+
+        Ok(to_read)
+    }
+
+    fn skip(&mut self, to_skip: usize) -> Result<usize> {
+        let remain_values = self.lengths.len() - self.length_offset;
+        let to_skip = remain_values.min(to_skip);
+
+        let src_lengths = &self.lengths[self.length_offset..self.length_offset 
+ to_skip];
+        let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
+
+        self.data_offset += total_bytes;
+        self.length_offset += to_skip;
+        Ok(to_skip)
+    }
+}
+
+/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`ViewBuffer`]
+pub struct ByteViewArrayDecoderDelta {
+    decoder: DeltaByteArrayDecoder,
+    validate_utf8: bool,
+}
+
+impl ByteViewArrayDecoderDelta {
+    fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
+        Ok(Self {
+            decoder: DeltaByteArrayDecoder::new(data)?,
+            validate_utf8,
+        })
+    }
+
+    // Unlike other encodings, we need to copy the data because we can not 
reuse the DeltaByteArray data in Arrow.
+    fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
+        output.views.reserve(len.min(self.decoder.remaining()));
+
+        let mut buffer: Vec<u8> = Vec::with_capacity(4096);
+
+        let buffer_id = output.buffers.len() as u32;
+
+        let read = self.decoder.read(len, |bytes| {
+            let offset = buffer.len();
+            buffer.extend_from_slice(bytes);
+            let view = make_view(bytes, buffer_id, offset as u32);
+            // # Safety
+            // The buffer_id is the last buffer in the output buffers
+            // The offset is calculated from the buffer, so it is valid
+            unsafe {
+                output.append_raw_view_unchecked(&view);
+            }
+            Ok(())
+        })?;
+
+        // The buffer is dense, so we can validate utf8 in one go
+        if self.validate_utf8 {
+            check_valid_utf8(&buffer)?;
+        }
+        let actual_block_id = output.append_block(buffer.into());
+        assert_eq!(actual_block_id, buffer_id);
+        Ok(read)
+    }
+
+    fn skip(&mut self, to_skip: usize) -> Result<usize> {
+        self.decoder.skip(to_skip)
+    }
+}
+
 /// Check that `val` is a valid UTF-8 sequence
 pub fn check_valid_utf8(val: &[u8]) -> Result<()> {
+    if let Some(&b) = val.first() {

Review Comment:
   I don't understand why this extra check is needed -- wouldn't 
`str::from_utf8` do the same check? 
   
   I played around with it and it seems like we can remove this check and 
indeed `str::from_utf8` still catches it
   
   ```diff
   diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs 
b/parquet/src/arrow/array_reader/byte_view_array.rs
   index 3c1e0f6395e..d68e2b5688c 100644
   --- a/parquet/src/arrow/array_reader/byte_view_array.rs
   +++ b/parquet/src/arrow/array_reader/byte_view_array.rs
   @@ -587,15 +587,6 @@ impl ByteViewArrayDecoderDelta {
   
    /// Check that `val` is a valid UTF-8 sequence
    pub fn check_valid_utf8(val: &[u8]) -> Result<()> {
   -    if let Some(&b) = val.first() {
   -        // A valid code-point iff it does not start with 0b10xxxxxx
   -        // Bit-magic taken from `std::str::is_char_boundary`
   -        if (b as i8) < -0x40 {
   -            return Err(ParquetError::General(
   -                "encountered non UTF-8 data".to_string(),
   -            ));
   -        }
   -    }
        match std::str::from_utf8(val) {
            Ok(_) => Ok(()),
            Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)),
   diff --git a/parquet/src/arrow/arrow_reader/mod.rs 
b/parquet/src/arrow/arrow_reader/mod.rs
   index a7911a67dc4..44d83174bd5 100644
   --- a/parquet/src/arrow/arrow_reader/mod.rs
   +++ b/parquet/src/arrow/arrow_reader/mod.rs
   @@ -2456,7 +2456,7 @@ mod tests {
            let cases = [
                (
                    invalid_utf8_first_char::<i32>(),
   -                "Parquet argument error: Parquet error: encountered non 
UTF-8 data",
   +                "Parquet argument error: Parquet error: encountered non 
UTF-8 data: invalid utf-8 sequence of 1 bytes from index 0",
                ),
                (
                    invalid_utf8_later_char::<i32>(),
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to