alamb commented on code in PR #5557: URL: https://github.com/apache/arrow-rs/pull/5557#discussion_r1558088034
########## parquet/src/arrow/buffer/view_buffer.rs: ########## @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::buffer::bit_util::iter_set_bits_rev; +use crate::arrow::record_reader::buffer::ValuesBuffer; +use crate::errors::{ParquetError, Result}; +use arrow_array::{make_array, ArrayRef}; +use arrow_buffer::{ArrowNativeType, Buffer, NullBuffer}; +use arrow_data::{ArrayDataBuilder, ByteView}; +use arrow_schema::DataType as ArrowType; + +/// A buffer of variable-sized byte arrays that can be converted into +/// a corresponding [`ArrayRef`] +#[derive(Debug, Default)] +pub struct ViewBuffer { + pub views: Vec<u128>, + pub buffers: Vec<u8>, + pub nulls: Option<NullBuffer>, +} + +impl ViewBuffer { + /// Returns the number of byte arrays in this buffer + pub fn len(&self) -> usize { + self.views.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> { + if validate_utf8 { Review Comment: Why don't we need to check the entire `data` buffer? As in why don't we have to check the second character, etc? Perhaps could use `str::from_utf8` https://doc.rust-lang.org/std/str/fn.from_utf8.html# ########## parquet/src/arrow/array_reader/byte_view_array.rs: ########## @@ -0,0 +1,645 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; +use crate::arrow::buffer::view_buffer::ViewBuffer; +use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder}; +use crate::arrow::record_reader::GenericRecordReader; +use crate::arrow::schema::parquet_to_arrow_field; +use crate::basic::{ConvertedType, Encoding}; +use crate::column::page::PageIterator; +use crate::column::reader::decoder::ColumnValueDecoder; +use crate::data_type::Int32Type; +use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder}; +use crate::errors::{ParquetError, Result}; +use crate::schema::types::ColumnDescPtr; +use arrow_array::ArrayRef; +use arrow_schema::DataType as ArrowType; +use bytes::Bytes; +use std::any::Any; + +/// Returns an [`ArrayReader`] that decodes the provided byte array column to view types. +pub fn make_byte_view_array_reader( + pages: Box<dyn PageIterator>, + column_desc: ColumnDescPtr, + arrow_type: Option<ArrowType>, +) -> Result<Box<dyn ArrayReader>> { + // Check if Arrow type is specified, else create it from Parquet type + let data_type = match arrow_type { + Some(t) => t, + None => match parquet_to_arrow_field(column_desc.as_ref())?.data_type() { + ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View, + _ => ArrowType::BinaryView, + }, + }; + + match data_type { + ArrowType::Utf8View | ArrowType::BinaryView => { + let reader = GenericRecordReader::new(column_desc); + Ok(Box::new(ByteViewArrayReader::new(pages, data_type, reader))) + } + _ => Err(general_err!( + "invalid data type for byte array reader - {}", + data_type + )), + } +} + +/// An [`ArrayReader`] for variable length byte arrays +struct ByteViewArrayReader { + data_type: ArrowType, + pages: Box<dyn PageIterator>, + def_levels_buffer: Option<Vec<i16>>, + rep_levels_buffer: Option<Vec<i16>>, + record_reader: GenericRecordReader<ViewBuffer, ByteViewArrayColumnValueDecoder>, +} + +impl ByteViewArrayReader { + fn new( + pages: Box<dyn PageIterator>, + data_type: ArrowType, + record_reader: GenericRecordReader<ViewBuffer, ByteViewArrayColumnValueDecoder>, + ) -> Self { + Self { + data_type, + pages, + def_levels_buffer: None, + rep_levels_buffer: None, + record_reader, + } + } +} + +impl ArrayReader for ByteViewArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn read_records(&mut self, batch_size: usize) -> Result<usize> { + read_records(&mut self.record_reader, self.pages.as_mut(), batch_size) + } + + fn consume_batch(&mut self) -> Result<ArrayRef> { + let buffer = self.record_reader.consume_record_data(); + let null_buffer = self.record_reader.consume_bitmap_buffer(); + self.def_levels_buffer = self.record_reader.consume_def_levels(); + self.rep_levels_buffer = self.record_reader.consume_rep_levels(); + self.record_reader.reset(); + + let array: ArrayRef = buffer.into_array(null_buffer, self.data_type.clone()); + + Ok(array) + } + + fn skip_records(&mut self, num_records: usize) -> Result<usize> { + skip_records(&mut self.record_reader, self.pages.as_mut(), num_records) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_levels_buffer.as_deref() + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_levels_buffer.as_deref() + } +} + +/// A [`ColumnValueDecoder`] for variable length byte arrays +struct ByteViewArrayColumnValueDecoder { + dict: Option<ViewBuffer>, + decoder: Option<ByteViewArrayDecoder>, + validate_utf8: bool, +} + +impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { + type Buffer = ViewBuffer; + + fn new(desc: &ColumnDescPtr) -> Self { + let validate_utf8 = desc.converted_type() == ConvertedType::UTF8; + Self { + dict: None, + decoder: None, + validate_utf8, + } + } + + fn set_dict( + &mut self, + buf: Bytes, + num_values: u32, + encoding: Encoding, + _is_sorted: bool, + ) -> Result<()> { + if !matches!( + encoding, + Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY + ) { + return Err(nyi_err!( + "Invalid/Unsupported encoding type for dictionary: {}", + encoding + )); + } + + let mut buffer = ViewBuffer::default(); + let mut decoder = ByteViewArrayDecoderPlain::new( + buf, + num_values as usize, + Some(num_values as usize), + self.validate_utf8, + ); + decoder.read(&mut buffer, usize::MAX)?; + self.dict = Some(buffer); + Ok(()) + } + + fn set_data( + &mut self, + encoding: Encoding, + data: Bytes, + num_levels: usize, + num_values: Option<usize>, + ) -> Result<()> { + self.decoder = Some(ByteViewArrayDecoder::new( + encoding, + data, + num_levels, + num_values, + self.validate_utf8, + )?); + Ok(()) + } + + fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> { + let decoder = self + .decoder + .as_mut() + .ok_or_else(|| general_err!("no decoder set"))?; + + decoder.read(out, num_values, self.dict.as_ref()) + } + + fn skip_values(&mut self, num_values: usize) -> Result<usize> { + let decoder = self + .decoder + .as_mut() + .ok_or_else(|| general_err!("no decoder set"))?; + + decoder.skip(num_values, self.dict.as_ref()) + } +} + +/// A generic decoder from uncompressed parquet value data to [`ViewBuffer`] +pub enum ByteViewArrayDecoder { + Plain(ByteViewArrayDecoderPlain), + Dictionary(ByteViewArrayDecoderDictionary), + DeltaLength(ByteViewArrayDecoderDeltaLength), + DeltaByteArray(ByteViewArrayDecoderDelta), +} + +impl ByteViewArrayDecoder { + pub fn new( + encoding: Encoding, + data: Bytes, + num_levels: usize, + num_values: Option<usize>, + validate_utf8: bool, + ) -> Result<Self> { + let decoder = match encoding { + Encoding::PLAIN => ByteViewArrayDecoder::Plain(ByteViewArrayDecoderPlain::new( + data, + num_levels, + num_values, + validate_utf8, + )), + Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => { + ByteViewArrayDecoder::Dictionary(ByteViewArrayDecoderDictionary::new( + data, num_levels, num_values, + )) + } + Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteViewArrayDecoder::DeltaLength( + ByteViewArrayDecoderDeltaLength::new(data, validate_utf8)?, + ), + Encoding::DELTA_BYTE_ARRAY => ByteViewArrayDecoder::DeltaByteArray( + ByteViewArrayDecoderDelta::new(data, validate_utf8)?, + ), + _ => { + return Err(general_err!( + "unsupported encoding for byte array: {}", + encoding + )) + } + }; + + Ok(decoder) + } + + /// Read up to `len` values to `out` with the optional dictionary + pub fn read( + &mut self, + out: &mut ViewBuffer, + len: usize, + dict: Option<&ViewBuffer>, + ) -> Result<usize> { + match self { + ByteViewArrayDecoder::Plain(d) => d.read(out, len), + ByteViewArrayDecoder::Dictionary(d) => { + let dict = + dict.ok_or_else(|| general_err!("missing dictionary page for column"))?; + + d.read(out, dict, len) + } + ByteViewArrayDecoder::DeltaLength(d) => d.read(out, len), + ByteViewArrayDecoder::DeltaByteArray(d) => d.read(out, len), + } + } + + /// Skip `len` values + pub fn skip(&mut self, len: usize, dict: Option<&ViewBuffer>) -> Result<usize> { + match self { + ByteViewArrayDecoder::Plain(d) => d.skip(len), + ByteViewArrayDecoder::Dictionary(d) => { + let dict = + dict.ok_or_else(|| general_err!("missing dictionary page for column"))?; + + d.skip(dict, len) + } + ByteViewArrayDecoder::DeltaLength(d) => d.skip(len), + ByteViewArrayDecoder::DeltaByteArray(d) => d.skip(len), + } + } +} + +/// Decoder from [`Encoding::PLAIN`] data to [`ViewBuffer`] +pub struct ByteViewArrayDecoderPlain { + buf: Bytes, + offset: usize, + validate_utf8: bool, + + /// This is a maximum as the null count is not always known, e.g. value data from + /// a v1 data page + max_remaining_values: usize, +} + +impl ByteViewArrayDecoderPlain { + pub fn new( + buf: Bytes, + num_levels: usize, + num_values: Option<usize>, + validate_utf8: bool, + ) -> Self { + Self { + buf, + validate_utf8, + offset: 0, + max_remaining_values: num_values.unwrap_or(num_levels), + } + } + + pub fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> { + let to_read = len.min(self.max_remaining_values); + + let remaining_bytes = self.buf.len() - self.offset; + if remaining_bytes == 0 { + return Ok(0); + } + + let mut read = 0; + + let buf = self.buf.as_ref(); + while self.offset < self.buf.len() && read != to_read { + if self.offset + 4 > buf.len() { + return Err(ParquetError::EOF("eof decoding byte view array".into())); + } + let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap(); + let len = u32::from_le_bytes(len_bytes); + + let start_offset = self.offset + 4; + let end_offset = start_offset + len as usize; + if end_offset > buf.len() { + return Err(ParquetError::EOF("eof decoding byte view array".into())); + } + + output.try_push(&buf[start_offset..end_offset], self.validate_utf8)?; + + self.offset = end_offset; + read += 1; + } + self.max_remaining_values -= to_read; + + Ok(to_read) + } + + pub fn skip(&mut self, to_skip: usize) -> Result<usize> { + let to_skip = to_skip.min(self.max_remaining_values); + let mut skip = 0; + let buf = self.buf.as_ref(); + + while self.offset < self.buf.len() && skip != to_skip { + if self.offset + 4 > buf.len() { + return Err(ParquetError::EOF("eof decoding byte array".into())); + } + let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap(); + let len = u32::from_le_bytes(len_bytes) as usize; + skip += 1; + self.offset = self.offset + 4 + len; + } + self.max_remaining_values -= skip; + Ok(skip) + } +} + +/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`] +pub struct ByteViewArrayDecoderDeltaLength { + lengths: Vec<i32>, + data: Bytes, + length_offset: usize, + data_offset: usize, + validate_utf8: bool, +} + +impl ByteViewArrayDecoderDeltaLength { + fn new(data: Bytes, validate_utf8: bool) -> Result<Self> { + let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new(); + len_decoder.set_data(data.clone(), 0)?; + let values = len_decoder.values_left(); + + let mut lengths = vec![0; values]; + len_decoder.get(&mut lengths)?; + + Ok(Self { + lengths, + data, + validate_utf8, + length_offset: 0, + data_offset: len_decoder.get_offset(), + }) + } + + fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> { + let to_read = len.min(self.lengths.len() - self.length_offset); + + let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read]; + + let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum(); + + if self.data_offset + total_bytes > self.data.len() { + return Err(ParquetError::EOF( + "Insufficient delta length byte array bytes".to_string(), + )); + } + + let mut start_offset = self.data_offset; + for length in src_lengths { + let end_offset = start_offset + *length as usize; + output.try_push( + &self.data.as_ref()[start_offset..end_offset], + self.validate_utf8, + )?; + start_offset = end_offset; + } + + self.data_offset = start_offset; + self.length_offset += to_read; + + Ok(to_read) + } + + fn skip(&mut self, to_skip: usize) -> Result<usize> { + let remain_values = self.lengths.len() - self.length_offset; + let to_skip = remain_values.min(to_skip); + + let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_skip]; + let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum(); + + self.data_offset += total_bytes; + self.length_offset += to_skip; + Ok(to_skip) + } +} + +/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`ViewBuffer`] +pub struct ByteViewArrayDecoderDelta { + decoder: DeltaByteArrayDecoder, + validate_utf8: bool, +} + +impl ByteViewArrayDecoderDelta { + fn new(data: Bytes, validate_utf8: bool) -> Result<Self> { + Ok(Self { + decoder: DeltaByteArrayDecoder::new(data)?, + validate_utf8, + }) + } + + fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> { + let read = self + .decoder + .read(len, |bytes| output.try_push(bytes, self.validate_utf8))?; + + Ok(read) + } + + fn skip(&mut self, to_skip: usize) -> Result<usize> { + self.decoder.skip(to_skip) + } +} + +/// Decoder from [`Encoding::RLE_DICTIONARY`] to [`ViewBuffer`] +pub struct ByteViewArrayDecoderDictionary { + decoder: DictIndexDecoder, +} + +impl ByteViewArrayDecoderDictionary { + fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Self { + Self { + decoder: DictIndexDecoder::new(data, num_levels, num_values), + } + } + + fn read(&mut self, output: &mut ViewBuffer, dict: &ViewBuffer, len: usize) -> Result<usize> { + // All data must be NULL + if dict.is_empty() { + return Ok(0); + } + + self.decoder.read(len, |keys| { + output.extend_from_dictionary(keys, &dict.views, &dict.buffers) + }) + } + + fn skip(&mut self, dict: &ViewBuffer, to_skip: usize) -> Result<usize> { + // All data must be NULL + if dict.is_empty() { + return Ok(0); + } + + self.decoder.skip(to_skip) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::arrow::array_reader::test_util::{byte_array_all_encodings, utf8_column}; + use crate::arrow::record_reader::buffer::ValuesBuffer; + use arrow_array::{Array, StringViewArray}; + use arrow_buffer::Buffer; + + #[test] + fn test_byte_array_decoder() { + let (pages, encoded_dictionary) = byte_array_all_encodings(vec![ + "hello", + "world", + "here comes the snow", + "large payload over 12 bytes", + ]); + + let column_desc = utf8_column(); + let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc); + + decoder + .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) + .unwrap(); + + for (encoding, page) in pages { + let mut output = ViewBuffer::default(); + decoder.set_data(encoding, page, 4, Some(4)).unwrap(); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + + let first = output.views.first().unwrap(); + let len = *first as u32; + + assert_eq!( + &first.to_le_bytes()[4..4 + len as usize], + "hello".as_bytes() + ); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + + assert_eq!(decoder.read(&mut output, 2).unwrap(), 2); + + assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); + + let valid = [false, false, true, true, false, true, true, false, false]; Review Comment: Should the null buffer be created as part of the ViewsBuffer (not passed into the encoder)? ########## parquet/src/arrow/buffer/view_buffer.rs: ########## @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::buffer::bit_util::iter_set_bits_rev; +use crate::arrow::record_reader::buffer::ValuesBuffer; +use crate::errors::{ParquetError, Result}; +use arrow_array::{make_array, ArrayRef}; +use arrow_buffer::{ArrowNativeType, Buffer, NullBuffer}; +use arrow_data::{ArrayDataBuilder, ByteView}; +use arrow_schema::DataType as ArrowType; + +/// A buffer of variable-sized byte arrays that can be converted into +/// a corresponding [`ArrayRef`] +#[derive(Debug, Default)] +pub struct ViewBuffer { + pub views: Vec<u128>, + pub buffers: Vec<u8>, + pub nulls: Option<NullBuffer>, +} + +impl ViewBuffer { + /// Returns the number of byte arrays in this buffer + pub fn len(&self) -> usize { + self.views.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> { + if validate_utf8 { + if let Some(&b) = data.first() { + // A valid code-point iff it does not start with 0b10xxxxxx + // Bit-magic taken from `std::str::is_char_boundary` + if (b as i8) < -0x40 { + return Err(ParquetError::General( + "encountered non UTF-8 data".to_string(), + )); + } + } + } + let length: u32 = data.len().try_into().unwrap(); + if length <= 12 { + let mut view_buffer = [0; 16]; + view_buffer[0..4].copy_from_slice(&length.to_le_bytes()); + view_buffer[4..4 + length as usize].copy_from_slice(data); + self.views.push(u128::from_le_bytes(view_buffer)); + return Ok(()); + } + + let offset = self.buffers.len() as u32; + self.buffers.extend_from_slice(data); + + let view = ByteView { + length, + prefix: u32::from_le_bytes(data[0..4].try_into().unwrap()), + buffer_index: 0, + offset, + }; + self.views.push(view.into()); + Ok(()) + } + + /// Extends this buffer with a list of keys + pub fn extend_from_dictionary<K: ArrowNativeType>( + &mut self, + keys: &[K], + dict_views: &[u128], + dict_buffers: &[u8], + ) -> Result<()> { + for key in keys { + let index = key.as_usize(); + if index >= dict_views.len() { + return Err(general_err!( + "dictionary key beyond bounds of dictionary: 0..{}", + dict_views.len() + )); + } + + let view = dict_views[index]; + let len = view as u32; + + // Dictionary values are verified when decoding dictionary page, so validate_utf8 is false here. + if len <= 12 { + self.try_push(&view.to_le_bytes()[4..4 + len as usize], false)?; + } else { + let offset = (view >> 96) as usize; + self.try_push(&dict_buffers[offset..offset + len as usize], false)? + }; + } + Ok(()) + } + + /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer` + pub fn into_array(self, null_buffer: Option<Buffer>, data_type: ArrowType) -> ArrayRef { Review Comment: Why does this also need to get a `null_buffer` passed in when there is also a field `self.null_buffer`? ########## parquet/src/arrow/buffer/view_buffer.rs: ########## @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::buffer::bit_util::iter_set_bits_rev; +use crate::arrow::record_reader::buffer::ValuesBuffer; +use crate::errors::{ParquetError, Result}; +use arrow_array::{make_array, ArrayRef}; +use arrow_buffer::{ArrowNativeType, Buffer, NullBuffer}; +use arrow_data::{ArrayDataBuilder, ByteView}; +use arrow_schema::DataType as ArrowType; + +/// A buffer of variable-sized byte arrays that can be converted into +/// a corresponding [`ArrayRef`] +#[derive(Debug, Default)] +pub struct ViewBuffer { + pub views: Vec<u128>, + pub buffers: Vec<u8>, + pub nulls: Option<NullBuffer>, +} + +impl ViewBuffer { + /// Returns the number of byte arrays in this buffer + pub fn len(&self) -> usize { + self.views.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> { + if validate_utf8 { + if let Some(&b) = data.first() { + // A valid code-point iff it does not start with 0b10xxxxxx + // Bit-magic taken from `std::str::is_char_boundary` + if (b as i8) < -0x40 { + return Err(ParquetError::General( + "encountered non UTF-8 data".to_string(), + )); + } + } + } + let length: u32 = data.len().try_into().unwrap(); + if length <= 12 { Review Comment: It would be really nice if this logic of converting `bytes` + `len` got refactored into a function Maybe we could eventually (as a follow on PR), define something like ```rust struct View(u128); ``` ########## parquet/src/arrow/array_reader/byte_view_array.rs: ########## @@ -0,0 +1,645 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; +use crate::arrow::buffer::view_buffer::ViewBuffer; +use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder}; +use crate::arrow::record_reader::GenericRecordReader; +use crate::arrow::schema::parquet_to_arrow_field; +use crate::basic::{ConvertedType, Encoding}; +use crate::column::page::PageIterator; +use crate::column::reader::decoder::ColumnValueDecoder; +use crate::data_type::Int32Type; +use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder}; +use crate::errors::{ParquetError, Result}; +use crate::schema::types::ColumnDescPtr; +use arrow_array::ArrayRef; +use arrow_schema::DataType as ArrowType; +use bytes::Bytes; +use std::any::Any; + +/// Returns an [`ArrayReader`] that decodes the provided byte array column to view types. +pub fn make_byte_view_array_reader( + pages: Box<dyn PageIterator>, + column_desc: ColumnDescPtr, + arrow_type: Option<ArrowType>, +) -> Result<Box<dyn ArrayReader>> { + // Check if Arrow type is specified, else create it from Parquet type + let data_type = match arrow_type { + Some(t) => t, + None => match parquet_to_arrow_field(column_desc.as_ref())?.data_type() { + ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View, + _ => ArrowType::BinaryView, + }, + }; + + match data_type { + ArrowType::Utf8View | ArrowType::BinaryView => { + let reader = GenericRecordReader::new(column_desc); + Ok(Box::new(ByteViewArrayReader::new(pages, data_type, reader))) + } + _ => Err(general_err!( + "invalid data type for byte array reader - {}", + data_type + )), + } +} + +/// An [`ArrayReader`] for variable length byte arrays +struct ByteViewArrayReader { + data_type: ArrowType, + pages: Box<dyn PageIterator>, + def_levels_buffer: Option<Vec<i16>>, + rep_levels_buffer: Option<Vec<i16>>, + record_reader: GenericRecordReader<ViewBuffer, ByteViewArrayColumnValueDecoder>, +} + +impl ByteViewArrayReader { + fn new( + pages: Box<dyn PageIterator>, + data_type: ArrowType, + record_reader: GenericRecordReader<ViewBuffer, ByteViewArrayColumnValueDecoder>, + ) -> Self { + Self { + data_type, + pages, + def_levels_buffer: None, + rep_levels_buffer: None, + record_reader, + } + } +} + +impl ArrayReader for ByteViewArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn read_records(&mut self, batch_size: usize) -> Result<usize> { + read_records(&mut self.record_reader, self.pages.as_mut(), batch_size) + } + + fn consume_batch(&mut self) -> Result<ArrayRef> { + let buffer = self.record_reader.consume_record_data(); + let null_buffer = self.record_reader.consume_bitmap_buffer(); + self.def_levels_buffer = self.record_reader.consume_def_levels(); + self.rep_levels_buffer = self.record_reader.consume_rep_levels(); + self.record_reader.reset(); + + let array: ArrayRef = buffer.into_array(null_buffer, self.data_type.clone()); + + Ok(array) + } + + fn skip_records(&mut self, num_records: usize) -> Result<usize> { + skip_records(&mut self.record_reader, self.pages.as_mut(), num_records) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_levels_buffer.as_deref() + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_levels_buffer.as_deref() + } +} + +/// A [`ColumnValueDecoder`] for variable length byte arrays +struct ByteViewArrayColumnValueDecoder { + dict: Option<ViewBuffer>, + decoder: Option<ByteViewArrayDecoder>, + validate_utf8: bool, +} + +impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { + type Buffer = ViewBuffer; + + fn new(desc: &ColumnDescPtr) -> Self { + let validate_utf8 = desc.converted_type() == ConvertedType::UTF8; + Self { + dict: None, + decoder: None, + validate_utf8, + } + } + + fn set_dict( + &mut self, + buf: Bytes, + num_values: u32, + encoding: Encoding, + _is_sorted: bool, + ) -> Result<()> { + if !matches!( + encoding, + Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY + ) { + return Err(nyi_err!( + "Invalid/Unsupported encoding type for dictionary: {}", + encoding + )); + } + + let mut buffer = ViewBuffer::default(); + let mut decoder = ByteViewArrayDecoderPlain::new( + buf, + num_values as usize, + Some(num_values as usize), + self.validate_utf8, + ); + decoder.read(&mut buffer, usize::MAX)?; + self.dict = Some(buffer); + Ok(()) + } + + fn set_data( + &mut self, + encoding: Encoding, + data: Bytes, + num_levels: usize, + num_values: Option<usize>, + ) -> Result<()> { + self.decoder = Some(ByteViewArrayDecoder::new( + encoding, + data, + num_levels, + num_values, + self.validate_utf8, + )?); + Ok(()) + } + + fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> { + let decoder = self + .decoder + .as_mut() + .ok_or_else(|| general_err!("no decoder set"))?; + + decoder.read(out, num_values, self.dict.as_ref()) + } + + fn skip_values(&mut self, num_values: usize) -> Result<usize> { + let decoder = self + .decoder + .as_mut() + .ok_or_else(|| general_err!("no decoder set"))?; + + decoder.skip(num_values, self.dict.as_ref()) + } +} + +/// A generic decoder from uncompressed parquet value data to [`ViewBuffer`] +pub enum ByteViewArrayDecoder { + Plain(ByteViewArrayDecoderPlain), + Dictionary(ByteViewArrayDecoderDictionary), + DeltaLength(ByteViewArrayDecoderDeltaLength), + DeltaByteArray(ByteViewArrayDecoderDelta), +} + +impl ByteViewArrayDecoder { + pub fn new( + encoding: Encoding, + data: Bytes, + num_levels: usize, + num_values: Option<usize>, + validate_utf8: bool, + ) -> Result<Self> { + let decoder = match encoding { + Encoding::PLAIN => ByteViewArrayDecoder::Plain(ByteViewArrayDecoderPlain::new( + data, + num_levels, + num_values, + validate_utf8, + )), + Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => { + ByteViewArrayDecoder::Dictionary(ByteViewArrayDecoderDictionary::new( + data, num_levels, num_values, + )) + } + Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteViewArrayDecoder::DeltaLength( + ByteViewArrayDecoderDeltaLength::new(data, validate_utf8)?, + ), + Encoding::DELTA_BYTE_ARRAY => ByteViewArrayDecoder::DeltaByteArray( + ByteViewArrayDecoderDelta::new(data, validate_utf8)?, + ), + _ => { + return Err(general_err!( + "unsupported encoding for byte array: {}", + encoding + )) + } + }; + + Ok(decoder) + } + + /// Read up to `len` values to `out` with the optional dictionary + pub fn read( + &mut self, + out: &mut ViewBuffer, + len: usize, + dict: Option<&ViewBuffer>, + ) -> Result<usize> { + match self { + ByteViewArrayDecoder::Plain(d) => d.read(out, len), + ByteViewArrayDecoder::Dictionary(d) => { + let dict = + dict.ok_or_else(|| general_err!("missing dictionary page for column"))?; + + d.read(out, dict, len) + } + ByteViewArrayDecoder::DeltaLength(d) => d.read(out, len), + ByteViewArrayDecoder::DeltaByteArray(d) => d.read(out, len), + } + } + + /// Skip `len` values + pub fn skip(&mut self, len: usize, dict: Option<&ViewBuffer>) -> Result<usize> { + match self { + ByteViewArrayDecoder::Plain(d) => d.skip(len), + ByteViewArrayDecoder::Dictionary(d) => { + let dict = + dict.ok_or_else(|| general_err!("missing dictionary page for column"))?; + + d.skip(dict, len) + } + ByteViewArrayDecoder::DeltaLength(d) => d.skip(len), + ByteViewArrayDecoder::DeltaByteArray(d) => d.skip(len), + } + } +} + +/// Decoder from [`Encoding::PLAIN`] data to [`ViewBuffer`] +pub struct ByteViewArrayDecoderPlain { + buf: Bytes, Review Comment: Can we please document what these fields are (specifically what is `offset`)? I think it means the offset in `buf` for the current array ########## parquet/src/arrow/buffer/view_buffer.rs: ########## @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::buffer::bit_util::iter_set_bits_rev; +use crate::arrow::record_reader::buffer::ValuesBuffer; +use crate::errors::{ParquetError, Result}; +use arrow_array::{make_array, ArrayRef}; +use arrow_buffer::{ArrowNativeType, Buffer, NullBuffer}; +use arrow_data::{ArrayDataBuilder, ByteView}; +use arrow_schema::DataType as ArrowType; + +/// A buffer of variable-sized byte arrays that can be converted into +/// a corresponding [`ArrayRef`] +#[derive(Debug, Default)] +pub struct ViewBuffer { + pub views: Vec<u128>, + pub buffers: Vec<u8>, + pub nulls: Option<NullBuffer>, +} + +impl ViewBuffer { + /// Returns the number of byte arrays in this buffer + pub fn len(&self) -> usize { + self.views.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> { + if validate_utf8 { + if let Some(&b) = data.first() { + // A valid code-point iff it does not start with 0b10xxxxxx + // Bit-magic taken from `std::str::is_char_boundary` + if (b as i8) < -0x40 { + return Err(ParquetError::General( + "encountered non UTF-8 data".to_string(), + )); + } + } + } + let length: u32 = data.len().try_into().unwrap(); + if length <= 12 { + let mut view_buffer = [0; 16]; + view_buffer[0..4].copy_from_slice(&length.to_le_bytes()); + view_buffer[4..4 + length as usize].copy_from_slice(data); + self.views.push(u128::from_le_bytes(view_buffer)); + return Ok(()); + } + + let offset = self.buffers.len() as u32; + self.buffers.extend_from_slice(data); + + let view = ByteView { + length, + prefix: u32::from_le_bytes(data[0..4].try_into().unwrap()), + buffer_index: 0, + offset, + }; + self.views.push(view.into()); + Ok(()) + } + + /// Extends this buffer with a list of keys + pub fn extend_from_dictionary<K: ArrowNativeType>( + &mut self, + keys: &[K], + dict_views: &[u128], + dict_buffers: &[u8], + ) -> Result<()> { + for key in keys { + let index = key.as_usize(); + if index >= dict_views.len() { + return Err(general_err!( + "dictionary key beyond bounds of dictionary: 0..{}", + dict_views.len() + )); + } + + let view = dict_views[index]; + let len = view as u32; + + // Dictionary values are verified when decoding dictionary page, so validate_utf8 is false here. + if len <= 12 { Review Comment: If the dictionary is already a view, can we simply copy the bytes (if the len is < 12)? ########## parquet/src/arrow/buffer/view_buffer.rs: ########## @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::buffer::bit_util::iter_set_bits_rev; +use crate::arrow::record_reader::buffer::ValuesBuffer; +use crate::errors::{ParquetError, Result}; +use arrow_array::{make_array, ArrayRef}; +use arrow_buffer::{ArrowNativeType, Buffer, NullBuffer}; +use arrow_data::{ArrayDataBuilder, ByteView}; +use arrow_schema::DataType as ArrowType; + +/// A buffer of variable-sized byte arrays that can be converted into +/// a corresponding [`ArrayRef`] +#[derive(Debug, Default)] +pub struct ViewBuffer { + pub views: Vec<u128>, + pub buffers: Vec<u8>, + pub nulls: Option<NullBuffer>, +} + +impl ViewBuffer { + /// Returns the number of byte arrays in this buffer + pub fn len(&self) -> usize { + self.views.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> { + if validate_utf8 { + if let Some(&b) = data.first() { + // A valid code-point iff it does not start with 0b10xxxxxx + // Bit-magic taken from `std::str::is_char_boundary` + if (b as i8) < -0x40 { + return Err(ParquetError::General( + "encountered non UTF-8 data".to_string(), + )); + } + } + } + let length: u32 = data.len().try_into().unwrap(); + if length <= 12 { + let mut view_buffer = [0; 16]; + view_buffer[0..4].copy_from_slice(&length.to_le_bytes()); + view_buffer[4..4 + length as usize].copy_from_slice(data); + self.views.push(u128::from_le_bytes(view_buffer)); + return Ok(()); + } + + let offset = self.buffers.len() as u32; + self.buffers.extend_from_slice(data); + + let view = ByteView { + length, + prefix: u32::from_le_bytes(data[0..4].try_into().unwrap()), + buffer_index: 0, + offset, + }; + self.views.push(view.into()); + Ok(()) + } + + /// Extends this buffer with a list of keys + pub fn extend_from_dictionary<K: ArrowNativeType>( + &mut self, + keys: &[K], + dict_views: &[u128], + dict_buffers: &[u8], + ) -> Result<()> { + for key in keys { + let index = key.as_usize(); + if index >= dict_views.len() { + return Err(general_err!( + "dictionary key beyond bounds of dictionary: 0..{}", + dict_views.len() + )); + } + + let view = dict_views[index]; + let len = view as u32; + + // Dictionary values are verified when decoding dictionary page, so validate_utf8 is false here. + if len <= 12 { + self.try_push(&view.to_le_bytes()[4..4 + len as usize], false)?; + } else { + let offset = (view >> 96) as usize; + self.try_push(&dict_buffers[offset..offset + len as usize], false)? + }; + } + Ok(()) + } + + /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer` + pub fn into_array(self, null_buffer: Option<Buffer>, data_type: ArrowType) -> ArrayRef { Review Comment: I think this is an unsafe API and should be marked as such -- for example it would allow creating a StringViewArray from non utf8 data if `try_push` is called without `validate_utf8 =true` Alternately, perhaps `validate_utf8` could be a member of this struct and then this call can simply validate that if we are building a `StringViewArray` that `self.validate_utf8` was set -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
