This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 5bce1044f Add COW conversion for Buffer and PrimitiveArray and
unary_mut (#3115)
5bce1044f is described below
commit 5bce1044f6ae3d64117b2f692a427af7e9d06029
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Thu Nov 17 12:44:17 2022 -0800
Add COW conversion for Buffer and PrimitiveArray and unary_mut (#3115)
* Add some APIs for copy-on-write support
* Update
* Add unary_mut as an example
* For review
* For review
* For review
* Fix test and more for review
* Add test on sliced array
* Address an overlooking review.
* For review
---
arrow-array/src/array/mod.rs | 12 ++
arrow-array/src/array/primitive_array.rs | 161 +++++++++++++++++++++-
arrow-array/src/builder/boolean_buffer_builder.rs | 5 +
arrow-array/src/builder/buffer_builder.rs | 9 ++
arrow-array/src/builder/null_buffer_builder.rs | 25 +++-
arrow-array/src/builder/primitive_builder.rs | 24 ++++
arrow-buffer/src/buffer/immutable.rs | 19 +++
arrow-buffer/src/buffer/mutable.rs | 19 +++
arrow-buffer/src/bytes.rs | 5 +
9 files changed, 277 insertions(+), 2 deletions(-)
diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs
index 41aa438c9..307753a71 100644
--- a/arrow-array/src/array/mod.rs
+++ b/arrow-array/src/array/mod.rs
@@ -862,6 +862,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
+ use crate::cast::downcast_array;
use arrow_schema::Field;
#[test]
@@ -1113,4 +1114,15 @@ mod tests {
assert!(compute_my_thing(&arr));
assert!(compute_my_thing(arr.as_ref()));
}
+
+ #[test]
+ fn test_downcast_array() {
+ let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();
+
+ let boxed: ArrayRef = Arc::new(array);
+ let array: Int32Array = downcast_array(&boxed);
+
+ let expected: Int32Array = vec![1, 2,
3].into_iter().map(Some).collect();
+ assert_eq!(array, expected);
+ }
}
diff --git a/arrow-array/src/array/primitive_array.rs
b/arrow-array/src/array/primitive_array.rs
index 7cf7de721..195e2dc19 100644
--- a/arrow-array/src/array/primitive_array.rs
+++ b/arrow-array/src/array/primitive_array.rs
@@ -397,6 +397,36 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
unsafe { build_primitive_array(len, buffer, null_count, null_buffer) }
}
+ /// Applies an unary and infallible function to a mutable primitive array.
+ /// Mutable primitive array means that the buffer is not shared with other
arrays.
+ /// As a result, this mutates the buffer directly without allocating new
buffer.
+ ///
+ /// # Implementation
+ ///
+ /// This will apply the function for all values, including those on null
slots.
+ /// This implies that the operation must be infallible for any value of
the corresponding type
+ /// or this function may panic.
+ /// # Example
+ /// ```rust
+ /// # use arrow_array::{Int32Array, types::Int32Type};
+ /// # fn main() {
+ /// let array = Int32Array::from(vec![Some(5), Some(7), None]);
+ /// let c = array.unary_mut(|x| x * 2 + 1).unwrap();
+ /// assert_eq!(c, Int32Array::from(vec![Some(11), Some(15), None]));
+ /// # }
+ /// ```
+ pub fn unary_mut<F>(self, op: F) -> Result<PrimitiveArray<T>,
PrimitiveArray<T>>
+ where
+ F: Fn(T::Native) -> T::Native,
+ {
+ let mut builder = self.into_builder()?;
+ builder
+ .values_slice_mut()
+ .iter_mut()
+ .for_each(|v| *v = op(*v));
+ Ok(builder.finish())
+ }
+
/// Applies a unary and fallible function to all valid values in a
primitive array
///
/// This is unlike [`Self::unary`] which will apply an infallible function
to all rows
@@ -489,6 +519,66 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
)
}
}
+
+ /// Returns `PrimitiveBuilder` of this primitive array for mutating its
values if the underlying
+ /// data buffer is not shared by others.
+ pub fn into_builder(self) -> Result<PrimitiveBuilder<T>, Self> {
+ let len = self.len();
+ let null_bit_buffer = self
+ .data
+ .null_buffer()
+ .map(|b| b.bit_slice(self.data.offset(), len));
+
+ let element_len = std::mem::size_of::<T::Native>();
+ let buffer = self.data.buffers()[0]
+ .slice_with_length(self.data.offset() * element_len, len *
element_len);
+
+ drop(self.data);
+
+ let try_mutable_null_buffer = match null_bit_buffer {
+ None => Ok(None),
+ Some(null_buffer) => {
+ // Null buffer exists, tries to make it mutable
+ null_buffer.into_mutable().map(Some)
+ }
+ };
+
+ let try_mutable_buffers = match try_mutable_null_buffer {
+ Ok(mutable_null_buffer) => {
+ // Got mutable null buffer, tries to get mutable value buffer
+ let try_mutable_buffer = buffer.into_mutable();
+
+ // try_mutable_buffer.map(...).map_err(...) doesn't work as
the compiler complains
+ // mutable_null_buffer is moved into map closure.
+ match try_mutable_buffer {
+ Ok(mutable_buffer) =>
Ok(PrimitiveBuilder::<T>::new_from_buffer(
+ mutable_buffer,
+ mutable_null_buffer,
+ )),
+ Err(buffer) => Err((buffer, mutable_null_buffer.map(|b|
b.into()))),
+ }
+ }
+ Err(mutable_null_buffer) => {
+ // Unable to get mutable null buffer
+ Err((buffer, Some(mutable_null_buffer)))
+ }
+ };
+
+ match try_mutable_buffers {
+ Ok(builder) => Ok(builder),
+ Err((buffer, null_bit_buffer)) => {
+ let builder = ArrayData::builder(T::DATA_TYPE)
+ .len(len)
+ .add_buffer(buffer)
+ .null_bit_buffer(null_bit_buffer);
+
+ let array_data = unsafe { builder.build_unchecked() };
+ let array = PrimitiveArray::<T>::from(array_data);
+
+ Err(array)
+ }
+ }
+ }
}
#[inline]
@@ -1036,7 +1126,9 @@ impl<T: DecimalType + ArrowPrimitiveType>
PrimitiveArray<T> {
mod tests {
use super::*;
use crate::builder::{Decimal128Builder, Decimal256Builder};
- use crate::BooleanArray;
+ use crate::cast::downcast_array;
+ use crate::{ArrayRef, BooleanArray};
+ use std::sync::Arc;
#[test]
fn test_primitive_array_from_vec() {
@@ -1939,4 +2031,71 @@ mod tests {
array.value(4);
}
+
+ #[test]
+ fn test_into_builder() {
+ let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();
+
+ let boxed: ArrayRef = Arc::new(array);
+ let col: Int32Array = downcast_array(&boxed);
+ drop(boxed);
+
+ let mut builder = col.into_builder().unwrap();
+
+ let slice = builder.values_slice_mut();
+ assert_eq!(slice, &[1, 2, 3]);
+
+ slice[0] = 4;
+ slice[1] = 2;
+ slice[2] = 1;
+
+ let expected: Int32Array = vec![Some(4), Some(2),
Some(1)].into_iter().collect();
+
+ let new_array = builder.finish();
+ assert_eq!(expected, new_array);
+ }
+
+ #[test]
+ fn test_into_builder_cloned_array() {
+ let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();
+
+ let boxed: ArrayRef = Arc::new(array);
+
+ let col: Int32Array =
PrimitiveArray::<Int32Type>::from(boxed.data().clone());
+ let err = col.into_builder();
+
+ match err {
+ Ok(_) => panic!("Should not get builder from cloned array"),
+ Err(returned) => {
+ let expected: Int32Array = vec![1, 2,
3].into_iter().map(Some).collect();
+ assert_eq!(expected, returned)
+ }
+ }
+ }
+
+ #[test]
+ fn test_into_builder_on_sliced_array() {
+ let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();
+ let slice = array.slice(1, 2);
+ let col: Int32Array = downcast_array(&slice);
+
+ drop(slice);
+
+ col.into_builder()
+ .expect_err("Should not build builder from sliced array");
+ }
+
+ #[test]
+ fn test_unary_mut() {
+ let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();
+
+ let c = array.unary_mut(|x| x * 2 + 1).unwrap();
+ let expected: Int32Array = vec![3, 5,
7].into_iter().map(Some).collect();
+
+ assert_eq!(expected, c);
+
+ let array: Int32Array = Int32Array::from(vec![Some(5), Some(7), None]);
+ let c = array.unary_mut(|x| x * 2 + 1).unwrap();
+ assert_eq!(c, Int32Array::from(vec![Some(11), Some(15), None]));
+ }
}
diff --git a/arrow-array/src/builder/boolean_buffer_builder.rs
b/arrow-array/src/builder/boolean_buffer_builder.rs
index 16c6750d1..2ab01ccfe 100644
--- a/arrow-array/src/builder/boolean_buffer_builder.rs
+++ b/arrow-array/src/builder/boolean_buffer_builder.rs
@@ -33,6 +33,11 @@ impl BooleanBufferBuilder {
Self { buffer, len: 0 }
}
+ pub fn new_from_buffer(buffer: MutableBuffer, len: usize) -> Self {
+ assert!(len <= buffer.len() * 8);
+ Self { buffer, len }
+ }
+
#[inline]
pub fn len(&self) -> usize {
self.len
diff --git a/arrow-array/src/builder/buffer_builder.rs
b/arrow-array/src/builder/buffer_builder.rs
index 2da11cb23..d3146366d 100644
--- a/arrow-array/src/builder/buffer_builder.rs
+++ b/arrow-array/src/builder/buffer_builder.rs
@@ -124,6 +124,15 @@ impl<T: ArrowNativeType> BufferBuilder<T> {
}
}
+ pub fn new_from_buffer(buffer: MutableBuffer) -> Self {
+ let buffer_len = buffer.len();
+ Self {
+ buffer,
+ len: buffer_len / std::mem::size_of::<T>(),
+ _marker: PhantomData,
+ }
+ }
+
/// Returns the current number of array elements in the internal buffer.
///
/// # Example:
diff --git a/arrow-array/src/builder/null_buffer_builder.rs
b/arrow-array/src/builder/null_buffer_builder.rs
index b2aa622ca..fef7214d5 100644
--- a/arrow-array/src/builder/null_buffer_builder.rs
+++ b/arrow-array/src/builder/null_buffer_builder.rs
@@ -16,7 +16,7 @@
// under the License.
use crate::builder::BooleanBufferBuilder;
-use arrow_buffer::Buffer;
+use arrow_buffer::{Buffer, MutableBuffer};
/// Builder for creating the null bit buffer.
/// This builder only materializes the buffer when we append `false`.
@@ -42,6 +42,29 @@ impl NullBufferBuilder {
}
}
+ /// Creates a new builder with given length.
+ pub fn new_with_len(len: usize) -> Self {
+ Self {
+ bitmap_builder: None,
+ len,
+ capacity: len,
+ }
+ }
+
+ /// Creates a new builder from a `MutableBuffer`.
+ pub fn new_from_buffer(buffer: MutableBuffer, len: usize) -> Self {
+ let capacity = buffer.len() * 8;
+
+ assert!(len < capacity);
+
+ let bitmap_builder =
Some(BooleanBufferBuilder::new_from_buffer(buffer, len));
+ Self {
+ bitmap_builder,
+ len,
+ capacity,
+ }
+ }
+
/// Appends `n` `true`s into the builder
/// to indicate that these `n` items are not nulls.
#[inline]
diff --git a/arrow-array/src/builder/primitive_builder.rs
b/arrow-array/src/builder/primitive_builder.rs
index ed3594c60..55d8bac01 100644
--- a/arrow-array/src/builder/primitive_builder.rs
+++ b/arrow-array/src/builder/primitive_builder.rs
@@ -19,6 +19,7 @@ use crate::builder::null_buffer_builder::NullBufferBuilder;
use crate::builder::{ArrayBuilder, BufferBuilder};
use crate::types::*;
use crate::{ArrayRef, ArrowPrimitiveType, PrimitiveArray};
+use arrow_buffer::MutableBuffer;
use arrow_data::ArrayData;
use std::any::Any;
use std::sync::Arc;
@@ -114,6 +115,24 @@ impl<T: ArrowPrimitiveType> PrimitiveBuilder<T> {
}
}
+ pub fn new_from_buffer(
+ values_buffer: MutableBuffer,
+ null_buffer: Option<MutableBuffer>,
+ ) -> Self {
+ let values_builder =
BufferBuilder::<T::Native>::new_from_buffer(values_buffer);
+
+ let null_buffer_builder = null_buffer
+ .map(|buffer| {
+ NullBufferBuilder::new_from_buffer(buffer,
values_builder.len())
+ })
+ .unwrap_or_else(||
NullBufferBuilder::new_with_len(values_builder.len()));
+
+ Self {
+ values_builder,
+ null_buffer_builder,
+ }
+ }
+
/// Returns the capacity of this builder measured in slots of type `T`
pub fn capacity(&self) -> usize {
self.values_builder.capacity()
@@ -204,6 +223,11 @@ impl<T: ArrowPrimitiveType> PrimitiveBuilder<T> {
pub fn values_slice(&self) -> &[T::Native] {
self.values_builder.as_slice()
}
+
+ /// Returns the current values buffer as a mutable slice
+ pub fn values_slice_mut(&mut self) -> &mut [T::Native] {
+ self.values_builder.as_slice_mut()
+ }
}
#[cfg(test)]
diff --git a/arrow-buffer/src/buffer/immutable.rs
b/arrow-buffer/src/buffer/immutable.rs
index 94bc98678..d5d7cd8ef 100644
--- a/arrow-buffer/src/buffer/immutable.rs
+++ b/arrow-buffer/src/buffer/immutable.rs
@@ -227,6 +227,25 @@ impl Buffer {
pub fn count_set_bits_offset(&self, offset: usize, len: usize) -> usize {
UnalignedBitChunk::new(self.as_slice(), offset, len).count_ones()
}
+
+ /// Returns `MutableBuffer` for mutating the buffer if this buffer is not
shared.
+ /// Returns `Err` if this is shared or its allocation is from an external
source.
+ pub fn into_mutable(self) -> Result<MutableBuffer, Self> {
+ let offset_ptr = self.as_ptr();
+ let offset = self.offset;
+ let length = self.length;
+ Arc::try_unwrap(self.data)
+ .and_then(|bytes| {
+ // The pointer of underlying buffer should not be offset.
+ assert_eq!(offset_ptr, bytes.ptr().as_ptr());
+ MutableBuffer::from_bytes(bytes).map_err(Arc::new)
+ })
+ .map_err(|bytes| Buffer {
+ data: bytes,
+ offset,
+ length,
+ })
+ }
}
/// Creating a `Buffer` instance by copying the memory from a `AsRef<[u8]>`
into a newly
diff --git a/arrow-buffer/src/buffer/mutable.rs
b/arrow-buffer/src/buffer/mutable.rs
index bd139466a..b70a74e84 100644
--- a/arrow-buffer/src/buffer/mutable.rs
+++ b/arrow-buffer/src/buffer/mutable.rs
@@ -23,6 +23,7 @@ use crate::{
native::{ArrowNativeType, ToByteSlice},
util::bit_util,
};
+use std::mem;
use std::ptr::NonNull;
/// A [`MutableBuffer`] is Arrow's interface to build a [`Buffer`] out of
items or slices of items.
@@ -92,6 +93,24 @@ impl MutableBuffer {
}
}
+ /// Allocates a new [MutableBuffer] from given `Bytes`.
+ pub(crate) fn from_bytes(bytes: Bytes) -> Result<Self, Bytes> {
+ if !matches!(bytes.deallocation(), Deallocation::Arrow(_)) {
+ return Err(bytes);
+ }
+
+ let len = bytes.len();
+ let capacity = bytes.capacity();
+ let ptr = bytes.ptr();
+ mem::forget(bytes);
+
+ Ok(Self {
+ data: ptr,
+ len,
+ capacity,
+ })
+ }
+
/// creates a new [MutableBuffer] with capacity and length capable of
holding `len` bits.
/// This is useful to create a buffer for packed bitmaps.
pub fn new_null(len: usize) -> Self {
diff --git a/arrow-buffer/src/bytes.rs b/arrow-buffer/src/bytes.rs
index 20bf5a474..fea04ad0d 100644
--- a/arrow-buffer/src/bytes.rs
+++ b/arrow-buffer/src/bytes.rs
@@ -99,6 +99,11 @@ impl Bytes {
Deallocation::Custom(_) => 0,
}
}
+
+ #[inline]
+ pub(crate) fn deallocation(&self) -> &Deallocation {
+ &self.deallocation
+ }
}
// Deallocation is Send + Sync, repeating the bound here makes that
refactoring safe