This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new d440c244b Zero-copy Vec conversion (#3516) (#1176) (#3756)
d440c244b is described below
commit d440c244bde4d1e99afcc72ac5b2c049a62f9225
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed Mar 1 10:06:59 2023 +0000
Zero-copy Vec conversion (#3516) (#1176) (#3756)
* Zero-copy Vec conversion (#3516) (#1176)
* Fix doc
* More tests
* Review feedback
* More tests
---
arrow-array/src/array/list_array.rs | 2 +
arrow-buffer/src/alloc/mod.rs | 18 ++--
arrow-buffer/src/buffer/immutable.rs | 192 ++++++++++++++++++++++++++++++++++-
arrow-buffer/src/buffer/mutable.rs | 30 ++++--
arrow-buffer/src/buffer/scalar.rs | 9 ++
arrow-buffer/src/bytes.rs | 13 +--
6 files changed, 237 insertions(+), 27 deletions(-)
diff --git a/arrow-array/src/array/list_array.rs
b/arrow-array/src/array/list_array.rs
index 6b63269d1..178139f81 100644
--- a/arrow-array/src/array/list_array.rs
+++ b/arrow-array/src/array/list_array.rs
@@ -829,6 +829,7 @@ mod tests {
#[test]
#[should_panic(expected = "memory is not aligned")]
+ #[allow(deprecated)]
fn test_primitive_array_alignment() {
let ptr = arrow_buffer::alloc::allocate_aligned(8);
let buf = unsafe { Buffer::from_raw_parts(ptr, 8, 8) };
@@ -845,6 +846,7 @@ mod tests {
// Different error messages, so skip for now
// https://github.com/apache/arrow-rs/issues/1545
#[cfg(not(feature = "force_validate"))]
+ #[allow(deprecated)]
fn test_list_array_alignment() {
let ptr = arrow_buffer::alloc::allocate_aligned(8);
let buf = unsafe { Buffer::from_raw_parts(ptr, 8, 8) };
diff --git a/arrow-buffer/src/alloc/mod.rs b/arrow-buffer/src/alloc/mod.rs
index 1493d839f..7600a28d8 100644
--- a/arrow-buffer/src/alloc/mod.rs
+++ b/arrow-buffer/src/alloc/mod.rs
@@ -45,6 +45,7 @@ fn dangling_ptr() -> NonNull<u8> {
/// Allocates a cache-aligned memory region of `size` bytes with uninitialized
values.
/// This is more performant than using [allocate_aligned_zeroed] when all
bytes will have
/// an unknown or non-zero value and is semantically similar to `malloc`.
+#[deprecated(note = "Use Vec")]
pub fn allocate_aligned(size: usize) -> NonNull<u8> {
unsafe {
if size == 0 {
@@ -60,6 +61,7 @@ pub fn allocate_aligned(size: usize) -> NonNull<u8> {
/// Allocates a cache-aligned memory region of `size` bytes with `0` on all of
them.
/// This is more performant than using [allocate_aligned] and setting all
bytes to zero
/// and is semantically similar to `calloc`.
+#[deprecated(note = "Use Vec")]
pub fn allocate_aligned_zeroed(size: usize) -> NonNull<u8> {
unsafe {
if size == 0 {
@@ -80,6 +82,7 @@ pub fn allocate_aligned_zeroed(size: usize) -> NonNull<u8> {
/// * ptr must denote a block of memory currently allocated via this allocator,
///
/// * size must be the same size that was used to allocate that block of
memory,
+#[deprecated(note = "Use Vec")]
pub unsafe fn free_aligned(ptr: NonNull<u8>, size: usize) {
if size != 0 {
std::alloc::dealloc(
@@ -100,6 +103,8 @@ pub unsafe fn free_aligned(ptr: NonNull<u8>, size: usize) {
///
/// * new_size, when rounded up to the nearest multiple of [ALIGNMENT], must
not overflow (i.e.,
/// the rounded value must be less than usize::MAX).
+#[deprecated(note = "Use Vec")]
+#[allow(deprecated)]
pub unsafe fn reallocate(
ptr: NonNull<u8>,
old_size: usize,
@@ -132,19 +137,18 @@ impl<T: RefUnwindSafe + Send + Sync> Allocation for T {}
/// Mode of deallocating memory regions
pub(crate) enum Deallocation {
- /// An allocation of the given capacity that needs to be deallocated using
arrows's cache aligned allocator.
- /// See [allocate_aligned] and [free_aligned].
- Arrow(usize),
- /// An allocation from an external source like the FFI interface or a Rust
Vec.
- /// Deallocation will happen
+ /// An allocation using [`std::alloc`]
+ Standard(Layout),
+ /// An allocation from an external source like the FFI interface
+ /// Deallocation will happen on `Allocation::drop`
Custom(Arc<dyn Allocation>),
}
impl Debug for Deallocation {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
- Deallocation::Arrow(capacity) => {
- write!(f, "Deallocation::Arrow {{ capacity: {capacity} }}")
+ Deallocation::Standard(layout) => {
+ write!(f, "Deallocation::Standard {layout:?}")
}
Deallocation::Custom(_) => {
write!(f, "Deallocation::Custom {{ capacity: unknown }}")
diff --git a/arrow-buffer/src/buffer/immutable.rs
b/arrow-buffer/src/buffer/immutable.rs
index cbfba1e05..5f42035c9 100644
--- a/arrow-buffer/src/buffer/immutable.rs
+++ b/arrow-buffer/src/buffer/immutable.rs
@@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.
-use std::convert::AsRef;
+use std::alloc::Layout;
use std::fmt::Debug;
use std::iter::FromIterator;
use std::ptr::NonNull;
use std::sync::Arc;
-use crate::alloc::{Allocation, Deallocation};
+use crate::alloc::{Allocation, Deallocation, ALIGNMENT};
use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk};
use crate::{bytes::Bytes, native::ArrowNativeType};
@@ -42,6 +42,8 @@ pub struct Buffer {
ptr: *const u8,
/// Byte length of the buffer.
+ ///
+ /// Must be less than or equal to `data.len()`
length: usize,
}
@@ -69,6 +71,22 @@ impl Buffer {
}
}
+ /// Create a [`Buffer`] from the provided `Vec` without copying
+ #[inline]
+ pub fn from_vec<T: ArrowNativeType>(vec: Vec<T>) -> Self {
+ // Safety
+ // Vec::as_ptr guaranteed to not be null and ArrowNativeType are
trivially transmutable
+ let ptr = unsafe { NonNull::new_unchecked(vec.as_ptr() as _) };
+ let len = vec.len() * std::mem::size_of::<T>();
+ // Safety
+ // Vec guaranteed to have a valid layout matching that of
`Layout::array`
+ // This is based on `RawVec::current_memory`
+ let layout = unsafe {
Layout::array::<T>(vec.capacity()).unwrap_unchecked() };
+ std::mem::forget(vec);
+ let b = unsafe { Bytes::new(ptr, len, Deallocation::Standard(layout))
};
+ Self::from_bytes(b)
+ }
+
/// Initializes a [Buffer] from a slice of items.
pub fn from_slice_ref<U: ArrowNativeType, T: AsRef<[U]>>(items: T) -> Self
{
let slice = items.as_ref();
@@ -78,7 +96,7 @@ impl Buffer {
buffer.into()
}
- /// Creates a buffer from an existing memory region (must already be
byte-aligned), this
+ /// Creates a buffer from an existing aligned memory region (must already
be byte-aligned), this
/// `Buffer` will free this piece of memory when dropped.
///
/// # Arguments
@@ -91,9 +109,11 @@ impl Buffer {
///
/// This function is unsafe as there is no guarantee that the given
pointer is valid for `len`
/// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is
guaranteed.
+ #[deprecated(note = "Use From<Vec<T>>")]
pub unsafe fn from_raw_parts(ptr: NonNull<u8>, len: usize, capacity:
usize) -> Self {
assert!(len <= capacity);
- Buffer::build_with_arguments(ptr, len, Deallocation::Arrow(capacity))
+ let layout = Layout::from_size_align(capacity, ALIGNMENT).unwrap();
+ Buffer::build_with_arguments(ptr, len, Deallocation::Standard(layout))
}
/// Creates a buffer from an existing memory region. Ownership of the
memory is tracked via reference counting
@@ -253,7 +273,8 @@ impl Buffer {
}
/// Returns `MutableBuffer` for mutating the buffer if this buffer is not
shared.
- /// Returns `Err` if this is shared or its allocation is from an external
source.
+ /// Returns `Err` if this is shared or its allocation is from an external
source or
+ /// it is not allocated with alignment [`ALIGNMENT`]
pub fn into_mutable(self) -> Result<MutableBuffer, Self> {
let ptr = self.ptr;
let length = self.length;
@@ -269,6 +290,45 @@ impl Buffer {
length,
})
}
+
+ /// Returns `Vec` for mutating the buffer
+ ///
+ /// Returns `Err(self)` if this buffer does not have the same [`Layout`] as
+ /// the destination Vec or contains a non-zero offset
+ pub fn into_vec<T: ArrowNativeType>(self) -> Result<Vec<T>, Self> {
+ let layout = match self.data.deallocation() {
+ Deallocation::Standard(l) => l,
+ _ => return Err(self), // Custom allocation
+ };
+
+ if self.ptr != self.data.as_ptr() {
+ return Err(self); // Data is offset
+ }
+
+ let v_capacity = layout.size() / std::mem::size_of::<T>();
+ match Layout::array::<T>(v_capacity) {
+ Ok(expected) if layout == &expected => {}
+ _ => return Err(self), // Incorrect layout
+ }
+
+ let length = self.length;
+ let ptr = self.ptr;
+ let v_len = self.length / std::mem::size_of::<T>();
+
+ Arc::try_unwrap(self.data)
+ .map(|bytes| unsafe {
+ let ptr = bytes.ptr().as_ptr() as _;
+ std::mem::forget(bytes);
+ // Safety
+ // Verified that bytes layout matches that of Vec
+ Vec::from_raw_parts(ptr, v_len, v_capacity)
+ })
+ .map_err(|bytes| Buffer {
+ data: bytes,
+ ptr,
+ length,
+ })
+ }
}
/// Creating a `Buffer` instance by copying the memory from a `AsRef<[u8]>`
into a newly
@@ -378,6 +438,7 @@ impl<T: ArrowNativeType> FromIterator<T> for Buffer {
#[cfg(test)]
mod tests {
+ use crate::i256;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::thread;
@@ -632,4 +693,125 @@ mod tests {
let buffer = Buffer::from(MutableBuffer::from_len_zeroed(12));
buffer.slice_with_length(2, usize::MAX);
}
+
+ #[test]
+ fn test_vec_interop() {
+ // Test empty vec
+ let a: Vec<i128> = Vec::new();
+ let b = Buffer::from_vec(a);
+ b.into_vec::<i128>().unwrap();
+
+ // Test vec with capacity
+ let a: Vec<i128> = Vec::with_capacity(20);
+ let b = Buffer::from_vec(a);
+ let back = b.into_vec::<i128>().unwrap();
+ assert_eq!(back.len(), 0);
+ assert_eq!(back.capacity(), 20);
+
+ // Test vec with values
+ let mut a: Vec<i128> = Vec::with_capacity(3);
+ a.extend_from_slice(&[1, 2, 3]);
+ let b = Buffer::from_vec(a);
+ let back = b.into_vec::<i128>().unwrap();
+ assert_eq!(back.len(), 3);
+ assert_eq!(back.capacity(), 3);
+
+ // Test vec with values and spare capacity
+ let mut a: Vec<i128> = Vec::with_capacity(20);
+ a.extend_from_slice(&[1, 4, 7, 8, 9, 3, 6]);
+ let b = Buffer::from_vec(a);
+ let back = b.into_vec::<i128>().unwrap();
+ assert_eq!(back.len(), 7);
+ assert_eq!(back.capacity(), 20);
+
+ // Test incorrect alignment
+ let a: Vec<i128> = Vec::new();
+ let b = Buffer::from_vec(a);
+ let b = b.into_vec::<i32>().unwrap_err();
+ b.into_vec::<i8>().unwrap_err();
+
+ // Test convert between types with same alignment
+ // This is an implementation quirk, but isn't harmful
+ // as ArrowNativeType are trivially transmutable
+ let a: Vec<i64> = vec![1, 2, 3, 4];
+ let b = Buffer::from_vec(a);
+ let back = b.into_vec::<u64>().unwrap();
+ assert_eq!(back.len(), 4);
+ assert_eq!(back.capacity(), 4);
+
+ // i256 has the same layout as i128 so this is valid
+ let mut b: Vec<i128> = Vec::with_capacity(4);
+ b.extend_from_slice(&[1, 2, 3, 4]);
+ let b = Buffer::from_vec(b);
+ let back = b.into_vec::<i256>().unwrap();
+ assert_eq!(back.len(), 2);
+ assert_eq!(back.capacity(), 2);
+
+ // Invalid layout
+ let b: Vec<i128> = vec![1, 2, 3];
+ let b = Buffer::from_vec(b);
+ b.into_vec::<i256>().unwrap_err();
+
+ // Invalid layout
+ let mut b: Vec<i128> = Vec::with_capacity(5);
+ b.extend_from_slice(&[1, 2, 3, 4]);
+ let b = Buffer::from_vec(b);
+ b.into_vec::<i256>().unwrap_err();
+
+ // Truncates length
+ // This is an implementation quirk, but isn't harmful
+ let mut b: Vec<i128> = Vec::with_capacity(4);
+ b.extend_from_slice(&[1, 2, 3]);
+ let b = Buffer::from_vec(b);
+ let back = b.into_vec::<i256>().unwrap();
+ assert_eq!(back.len(), 1);
+ assert_eq!(back.capacity(), 2);
+
+ // Cannot use aligned allocation
+ let b = Buffer::from(MutableBuffer::new(10));
+ let b = b.into_vec::<u8>().unwrap_err();
+ b.into_vec::<u64>().unwrap_err();
+
+ // Test slicing
+ let mut a: Vec<i128> = Vec::with_capacity(20);
+ a.extend_from_slice(&[1, 4, 7, 8, 9, 3, 6]);
+ let b = Buffer::from_vec(a);
+ let slice = b.slice_with_length(0, 64);
+
+ // Shared reference fails
+ let slice = slice.into_vec::<i128>().unwrap_err();
+ drop(b);
+
+ // Succeeds as no outstanding shared reference
+ let back = slice.into_vec::<i128>().unwrap();
+ assert_eq!(&back, &[1, 4, 7, 8]);
+ assert_eq!(back.capacity(), 20);
+
+ // Slicing by non-multiple length truncates
+ let mut a: Vec<i128> = Vec::with_capacity(8);
+ a.extend_from_slice(&[1, 4, 7, 3]);
+
+ let b = Buffer::from_vec(a);
+ let slice = b.slice_with_length(0, 34);
+ drop(b);
+
+ let back = slice.into_vec::<i128>().unwrap();
+ assert_eq!(&back, &[1, 4]);
+ assert_eq!(back.capacity(), 8);
+
+ // Offset prevents conversion
+ let a: Vec<u32> = vec![1, 3, 4, 6];
+ let b = Buffer::from_vec(a).slice(2);
+ b.into_vec::<u32>().unwrap_err();
+
+ let b = MutableBuffer::new(16).into_buffer();
+ let b = b.into_vec::<u8>().unwrap_err(); // Invalid layout
+ let b = b.into_vec::<u32>().unwrap_err(); // Invalid layout
+ b.into_mutable().unwrap();
+
+ let b = Buffer::from_vec(vec![1_u32, 3, 5]);
+ let b = b.into_mutable().unwrap_err(); // Invalid layout
+ let b = b.into_vec::<u32>().unwrap();
+ assert_eq!(b, &[1, 3, 5]);
+ }
}
diff --git a/arrow-buffer/src/buffer/mutable.rs
b/arrow-buffer/src/buffer/mutable.rs
index 2e6e2f1d7..250ac9f31 100644
--- a/arrow-buffer/src/buffer/mutable.rs
+++ b/arrow-buffer/src/buffer/mutable.rs
@@ -16,23 +16,28 @@
// under the License.
use super::Buffer;
-use crate::alloc::Deallocation;
+use crate::alloc::{Deallocation, ALIGNMENT};
use crate::{
alloc,
bytes::Bytes,
native::{ArrowNativeType, ToByteSlice},
util::bit_util,
};
+use std::alloc::Layout;
use std::mem;
use std::ptr::NonNull;
/// A [`MutableBuffer`] is Arrow's interface to build a [`Buffer`] out of
items or slices of items.
+///
/// [`Buffer`]s created from [`MutableBuffer`] (via `into`) are guaranteed to
have its pointer aligned
/// along cache lines and in multiple of 64 bytes.
+///
/// Use [MutableBuffer::push] to insert an item,
[MutableBuffer::extend_from_slice]
/// to insert many items, and `into` to convert it to [`Buffer`].
///
-/// For a safe, strongly typed API consider using `arrow::array::BufferBuilder`
+/// For a safe, strongly typed API consider using `Vec`
+///
+/// Note: this may be deprecated in a future release
([#1176](https://github.com/apache/arrow-rs/issues/1176))
///
/// # Example
///
@@ -62,6 +67,7 @@ impl MutableBuffer {
/// Allocate a new [MutableBuffer] with initial capacity to be at least
`capacity`.
#[inline]
+ #[allow(deprecated)]
pub fn with_capacity(capacity: usize) -> Self {
let capacity = bit_util::round_upto_multiple_of_64(capacity);
let ptr = alloc::allocate_aligned(capacity);
@@ -83,6 +89,7 @@ impl MutableBuffer {
/// let data = buffer.as_slice_mut();
/// assert_eq!(data[126], 0u8);
/// ```
+ #[allow(deprecated)]
pub fn from_len_zeroed(len: usize) -> Self {
let new_capacity = bit_util::round_upto_multiple_of_64(len);
let ptr = alloc::allocate_aligned_zeroed(new_capacity);
@@ -95,12 +102,14 @@ impl MutableBuffer {
/// Allocates a new [MutableBuffer] from given `Bytes`.
pub(crate) fn from_bytes(bytes: Bytes) -> Result<Self, Bytes> {
- if !matches!(bytes.deallocation(), Deallocation::Arrow(_)) {
- return Err(bytes);
- }
+ let capacity = match bytes.deallocation() {
+ Deallocation::Standard(layout) if layout.align() == ALIGNMENT => {
+ layout.size()
+ }
+ _ => return Err(bytes),
+ };
let len = bytes.len();
- let capacity = bytes.capacity();
let ptr = bytes.ptr();
mem::forget(bytes);
@@ -224,6 +233,7 @@ impl MutableBuffer {
/// buffer.shrink_to_fit();
/// assert!(buffer.capacity() >= 64 && buffer.capacity() < 128);
/// ```
+ #[allow(deprecated)]
pub fn shrink_to_fit(&mut self) {
let new_capacity = bit_util::round_upto_multiple_of_64(self.len);
if new_capacity < self.capacity {
@@ -300,9 +310,9 @@ impl MutableBuffer {
#[inline]
pub(super) fn into_buffer(self) -> Buffer {
- let bytes = unsafe {
- Bytes::new(self.data, self.len, Deallocation::Arrow(self.capacity))
- };
+ let layout = Layout::from_size_align(self.capacity,
ALIGNMENT).unwrap();
+ let bytes =
+ unsafe { Bytes::new(self.data, self.len,
Deallocation::Standard(layout)) };
std::mem::forget(self);
Buffer::from_bytes(bytes)
}
@@ -448,6 +458,7 @@ impl MutableBuffer {
/// # Safety
/// `ptr` must be allocated for `old_capacity`.
#[cold]
+#[allow(deprecated)]
unsafe fn reallocate(
ptr: NonNull<u8>,
old_capacity: usize,
@@ -630,6 +641,7 @@ impl std::ops::DerefMut for MutableBuffer {
}
impl Drop for MutableBuffer {
+ #[allow(deprecated)]
fn drop(&mut self) {
unsafe { alloc::free_aligned(self.data, self.capacity) };
}
diff --git a/arrow-buffer/src/buffer/scalar.rs
b/arrow-buffer/src/buffer/scalar.rs
index e688e52fe..01a64633f 100644
--- a/arrow-buffer/src/buffer/scalar.rs
+++ b/arrow-buffer/src/buffer/scalar.rs
@@ -90,6 +90,15 @@ impl<T: ArrowNativeType> From<Buffer> for ScalarBuffer<T> {
}
}
+impl<T: ArrowNativeType> From<Vec<T>> for ScalarBuffer<T> {
+ fn from(value: Vec<T>) -> Self {
+ Self {
+ buffer: Buffer::from_vec(value),
+ phantom: Default::default(),
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/arrow-buffer/src/bytes.rs b/arrow-buffer/src/bytes.rs
index 3320dfc26..2820fda78 100644
--- a/arrow-buffer/src/bytes.rs
+++ b/arrow-buffer/src/bytes.rs
@@ -23,10 +23,10 @@ use core::slice;
use std::ptr::NonNull;
use std::{fmt::Debug, fmt::Formatter};
-use crate::alloc;
use crate::alloc::Deallocation;
/// A continuous, fixed-size, immutable memory region that knows how to
de-allocate itself.
+///
/// This structs' API is inspired by the `bytes::Bytes`, but it is not limited
to using rust's
/// global allocator nor u8 alignment.
///
@@ -53,7 +53,7 @@ impl Bytes {
///
/// * `ptr` - Pointer to raw parts
/// * `len` - Length of raw parts in **bytes**
- /// * `capacity` - Total allocated memory for the pointer `ptr`, in
**bytes**
+ /// * `deallocation` - Type of allocation
///
/// # Safety
///
@@ -93,7 +93,7 @@ impl Bytes {
pub fn capacity(&self) -> usize {
match self.deallocation {
- Deallocation::Arrow(capacity) => capacity,
+ Deallocation::Standard(layout) => layout.size(),
// we cannot determine this in general,
// and thus we state that this is externally-owned memory
Deallocation::Custom(_) => 0,
@@ -115,9 +115,10 @@ impl Drop for Bytes {
#[inline]
fn drop(&mut self) {
match &self.deallocation {
- Deallocation::Arrow(capacity) => {
- unsafe { alloc::free_aligned(self.ptr, *capacity) };
- }
+ Deallocation::Standard(layout) => match layout.size() {
+ 0 => {} // Nothing to do
+ _ => unsafe { std::alloc::dealloc(self.ptr.as_ptr(), *layout)
},
+ },
// The automatic drop implementation will free the memory once the
reference count reaches zero
Deallocation::Custom(_allocation) => (),
}