This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bce1044f Add COW conversion for Buffer and PrimitiveArray and 
unary_mut (#3115)
5bce1044f is described below

commit 5bce1044f6ae3d64117b2f692a427af7e9d06029
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Thu Nov 17 12:44:17 2022 -0800

    Add COW conversion for Buffer and PrimitiveArray and unary_mut (#3115)
    
    * Add some APIs for copy-on-write support
    
    * Update
    
    * Add unary_mut as an example
    
    * For review
    
    * For review
    
    * For review
    
    * Fix test and more for review
    
    * Add test on sliced array
    
    * Address an overlooking review.
    
    * For review
---
 arrow-array/src/array/mod.rs                      |  12 ++
 arrow-array/src/array/primitive_array.rs          | 161 +++++++++++++++++++++-
 arrow-array/src/builder/boolean_buffer_builder.rs |   5 +
 arrow-array/src/builder/buffer_builder.rs         |   9 ++
 arrow-array/src/builder/null_buffer_builder.rs    |  25 +++-
 arrow-array/src/builder/primitive_builder.rs      |  24 ++++
 arrow-buffer/src/buffer/immutable.rs              |  19 +++
 arrow-buffer/src/buffer/mutable.rs                |  19 +++
 arrow-buffer/src/bytes.rs                         |   5 +
 9 files changed, 277 insertions(+), 2 deletions(-)

diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs
index 41aa438c9..307753a71 100644
--- a/arrow-array/src/array/mod.rs
+++ b/arrow-array/src/array/mod.rs
@@ -862,6 +862,7 @@ where
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::cast::downcast_array;
     use arrow_schema::Field;
 
     #[test]
@@ -1113,4 +1114,15 @@ mod tests {
         assert!(compute_my_thing(&arr));
         assert!(compute_my_thing(arr.as_ref()));
     }
+
+    #[test]
+    fn test_downcast_array() {
+        let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();
+
+        let boxed: ArrayRef = Arc::new(array);
+        let array: Int32Array = downcast_array(&boxed);
+
+        let expected: Int32Array = vec![1, 2, 
3].into_iter().map(Some).collect();
+        assert_eq!(array, expected);
+    }
 }
diff --git a/arrow-array/src/array/primitive_array.rs 
b/arrow-array/src/array/primitive_array.rs
index 7cf7de721..195e2dc19 100644
--- a/arrow-array/src/array/primitive_array.rs
+++ b/arrow-array/src/array/primitive_array.rs
@@ -397,6 +397,36 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
         unsafe { build_primitive_array(len, buffer, null_count, null_buffer) }
     }
 
+    /// Applies an unary and infallible function to a mutable primitive array.
+    /// Mutable primitive array means that the buffer is not shared with other 
arrays.
+    /// As a result, this mutates the buffer directly without allocating new 
buffer.
+    ///
+    /// # Implementation
+    ///
+    /// This will apply the function for all values, including those on null 
slots.
+    /// This implies that the operation must be infallible for any value of 
the corresponding type
+    /// or this function may panic.
+    /// # Example
+    /// ```rust
+    /// # use arrow_array::{Int32Array, types::Int32Type};
+    /// # fn main() {
+    /// let array = Int32Array::from(vec![Some(5), Some(7), None]);
+    /// let c = array.unary_mut(|x| x * 2 + 1).unwrap();
+    /// assert_eq!(c, Int32Array::from(vec![Some(11), Some(15), None]));
+    /// # }
+    /// ```
+    pub fn unary_mut<F>(self, op: F) -> Result<PrimitiveArray<T>, 
PrimitiveArray<T>>
+    where
+        F: Fn(T::Native) -> T::Native,
+    {
+        let mut builder = self.into_builder()?;
+        builder
+            .values_slice_mut()
+            .iter_mut()
+            .for_each(|v| *v = op(*v));
+        Ok(builder.finish())
+    }
+
     /// Applies a unary and fallible function to all valid values in a 
primitive array
     ///
     /// This is unlike [`Self::unary`] which will apply an infallible function 
to all rows
@@ -489,6 +519,66 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
             )
         }
     }
+
+    /// Returns `PrimitiveBuilder` of this primitive array for mutating its 
values if the underlying
+    /// data buffer is not shared by others.
+    pub fn into_builder(self) -> Result<PrimitiveBuilder<T>, Self> {
+        let len = self.len();
+        let null_bit_buffer = self
+            .data
+            .null_buffer()
+            .map(|b| b.bit_slice(self.data.offset(), len));
+
+        let element_len = std::mem::size_of::<T::Native>();
+        let buffer = self.data.buffers()[0]
+            .slice_with_length(self.data.offset() * element_len, len * 
element_len);
+
+        drop(self.data);
+
+        let try_mutable_null_buffer = match null_bit_buffer {
+            None => Ok(None),
+            Some(null_buffer) => {
+                // Null buffer exists, tries to make it mutable
+                null_buffer.into_mutable().map(Some)
+            }
+        };
+
+        let try_mutable_buffers = match try_mutable_null_buffer {
+            Ok(mutable_null_buffer) => {
+                // Got mutable null buffer, tries to get mutable value buffer
+                let try_mutable_buffer = buffer.into_mutable();
+
+                // try_mutable_buffer.map(...).map_err(...) doesn't work as 
the compiler complains
+                // mutable_null_buffer is moved into map closure.
+                match try_mutable_buffer {
+                    Ok(mutable_buffer) => 
Ok(PrimitiveBuilder::<T>::new_from_buffer(
+                        mutable_buffer,
+                        mutable_null_buffer,
+                    )),
+                    Err(buffer) => Err((buffer, mutable_null_buffer.map(|b| 
b.into()))),
+                }
+            }
+            Err(mutable_null_buffer) => {
+                // Unable to get mutable null buffer
+                Err((buffer, Some(mutable_null_buffer)))
+            }
+        };
+
+        match try_mutable_buffers {
+            Ok(builder) => Ok(builder),
+            Err((buffer, null_bit_buffer)) => {
+                let builder = ArrayData::builder(T::DATA_TYPE)
+                    .len(len)
+                    .add_buffer(buffer)
+                    .null_bit_buffer(null_bit_buffer);
+
+                let array_data = unsafe { builder.build_unchecked() };
+                let array = PrimitiveArray::<T>::from(array_data);
+
+                Err(array)
+            }
+        }
+    }
 }
 
 #[inline]
@@ -1036,7 +1126,9 @@ impl<T: DecimalType + ArrowPrimitiveType> 
PrimitiveArray<T> {
 mod tests {
     use super::*;
     use crate::builder::{Decimal128Builder, Decimal256Builder};
-    use crate::BooleanArray;
+    use crate::cast::downcast_array;
+    use crate::{ArrayRef, BooleanArray};
+    use std::sync::Arc;
 
     #[test]
     fn test_primitive_array_from_vec() {
@@ -1939,4 +2031,71 @@ mod tests {
 
         array.value(4);
     }
+
+    #[test]
+    fn test_into_builder() {
+        let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();
+
+        let boxed: ArrayRef = Arc::new(array);
+        let col: Int32Array = downcast_array(&boxed);
+        drop(boxed);
+
+        let mut builder = col.into_builder().unwrap();
+
+        let slice = builder.values_slice_mut();
+        assert_eq!(slice, &[1, 2, 3]);
+
+        slice[0] = 4;
+        slice[1] = 2;
+        slice[2] = 1;
+
+        let expected: Int32Array = vec![Some(4), Some(2), 
Some(1)].into_iter().collect();
+
+        let new_array = builder.finish();
+        assert_eq!(expected, new_array);
+    }
+
+    #[test]
+    fn test_into_builder_cloned_array() {
+        let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();
+
+        let boxed: ArrayRef = Arc::new(array);
+
+        let col: Int32Array = 
PrimitiveArray::<Int32Type>::from(boxed.data().clone());
+        let err = col.into_builder();
+
+        match err {
+            Ok(_) => panic!("Should not get builder from cloned array"),
+            Err(returned) => {
+                let expected: Int32Array = vec![1, 2, 
3].into_iter().map(Some).collect();
+                assert_eq!(expected, returned)
+            }
+        }
+    }
+
+    #[test]
+    fn test_into_builder_on_sliced_array() {
+        let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();
+        let slice = array.slice(1, 2);
+        let col: Int32Array = downcast_array(&slice);
+
+        drop(slice);
+
+        col.into_builder()
+            .expect_err("Should not build builder from sliced array");
+    }
+
+    #[test]
+    fn test_unary_mut() {
+        let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();
+
+        let c = array.unary_mut(|x| x * 2 + 1).unwrap();
+        let expected: Int32Array = vec![3, 5, 
7].into_iter().map(Some).collect();
+
+        assert_eq!(expected, c);
+
+        let array: Int32Array = Int32Array::from(vec![Some(5), Some(7), None]);
+        let c = array.unary_mut(|x| x * 2 + 1).unwrap();
+        assert_eq!(c, Int32Array::from(vec![Some(11), Some(15), None]));
+    }
 }
diff --git a/arrow-array/src/builder/boolean_buffer_builder.rs 
b/arrow-array/src/builder/boolean_buffer_builder.rs
index 16c6750d1..2ab01ccfe 100644
--- a/arrow-array/src/builder/boolean_buffer_builder.rs
+++ b/arrow-array/src/builder/boolean_buffer_builder.rs
@@ -33,6 +33,11 @@ impl BooleanBufferBuilder {
         Self { buffer, len: 0 }
     }
 
+    pub fn new_from_buffer(buffer: MutableBuffer, len: usize) -> Self {
+        assert!(len <= buffer.len() * 8);
+        Self { buffer, len }
+    }
+
     #[inline]
     pub fn len(&self) -> usize {
         self.len
diff --git a/arrow-array/src/builder/buffer_builder.rs 
b/arrow-array/src/builder/buffer_builder.rs
index 2da11cb23..d3146366d 100644
--- a/arrow-array/src/builder/buffer_builder.rs
+++ b/arrow-array/src/builder/buffer_builder.rs
@@ -124,6 +124,15 @@ impl<T: ArrowNativeType> BufferBuilder<T> {
         }
     }
 
+    pub fn new_from_buffer(buffer: MutableBuffer) -> Self {
+        let buffer_len = buffer.len();
+        Self {
+            buffer,
+            len: buffer_len / std::mem::size_of::<T>(),
+            _marker: PhantomData,
+        }
+    }
+
     /// Returns the current number of array elements in the internal buffer.
     ///
     /// # Example:
diff --git a/arrow-array/src/builder/null_buffer_builder.rs 
b/arrow-array/src/builder/null_buffer_builder.rs
index b2aa622ca..fef7214d5 100644
--- a/arrow-array/src/builder/null_buffer_builder.rs
+++ b/arrow-array/src/builder/null_buffer_builder.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use crate::builder::BooleanBufferBuilder;
-use arrow_buffer::Buffer;
+use arrow_buffer::{Buffer, MutableBuffer};
 
 /// Builder for creating the null bit buffer.
 /// This builder only materializes the buffer when we append `false`.
@@ -42,6 +42,29 @@ impl NullBufferBuilder {
         }
     }
 
+    /// Creates a new builder with given length.
+    pub fn new_with_len(len: usize) -> Self {
+        Self {
+            bitmap_builder: None,
+            len,
+            capacity: len,
+        }
+    }
+
+    /// Creates a new builder from a `MutableBuffer`.
+    pub fn new_from_buffer(buffer: MutableBuffer, len: usize) -> Self {
+        let capacity = buffer.len() * 8;
+
+        assert!(len < capacity);
+
+        let bitmap_builder = 
Some(BooleanBufferBuilder::new_from_buffer(buffer, len));
+        Self {
+            bitmap_builder,
+            len,
+            capacity,
+        }
+    }
+
     /// Appends `n` `true`s into the builder
     /// to indicate that these `n` items are not nulls.
     #[inline]
diff --git a/arrow-array/src/builder/primitive_builder.rs 
b/arrow-array/src/builder/primitive_builder.rs
index ed3594c60..55d8bac01 100644
--- a/arrow-array/src/builder/primitive_builder.rs
+++ b/arrow-array/src/builder/primitive_builder.rs
@@ -19,6 +19,7 @@ use crate::builder::null_buffer_builder::NullBufferBuilder;
 use crate::builder::{ArrayBuilder, BufferBuilder};
 use crate::types::*;
 use crate::{ArrayRef, ArrowPrimitiveType, PrimitiveArray};
+use arrow_buffer::MutableBuffer;
 use arrow_data::ArrayData;
 use std::any::Any;
 use std::sync::Arc;
@@ -114,6 +115,24 @@ impl<T: ArrowPrimitiveType> PrimitiveBuilder<T> {
         }
     }
 
+    pub fn new_from_buffer(
+        values_buffer: MutableBuffer,
+        null_buffer: Option<MutableBuffer>,
+    ) -> Self {
+        let values_builder = 
BufferBuilder::<T::Native>::new_from_buffer(values_buffer);
+
+        let null_buffer_builder = null_buffer
+            .map(|buffer| {
+                NullBufferBuilder::new_from_buffer(buffer, 
values_builder.len())
+            })
+            .unwrap_or_else(|| 
NullBufferBuilder::new_with_len(values_builder.len()));
+
+        Self {
+            values_builder,
+            null_buffer_builder,
+        }
+    }
+
     /// Returns the capacity of this builder measured in slots of type `T`
     pub fn capacity(&self) -> usize {
         self.values_builder.capacity()
@@ -204,6 +223,11 @@ impl<T: ArrowPrimitiveType> PrimitiveBuilder<T> {
     pub fn values_slice(&self) -> &[T::Native] {
         self.values_builder.as_slice()
     }
+
+    /// Returns the current values buffer as a mutable slice
+    pub fn values_slice_mut(&mut self) -> &mut [T::Native] {
+        self.values_builder.as_slice_mut()
+    }
 }
 
 #[cfg(test)]
diff --git a/arrow-buffer/src/buffer/immutable.rs 
b/arrow-buffer/src/buffer/immutable.rs
index 94bc98678..d5d7cd8ef 100644
--- a/arrow-buffer/src/buffer/immutable.rs
+++ b/arrow-buffer/src/buffer/immutable.rs
@@ -227,6 +227,25 @@ impl Buffer {
     pub fn count_set_bits_offset(&self, offset: usize, len: usize) -> usize {
         UnalignedBitChunk::new(self.as_slice(), offset, len).count_ones()
     }
+
+    /// Returns `MutableBuffer` for mutating the buffer if this buffer is not 
shared.
+    /// Returns `Err` if this is shared or its allocation is from an external 
source.
+    pub fn into_mutable(self) -> Result<MutableBuffer, Self> {
+        let offset_ptr = self.as_ptr();
+        let offset = self.offset;
+        let length = self.length;
+        Arc::try_unwrap(self.data)
+            .and_then(|bytes| {
+                // The pointer of underlying buffer should not be offset.
+                assert_eq!(offset_ptr, bytes.ptr().as_ptr());
+                MutableBuffer::from_bytes(bytes).map_err(Arc::new)
+            })
+            .map_err(|bytes| Buffer {
+                data: bytes,
+                offset,
+                length,
+            })
+    }
 }
 
 /// Creating a `Buffer` instance by copying the memory from a `AsRef<[u8]>` 
into a newly
diff --git a/arrow-buffer/src/buffer/mutable.rs 
b/arrow-buffer/src/buffer/mutable.rs
index bd139466a..b70a74e84 100644
--- a/arrow-buffer/src/buffer/mutable.rs
+++ b/arrow-buffer/src/buffer/mutable.rs
@@ -23,6 +23,7 @@ use crate::{
     native::{ArrowNativeType, ToByteSlice},
     util::bit_util,
 };
+use std::mem;
 use std::ptr::NonNull;
 
 /// A [`MutableBuffer`] is Arrow's interface to build a [`Buffer`] out of 
items or slices of items.
@@ -92,6 +93,24 @@ impl MutableBuffer {
         }
     }
 
+    /// Allocates a new [MutableBuffer] from given `Bytes`.
+    pub(crate) fn from_bytes(bytes: Bytes) -> Result<Self, Bytes> {
+        if !matches!(bytes.deallocation(), Deallocation::Arrow(_)) {
+            return Err(bytes);
+        }
+
+        let len = bytes.len();
+        let capacity = bytes.capacity();
+        let ptr = bytes.ptr();
+        mem::forget(bytes);
+
+        Ok(Self {
+            data: ptr,
+            len,
+            capacity,
+        })
+    }
+
     /// creates a new [MutableBuffer] with capacity and length capable of 
holding `len` bits.
     /// This is useful to create a buffer for packed bitmaps.
     pub fn new_null(len: usize) -> Self {
diff --git a/arrow-buffer/src/bytes.rs b/arrow-buffer/src/bytes.rs
index 20bf5a474..fea04ad0d 100644
--- a/arrow-buffer/src/bytes.rs
+++ b/arrow-buffer/src/bytes.rs
@@ -99,6 +99,11 @@ impl Bytes {
             Deallocation::Custom(_) => 0,
         }
     }
+
+    #[inline]
+    pub(crate) fn deallocation(&self) -> &Deallocation {
+        &self.deallocation
+    }
 }
 
 // Deallocation is Send + Sync, repeating the bound here makes that 
refactoring safe

Reply via email to