tustvold commented on code in PR #2221:
URL: https://github.com/apache/arrow-rs/pull/2221#discussion_r933069070
##########
parquet/src/arrow/arrow_writer/byte_array.rs:
##########
@@ -0,0 +1,537 @@
+use crate::arrow::arrow_writer::levels::LevelInfo;
+use crate::arrow::arrow_writer::ArrayWriter;
+use crate::basic::Encoding;
+use crate::column::page::PageWriter;
+use crate::column::writer::encoder::{
+ ColumnValueEncoder, DataPageValues, DictionaryPage,
+};
+use crate::column::writer::GenericColumnWriter;
+use crate::data_type::{AsBytes, ByteArray, Int32Type};
+use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
+use crate::encodings::rle::RleEncoder;
+use crate::errors::{ParquetError, Result};
+use crate::file::properties::{WriterProperties, WriterPropertiesPtr,
WriterVersion};
+use crate::file::writer::OnCloseColumnChunk;
+use crate::schema::types::ColumnDescPtr;
+use crate::util::bit_util::num_required_bits;
+use crate::util::interner::{Interner, Storage};
+use arrow::array::{
+ Array, ArrayAccessor, ArrayRef, BinaryArray, LargeBinaryArray,
LargeStringArray,
+ StringArray,
+};
+use arrow::datatypes::DataType;
+
+macro_rules! downcast_op {
+ ($data_type:expr, $array:ident, $op:expr $(, $arg:expr)*) => {
+ match $data_type {
+ DataType::Utf8 =>
$op($array.as_any().downcast_ref::<StringArray>().unwrap()$(, $arg)*),
+ DataType::LargeUtf8 => {
+
$op($array.as_any().downcast_ref::<LargeStringArray>().unwrap()$(, $arg)*)
+ }
+ DataType::Binary => {
+ $op($array.as_any().downcast_ref::<BinaryArray>().unwrap()$(,
$arg)*)
+ }
+ DataType::LargeBinary => {
+
$op($array.as_any().downcast_ref::<LargeBinaryArray>().unwrap()$(, $arg)*)
+ }
+ d => unreachable!("cannot downcast {} to byte array", d)
+ }
+ };
+}
+
+/// Returns an [`ArrayWriter`] for byte or string arrays
+pub(super) fn make_byte_array_writer<'a>(
+ descr: ColumnDescPtr,
+ data_type: DataType,
+ props: WriterPropertiesPtr,
+ page_writer: Box<dyn PageWriter + 'a>,
+ on_close: OnCloseColumnChunk<'a>,
+) -> Box<dyn ArrayWriter + 'a> {
+ Box::new(ByteArrayWriter {
+ writer: Some(GenericColumnWriter::new(descr, props, page_writer)),
+ on_close: Some(on_close),
+ data_type,
+ })
+}
+
+/// An [`ArrayWriter`] for [`ByteArray`]
+struct ByteArrayWriter<'a> {
+ writer: Option<GenericColumnWriter<'a, ByteArrayEncoder>>,
+ on_close: Option<OnCloseColumnChunk<'a>>,
+ data_type: DataType,
+}
+
+impl<'a> ArrayWriter for ByteArrayWriter<'a> {
+ fn write(&mut self, array: &ArrayRef, levels: LevelInfo) -> Result<()> {
+ self.writer.as_mut().unwrap().write_batch_internal(
+ array,
+ Some(levels.non_null_indices()),
+ levels.def_levels(),
+ levels.rep_levels(),
+ None,
+ None,
+ None,
+ )?;
+ Ok(())
+ }
+
+ fn close(&mut self) -> Result<()> {
+ let (bytes_written, rows_written, metadata, column_index,
offset_index) =
+ self.writer.take().unwrap().close()?;
+
+ if let Some(on_close) = self.on_close.take() {
+ on_close(
+ bytes_written,
+ rows_written,
+ metadata,
+ column_index,
+ offset_index,
+ )?;
+ }
+ Ok(())
+ }
+}
+
+/// A fallback encoder, i.e. non-dictionary, for [`ByteArray`]
+struct FallbackEncoder {
+ encoder: FallbackEncoderImpl,
+ num_values: usize,
+}
+
+enum FallbackEncoderImpl {
+ Plain {
+ buffer: Vec<u8>,
+ },
+ DeltaLength {
+ buffer: Vec<u8>,
+ lengths: DeltaBitPackEncoder<Int32Type>,
+ },
+ Delta {
+ buffer: Vec<u8>,
+ last_value: Vec<u8>,
+ prefix_lengths: DeltaBitPackEncoder<Int32Type>,
+ suffix_lengths: DeltaBitPackEncoder<Int32Type>,
+ },
+}
+
+impl FallbackEncoder {
+ /// Create the fallback encoder for the given [`ColumnDescPtr`] and
[`WriterProperties`]
+ fn new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self> {
+ // Set either main encoder or fallback encoder.
+ let encoding = props.encoding(descr.path()).unwrap_or_else(|| {
+ match props.writer_version() {
+ WriterVersion::PARQUET_1_0 => Encoding::PLAIN,
+ WriterVersion::PARQUET_2_0 => Encoding::DELTA_BYTE_ARRAY,
+ }
+ });
+
+ let encoder = match encoding {
+ Encoding::PLAIN => FallbackEncoderImpl::Plain { buffer: vec![] },
+ Encoding::DELTA_LENGTH_BYTE_ARRAY =>
FallbackEncoderImpl::DeltaLength {
+ buffer: vec![],
+ lengths: DeltaBitPackEncoder::new(),
+ },
+ Encoding::DELTA_BYTE_ARRAY => FallbackEncoderImpl::Delta {
+ buffer: vec![],
+ last_value: vec![],
+ prefix_lengths: DeltaBitPackEncoder::new(),
+ suffix_lengths: DeltaBitPackEncoder::new(),
+ },
+ _ => {
+ return Err(general_err!(
+ "unsupported encoding {} for byte array",
+ encoding
+ ))
+ }
+ };
+
+ Ok(Self {
+ encoder,
+ num_values: 0,
+ })
+ }
+
+ /// Encode `values` to the in-progress page
+ fn encode<T>(&mut self, values: T, indices: &[usize])
+ where
+ T: ArrayAccessor + Copy,
+ T::Item: AsRef<[u8]>,
+ {
+ self.num_values += indices.len();
+ match &mut self.encoder {
+ FallbackEncoderImpl::Plain { buffer } => {
+ for idx in indices {
+ let value = values.value(*idx);
+ let value = value.as_ref();
+ buffer.extend_from_slice((value.len() as u32).as_bytes());
+ buffer.extend_from_slice(value)
+ }
+ }
+ FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
+ for idx in indices {
+ let value = values.value(*idx);
+ let value = value.as_ref();
+ lengths.put(&[value.len() as i32]).unwrap();
+ buffer.extend_from_slice(value);
+ }
+ }
+ FallbackEncoderImpl::Delta {
+ buffer,
+ last_value,
+ prefix_lengths,
+ suffix_lengths,
+ } => {
+ for idx in indices {
+ let value = values.value(*idx);
+ let value = value.as_ref();
+ let mut prefix_length = 0;
+
+ while prefix_length < last_value.len()
+ && prefix_length < value.len()
+ && last_value[prefix_length] == value[prefix_length]
+ {
+ prefix_length += 1;
+ }
+
+ let suffix_length = value.len() - prefix_length;
+
+ last_value.clear();
+ last_value.extend_from_slice(value);
+
+ buffer.extend_from_slice(&value[prefix_length..]);
+ prefix_lengths.put(&[prefix_length as i32]).unwrap();
+ suffix_lengths.put(&[suffix_length as i32]).unwrap();
+ }
+ }
+ }
+ }
+
+ fn estimated_data_page_size(&self) -> usize {
+ match &self.encoder {
+ FallbackEncoderImpl::Plain { buffer, .. } => buffer.len(),
+ FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
+ buffer.len() + lengths.estimated_data_encoded_size()
+ }
+ FallbackEncoderImpl::Delta {
+ buffer,
+ prefix_lengths,
+ suffix_lengths,
+ ..
+ } => {
+ buffer.len()
+ + prefix_lengths.estimated_data_encoded_size()
+ + suffix_lengths.estimated_data_encoded_size()
+ }
+ }
+ }
+
+ fn flush_data_page(
+ &mut self,
+ min_value: Option<ByteArray>,
+ max_value: Option<ByteArray>,
+ ) -> Result<DataPageValues<ByteArray>> {
+ let (buf, encoding) = match &mut self.encoder {
+ FallbackEncoderImpl::Plain { buffer } => {
+ (std::mem::take(buffer), Encoding::PLAIN)
+ }
+ FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
+ let lengths = lengths.flush_buffer()?;
+
+ let mut out = Vec::with_capacity(lengths.len() + buffer.len());
+ out.extend_from_slice(lengths.data());
+ out.extend_from_slice(buffer);
+ (out, Encoding::DELTA_LENGTH_BYTE_ARRAY)
+ }
+ FallbackEncoderImpl::Delta {
+ buffer,
+ prefix_lengths,
+ suffix_lengths,
+ ..
+ } => {
+ let prefix_lengths = prefix_lengths.flush_buffer()?;
+ let suffix_lengths = suffix_lengths.flush_buffer()?;
+
+ let mut out = Vec::with_capacity(
+ prefix_lengths.len() + suffix_lengths.len() + buffer.len(),
+ );
+ out.extend_from_slice(prefix_lengths.data());
+ out.extend_from_slice(suffix_lengths.data());
+ out.extend_from_slice(buffer);
+ (out, Encoding::DELTA_BYTE_ARRAY)
+ }
+ };
+
+ Ok(DataPageValues {
+ buf: buf.into(),
+ num_values: std::mem::take(&mut self.num_values),
+ encoding,
+ min_value,
+ max_value,
+ })
+ }
+}
+
+/// [`Storage`] for the [`Interner`] used by [`DictEncoder`]
+#[derive(Debug, Default)]
+struct ByteArrayStorage {
+ /// Encoded dictionary data
+ page: Vec<u8>,
+
+ values: Vec<std::ops::Range<usize>>,
+}
+
+impl Storage for ByteArrayStorage {
+ type Key = u64;
+ type Value = [u8];
+
+ fn get(&self, idx: Self::Key) -> &Self::Value {
+ &self.page[self.values[idx as usize].clone()]
+ }
+
+ fn push(&mut self, value: &Self::Value) -> Self::Key {
+ let key = self.values.len();
+
+ self.page.reserve(4 + value.len());
+ self.page.extend_from_slice((value.len() as u32).as_bytes());
+
+ let start = self.page.len();
+ self.page.extend_from_slice(value);
+ self.values.push(start..self.page.len());
+
+ key as u64
+ }
+}
+
+/// A dictionary encoder for byte array data
+#[derive(Debug, Default)]
+struct DictEncoder {
+ interner: Interner<ByteArrayStorage>,
+ indices: Vec<u64>,
+}
+
+impl DictEncoder {
+ /// Encode `values` to the in-progress page
+ fn encode<T>(&mut self, values: T, indices: &[usize])
+ where
+ T: ArrayAccessor + Copy,
+ T::Item: AsRef<[u8]>,
+ {
+ self.indices.reserve(indices.len());
+
+ for idx in indices {
+ let value = values.value(*idx);
+ let interned = self.interner.intern(value.as_ref());
+ self.indices.push(interned);
+ }
+ }
+
+ fn bit_width(&self) -> u8 {
+ let length = self.interner.storage().values.len();
+ num_required_bits(length.saturating_sub(1) as u64)
+ }
+
+ fn estimated_data_page_size(&self) -> usize {
+ let bit_width = self.bit_width();
+ 1 + RleEncoder::min_buffer_size(bit_width)
+ + RleEncoder::max_buffer_size(bit_width, self.indices.len())
+ }
+
+ fn estimated_dict_page_size(&self) -> usize {
+ self.interner.storage().page.len()
+ }
+
+ fn flush_dict_page(self) -> DictionaryPage {
+ let storage = self.interner.into_inner();
+
+ DictionaryPage {
+ buf: storage.page.into(),
+ num_values: storage.values.len(),
+ is_sorted: false,
+ }
+ }
+
+ fn flush_data_page(
+ &mut self,
+ min_value: Option<ByteArray>,
+ max_value: Option<ByteArray>,
+ ) -> Result<DataPageValues<ByteArray>> {
+ let num_values = self.indices.len();
+ let buffer_len = self.estimated_data_page_size();
+ let mut buffer = vec![0; buffer_len];
+ buffer[0] = self.bit_width() as u8;
+
+ // Write bit width in the first byte
+ buffer.extend_from_slice((self.bit_width() as u8).as_bytes());
+ let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer,
1);
+ for index in &self.indices {
+ if !encoder.put(*index as u64)? {
+ return Err(general_err!("Encoder doesn't have enough space"));
+ }
+ }
+
+ self.indices.clear();
+
+ Ok(DataPageValues {
+ buf: encoder.consume()?.into(),
+ num_values,
+ encoding: Encoding::RLE_DICTIONARY,
+ min_value,
+ max_value,
+ })
+ }
+}
+
+struct ByteArrayEncoder {
+ fallback: FallbackEncoder,
+ dict_encoder: Option<DictEncoder>,
+ num_values: usize,
+ min_value: Option<ByteArray>,
+ max_value: Option<ByteArray>,
+}
+
+impl ColumnValueEncoder for ByteArrayEncoder {
+ type T = ByteArray;
+ type Values = ArrayRef;
Review Comment:
Initially I had the concrete type here, i.e. `StringArray`. This works,
however, would present difficulties in adapting this to preserve dictionaries,
as TypedDictionary (#2136) will contain a lifetime, which would then require
GATs here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]