This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 8355823f74 Complete `StringViewArray` and `BinaryViewArray` parquet
decoder: implement delta byte array and delta length byte array encoding
(#6004)
8355823f74 is described below
commit 8355823f74c3b2e1cd86118f8da8bc0305cd4b68
Author: Xiangpeng Hao <[email protected]>
AuthorDate: Mon Jul 8 17:11:48 2024 -0400
Complete `StringViewArray` and `BinaryViewArray` parquet decoder:
implement delta byte array and delta length byte array encoding (#6004)
* implement all encodings
* address comments
* fix bug
* Update parquet/src/arrow/array_reader/byte_view_array.rs
Co-authored-by: Andrew Lamb <[email protected]>
* fix test
* update comments
* update test
* Only copy strings one
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
parquet/src/arrow/array_reader/builder.rs | 2 +-
parquet/src/arrow/array_reader/byte_array.rs | 63 +++----
parquet/src/arrow/array_reader/byte_view_array.rs | 205 ++++++++++++++++++++--
parquet/src/arrow/arrow_reader/mod.rs | 18 +-
parquet/src/arrow/buffer/view_buffer.rs | 2 -
5 files changed, 218 insertions(+), 72 deletions(-)
diff --git a/parquet/src/arrow/array_reader/builder.rs
b/parquet/src/arrow/array_reader/builder.rs
index 958594c932..945f62526a 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -19,7 +19,7 @@ use std::sync::Arc;
use arrow_schema::{DataType, Fields, SchemaBuilder};
-use crate::arrow::array_reader::byte_array::make_byte_view_array_reader;
+use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader;
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
use
crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
use crate::arrow::array_reader::{
diff --git a/parquet/src/arrow/array_reader/byte_array.rs
b/parquet/src/arrow/array_reader/byte_array.rs
index d0aa6f7b1e..80c448edc6 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -74,36 +74,6 @@ pub fn make_byte_array_reader(
}
}
-/// Returns an [`ArrayReader`] that decodes the provided byte array column to
view types.
-pub fn make_byte_view_array_reader(
- pages: Box<dyn PageIterator>,
- column_desc: ColumnDescPtr,
- arrow_type: Option<ArrowType>,
-) -> Result<Box<dyn ArrayReader>> {
- // Check if Arrow type is specified, else create it from Parquet type
- let data_type = match arrow_type {
- Some(t) => t,
- None => match
parquet_to_arrow_field(column_desc.as_ref())?.data_type() {
- ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View,
- _ => ArrowType::BinaryView,
- },
- };
-
- match data_type {
- ArrowType::BinaryView | ArrowType::Utf8View => {
- let reader = GenericRecordReader::new(column_desc);
- Ok(Box::new(ByteArrayReader::<i32>::new(
- pages, data_type, reader,
- )))
- }
-
- _ => Err(general_err!(
- "invalid data type for byte array reader read to view type - {}",
- data_type
- )),
- }
-}
-
/// An [`ArrayReader`] for variable length byte arrays
struct ByteArrayReader<I: OffsetSizeTrait> {
data_type: ArrowType,
@@ -472,6 +442,23 @@ impl ByteArrayDecoderDeltaLength {
let mut lengths = vec![0; values];
len_decoder.get(&mut lengths)?;
+ let mut total_bytes = 0;
+
+ for l in lengths.iter() {
+ if *l < 0 {
+ return Err(ParquetError::General(
+ "negative delta length byte array length".to_string(),
+ ));
+ }
+ total_bytes += *l as usize;
+ }
+
+ if total_bytes + len_decoder.get_offset() > data.len() {
+ return Err(ParquetError::General(
+ "Insufficient delta length byte array bytes".to_string(),
+ ));
+ }
+
Ok(Self {
lengths,
data,
@@ -496,23 +483,17 @@ impl ByteArrayDecoderDeltaLength {
let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
output.values.reserve(total_bytes);
- if self.data_offset + total_bytes > self.data.len() {
- return Err(ParquetError::EOF(
- "Insufficient delta length byte array bytes".to_string(),
- ));
- }
-
- let mut start_offset = self.data_offset;
+ let mut current_offset = self.data_offset;
for length in src_lengths {
- let end_offset = start_offset + *length as usize;
+ let end_offset = current_offset + *length as usize;
output.try_push(
- &self.data.as_ref()[start_offset..end_offset],
+ &self.data.as_ref()[current_offset..end_offset],
self.validate_utf8,
)?;
- start_offset = end_offset;
+ current_offset = end_offset;
}
- self.data_offset = start_offset;
+ self.data_offset = current_offset;
self.length_offset += to_read;
if self.validate_utf8 {
diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs
b/parquet/src/arrow/array_reader/byte_view_array.rs
index 9c5caaad59..dc4ce3f9c1 100644
--- a/parquet/src/arrow/array_reader/byte_view_array.rs
+++ b/parquet/src/arrow/array_reader/byte_view_array.rs
@@ -17,22 +17,23 @@
use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
use crate::arrow::buffer::view_buffer::ViewBuffer;
-use crate::arrow::decoder::DictIndexDecoder;
+use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::{ConvertedType, Encoding};
use crate::column::page::PageIterator;
use crate::column::reader::decoder::ColumnValueDecoder;
+use crate::data_type::Int32Type;
+use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
-use arrow_array::ArrayRef;
+use arrow_array::{builder::make_view, ArrayRef};
use arrow_data::ByteView;
use arrow_schema::DataType as ArrowType;
use bytes::Bytes;
use std::any::Any;
/// Returns an [`ArrayReader`] that decodes the provided byte array column to
view types.
-#[allow(unused)]
pub fn make_byte_view_array_reader(
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
@@ -61,7 +62,6 @@ pub fn make_byte_view_array_reader(
}
/// An [`ArrayReader`] for variable length byte arrays
-#[allow(unused)]
struct ByteViewArrayReader {
data_type: ArrowType,
pages: Box<dyn PageIterator>,
@@ -213,6 +213,8 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder
{
pub enum ByteViewArrayDecoder {
Plain(ByteViewArrayDecoderPlain),
Dictionary(ByteViewArrayDecoderDictionary),
+ DeltaLength(ByteViewArrayDecoderDeltaLength),
+ DeltaByteArray(ByteViewArrayDecoderDelta),
}
impl ByteViewArrayDecoder {
@@ -235,9 +237,12 @@ impl ByteViewArrayDecoder {
data, num_levels, num_values,
))
}
- Encoding::DELTA_LENGTH_BYTE_ARRAY | Encoding::DELTA_BYTE_ARRAY => {
- unimplemented!("stay tuned!")
- }
+ Encoding::DELTA_LENGTH_BYTE_ARRAY =>
ByteViewArrayDecoder::DeltaLength(
+ ByteViewArrayDecoderDeltaLength::new(data, validate_utf8)?,
+ ),
+ Encoding::DELTA_BYTE_ARRAY => ByteViewArrayDecoder::DeltaByteArray(
+ ByteViewArrayDecoderDelta::new(data, validate_utf8)?,
+ ),
_ => {
return Err(general_err!(
"unsupported encoding for byte array: {}",
@@ -263,6 +268,8 @@ impl ByteViewArrayDecoder {
.ok_or_else(|| general_err!("dictionary required for
dictionary encoding"))?;
d.read(out, dict, len)
}
+ ByteViewArrayDecoder::DeltaLength(d) => d.read(out, len),
+ ByteViewArrayDecoder::DeltaByteArray(d) => d.read(out, len),
}
}
@@ -275,6 +282,8 @@ impl ByteViewArrayDecoder {
.ok_or_else(|| general_err!("dictionary required for
dictionary encoding"))?;
d.skip(dict, len)
}
+ ByteViewArrayDecoder::DeltaLength(d) => d.skip(len),
+ ByteViewArrayDecoder::DeltaByteArray(d) => d.skip(len),
}
}
}
@@ -487,6 +496,181 @@ impl ByteViewArrayDecoderDictionary {
}
}
+/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`]
+pub struct ByteViewArrayDecoderDeltaLength {
+ lengths: Vec<i32>,
+ data: Bytes,
+ length_offset: usize,
+ data_offset: usize,
+ validate_utf8: bool,
+}
+
+impl ByteViewArrayDecoderDeltaLength {
+ fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
+ let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
+ len_decoder.set_data(data.clone(), 0)?;
+ let values = len_decoder.values_left();
+
+ let mut lengths = vec![0; values];
+ len_decoder.get(&mut lengths)?;
+
+ let mut total_bytes = 0;
+
+ for l in lengths.iter() {
+ if *l < 0 {
+ return Err(ParquetError::General(
+ "negative delta length byte array length".to_string(),
+ ));
+ }
+ total_bytes += *l as usize;
+ }
+
+ if total_bytes + len_decoder.get_offset() > data.len() {
+ return Err(ParquetError::General(
+ "Insufficient delta length byte array bytes".to_string(),
+ ));
+ }
+
+ Ok(Self {
+ lengths,
+ data,
+ validate_utf8,
+ length_offset: 0,
+ data_offset: len_decoder.get_offset(),
+ })
+ }
+
+ fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
+ let to_read = len.min(self.lengths.len() - self.length_offset);
+ output.views.reserve(to_read);
+
+ let src_lengths = &self.lengths[self.length_offset..self.length_offset
+ to_read];
+
+ let block_id = output.append_block(self.data.clone().into());
+
+ let mut current_offset = self.data_offset;
+ let initial_offset = current_offset;
+ for length in src_lengths {
+ // # Safety
+ // The length is from the delta length decoder, so it is valid
+ // The start_offset is calculated from the lengths, so it is valid
+ // `start_offset + length` is guaranteed to be within the bounds
of `data`, as checked in `new`
+ unsafe { output.append_view_unchecked(block_id, current_offset as
u32, *length as u32) }
+
+ current_offset += *length as usize;
+ }
+
+ // Delta length encoding has continuous strings, we can validate utf8
in one go
+ if self.validate_utf8 {
+ check_valid_utf8(&self.data[initial_offset..current_offset])?;
+ }
+
+ self.data_offset = current_offset;
+ self.length_offset += to_read;
+
+ Ok(to_read)
+ }
+
+ fn skip(&mut self, to_skip: usize) -> Result<usize> {
+ let remain_values = self.lengths.len() - self.length_offset;
+ let to_skip = remain_values.min(to_skip);
+
+ let src_lengths = &self.lengths[self.length_offset..self.length_offset
+ to_skip];
+ let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
+
+ self.data_offset += total_bytes;
+ self.length_offset += to_skip;
+ Ok(to_skip)
+ }
+}
+
+/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`ViewBuffer`]
+pub struct ByteViewArrayDecoderDelta {
+ decoder: DeltaByteArrayDecoder,
+ validate_utf8: bool,
+}
+
+impl ByteViewArrayDecoderDelta {
+ fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
+ Ok(Self {
+ decoder: DeltaByteArrayDecoder::new(data)?,
+ validate_utf8,
+ })
+ }
+
+ // Unlike other encodings, we need to copy the data.
+ //
+ // DeltaByteArray data is stored using shared prefixes/suffixes,
+ // which results in potentially non-contiguous
+ // strings, while Arrow encodings require contiguous strings
+ //
+ //
<https://parquet.apache.org/docs/file-format/data-pages/encodings/#delta-strings-delta_byte_array--7>
+
+ fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
+ output.views.reserve(len.min(self.decoder.remaining()));
+
+ // array buffer only have long strings
+ let mut array_buffer: Vec<u8> = Vec::with_capacity(4096);
+
+ let buffer_id = output.buffers.len() as u32;
+
+ let read = if !self.validate_utf8 {
+ self.decoder.read(len, |bytes| {
+ let offset = array_buffer.len();
+ let view = make_view(bytes, buffer_id, offset as u32);
+ if bytes.len() > 12 {
+ // only copy the data to buffer if the string can not be
inlined.
+ array_buffer.extend_from_slice(bytes);
+ }
+
+ // # Safety
+ // The buffer_id is the last buffer in the output buffers
+ // The offset is calculated from the buffer, so it is valid
+ unsafe {
+ output.append_raw_view_unchecked(&view);
+ }
+ Ok(())
+ })?
+ } else {
+ // utf8 validation buffer has only short strings. These short
+ // strings are inlined into the views but we copy them into a
+ // contiguous buffer to accelerate validation.®
+ let mut utf8_validation_buffer = Vec::with_capacity(4096);
+
+ let v = self.decoder.read(len, |bytes| {
+ let offset = array_buffer.len();
+ let view = make_view(bytes, buffer_id, offset as u32);
+ if bytes.len() > 12 {
+ // only copy the data to buffer if the string can not be
inlined.
+ array_buffer.extend_from_slice(bytes);
+ } else {
+ utf8_validation_buffer.extend_from_slice(bytes);
+ }
+
+ // # Safety
+ // The buffer_id is the last buffer in the output buffers
+ // The offset is calculated from the buffer, so it is valid
+ // Utf-8 validation is done later
+ unsafe {
+ output.append_raw_view_unchecked(&view);
+ }
+ Ok(())
+ })?;
+ check_valid_utf8(&array_buffer)?;
+ check_valid_utf8(&utf8_validation_buffer)?;
+ v
+ };
+
+ let actual_block_id = output.append_block(array_buffer.into());
+ assert_eq!(actual_block_id, buffer_id);
+ Ok(read)
+ }
+
+ fn skip(&mut self, to_skip: usize) -> Result<usize> {
+ self.decoder.skip(to_skip)
+ }
+}
+
/// Check that `val` is a valid UTF-8 sequence
pub fn check_valid_utf8(val: &[u8]) -> Result<()> {
match std::str::from_utf8(val) {
@@ -525,13 +709,6 @@ mod tests {
.unwrap();
for (encoding, page) in pages {
- if encoding != Encoding::PLAIN
- && encoding != Encoding::RLE_DICTIONARY
- && encoding != Encoding::PLAIN_DICTIONARY
- {
- // skip unsupported encodings for now as they are not yet
implemented
- continue;
- }
let mut output = ViewBuffer::default();
decoder.set_data(encoding, page, 4, Some(4)).unwrap();
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index b38092dbc9..cc369cec0e 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -2456,26 +2456,16 @@ mod tests {
let cases = [
(
invalid_utf8_first_char::<i32>(),
- "Parquet argument error: Parquet error: encountered non UTF-8
data",
+ "Parquet argument error: Parquet error: encountered non UTF-8
data: invalid utf-8 sequence of 1 bytes from index 11",
),
(
invalid_utf8_later_char::<i32>(),
- "Parquet argument error: Parquet error: encountered non UTF-8
data: invalid utf-8 sequence of 1 bytes from index 6",
+ "Parquet argument error: Parquet error: encountered non UTF-8
data: invalid utf-8 sequence of 1 bytes from index 14",
),
];
for (array, expected_error) in cases {
- // cast not yet implemented for BinaryView
- // https://github.com/apache/arrow-rs/issues/5508
- // so copy directly
- let mut builder = BinaryViewBuilder::with_capacity(100);
- for v in array.iter() {
- if let Some(v) = v {
- builder.append_value(v);
- } else {
- builder.append_null();
- }
- }
- let array = builder.finish();
+ let array = arrow_cast::cast(&array,
&ArrowDataType::BinaryView).unwrap();
+ let array = array.as_binary_view();
// data is not valid utf8 we can not construct a correct
StringArray
// safely, so purposely create an invalid StringArray
diff --git a/parquet/src/arrow/buffer/view_buffer.rs
b/parquet/src/arrow/buffer/view_buffer.rs
index 1651aa2d75..ae83ac3177 100644
--- a/parquet/src/arrow/buffer/view_buffer.rs
+++ b/parquet/src/arrow/buffer/view_buffer.rs
@@ -37,7 +37,6 @@ impl ViewBuffer {
self.views.is_empty()
}
- #[allow(unused)]
pub fn append_block(&mut self, block: Buffer) -> u32 {
let block_id = self.buffers.len() as u32;
self.buffers.push(block);
@@ -49,7 +48,6 @@ impl ViewBuffer {
/// - `block` is a valid index, i.e., the return value of `append_block`
/// - `offset` and `offset + len` are valid indices into the buffer
/// - The `(offset, offset + len)` is valid value for the native type.
- #[allow(unused)]
pub unsafe fn append_view_unchecked(&mut self, block: u32, offset: u32,
len: u32) {
let b = self.buffers.get_unchecked(block as usize);
let end = offset.saturating_add(len);