alamb commented on code in PR #6004:
URL: https://github.com/apache/arrow-rs/pull/6004#discussion_r1669050838
##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -2456,26 +2456,16 @@ mod tests {
let cases = [
(
invalid_utf8_first_char::<i32>(),
- "Parquet argument error: Parquet error: encountered non UTF-8
data",
+ "Parquet argument error: Parquet error: encountered non UTF-8
data: invalid utf-8 sequence of 1 bytes from index 0",
),
(
invalid_utf8_later_char::<i32>(),
- "Parquet argument error: Parquet error: encountered non UTF-8
data: invalid utf-8 sequence of 1 bytes from index 6",
+ "Parquet argument error: Parquet error: encountered non UTF-8
data: invalid utf-8 sequence of 1 bytes from index 3",
),
];
for (array, expected_error) in cases {
- // cast not yet implemented for BinaryView
Review Comment:
🥳
##########
parquet/src/arrow/array_reader/byte_view_array.rs:
##########
@@ -449,6 +458,166 @@ impl ByteViewArrayDecoderDictionary {
}
}
+/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`]
+pub struct ByteViewArrayDecoderDeltaLength {
+ lengths: Vec<i32>,
+ data: Bytes,
+ length_offset: usize,
+ data_offset: usize,
+ validate_utf8: bool,
+}
+
+impl ByteViewArrayDecoderDeltaLength {
+ fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
+ let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
+ len_decoder.set_data(data.clone(), 0)?;
+ let values = len_decoder.values_left();
+
+ let mut lengths = vec![0; values];
+ len_decoder.get(&mut lengths)?;
+
+ let mut total_bytes = 0;
+
+ for l in lengths.iter() {
+ if *l < 0 {
+ return Err(ParquetError::General(
+ "negative delta length byte array length".to_string(),
+ ));
+ }
+ total_bytes += *l as usize;
+ }
+
+ if total_bytes + len_decoder.get_offset() > data.len() {
+ return Err(ParquetError::General(
+ "Insufficient delta length byte array bytes".to_string(),
+ ));
+ }
+
+ Ok(Self {
+ lengths,
+ data,
+ validate_utf8,
+ length_offset: 0,
+ data_offset: len_decoder.get_offset(),
+ })
+ }
+
+ fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
+ let to_read = len.min(self.lengths.len() - self.length_offset);
+ output.views.reserve(to_read);
+
+ let src_lengths = &self.lengths[self.length_offset..self.length_offset
+ to_read];
+
+ let block_id = output.append_block(self.data.clone().into());
+
+ let mut current_offset = self.data_offset;
+ let initial_offset = current_offset;
+ for length in src_lengths {
+ // # Safety
+ // The length is from the delta length decoder, so it is valid
+ // The start_offset is calculated from the lengths, so it is valid
+ // `start_offset` + *length is guaranteed to be within the bounds
of `data`, as checked in `new`
+ unsafe { output.append_view_unchecked(block_id, current_offset as
u32, *length as u32) }
+
+ current_offset += *length as usize;
+ }
+
+ // Delta length encoding has continuous strings, we can validate utf8
in one go
+ if self.validate_utf8 {
+ check_valid_utf8(&self.data[initial_offset..current_offset])?;
+ }
+
+ self.data_offset = current_offset;
+ self.length_offset += to_read;
+
+ Ok(to_read)
+ }
+
+ fn skip(&mut self, to_skip: usize) -> Result<usize> {
+ let remain_values = self.lengths.len() - self.length_offset;
+ let to_skip = remain_values.min(to_skip);
+
+ let src_lengths = &self.lengths[self.length_offset..self.length_offset
+ to_skip];
+ let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
+
+ self.data_offset += total_bytes;
+ self.length_offset += to_skip;
+ Ok(to_skip)
+ }
+}
+
+/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`ViewBuffer`]
+pub struct ByteViewArrayDecoderDelta {
+ decoder: DeltaByteArrayDecoder,
+ validate_utf8: bool,
+}
+
+impl ByteViewArrayDecoderDelta {
+ fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
+ Ok(Self {
+ decoder: DeltaByteArrayDecoder::new(data)?,
+ validate_utf8,
+ })
+ }
+
+ // Unlike other encodings, we need to copy the data.
+ //
+ // DeltaByteArray data is stored using shared prefixes/suffixes,
+ // which results in potentially non-contiguous
+ // strings, while Arrow encodings require contiguous strings
+ //
+ //
<https://parquet.apache.org/docs/file-format/data-pages/encodings/#delta-strings-delta_byte_array--7>
+
+ fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
+ output.views.reserve(len.min(self.decoder.remaining()));
+
+ // array buffer only have long strings
+ let mut array_buffer: Vec<u8> = Vec::with_capacity(4096);
+
+ // utf8 validation buffer have all strings, we batch the strings in
one buffer to accelerate validation
+ let mut utf8_validation_buffer = if self.validate_utf8 {
+ Some(Vec::with_capacity(4096))
+ } else {
+ None
+ };
+
+ let buffer_id = output.buffers.len() as u32;
+
+ let read = self.decoder.read(len, |bytes| {
Review Comment:
I don't know if it would make a difference but we could potentially have two
copies of this loop, one where utf8 validation was needed and one without --
that would avoid having to check `Some(..) = utf8_validation_buffer` in the
inner loop
##########
parquet/src/arrow/array_reader/byte_view_array.rs:
##########
@@ -449,6 +458,166 @@ impl ByteViewArrayDecoderDictionary {
}
}
+/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`]
+pub struct ByteViewArrayDecoderDeltaLength {
+ lengths: Vec<i32>,
+ data: Bytes,
+ length_offset: usize,
+ data_offset: usize,
+ validate_utf8: bool,
+}
+
+impl ByteViewArrayDecoderDeltaLength {
+ fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
+ let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
+ len_decoder.set_data(data.clone(), 0)?;
+ let values = len_decoder.values_left();
+
+ let mut lengths = vec![0; values];
+ len_decoder.get(&mut lengths)?;
+
+ let mut total_bytes = 0;
+
+ for l in lengths.iter() {
+ if *l < 0 {
+ return Err(ParquetError::General(
+ "negative delta length byte array length".to_string(),
+ ));
+ }
+ total_bytes += *l as usize;
+ }
+
+ if total_bytes + len_decoder.get_offset() > data.len() {
+ return Err(ParquetError::General(
+ "Insufficient delta length byte array bytes".to_string(),
+ ));
+ }
+
+ Ok(Self {
+ lengths,
+ data,
+ validate_utf8,
+ length_offset: 0,
+ data_offset: len_decoder.get_offset(),
+ })
+ }
+
+ fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
+ let to_read = len.min(self.lengths.len() - self.length_offset);
+ output.views.reserve(to_read);
+
+ let src_lengths = &self.lengths[self.length_offset..self.length_offset
+ to_read];
+
+ let block_id = output.append_block(self.data.clone().into());
+
+ let mut current_offset = self.data_offset;
+ let initial_offset = current_offset;
+ for length in src_lengths {
+ // # Safety
+ // The length is from the delta length decoder, so it is valid
+ // The start_offset is calculated from the lengths, so it is valid
+ // `start_offset` + *length is guaranteed to be within the bounds
of `data`, as checked in `new`
+ unsafe { output.append_view_unchecked(block_id, current_offset as
u32, *length as u32) }
+
+ current_offset += *length as usize;
+ }
+
+ // Delta length encoding has continuous strings, we can validate utf8
in one go
+ if self.validate_utf8 {
+ check_valid_utf8(&self.data[initial_offset..current_offset])?;
+ }
+
+ self.data_offset = current_offset;
+ self.length_offset += to_read;
+
+ Ok(to_read)
+ }
+
+ fn skip(&mut self, to_skip: usize) -> Result<usize> {
+ let remain_values = self.lengths.len() - self.length_offset;
+ let to_skip = remain_values.min(to_skip);
+
+ let src_lengths = &self.lengths[self.length_offset..self.length_offset
+ to_skip];
+ let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
+
+ self.data_offset += total_bytes;
+ self.length_offset += to_skip;
+ Ok(to_skip)
+ }
+}
+
+/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`ViewBuffer`]
+pub struct ByteViewArrayDecoderDelta {
+ decoder: DeltaByteArrayDecoder,
+ validate_utf8: bool,
+}
+
+impl ByteViewArrayDecoderDelta {
+ fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
+ Ok(Self {
+ decoder: DeltaByteArrayDecoder::new(data)?,
+ validate_utf8,
+ })
+ }
+
+ // Unlike other encodings, we need to copy the data.
+ //
+ // DeltaByteArray data is stored using shared prefixes/suffixes,
+ // which results in potentially non-contiguous
+ // strings, while Arrow encodings require contiguous strings
+ //
+ //
<https://parquet.apache.org/docs/file-format/data-pages/encodings/#delta-strings-delta_byte_array--7>
+
+ fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
+ output.views.reserve(len.min(self.decoder.remaining()));
+
+ // array buffer only have long strings
+ let mut array_buffer: Vec<u8> = Vec::with_capacity(4096);
+
+ // utf8 validation buffer have all strings, we batch the strings in
one buffer to accelerate validation
+ let mut utf8_validation_buffer = if self.validate_utf8 {
+ Some(Vec::with_capacity(4096))
+ } else {
+ None
+ };
+
+ let buffer_id = output.buffers.len() as u32;
+
+ let read = self.decoder.read(len, |bytes| {
+ let offset = array_buffer.len();
+ let view = make_view(bytes, buffer_id, offset as u32);
+ if bytes.len() > 12 {
+ // only copy the data to buffer if the string can not be
inlined.
+ array_buffer.extend_from_slice(bytes);
+ }
+ if let Some(v) = utf8_validation_buffer.as_mut() {
+ v.extend_from_slice(bytes);
+ }
+
+ // # Safety
+ // The buffer_id is the last buffer in the output buffers
+ // The offset is calculated from the buffer, so it is valid
+ // Utf-8 validation is done later
+ unsafe {
+ output.append_raw_view_unchecked(&view);
+ }
+ Ok(())
+ })?;
+
+ utf8_validation_buffer
+ .map(|v| check_valid_utf8(&v))
+ .transpose()?;
+
Review Comment:
Don't we also need to check `array_buffer` for valid utf8? Maybe I am
missing it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]