This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 8355823f74 Complete `StringViewArray` and `BinaryViewArray` parquet 
decoder:  implement delta byte array and delta length byte array encoding 
(#6004)
8355823f74 is described below

commit 8355823f74c3b2e1cd86118f8da8bc0305cd4b68
Author: Xiangpeng Hao <[email protected]>
AuthorDate: Mon Jul 8 17:11:48 2024 -0400

    Complete `StringViewArray` and `BinaryViewArray` parquet decoder:  
implement delta byte array and delta length byte array encoding (#6004)
    
    * implement all encodings
    
    * address comments
    
    * fix bug
    
    * Update parquet/src/arrow/array_reader/byte_view_array.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * fix test
    
    * update comments
    
    * update test
    
    * Only copy strings one
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 parquet/src/arrow/array_reader/builder.rs         |   2 +-
 parquet/src/arrow/array_reader/byte_array.rs      |  63 +++----
 parquet/src/arrow/array_reader/byte_view_array.rs | 205 ++++++++++++++++++++--
 parquet/src/arrow/arrow_reader/mod.rs             |  18 +-
 parquet/src/arrow/buffer/view_buffer.rs           |   2 -
 5 files changed, 218 insertions(+), 72 deletions(-)

diff --git a/parquet/src/arrow/array_reader/builder.rs 
b/parquet/src/arrow/array_reader/builder.rs
index 958594c932..945f62526a 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -19,7 +19,7 @@ use std::sync::Arc;
 
 use arrow_schema::{DataType, Fields, SchemaBuilder};
 
-use crate::arrow::array_reader::byte_array::make_byte_view_array_reader;
+use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader;
 use crate::arrow::array_reader::empty_array::make_empty_array_reader;
 use 
crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
 use crate::arrow::array_reader::{
diff --git a/parquet/src/arrow/array_reader/byte_array.rs 
b/parquet/src/arrow/array_reader/byte_array.rs
index d0aa6f7b1e..80c448edc6 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -74,36 +74,6 @@ pub fn make_byte_array_reader(
     }
 }
 
-/// Returns an [`ArrayReader`] that decodes the provided byte array column to 
view types.
-pub fn make_byte_view_array_reader(
-    pages: Box<dyn PageIterator>,
-    column_desc: ColumnDescPtr,
-    arrow_type: Option<ArrowType>,
-) -> Result<Box<dyn ArrayReader>> {
-    // Check if Arrow type is specified, else create it from Parquet type
-    let data_type = match arrow_type {
-        Some(t) => t,
-        None => match 
parquet_to_arrow_field(column_desc.as_ref())?.data_type() {
-            ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View,
-            _ => ArrowType::BinaryView,
-        },
-    };
-
-    match data_type {
-        ArrowType::BinaryView | ArrowType::Utf8View => {
-            let reader = GenericRecordReader::new(column_desc);
-            Ok(Box::new(ByteArrayReader::<i32>::new(
-                pages, data_type, reader,
-            )))
-        }
-
-        _ => Err(general_err!(
-            "invalid data type for byte array reader read to view type - {}",
-            data_type
-        )),
-    }
-}
-
 /// An [`ArrayReader`] for variable length byte arrays
 struct ByteArrayReader<I: OffsetSizeTrait> {
     data_type: ArrowType,
@@ -472,6 +442,23 @@ impl ByteArrayDecoderDeltaLength {
         let mut lengths = vec![0; values];
         len_decoder.get(&mut lengths)?;
 
+        let mut total_bytes = 0;
+
+        for l in lengths.iter() {
+            if *l < 0 {
+                return Err(ParquetError::General(
+                    "negative delta length byte array length".to_string(),
+                ));
+            }
+            total_bytes += *l as usize;
+        }
+
+        if total_bytes + len_decoder.get_offset() > data.len() {
+            return Err(ParquetError::General(
+                "Insufficient delta length byte array bytes".to_string(),
+            ));
+        }
+
         Ok(Self {
             lengths,
             data,
@@ -496,23 +483,17 @@ impl ByteArrayDecoderDeltaLength {
         let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
         output.values.reserve(total_bytes);
 
-        if self.data_offset + total_bytes > self.data.len() {
-            return Err(ParquetError::EOF(
-                "Insufficient delta length byte array bytes".to_string(),
-            ));
-        }
-
-        let mut start_offset = self.data_offset;
+        let mut current_offset = self.data_offset;
         for length in src_lengths {
-            let end_offset = start_offset + *length as usize;
+            let end_offset = current_offset + *length as usize;
             output.try_push(
-                &self.data.as_ref()[start_offset..end_offset],
+                &self.data.as_ref()[current_offset..end_offset],
                 self.validate_utf8,
             )?;
-            start_offset = end_offset;
+            current_offset = end_offset;
         }
 
-        self.data_offset = start_offset;
+        self.data_offset = current_offset;
         self.length_offset += to_read;
 
         if self.validate_utf8 {
diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs 
b/parquet/src/arrow/array_reader/byte_view_array.rs
index 9c5caaad59..dc4ce3f9c1 100644
--- a/parquet/src/arrow/array_reader/byte_view_array.rs
+++ b/parquet/src/arrow/array_reader/byte_view_array.rs
@@ -17,22 +17,23 @@
 
 use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
 use crate::arrow::buffer::view_buffer::ViewBuffer;
-use crate::arrow::decoder::DictIndexDecoder;
+use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
 use crate::arrow::record_reader::GenericRecordReader;
 use crate::arrow::schema::parquet_to_arrow_field;
 use crate::basic::{ConvertedType, Encoding};
 use crate::column::page::PageIterator;
 use crate::column::reader::decoder::ColumnValueDecoder;
+use crate::data_type::Int32Type;
+use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
 use crate::errors::{ParquetError, Result};
 use crate::schema::types::ColumnDescPtr;
-use arrow_array::ArrayRef;
+use arrow_array::{builder::make_view, ArrayRef};
 use arrow_data::ByteView;
 use arrow_schema::DataType as ArrowType;
 use bytes::Bytes;
 use std::any::Any;
 
 /// Returns an [`ArrayReader`] that decodes the provided byte array column to 
view types.
-#[allow(unused)]
 pub fn make_byte_view_array_reader(
     pages: Box<dyn PageIterator>,
     column_desc: ColumnDescPtr,
@@ -61,7 +62,6 @@ pub fn make_byte_view_array_reader(
 }
 
 /// An [`ArrayReader`] for variable length byte arrays
-#[allow(unused)]
 struct ByteViewArrayReader {
     data_type: ArrowType,
     pages: Box<dyn PageIterator>,
@@ -213,6 +213,8 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder 
{
 pub enum ByteViewArrayDecoder {
     Plain(ByteViewArrayDecoderPlain),
     Dictionary(ByteViewArrayDecoderDictionary),
+    DeltaLength(ByteViewArrayDecoderDeltaLength),
+    DeltaByteArray(ByteViewArrayDecoderDelta),
 }
 
 impl ByteViewArrayDecoder {
@@ -235,9 +237,12 @@ impl ByteViewArrayDecoder {
                     data, num_levels, num_values,
                 ))
             }
-            Encoding::DELTA_LENGTH_BYTE_ARRAY | Encoding::DELTA_BYTE_ARRAY => {
-                unimplemented!("stay tuned!")
-            }
+            Encoding::DELTA_LENGTH_BYTE_ARRAY => 
ByteViewArrayDecoder::DeltaLength(
+                ByteViewArrayDecoderDeltaLength::new(data, validate_utf8)?,
+            ),
+            Encoding::DELTA_BYTE_ARRAY => ByteViewArrayDecoder::DeltaByteArray(
+                ByteViewArrayDecoderDelta::new(data, validate_utf8)?,
+            ),
             _ => {
                 return Err(general_err!(
                     "unsupported encoding for byte array: {}",
@@ -263,6 +268,8 @@ impl ByteViewArrayDecoder {
                     .ok_or_else(|| general_err!("dictionary required for 
dictionary encoding"))?;
                 d.read(out, dict, len)
             }
+            ByteViewArrayDecoder::DeltaLength(d) => d.read(out, len),
+            ByteViewArrayDecoder::DeltaByteArray(d) => d.read(out, len),
         }
     }
 
@@ -275,6 +282,8 @@ impl ByteViewArrayDecoder {
                     .ok_or_else(|| general_err!("dictionary required for 
dictionary encoding"))?;
                 d.skip(dict, len)
             }
+            ByteViewArrayDecoder::DeltaLength(d) => d.skip(len),
+            ByteViewArrayDecoder::DeltaByteArray(d) => d.skip(len),
         }
     }
 }
@@ -487,6 +496,181 @@ impl ByteViewArrayDecoderDictionary {
     }
 }
 
+/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`]
+pub struct ByteViewArrayDecoderDeltaLength {
+    lengths: Vec<i32>,
+    data: Bytes,
+    length_offset: usize,
+    data_offset: usize,
+    validate_utf8: bool,
+}
+
+impl ByteViewArrayDecoderDeltaLength {
+    fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
+        let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
+        len_decoder.set_data(data.clone(), 0)?;
+        let values = len_decoder.values_left();
+
+        let mut lengths = vec![0; values];
+        len_decoder.get(&mut lengths)?;
+
+        let mut total_bytes = 0;
+
+        for l in lengths.iter() {
+            if *l < 0 {
+                return Err(ParquetError::General(
+                    "negative delta length byte array length".to_string(),
+                ));
+            }
+            total_bytes += *l as usize;
+        }
+
+        if total_bytes + len_decoder.get_offset() > data.len() {
+            return Err(ParquetError::General(
+                "Insufficient delta length byte array bytes".to_string(),
+            ));
+        }
+
+        Ok(Self {
+            lengths,
+            data,
+            validate_utf8,
+            length_offset: 0,
+            data_offset: len_decoder.get_offset(),
+        })
+    }
+
+    fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
+        let to_read = len.min(self.lengths.len() - self.length_offset);
+        output.views.reserve(to_read);
+
+        let src_lengths = &self.lengths[self.length_offset..self.length_offset 
+ to_read];
+
+        let block_id = output.append_block(self.data.clone().into());
+
+        let mut current_offset = self.data_offset;
+        let initial_offset = current_offset;
+        for length in src_lengths {
+            // # Safety
+            // The length is from the delta length decoder, so it is valid
+            // The start_offset is calculated from the lengths, so it is valid
+            // `start_offset + length` is guaranteed to be within the bounds 
of `data`, as checked in `new`
+            unsafe { output.append_view_unchecked(block_id, current_offset as 
u32, *length as u32) }
+
+            current_offset += *length as usize;
+        }
+
+        // Delta length encoding has continuous strings, we can validate utf8 
in one go
+        if self.validate_utf8 {
+            check_valid_utf8(&self.data[initial_offset..current_offset])?;
+        }
+
+        self.data_offset = current_offset;
+        self.length_offset += to_read;
+
+        Ok(to_read)
+    }
+
+    fn skip(&mut self, to_skip: usize) -> Result<usize> {
+        let remain_values = self.lengths.len() - self.length_offset;
+        let to_skip = remain_values.min(to_skip);
+
+        let src_lengths = &self.lengths[self.length_offset..self.length_offset 
+ to_skip];
+        let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
+
+        self.data_offset += total_bytes;
+        self.length_offset += to_skip;
+        Ok(to_skip)
+    }
+}
+
+/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`ViewBuffer`]
+pub struct ByteViewArrayDecoderDelta {
+    decoder: DeltaByteArrayDecoder,
+    validate_utf8: bool,
+}
+
+impl ByteViewArrayDecoderDelta {
+    fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
+        Ok(Self {
+            decoder: DeltaByteArrayDecoder::new(data)?,
+            validate_utf8,
+        })
+    }
+
+    // Unlike other encodings, we need to copy the data.
+    //
+    //  DeltaByteArray data is stored using shared prefixes/suffixes,
+    // which results in potentially non-contiguous
+    // strings, while Arrow encodings require contiguous strings
+    //
+    // 
<https://parquet.apache.org/docs/file-format/data-pages/encodings/#delta-strings-delta_byte_array--7>
+
+    fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
+        output.views.reserve(len.min(self.decoder.remaining()));
+
+        // array buffer only have long strings
+        let mut array_buffer: Vec<u8> = Vec::with_capacity(4096);
+
+        let buffer_id = output.buffers.len() as u32;
+
+        let read = if !self.validate_utf8 {
+            self.decoder.read(len, |bytes| {
+                let offset = array_buffer.len();
+                let view = make_view(bytes, buffer_id, offset as u32);
+                if bytes.len() > 12 {
+                    // only copy the data to buffer if the string can not be 
inlined.
+                    array_buffer.extend_from_slice(bytes);
+                }
+
+                // # Safety
+                // The buffer_id is the last buffer in the output buffers
+                // The offset is calculated from the buffer, so it is valid
+                unsafe {
+                    output.append_raw_view_unchecked(&view);
+                }
+                Ok(())
+            })?
+        } else {
+            // utf8 validation buffer has only short strings. These short
+            // strings are inlined into the views but we copy them into a
+            // contiguous buffer to accelerate validation.®
+            let mut utf8_validation_buffer = Vec::with_capacity(4096);
+
+            let v = self.decoder.read(len, |bytes| {
+                let offset = array_buffer.len();
+                let view = make_view(bytes, buffer_id, offset as u32);
+                if bytes.len() > 12 {
+                    // only copy the data to buffer if the string can not be 
inlined.
+                    array_buffer.extend_from_slice(bytes);
+                } else {
+                    utf8_validation_buffer.extend_from_slice(bytes);
+                }
+
+                // # Safety
+                // The buffer_id is the last buffer in the output buffers
+                // The offset is calculated from the buffer, so it is valid
+                // Utf-8 validation is done later
+                unsafe {
+                    output.append_raw_view_unchecked(&view);
+                }
+                Ok(())
+            })?;
+            check_valid_utf8(&array_buffer)?;
+            check_valid_utf8(&utf8_validation_buffer)?;
+            v
+        };
+
+        let actual_block_id = output.append_block(array_buffer.into());
+        assert_eq!(actual_block_id, buffer_id);
+        Ok(read)
+    }
+
+    fn skip(&mut self, to_skip: usize) -> Result<usize> {
+        self.decoder.skip(to_skip)
+    }
+}
+
 /// Check that `val` is a valid UTF-8 sequence
 pub fn check_valid_utf8(val: &[u8]) -> Result<()> {
     match std::str::from_utf8(val) {
@@ -525,13 +709,6 @@ mod tests {
             .unwrap();
 
         for (encoding, page) in pages {
-            if encoding != Encoding::PLAIN
-                && encoding != Encoding::RLE_DICTIONARY
-                && encoding != Encoding::PLAIN_DICTIONARY
-            {
-                // skip unsupported encodings for now as they are not yet 
implemented
-                continue;
-            }
             let mut output = ViewBuffer::default();
             decoder.set_data(encoding, page, 4, Some(4)).unwrap();
 
diff --git a/parquet/src/arrow/arrow_reader/mod.rs 
b/parquet/src/arrow/arrow_reader/mod.rs
index b38092dbc9..cc369cec0e 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -2456,26 +2456,16 @@ mod tests {
         let cases = [
             (
                 invalid_utf8_first_char::<i32>(),
-                "Parquet argument error: Parquet error: encountered non UTF-8 
data",
+                "Parquet argument error: Parquet error: encountered non UTF-8 
data: invalid utf-8 sequence of 1 bytes from index 11",
             ),
             (
                 invalid_utf8_later_char::<i32>(),
-                "Parquet argument error: Parquet error: encountered non UTF-8 
data: invalid utf-8 sequence of 1 bytes from index 6",
+                "Parquet argument error: Parquet error: encountered non UTF-8 
data: invalid utf-8 sequence of 1 bytes from index 14",
             ),
         ];
         for (array, expected_error) in cases {
-            // cast not yet implemented for BinaryView
-            // https://github.com/apache/arrow-rs/issues/5508
-            // so copy directly
-            let mut builder = BinaryViewBuilder::with_capacity(100);
-            for v in array.iter() {
-                if let Some(v) = v {
-                    builder.append_value(v);
-                } else {
-                    builder.append_null();
-                }
-            }
-            let array = builder.finish();
+            let array = arrow_cast::cast(&array, 
&ArrowDataType::BinaryView).unwrap();
+            let array = array.as_binary_view();
 
             // data is not valid utf8 we can not construct a correct 
StringArray
             // safely, so purposely create an invalid StringArray
diff --git a/parquet/src/arrow/buffer/view_buffer.rs 
b/parquet/src/arrow/buffer/view_buffer.rs
index 1651aa2d75..ae83ac3177 100644
--- a/parquet/src/arrow/buffer/view_buffer.rs
+++ b/parquet/src/arrow/buffer/view_buffer.rs
@@ -37,7 +37,6 @@ impl ViewBuffer {
         self.views.is_empty()
     }
 
-    #[allow(unused)]
     pub fn append_block(&mut self, block: Buffer) -> u32 {
         let block_id = self.buffers.len() as u32;
         self.buffers.push(block);
@@ -49,7 +48,6 @@ impl ViewBuffer {
     /// - `block` is a valid index, i.e., the return value of `append_block`
     /// - `offset` and `offset + len` are valid indices into the buffer
     /// - The `(offset, offset + len)` is valid value for the native type.
-    #[allow(unused)]
     pub unsafe fn append_view_unchecked(&mut self, block: u32, offset: u32, 
len: u32) {
         let b = self.buffers.get_unchecked(block as usize);
         let end = offset.saturating_add(len);

Reply via email to