tustvold commented on code in PR #2221:
URL: https://github.com/apache/arrow-rs/pull/2221#discussion_r933065507
##########
parquet/src/arrow/arrow_writer/byte_array.rs:
##########
@@ -0,0 +1,537 @@
+use crate::arrow::arrow_writer::levels::LevelInfo;
+use crate::arrow::arrow_writer::ArrayWriter;
+use crate::basic::Encoding;
+use crate::column::page::PageWriter;
+use crate::column::writer::encoder::{
+ ColumnValueEncoder, DataPageValues, DictionaryPage,
+};
+use crate::column::writer::GenericColumnWriter;
+use crate::data_type::{AsBytes, ByteArray, Int32Type};
+use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
+use crate::encodings::rle::RleEncoder;
+use crate::errors::{ParquetError, Result};
+use crate::file::properties::{WriterProperties, WriterPropertiesPtr,
WriterVersion};
+use crate::file::writer::OnCloseColumnChunk;
+use crate::schema::types::ColumnDescPtr;
+use crate::util::bit_util::num_required_bits;
+use crate::util::interner::{Interner, Storage};
+use arrow::array::{
+ Array, ArrayAccessor, ArrayRef, BinaryArray, LargeBinaryArray,
LargeStringArray,
+ StringArray,
+};
+use arrow::datatypes::DataType;
+
+macro_rules! downcast_op {
+ ($data_type:expr, $array:ident, $op:expr $(, $arg:expr)*) => {
+ match $data_type {
+ DataType::Utf8 =>
$op($array.as_any().downcast_ref::<StringArray>().unwrap()$(, $arg)*),
+ DataType::LargeUtf8 => {
+
$op($array.as_any().downcast_ref::<LargeStringArray>().unwrap()$(, $arg)*)
+ }
+ DataType::Binary => {
+ $op($array.as_any().downcast_ref::<BinaryArray>().unwrap()$(,
$arg)*)
+ }
+ DataType::LargeBinary => {
+
$op($array.as_any().downcast_ref::<LargeBinaryArray>().unwrap()$(, $arg)*)
+ }
+ d => unreachable!("cannot downcast {} to byte array", d)
+ }
+ };
+}
+
+/// Returns an [`ArrayWriter`] for byte or string arrays
+pub(super) fn make_byte_array_writer<'a>(
+ descr: ColumnDescPtr,
+ data_type: DataType,
+ props: WriterPropertiesPtr,
+ page_writer: Box<dyn PageWriter + 'a>,
+ on_close: OnCloseColumnChunk<'a>,
+) -> Box<dyn ArrayWriter + 'a> {
+ Box::new(ByteArrayWriter {
+ writer: Some(GenericColumnWriter::new(descr, props, page_writer)),
+ on_close: Some(on_close),
+ data_type,
+ })
+}
+
+/// An [`ArrayWriter`] for [`ByteArray`]
+struct ByteArrayWriter<'a> {
+ writer: Option<GenericColumnWriter<'a, ByteArrayEncoder>>,
+ on_close: Option<OnCloseColumnChunk<'a>>,
+ data_type: DataType,
+}
+
+impl<'a> ArrayWriter for ByteArrayWriter<'a> {
+ fn write(&mut self, array: &ArrayRef, levels: LevelInfo) -> Result<()> {
+ self.writer.as_mut().unwrap().write_batch_internal(
+ array,
+ Some(levels.non_null_indices()),
+ levels.def_levels(),
+ levels.rep_levels(),
+ None,
+ None,
+ None,
+ )?;
+ Ok(())
+ }
+
+ fn close(&mut self) -> Result<()> {
+ let (bytes_written, rows_written, metadata, column_index,
offset_index) =
+ self.writer.take().unwrap().close()?;
+
+ if let Some(on_close) = self.on_close.take() {
+ on_close(
+ bytes_written,
+ rows_written,
+ metadata,
+ column_index,
+ offset_index,
+ )?;
+ }
+ Ok(())
+ }
+}
+
+/// A fallback encoder, i.e. non-dictionary, for [`ByteArray`]
+struct FallbackEncoder {
+ encoder: FallbackEncoderImpl,
+ num_values: usize,
+}
+
+enum FallbackEncoderImpl {
+ Plain {
+ buffer: Vec<u8>,
+ },
+ DeltaLength {
+ buffer: Vec<u8>,
+ lengths: DeltaBitPackEncoder<Int32Type>,
+ },
+ Delta {
+ buffer: Vec<u8>,
+ last_value: Vec<u8>,
+ prefix_lengths: DeltaBitPackEncoder<Int32Type>,
+ suffix_lengths: DeltaBitPackEncoder<Int32Type>,
+ },
+}
+
+impl FallbackEncoder {
+ /// Create the fallback encoder for the given [`ColumnDescPtr`] and
[`WriterProperties`]
+ fn new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self> {
+ // Set either main encoder or fallback encoder.
+ let encoding = props.encoding(descr.path()).unwrap_or_else(|| {
+ match props.writer_version() {
+ WriterVersion::PARQUET_1_0 => Encoding::PLAIN,
+ WriterVersion::PARQUET_2_0 => Encoding::DELTA_BYTE_ARRAY,
+ }
+ });
+
+ let encoder = match encoding {
+ Encoding::PLAIN => FallbackEncoderImpl::Plain { buffer: vec![] },
+ Encoding::DELTA_LENGTH_BYTE_ARRAY =>
FallbackEncoderImpl::DeltaLength {
+ buffer: vec![],
+ lengths: DeltaBitPackEncoder::new(),
+ },
+ Encoding::DELTA_BYTE_ARRAY => FallbackEncoderImpl::Delta {
+ buffer: vec![],
+ last_value: vec![],
+ prefix_lengths: DeltaBitPackEncoder::new(),
+ suffix_lengths: DeltaBitPackEncoder::new(),
+ },
+ _ => {
+ return Err(general_err!(
+ "unsupported encoding {} for byte array",
+ encoding
+ ))
+ }
+ };
+
+ Ok(Self {
+ encoder,
+ num_values: 0,
+ })
+ }
+
+ /// Encode `values` to the in-progress page
+ fn encode<T>(&mut self, values: T, indices: &[usize])
+ where
+ T: ArrayAccessor + Copy,
+ T::Item: AsRef<[u8]>,
+ {
+ self.num_values += indices.len();
+ match &mut self.encoder {
+ FallbackEncoderImpl::Plain { buffer } => {
+ for idx in indices {
+ let value = values.value(*idx);
+ let value = value.as_ref();
+ buffer.extend_from_slice((value.len() as u32).as_bytes());
+ buffer.extend_from_slice(value)
+ }
+ }
+ FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
+ for idx in indices {
+ let value = values.value(*idx);
+ let value = value.as_ref();
+ lengths.put(&[value.len() as i32]).unwrap();
+ buffer.extend_from_slice(value);
+ }
+ }
+ FallbackEncoderImpl::Delta {
+ buffer,
+ last_value,
+ prefix_lengths,
+ suffix_lengths,
+ } => {
+ for idx in indices {
+ let value = values.value(*idx);
+ let value = value.as_ref();
+ let mut prefix_length = 0;
+
+ while prefix_length < last_value.len()
+ && prefix_length < value.len()
+ && last_value[prefix_length] == value[prefix_length]
+ {
+ prefix_length += 1;
+ }
+
+ let suffix_length = value.len() - prefix_length;
+
+ last_value.clear();
+ last_value.extend_from_slice(value);
+
+ buffer.extend_from_slice(&value[prefix_length..]);
+ prefix_lengths.put(&[prefix_length as i32]).unwrap();
+ suffix_lengths.put(&[suffix_length as i32]).unwrap();
+ }
+ }
+ }
+ }
+
+ fn estimated_data_page_size(&self) -> usize {
+ match &self.encoder {
+ FallbackEncoderImpl::Plain { buffer, .. } => buffer.len(),
+ FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
+ buffer.len() + lengths.estimated_data_encoded_size()
+ }
+ FallbackEncoderImpl::Delta {
+ buffer,
+ prefix_lengths,
+ suffix_lengths,
+ ..
+ } => {
+ buffer.len()
+ + prefix_lengths.estimated_data_encoded_size()
+ + suffix_lengths.estimated_data_encoded_size()
+ }
+ }
+ }
+
+ fn flush_data_page(
+ &mut self,
+ min_value: Option<ByteArray>,
+ max_value: Option<ByteArray>,
+ ) -> Result<DataPageValues<ByteArray>> {
+ let (buf, encoding) = match &mut self.encoder {
+ FallbackEncoderImpl::Plain { buffer } => {
+ (std::mem::take(buffer), Encoding::PLAIN)
+ }
+ FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
+ let lengths = lengths.flush_buffer()?;
+
+ let mut out = Vec::with_capacity(lengths.len() + buffer.len());
+ out.extend_from_slice(lengths.data());
+ out.extend_from_slice(buffer);
+ (out, Encoding::DELTA_LENGTH_BYTE_ARRAY)
+ }
+ FallbackEncoderImpl::Delta {
+ buffer,
+ prefix_lengths,
+ suffix_lengths,
+ ..
+ } => {
+ let prefix_lengths = prefix_lengths.flush_buffer()?;
+ let suffix_lengths = suffix_lengths.flush_buffer()?;
+
+ let mut out = Vec::with_capacity(
+ prefix_lengths.len() + suffix_lengths.len() + buffer.len(),
+ );
+ out.extend_from_slice(prefix_lengths.data());
+ out.extend_from_slice(suffix_lengths.data());
+ out.extend_from_slice(buffer);
+ (out, Encoding::DELTA_BYTE_ARRAY)
+ }
+ };
+
+ Ok(DataPageValues {
+ buf: buf.into(),
+ num_values: std::mem::take(&mut self.num_values),
+ encoding,
+ min_value,
+ max_value,
+ })
+ }
+}
+
+/// [`Storage`] for the [`Interner`] used by [`DictEncoder`]
+#[derive(Debug, Default)]
+struct ByteArrayStorage {
+ /// Encoded dictionary data
+ page: Vec<u8>,
+
+ values: Vec<std::ops::Range<usize>>,
+}
+
+impl Storage for ByteArrayStorage {
+ type Key = u64;
+ type Value = [u8];
+
+ fn get(&self, idx: Self::Key) -> &Self::Value {
+ &self.page[self.values[idx as usize].clone()]
+ }
+
+ fn push(&mut self, value: &Self::Value) -> Self::Key {
+ let key = self.values.len();
+
+ self.page.reserve(4 + value.len());
+ self.page.extend_from_slice((value.len() as u32).as_bytes());
+
+ let start = self.page.len();
+ self.page.extend_from_slice(value);
+ self.values.push(start..self.page.len());
+
+ key as u64
+ }
+}
+
+/// A dictionary encoder for byte array data
+#[derive(Debug, Default)]
+struct DictEncoder {
+ interner: Interner<ByteArrayStorage>,
+ indices: Vec<u64>,
+}
+
+impl DictEncoder {
+ /// Encode `values` to the in-progress page
+ fn encode<T>(&mut self, values: T, indices: &[usize])
+ where
+ T: ArrayAccessor + Copy,
+ T::Item: AsRef<[u8]>,
+ {
+ self.indices.reserve(indices.len());
+
+ for idx in indices {
+ let value = values.value(*idx);
+ let interned = self.interner.intern(value.as_ref());
+ self.indices.push(interned);
+ }
+ }
+
+ fn bit_width(&self) -> u8 {
+ let length = self.interner.storage().values.len();
+ num_required_bits(length.saturating_sub(1) as u64)
+ }
+
+ fn estimated_data_page_size(&self) -> usize {
+ let bit_width = self.bit_width();
+ 1 + RleEncoder::min_buffer_size(bit_width)
+ + RleEncoder::max_buffer_size(bit_width, self.indices.len())
+ }
+
+ fn estimated_dict_page_size(&self) -> usize {
+ self.interner.storage().page.len()
+ }
+
+ fn flush_dict_page(self) -> DictionaryPage {
+ let storage = self.interner.into_inner();
+
+ DictionaryPage {
+ buf: storage.page.into(),
+ num_values: storage.values.len(),
+ is_sorted: false,
+ }
+ }
+
+ fn flush_data_page(
+ &mut self,
+ min_value: Option<ByteArray>,
+ max_value: Option<ByteArray>,
+ ) -> Result<DataPageValues<ByteArray>> {
+ let num_values = self.indices.len();
+ let buffer_len = self.estimated_data_page_size();
+ let mut buffer = vec![0; buffer_len];
+ buffer[0] = self.bit_width() as u8;
+
+ // Write bit width in the first byte
+ buffer.extend_from_slice((self.bit_width() as u8).as_bytes());
+ let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer,
1);
+ for index in &self.indices {
+ if !encoder.put(*index as u64)? {
+ return Err(general_err!("Encoder doesn't have enough space"));
+ }
+ }
+
+ self.indices.clear();
+
+ Ok(DataPageValues {
+ buf: encoder.consume()?.into(),
+ num_values,
+ encoding: Encoding::RLE_DICTIONARY,
+ min_value,
+ max_value,
+ })
+ }
+}
+
+struct ByteArrayEncoder {
+ fallback: FallbackEncoder,
+ dict_encoder: Option<DictEncoder>,
+ num_values: usize,
+ min_value: Option<ByteArray>,
+ max_value: Option<ByteArray>,
+}
+
+impl ColumnValueEncoder for ByteArrayEncoder {
+ type T = ByteArray;
+ type Values = ArrayRef;
+
+ fn min_max(
+ &self,
+ values: &ArrayRef,
+ value_indices: Option<&[usize]>,
+ ) -> Option<(Self::T, Self::T)> {
+ match value_indices {
+ Some(indices) => {
+ let iter = indices.iter().cloned();
+ downcast_op!(values.data_type(), values, compute_min_max, iter)
+ }
+ None => {
+ let len = Array::len(values);
+ downcast_op!(values.data_type(), values, compute_min_max,
0..len)
+ }
+ }
+ }
+
+ fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
+ where
+ Self: Sized,
+ {
+ let dictionary = props
+ .dictionary_enabled(descr.path())
+ .then(|| DictEncoder::default());
+
+ let fallback = FallbackEncoder::new(descr, props)?;
+
+ Ok(Self {
+ fallback,
+ dict_encoder: dictionary,
+ num_values: 0,
+ min_value: None,
+ max_value: None,
+ })
+ }
+
+ fn write(
+ &mut self,
+ _values: &Self::Values,
+ _offset: usize,
+ _len: usize,
+ ) -> Result<()> {
+ unreachable!("should call write_gather instead")
+ }
+
+ fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) ->
Result<()> {
+ downcast_op!(values.data_type(), values, encode, indices, self);
+ Ok(())
+ }
+
+ fn num_values(&self) -> usize {
+ self.num_values
+ }
+
+ fn has_dictionary(&self) -> bool {
+ self.dict_encoder.is_some()
+ }
+
+ fn estimated_dict_page_size(&self) -> Option<usize> {
+ Some(self.dict_encoder.as_ref()?.estimated_dict_page_size())
+ }
+
+ fn estimated_data_page_size(&self) -> usize {
+ match &self.dict_encoder {
+ Some(encoder) => encoder.estimated_data_page_size(),
+ None => self.fallback.estimated_data_page_size(),
+ }
+ }
+
+ fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>> {
+ match self.dict_encoder.take() {
+ Some(encoder) => {
+ if self.num_values != 0 {
+ return Err(general_err!(
+ "Must flush data pages before flushing dictionary"
+ ));
+ }
+
+ Ok(Some(encoder.flush_dict_page()))
+ }
+ _ => Ok(None),
+ }
+ }
+
+ fn flush_data_page(&mut self) -> Result<DataPageValues<ByteArray>> {
+ let min_value = self.min_value.take();
+ let max_value = self.max_value.take();
+
+ match &mut self.dict_encoder {
+ Some(encoder) => encoder.flush_data_page(min_value, max_value),
+ _ => self.fallback.flush_data_page(min_value, max_value),
+ }
+ }
+}
+
+/// Encodes the provided `values` and `indices` to `encoder`
+///
+/// This is a free function so it can be used with `downcast_op!`
+fn encode<T>(values: T, indices: &[usize], encoder: &mut ByteArrayEncoder)
+where
+ T: ArrayAccessor + Copy,
+ T::Item: Copy + Ord + AsRef<[u8]>,
+{
+ if let Some((min, max)) = compute_min_max(values, indices.iter().cloned())
{
+ if encoder.min_value.as_ref().map_or(true, |m| m > &min) {
+ encoder.min_value = Some(min);
+ }
+
+ if encoder.max_value.as_ref().map_or(true, |m| m < &max) {
+ encoder.max_value = Some(max);
+ }
+ }
+
+ match &mut encoder.dict_encoder {
+ Some(dict_encoder) => dict_encoder.encode(values, indices),
+ None => encoder.fallback.encode(values, indices),
+ }
+}
+
+/// Computes the min and max for the provided array and indices
+///
+/// This is a free function so it can be used with `downcast_op!`
+fn compute_min_max<T>(
+ array: T,
+ mut valid: impl Iterator<Item = usize>,
+) -> Option<(ByteArray, ByteArray)>
+where
+ T: ArrayAccessor,
Review Comment:
Using the new ArrayAccessor :smile:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]