This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 60ec869779 Support StringViewArray interop with python: fix lingering
C Data Interface issues for *ViewArray (#6368)
60ec869779 is described below
commit 60ec869779ebcf44a4f757aff6eaa22a7fd5e4ed
Author: Andrew Duffy <[email protected]>
AuthorDate: Thu Sep 12 09:26:57 2024 -0400
Support StringViewArray interop with python: fix lingering C Data Interface
issues for *ViewArray (#6368)
* fix lingering C Data Interface issues for *ViewArray
Fixes https://github.com/apache/arrow-rs/issues/6366
* report views length in elements -> bytes
* use pyarrow 17
* use only good versions
* fix support for View arrays in C FFI, add test
* update comment in github action
* more ffi test cases
* more byte_view tests for into_pyarrow
---
.github/workflows/integration.yml | 7 +-
arrow-array/src/ffi.rs | 168 ++++++++++++++++++++++++++++++++---
arrow-buffer/src/buffer/immutable.rs | 7 +-
arrow-data/src/ffi.rs | 20 ++++-
arrow/src/pyarrow.rs | 2 +-
arrow/tests/pyarrow.rs | 69 +++++++++++++-
6 files changed, 247 insertions(+), 26 deletions(-)
diff --git a/.github/workflows/integration.yml
b/.github/workflows/integration.yml
index 1937fafe3a..41edc1bb19 100644
--- a/.github/workflows/integration.yml
+++ b/.github/workflows/integration.yml
@@ -48,7 +48,6 @@ on:
- arrow/**
jobs:
-
integration:
name: Archery test With other arrows
runs-on: ubuntu-latest
@@ -118,9 +117,9 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- rust: [ stable ]
- # PyArrow 13 was the last version prior to introduction to Arrow
PyCapsules
- pyarrow: [ "13", "14" ]
+ rust: [stable]
+ # PyArrow 15 was the first version to introduce StringView/BinaryView
support
+ pyarrow: ["15", "16", "17"]
steps:
- uses: actions/checkout@v4
with:
diff --git a/arrow-array/src/ffi.rs b/arrow-array/src/ffi.rs
index 1d76ed62d3..a28b3f7461 100644
--- a/arrow-array/src/ffi.rs
+++ b/arrow-array/src/ffi.rs
@@ -193,6 +193,13 @@ fn bit_width(data_type: &DataType, i: usize) ->
Result<usize> {
"The datatype \"{data_type:?}\" expects 3 buffers, but
requested {i}. Please verify that the C data interface is correctly
implemented."
)))
}
+ // Variable-sized views: have 3 or more buffers.
+ // Buffer 1 are the u128 views
+ // Buffers 2...N-1 are u8 byte buffers
+ (DataType::Utf8View, 1) | (DataType::BinaryView,1) => u128::BITS as _,
+ (DataType::Utf8View, _) | (DataType::BinaryView, _) => {
+ u8::BITS as _
+ }
// type ids. UnionArray doesn't have null bitmap so buffer index
begins with 0.
(DataType::Union(_, _), 0) => i8::BITS as _,
// Only DenseUnion has 2nd buffer
@@ -300,7 +307,7 @@ impl<'a> ImportedArrowArray<'a> {
};
let data_layout = layout(&self.data_type);
- let buffers = self.buffers(data_layout.can_contain_null_mask)?;
+ let buffers = self.buffers(data_layout.can_contain_null_mask,
data_layout.variadic)?;
let null_bit_buffer = if data_layout.can_contain_null_mask {
self.null_bit_buffer()
@@ -373,13 +380,30 @@ impl<'a> ImportedArrowArray<'a> {
/// returns all buffers, as organized by Rust (i.e. null buffer is skipped
if it's present
/// in the spec of the type)
- fn buffers(&self, can_contain_null_mask: bool) -> Result<Vec<Buffer>> {
+ fn buffers(&self, can_contain_null_mask: bool, variadic: bool) ->
Result<Vec<Buffer>> {
// + 1: skip null buffer
let buffer_begin = can_contain_null_mask as usize;
- (buffer_begin..self.array.num_buffers())
- .map(|index| {
- let len = self.buffer_len(index, &self.data_type)?;
+ let buffer_end = self.array.num_buffers() - usize::from(variadic);
+
+ let variadic_buffer_lens = if variadic {
+ // Each views array has 1 (optional) null buffer, 1 views buffer,
1 lengths buffer.
+ // Rest are variadic.
+ let num_variadic_buffers =
+ self.array.num_buffers() - (2 +
usize::from(can_contain_null_mask));
+ if num_variadic_buffers == 0 {
+ &[]
+ } else {
+ let lengths = self.array.buffer(self.array.num_buffers() - 1);
+ // SAFETY: is lengths is non-null, then it must be valid for
up to num_variadic_buffers.
+ unsafe { std::slice::from_raw_parts(lengths.cast::<i64>(),
num_variadic_buffers) }
+ }
+ } else {
+ &[]
+ };
+ (buffer_begin..buffer_end)
+ .map(|index| {
+ let len = self.buffer_len(index, variadic_buffer_lens,
&self.data_type)?;
match unsafe { create_buffer(self.owner.clone(), self.array,
index, len) } {
Some(buf) => Ok(buf),
None if len == 0 => {
@@ -399,7 +423,12 @@ impl<'a> ImportedArrowArray<'a> {
/// Rust implementation uses fixed-sized buffers, which require knowledge
of their `len`.
/// for variable-sized buffers, such as the second buffer of a
stringArray, we need
/// to fetch offset buffer's len to build the second buffer.
- fn buffer_len(&self, i: usize, dt: &DataType) -> Result<usize> {
+ fn buffer_len(
+ &self,
+ i: usize,
+ variadic_buffer_lengths: &[i64],
+ dt: &DataType,
+ ) -> Result<usize> {
// Special handling for dictionary type as we only care about the key
type in the case.
let data_type = match dt {
DataType::Dictionary(key_data_type, _) => key_data_type.as_ref(),
@@ -430,7 +459,7 @@ impl<'a> ImportedArrowArray<'a> {
}
// the len of the data buffer (buffer 2) equals the last value
of the offset buffer (buffer 1)
- let len = self.buffer_len(1, dt)?;
+ let len = self.buffer_len(1, variadic_buffer_lengths, dt)?;
// first buffer is the null buffer => add(1)
// we assume that pointer is aligned for `i32`, as Utf8 uses
`i32` offsets.
#[allow(clippy::cast_ptr_alignment)]
@@ -444,7 +473,7 @@ impl<'a> ImportedArrowArray<'a> {
}
// the len of the data buffer (buffer 2) equals the last value
of the offset buffer (buffer 1)
- let len = self.buffer_len(1, dt)?;
+ let len = self.buffer_len(1, variadic_buffer_lengths, dt)?;
// first buffer is the null buffer => add(1)
// we assume that pointer is aligned for `i64`, as Large uses
`i64` offsets.
#[allow(clippy::cast_ptr_alignment)]
@@ -452,6 +481,16 @@ impl<'a> ImportedArrowArray<'a> {
// get last offset
(unsafe { *offset_buffer.add(len / size_of::<i64>() - 1) }) as
usize
}
+ // View types: these have variadic buffers.
+ // Buffer 1 is the views buffer, which stores 1 u128 per length of
the array.
+ // Buffers 2..N-1 are the buffers holding the byte data. Their
lengths are variable.
+ // Buffer N is of length (N - 2) and stores i64 containing the
lengths of buffers 2..N-1
+ (DataType::Utf8View, 1) | (DataType::BinaryView, 1) => {
+ std::mem::size_of::<u128>() * length
+ }
+ (DataType::Utf8View, i) | (DataType::BinaryView, i) => {
+ variadic_buffer_lengths[i - 2] as usize
+ }
// buffer len of primitive types
_ => {
let bits = bit_width(data_type, i)?;
@@ -1229,18 +1268,18 @@ mod tests_from_ffi {
use arrow_data::ArrayData;
use arrow_schema::{DataType, Field};
- use crate::types::Int32Type;
+ use super::{ImportedArrowArray, Result};
+ use crate::builder::GenericByteViewBuilder;
+ use crate::types::{BinaryViewType, ByteViewType, Int32Type,
StringViewType};
use crate::{
array::{
Array, BooleanArray, DictionaryArray, FixedSizeBinaryArray,
FixedSizeListArray,
Int32Array, Int64Array, StringArray, StructArray, UInt32Array,
UInt64Array,
},
ffi::{from_ffi, FFI_ArrowArray, FFI_ArrowSchema},
- make_array, ArrayRef, ListArray,
+ make_array, ArrayRef, GenericByteViewArray, ListArray,
};
- use super::{ImportedArrowArray, Result};
-
fn test_round_trip(expected: &ArrayData) -> Result<()> {
// here we export the array
let array = FFI_ArrowArray::new(expected);
@@ -1453,8 +1492,8 @@ mod tests_from_ffi {
owner: &array,
};
- let offset_buf_len = imported_array.buffer_len(1,
&imported_array.data_type)?;
- let data_buf_len = imported_array.buffer_len(2,
&imported_array.data_type)?;
+ let offset_buf_len = imported_array.buffer_len(1, &[],
&imported_array.data_type)?;
+ let data_buf_len = imported_array.buffer_len(2, &[],
&imported_array.data_type)?;
assert_eq!(offset_buf_len, 4);
assert_eq!(data_buf_len, 0);
@@ -1472,6 +1511,18 @@ mod tests_from_ffi {
StringArray::from(array)
}
+ fn roundtrip_byte_view_array<T: ByteViewType>(
+ array: GenericByteViewArray<T>,
+ ) -> GenericByteViewArray<T> {
+ let data = array.into_data();
+
+ let array = FFI_ArrowArray::new(&data);
+ let schema = FFI_ArrowSchema::try_from(data.data_type()).unwrap();
+
+ let array = unsafe { from_ffi(array, &schema) }.unwrap();
+ GenericByteViewArray::<T>::from(array)
+ }
+
fn extend_array(array: &dyn Array) -> ArrayRef {
let len = array.len();
let data = array.to_data();
@@ -1551,4 +1602,93 @@ mod tests_from_ffi {
&imported
);
}
+
+ /// Helper trait to allow us to use easily strings as either
BinaryViewType::Native or
+ /// StringViewType::Native scalars.
+ trait NativeFromStr {
+ fn from_str(value: &str) -> &Self;
+ }
+
+ impl NativeFromStr for str {
+ fn from_str(value: &str) -> &Self {
+ value
+ }
+ }
+
+ impl NativeFromStr for [u8] {
+ fn from_str(value: &str) -> &Self {
+ value.as_bytes()
+ }
+ }
+
+ #[test]
+ fn test_round_trip_byte_view() {
+ fn test_case<T>()
+ where
+ T: ByteViewType,
+ T::Native: NativeFromStr,
+ {
+ macro_rules! run_test_case {
+ ($array:expr) => {{
+ // round-trip through C Data Interface
+ let len = $array.len();
+ let imported = roundtrip_byte_view_array($array);
+ assert_eq!(imported.len(), len);
+
+ let copied = extend_array(&imported);
+ assert_eq!(
+ copied
+ .as_any()
+ .downcast_ref::<GenericByteViewArray<T>>()
+ .unwrap(),
+ &imported
+ );
+ }};
+ }
+
+ // Empty test case.
+ let empty = GenericByteViewBuilder::<T>::new().finish();
+ run_test_case!(empty);
+
+ // All inlined strings test case.
+ let mut all_inlined = GenericByteViewBuilder::<T>::new();
+ all_inlined.append_value(T::Native::from_str("inlined1"));
+ all_inlined.append_value(T::Native::from_str("inlined2"));
+ all_inlined.append_value(T::Native::from_str("inlined3"));
+ let all_inlined = all_inlined.finish();
+ assert_eq!(all_inlined.data_buffers().len(), 0);
+ run_test_case!(all_inlined);
+
+ // some inlined + non-inlined, 1 variadic buffer.
+ let mixed_one_variadic = {
+ let mut builder = GenericByteViewBuilder::<T>::new();
+ builder.append_value(T::Native::from_str("inlined"));
+ let block_id =
+
builder.append_block(Buffer::from("non-inlined-string-buffer".as_bytes()));
+ builder.try_append_view(block_id, 0, 25).unwrap();
+ builder.finish()
+ };
+ assert_eq!(mixed_one_variadic.data_buffers().len(), 1);
+ run_test_case!(mixed_one_variadic);
+
+ // inlined + non-inlined, 2 variadic buffers.
+ let mixed_two_variadic = {
+ let mut builder = GenericByteViewBuilder::<T>::new();
+ builder.append_value(T::Native::from_str("inlined"));
+ let block_id =
+
builder.append_block(Buffer::from("non-inlined-string-buffer".as_bytes()));
+ builder.try_append_view(block_id, 0, 25).unwrap();
+
+ let block_id = builder
+
.append_block(Buffer::from("another-non-inlined-string-buffer".as_bytes()));
+ builder.try_append_view(block_id, 0, 33).unwrap();
+ builder.finish()
+ };
+ assert_eq!(mixed_two_variadic.data_buffers().len(), 2);
+ run_test_case!(mixed_two_variadic);
+ }
+
+ test_case::<StringViewType>();
+ test_case::<BinaryViewType>();
+ }
}
diff --git a/arrow-buffer/src/buffer/immutable.rs
b/arrow-buffer/src/buffer/immutable.rs
index 7cd3552215..fef2f8008b 100644
--- a/arrow-buffer/src/buffer/immutable.rs
+++ b/arrow-buffer/src/buffer/immutable.rs
@@ -203,7 +203,9 @@ impl Buffer {
pub fn advance(&mut self, offset: usize) {
assert!(
offset <= self.length,
- "the offset of the new Buffer cannot exceed the existing length"
+ "the offset of the new Buffer cannot exceed the existing length:
offset={} length={}",
+ offset,
+ self.length
);
self.length -= offset;
// Safety:
@@ -221,7 +223,8 @@ impl Buffer {
pub fn slice_with_length(&self, offset: usize, length: usize) -> Self {
assert!(
offset.saturating_add(length) <= self.length,
- "the offset of the new Buffer cannot exceed the existing length"
+ "the offset of the new Buffer cannot exceed the existing length:
slice offset={offset} length={length} selflen={}",
+ self.length
);
// Safety:
// offset + length <= self.length
diff --git a/arrow-data/src/ffi.rs b/arrow-data/src/ffi.rs
index 3345595fac..cd283d3266 100644
--- a/arrow-data/src/ffi.rs
+++ b/arrow-data/src/ffi.rs
@@ -20,7 +20,7 @@
use crate::bit_mask::set_bits;
use crate::{layout, ArrayData};
use arrow_buffer::buffer::NullBuffer;
-use arrow_buffer::{Buffer, MutableBuffer};
+use arrow_buffer::{Buffer, MutableBuffer, ScalarBuffer};
use arrow_schema::DataType;
use std::ffi::c_void;
@@ -121,7 +121,7 @@ impl FFI_ArrowArray {
pub fn new(data: &ArrayData) -> Self {
let data_layout = layout(data.data_type());
- let buffers = if data_layout.can_contain_null_mask {
+ let mut buffers = if data_layout.can_contain_null_mask {
// * insert the null buffer at the start
// * make all others `Option<Buffer>`.
std::iter::once(align_nulls(data.offset(), data.nulls()))
@@ -132,7 +132,7 @@ impl FFI_ArrowArray {
};
// `n_buffers` is the number of buffers by the spec.
- let n_buffers = {
+ let mut n_buffers = {
data_layout.buffers.len() + {
// If the layout has a null buffer by Arrow spec.
// Note that even the array doesn't have a null buffer because
it has
@@ -141,10 +141,22 @@ impl FFI_ArrowArray {
}
} as i64;
+ if data_layout.variadic {
+ // Save the lengths of all variadic buffers into a new buffer.
+ // The first buffer is `views`, and the rest are variadic.
+ let mut data_buffers_lengths = Vec::new();
+ for buffer in data.buffers().iter().skip(1) {
+ data_buffers_lengths.push(buffer.len() as i64);
+ n_buffers += 1;
+ }
+
+
buffers.push(Some(ScalarBuffer::from(data_buffers_lengths).into_inner()));
+ n_buffers += 1;
+ }
+
let buffers_ptr = buffers
.iter()
.flat_map(|maybe_buffer| match maybe_buffer {
- // note that `raw_data` takes into account the buffer's offset
Some(b) => Some(b.as_ptr() as *const c_void),
// This is for null buffer. We only put a null pointer for
// null buffer if by spec it can contain null mask.
diff --git a/arrow/src/pyarrow.rs b/arrow/src/pyarrow.rs
index 336398cbf2..a7b5937998 100644
--- a/arrow/src/pyarrow.rs
+++ b/arrow/src/pyarrow.rs
@@ -354,7 +354,7 @@ impl FromPyArrow for RecordBatch {
validate_pycapsule(array_capsule, "arrow_array")?;
let schema_ptr = unsafe {
schema_capsule.reference::<FFI_ArrowSchema>() };
- let ffi_array = unsafe {
FFI_ArrowArray::from_raw(array_capsule.pointer() as _) };
+ let ffi_array = unsafe {
FFI_ArrowArray::from_raw(array_capsule.pointer().cast()) };
let array_data = unsafe { ffi::from_ffi(ffi_array, schema_ptr)
}.map_err(to_py_err)?;
if !matches!(array_data.data_type(), DataType::Struct(_)) {
return Err(PyTypeError::new_err(
diff --git a/arrow/tests/pyarrow.rs b/arrow/tests/pyarrow.rs
index a1c365c317..d9ebd0daa1 100644
--- a/arrow/tests/pyarrow.rs
+++ b/arrow/tests/pyarrow.rs
@@ -18,6 +18,8 @@
use arrow::array::{ArrayRef, Int32Array, StringArray};
use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use arrow::record_batch::RecordBatch;
+use arrow_array::builder::{BinaryViewBuilder, StringViewBuilder};
+use arrow_array::{Array, BinaryViewArray, StringViewArray};
use pyo3::Python;
use std::sync::Arc;
@@ -27,7 +29,9 @@ fn test_to_pyarrow() {
let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b"]));
- let input = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap();
+ // The "very long string" will not be inlined, and force the creation of a
data buffer.
+ let c: ArrayRef = Arc::new(StringViewArray::from(vec!["short", "a very
long string"]));
+ let input = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c",
c)]).unwrap();
println!("input: {:?}", input);
let res = Python::with_gil(|py| {
@@ -40,3 +44,66 @@ fn test_to_pyarrow() {
assert_eq!(input, res);
}
+
+#[test]
+fn test_to_pyarrow_byte_view() {
+ pyo3::prepare_freethreaded_python();
+
+ for num_variadic_buffers in 0..=2 {
+ let string_view: ArrayRef =
Arc::new(string_view_column(num_variadic_buffers));
+ let binary_view: ArrayRef =
Arc::new(binary_view_column(num_variadic_buffers));
+
+ let input = RecordBatch::try_from_iter(vec![
+ ("string_view", string_view),
+ ("binary_view", binary_view),
+ ])
+ .unwrap();
+
+ println!("input: {:?}", input);
+ let res = Python::with_gil(|py| {
+ let py_input = input.to_pyarrow(py)?;
+ let records = RecordBatch::from_pyarrow_bound(py_input.bind(py))?;
+ let py_records = records.to_pyarrow(py)?;
+ RecordBatch::from_pyarrow_bound(py_records.bind(py))
+ })
+ .unwrap();
+
+ assert_eq!(input, res);
+ }
+}
+
+fn binary_view_column(num_variadic_buffers: usize) -> BinaryViewArray {
+ let long_scalar = b"but soft what light through yonder window
breaks".as_slice();
+ let mut builder =
BinaryViewBuilder::new().with_fixed_block_size(long_scalar.len() as u32);
+ // Make sure there is at least one non-inlined value.
+ builder.append_value("inlined".as_bytes());
+
+ for _ in 0..num_variadic_buffers {
+ builder.append_value(long_scalar);
+ }
+
+ let result = builder.finish();
+
+ assert_eq!(result.data_buffers().len(), num_variadic_buffers);
+ assert_eq!(result.len(), num_variadic_buffers + 1);
+
+ result
+}
+
+fn string_view_column(num_variadic_buffers: usize) -> StringViewArray {
+ let long_scalar = "but soft what light through yonder window breaks";
+ let mut builder =
StringViewBuilder::new().with_fixed_block_size(long_scalar.len() as u32);
+ // Make sure there is at least one non-inlined value.
+ builder.append_value("inlined");
+
+ for _ in 0..num_variadic_buffers {
+ builder.append_value(long_scalar);
+ }
+
+ let result = builder.finish();
+
+ assert_eq!(result.data_buffers().len(), num_variadic_buffers);
+ assert_eq!(result.len(), num_variadic_buffers + 1);
+
+ result
+}