yordan-pavlov commented on a change in pull request #8630: URL: https://github.com/apache/arrow/pull/8630#discussion_r520902160
########## File path: rust/arrow/src/array/transform/mod.rs ########## @@ -0,0 +1,496 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{io::Write, mem::size_of, sync::Arc}; + +use crate::{buffer::MutableBuffer, datatypes::DataType, util::bit_util}; + +use super::{ArrayData, ArrayDataRef}; + +mod boolean; +mod list; +mod primitive; +mod utils; +mod variable_size; + +type ExtendNullBits<'a> = Box<Fn(&mut _MutableArrayData, usize, usize) -> () + 'a>; +// function that extends `[start..start+len]` to the mutable array. +// this is dynamic because different data_types influence how buffers and childs are extended. +type Extend<'a> = Box<Fn(&mut _MutableArrayData, usize, usize) -> () + 'a>; + +#[derive(Debug)] +struct _MutableArrayData<'a> { + pub data_type: DataType, + pub null_count: usize, + + pub len: usize, + pub null_buffer: MutableBuffer, + + pub buffers: Vec<MutableBuffer>, + pub child_data: Vec<MutableArrayData<'a>>, +} + +impl<'a> _MutableArrayData<'a> { + fn freeze(self, dictionary: Option<ArrayDataRef>) -> ArrayData { + let mut buffers = Vec::with_capacity(self.buffers.len()); + for buffer in self.buffers { + buffers.push(buffer.freeze()); + } + + let child_data = match self.data_type { + DataType::Dictionary(_, _) => vec![dictionary.unwrap()], + _ => { + let mut child_data = Vec::with_capacity(self.child_data.len()); + for child in self.child_data { + child_data.push(Arc::new(child.freeze())); + } + child_data + } + }; + ArrayData::new( + self.data_type, + self.len, + Some(self.null_count), + if self.null_count > 0 { + Some(self.null_buffer.freeze()) + } else { + None + }, + 0, + buffers, + child_data, + ) + } + + /// Returns the buffer `buffer` as a slice of type `T`. When the expected buffer is bit-packed, + /// the slice is not offset. + #[inline] + pub(super) fn buffer<T>(&self, buffer: usize) -> &[T] { + let values = unsafe { self.buffers[buffer].data().align_to::<T>() }; + if !values.0.is_empty() || !values.2.is_empty() { + // this is unreachable because + unreachable!("The buffer is not byte-aligned with its interpretation") + }; + &values.1 + } +} + +fn build_set_nulls<'a>(array: &'a ArrayData) -> ExtendNullBits<'a> { + if let Some(bitmap) = array.null_bitmap() { + let bytes = bitmap.bits.data(); + Box::new(move |mutable, start, len| { + utils::resize_for_bits(&mut mutable.null_buffer, mutable.len + len); + mutable.null_count += utils::set_bits( + mutable.null_buffer.data_mut(), + bytes, + mutable.len, + array.offset() + start, + len, + ); + }) + } else { + Box::new(|_, _, _| {}) + } +} + +/// Struct to efficiently and interactively create an [ArrayData] from an existing [ArrayData] by +/// copying chunks. +/// The main use case of this struct is to perform unary operations to arrays of arbitrary types, such as `filter` and `take`. +/// # Example: +/// +/// ``` +/// use std::sync::Arc; +/// use arrow::{array::{Int32Array, Array, MutableArrayData}}; +/// +/// let array = Int32Array::from(vec![1, 2, 3, 4, 5]).data(); +/// // Create a new `MutableArrayData` from an array and with a capacity. +/// // Capacity here is equivalent to `Vec::with_capacity` +/// let mut mutable = MutableArrayData::new(&array, 4); +/// mutable.extend(1, 3); // extend from the slice [1..3], [2,3] +/// mutable.extend(0, 3); // extend from the slice [0..3], [1,2,3] +/// // `.freeze()` to convert `MutableArrayData` into a `ArrayData`. +/// let new_array = Int32Array::from(Arc::new(mutable.freeze())); +/// assert_eq!(Int32Array::from(vec![2, 3, 1, 2, 3]), new_array); +/// ``` +pub struct MutableArrayData<'a> { + data: _MutableArrayData<'a>, + + dictionary: Option<ArrayDataRef>, + + push_slice: Extend<'a>, + set_nulls: ExtendNullBits<'a>, +} + +impl<'a> std::fmt::Debug for MutableArrayData<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + // ignores the closures. + f.debug_struct("MutableArrayData") + .field("data", &self.data) + .finish() + } +} + +impl<'a> MutableArrayData<'a> { + /// returns a new [MutableArrayData] with capacity to `capacity` slots and specialized to create an + /// [ArrayData] from `array` + pub fn new(array: &'a ArrayData, capacity: usize) -> Self { + let data_type = array.data_type(); + use crate::datatypes::*; + let push_slice = match &data_type { + DataType::Boolean => boolean::build_extend(array), + DataType::UInt8 => primitive::build_extend::<u8>(array), + DataType::UInt16 => primitive::build_extend::<u16>(array), + DataType::UInt32 => primitive::build_extend::<u32>(array), + DataType::UInt64 => primitive::build_extend::<u64>(array), + DataType::Int8 => primitive::build_extend::<i8>(array), + DataType::Int16 => primitive::build_extend::<i16>(array), + DataType::Int32 => primitive::build_extend::<i32>(array), + DataType::Int64 => primitive::build_extend::<i64>(array), + DataType::Float32 => primitive::build_extend::<f32>(array), + DataType::Float64 => primitive::build_extend::<f64>(array), + DataType::Date32(_) + | DataType::Time32(_) + | DataType::Interval(IntervalUnit::YearMonth) => { + primitive::build_extend::<i32>(array) + } + DataType::Date64(_) + | DataType::Time64(_) + | DataType::Timestamp(_, _) + | DataType::Duration(_) + | DataType::Interval(IntervalUnit::DayTime) => { + primitive::build_extend::<i64>(array) + } + DataType::Utf8 | DataType::Binary => { + variable_size::build_extend::<i32>(array) + } + DataType::LargeUtf8 | DataType::LargeBinary => { + variable_size::build_extend::<i64>(array) + } + DataType::List(_) => list::build_extend::<i32>(array), + DataType::LargeList(_) => list::build_extend::<i64>(array), + DataType::Dictionary(child_data_type, _) => match child_data_type.as_ref() { + DataType::UInt8 => primitive::build_extend::<u8>(array), + DataType::UInt16 => primitive::build_extend::<u16>(array), + DataType::UInt32 => primitive::build_extend::<u32>(array), + DataType::UInt64 => primitive::build_extend::<u64>(array), + DataType::Int8 => primitive::build_extend::<i8>(array), + DataType::Int16 => primitive::build_extend::<i16>(array), + DataType::Int32 => primitive::build_extend::<i32>(array), + DataType::Int64 => primitive::build_extend::<i64>(array), + _ => unreachable!(), + }, + DataType::Float16 => unreachable!(), + /* + DataType::Null => {} + DataType::FixedSizeBinary(_) => {} + DataType::FixedSizeList(_, _) => {} + DataType::Struct(_) => {} + DataType::Union(_) => {} + */ + _ => { + todo!("Take and filter operations still not supported for this datatype") + } + }; + + let buffers = match &data_type { + DataType::Boolean => vec![MutableBuffer::new(capacity)], + DataType::UInt8 => vec![MutableBuffer::new(capacity * size_of::<u8>())], + DataType::UInt16 => vec![MutableBuffer::new(capacity * size_of::<u16>())], + DataType::UInt32 => vec![MutableBuffer::new(capacity * size_of::<u32>())], + DataType::UInt64 => vec![MutableBuffer::new(capacity * size_of::<u64>())], + DataType::Int8 => vec![MutableBuffer::new(capacity * size_of::<i8>())], + DataType::Int16 => vec![MutableBuffer::new(capacity * size_of::<i16>())], + DataType::Int32 => vec![MutableBuffer::new(capacity * size_of::<i32>())], + DataType::Int64 => vec![MutableBuffer::new(capacity * size_of::<i64>())], + DataType::Float32 => vec![MutableBuffer::new(capacity * size_of::<f32>())], + DataType::Float64 => vec![MutableBuffer::new(capacity * size_of::<f64>())], + DataType::Date32(_) | DataType::Time32(_) => { + vec![MutableBuffer::new(capacity * size_of::<i32>())] + } + DataType::Date64(_) + | DataType::Time64(_) + | DataType::Duration(_) + | DataType::Timestamp(_, _) => { + vec![MutableBuffer::new(capacity * size_of::<i64>())] + } + DataType::Interval(IntervalUnit::YearMonth) => { + vec![MutableBuffer::new(capacity * size_of::<i32>())] + } + DataType::Interval(IntervalUnit::DayTime) => { + vec![MutableBuffer::new(capacity * size_of::<i64>())] + } + DataType::Utf8 | DataType::Binary => { + let mut buffer = MutableBuffer::new((1 + capacity) * size_of::<i32>()); + buffer.write(0i32.to_byte_slice()).unwrap(); + vec![buffer, MutableBuffer::new(capacity * size_of::<u8>())] + } + DataType::LargeUtf8 | DataType::LargeBinary => { + let mut buffer = MutableBuffer::new((1 + capacity) * size_of::<i64>()); + buffer.write(0i64.to_byte_slice()).unwrap(); + vec![buffer, MutableBuffer::new(capacity * size_of::<u8>())] + } + DataType::List(_) => { + // offset buffer always starts with a zero + let mut buffer = MutableBuffer::new((1 + capacity) * size_of::<i32>()); + buffer.write(0i32.to_byte_slice()).unwrap(); + vec![buffer] + } + DataType::LargeList(_) => { + // offset buffer always starts with a zero + let mut buffer = MutableBuffer::new((1 + capacity) * size_of::<i64>()); + buffer.write(0i64.to_byte_slice()).unwrap(); + vec![buffer] + } + DataType::Dictionary(child_data_type, _) => match child_data_type.as_ref() { + DataType::UInt8 => vec![MutableBuffer::new(capacity * size_of::<u8>())], + DataType::UInt16 => vec![MutableBuffer::new(capacity * size_of::<u16>())], + DataType::UInt32 => vec![MutableBuffer::new(capacity * size_of::<u32>())], + DataType::UInt64 => vec![MutableBuffer::new(capacity * size_of::<u64>())], + DataType::Int8 => vec![MutableBuffer::new(capacity * size_of::<i8>())], + DataType::Int16 => vec![MutableBuffer::new(capacity * size_of::<i16>())], + DataType::Int32 => vec![MutableBuffer::new(capacity * size_of::<i32>())], + DataType::Int64 => vec![MutableBuffer::new(capacity * size_of::<i64>())], + _ => unreachable!(), + }, + DataType::Float16 => unreachable!(), + _ => { + todo!("Take and filter operations still not supported for this datatype") + } + }; + + let child_data = match &data_type { + DataType::Null + | DataType::Boolean + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::Float32 + | DataType::Float64 + | DataType::Date32(_) + | DataType::Date64(_) + | DataType::Time32(_) + | DataType::Time64(_) + | DataType::Duration(_) + | DataType::Timestamp(_, _) + | DataType::Utf8 + | DataType::Binary + | DataType::LargeUtf8 + | DataType::LargeBinary + | DataType::Interval(_) + | DataType::FixedSizeBinary(_) => vec![], + DataType::List(_) | DataType::LargeList(_) => { + vec![MutableArrayData::new(&array.child_data()[0], capacity)] + } + // the dictionary type just appends keys and clones the values. + DataType::Dictionary(_, _) => vec![], + DataType::Float16 => unreachable!(), + _ => { + todo!("Take and filter operations still not supported for this datatype") + } + }; + + let dictionary = match &data_type { + DataType::Dictionary(_, _) => Some(array.child_data()[0].clone()), + _ => None, + }; + + let set_nulls = build_set_nulls(array); + + let null_bytes = bit_util::ceil(capacity, 8); + let null_buffer = MutableBuffer::new(null_bytes); + + let data = _MutableArrayData { + data_type: data_type.clone(), + len: 0, + null_count: 0, + null_buffer, + buffers, + child_data, + }; + Self { + data, + dictionary, + push_slice: Box::new(push_slice), + set_nulls, + } + } + + /// Extends this [MutableArrayData] with elements from the bounded [ArrayData] at `start` + /// and for a size of `len`. + /// # Panic + /// This function panics if the range is out of bounds, i.e. if `start + len >= array.len()`. + pub fn extend(&mut self, start: usize, end: usize) { Review comment: I am not sure extend() correctly conveys what the method does; the idea being that we take / copy a slice (defined by start and end) into the output array. Would take() or copy() be a better name? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org