This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new e919e992a feat: enable metadata import/export through C data interface
(#3944)
e919e992a is described below
commit e919e992a1154dc44dac43c8623be68b05fe6ca1
Author: Will Jones <[email protected]>
AuthorDate: Tue Mar 28 22:43:43 2023 -0700
feat: enable metadata import/export through C data interface (#3944)
* feat: enable metadata export through C data interface
* chore: clippy warnings
* Update arrow-schema/src/ffi.rs
Co-authored-by: Raphael Taylor-Davies
<[email protected]>
* make parsing more defensive.
* use IntoIterator
* handle integer overflow
---------
Co-authored-by: Raphael Taylor-Davies
<[email protected]>
---
.../tests/test_sql.py | 6 +
arrow-schema/src/ffi.rs | 169 ++++++++++++++++++++-
arrow/src/ffi.rs | 27 +++-
3 files changed, 195 insertions(+), 7 deletions(-)
diff --git a/arrow-pyarrow-integration-testing/tests/test_sql.py
b/arrow-pyarrow-integration-testing/tests/test_sql.py
index 98564408d..f631f67cb 100644
--- a/arrow-pyarrow-integration-testing/tests/test_sql.py
+++ b/arrow-pyarrow-integration-testing/tests/test_sql.py
@@ -138,6 +138,12 @@ def test_field_roundtrip(pyarrow_type):
field = rust.round_trip_field(pyarrow_field)
assert field == pyarrow_field
+def test_field_metadata_roundtrip():
+ metadata = {"hello": "World! 😊", "x": "2"}
+ pyarrow_field = pa.field("test", pa.int32(), metadata=metadata)
+ field = rust.round_trip_field(pyarrow_field)
+ assert field == pyarrow_field
+ assert field.metadata == pyarrow_field.metadata
def test_schema_roundtrip():
pyarrow_fields = zip(string.ascii_lowercase, _supported_pyarrow_types)
diff --git a/arrow-schema/src/ffi.rs b/arrow-schema/src/ffi.rs
index 8e58e3158..058febbdd 100644
--- a/arrow-schema/src/ffi.rs
+++ b/arrow-schema/src/ffi.rs
@@ -36,7 +36,10 @@
use crate::{ArrowError, DataType, Field, Schema, TimeUnit, UnionMode};
use bitflags::bitflags;
-use std::ffi::{c_char, c_void, CStr, CString};
+use std::{
+ collections::HashMap,
+ ffi::{c_char, c_void, CStr, CString},
+};
bitflags! {
pub struct Flags: i64 {
@@ -74,6 +77,7 @@ pub struct FFI_ArrowSchema {
struct SchemaPrivateData {
children: Box<[*mut FFI_ArrowSchema]>,
dictionary: *mut FFI_ArrowSchema,
+ metadata: Option<Vec<u8>>,
}
// callback used to drop [FFI_ArrowSchema] when it is exported.
@@ -130,6 +134,7 @@ impl FFI_ArrowSchema {
let mut private_data = Box::new(SchemaPrivateData {
children: children_ptr,
dictionary: dictionary_ptr,
+ metadata: None,
});
// intentionally set from private_data (see
https://github.com/apache/arrow-rs/issues/580)
@@ -152,6 +157,63 @@ impl FFI_ArrowSchema {
Ok(self)
}
+ pub fn with_metadata<I, S>(mut self, metadata: I) -> Result<Self,
ArrowError>
+ where
+ I: IntoIterator<Item = (S, S)>,
+ S: AsRef<str>,
+ {
+ let metadata: Vec<(S, S)> = metadata.into_iter().collect();
+ //
https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.metadata
+ let new_metadata = if !metadata.is_empty() {
+ let mut metadata_serialized: Vec<u8> = Vec::new();
+ let num_entries: i32 = metadata.len().try_into().map_err(|_| {
+ ArrowError::CDataInterface(format!(
+ "metadata can only have {} entries, but {} were provided",
+ i32::MAX,
+ metadata.len()
+ ))
+ })?;
+ metadata_serialized.extend(num_entries.to_ne_bytes());
+
+ for (key, value) in metadata.into_iter() {
+ let key_len: i32 = key.as_ref().len().try_into().map_err(|_| {
+ ArrowError::CDataInterface(format!(
+ "metadata key can only have {} bytes, but {} were
provided",
+ i32::MAX,
+ key.as_ref().len()
+ ))
+ })?;
+ let value_len: i32 =
value.as_ref().len().try_into().map_err(|_| {
+ ArrowError::CDataInterface(format!(
+ "metadata value can only have {} bytes, but {} were
provided",
+ i32::MAX,
+ value.as_ref().len()
+ ))
+ })?;
+
+ metadata_serialized.extend(key_len.to_ne_bytes());
+ metadata_serialized.extend_from_slice(key.as_ref().as_bytes());
+ metadata_serialized.extend(value_len.to_ne_bytes());
+
metadata_serialized.extend_from_slice(value.as_ref().as_bytes());
+ }
+
+ self.metadata = metadata_serialized.as_ptr() as *const c_char;
+ Some(metadata_serialized)
+ } else {
+ self.metadata = std::ptr::null_mut();
+ None
+ };
+
+ unsafe {
+ let mut private_data =
+ Box::from_raw(self.private_data as *mut SchemaPrivateData);
+ private_data.metadata = new_metadata;
+ self.private_data = Box::into_raw(private_data) as *mut c_void;
+ }
+
+ Ok(self)
+ }
+
pub fn empty() -> Self {
Self {
format: std::ptr::null_mut(),
@@ -212,6 +274,71 @@ impl FFI_ArrowSchema {
pub fn dictionary_ordered(&self) -> bool {
self.flags & 0b00000001 != 0
}
+
+ pub fn metadata(&self) -> Result<HashMap<String, String>, ArrowError> {
+ if self.metadata.is_null() {
+ Ok(HashMap::new())
+ } else {
+ let mut pos = 0;
+ let buffer: *const u8 = self.metadata as *const u8;
+
+ fn next_four_bytes(buffer: *const u8, pos: &mut isize) -> [u8; 4] {
+ let out = unsafe {
+ [
+ *buffer.offset(*pos),
+ *buffer.offset(*pos + 1),
+ *buffer.offset(*pos + 2),
+ *buffer.offset(*pos + 3),
+ ]
+ };
+ *pos += 4;
+ out
+ }
+
+ fn next_n_bytes(buffer: *const u8, pos: &mut isize, n: i32) ->
&[u8] {
+ let out = unsafe {
+ std::slice::from_raw_parts(buffer.offset(*pos),
n.try_into().unwrap())
+ };
+ *pos += isize::try_from(n).unwrap();
+ out
+ }
+
+ let num_entries = i32::from_ne_bytes(next_four_bytes(buffer, &mut
pos));
+ if num_entries < 0 {
+ return Err(ArrowError::CDataInterface(
+ "Negative number of metadata entries".to_string(),
+ ));
+ }
+
+ let mut metadata = HashMap::with_capacity(
+ num_entries.try_into().expect("Too many metadata entries"),
+ );
+
+ for _ in 0..num_entries {
+ let key_length = i32::from_ne_bytes(next_four_bytes(buffer,
&mut pos));
+ if key_length < 0 {
+ return Err(ArrowError::CDataInterface(
+ "Negative key length in metadata".to_string(),
+ ));
+ }
+ let key = String::from_utf8(
+ next_n_bytes(buffer, &mut pos, key_length).to_vec(),
+ )?;
+ let value_length = i32::from_ne_bytes(next_four_bytes(buffer,
&mut pos));
+ if value_length < 0 {
+ return Err(ArrowError::CDataInterface(
+ "Negative value length in metadata".to_string(),
+ ));
+ }
+ let value = String::from_utf8(
+ next_n_bytes(buffer, &mut pos, value_length).to_vec(),
+ )?;
+ metadata.insert(key, value);
+ }
+
+ Ok(metadata)
+ }
+ }
}
impl Drop for FFI_ArrowSchema {
@@ -421,7 +548,8 @@ impl TryFrom<&FFI_ArrowSchema> for Field {
fn try_from(c_schema: &FFI_ArrowSchema) -> Result<Self, ArrowError> {
let dtype = DataType::try_from(c_schema)?;
- let field = Field::new(c_schema.name(), dtype, c_schema.nullable());
+ let mut field = Field::new(c_schema.name(), dtype,
c_schema.nullable());
+ field.set_metadata(c_schema.metadata()?);
Ok(field)
}
}
@@ -433,7 +561,7 @@ impl TryFrom<&FFI_ArrowSchema> for Schema {
// interpret it as a struct type then extract its fields
let dtype = DataType::try_from(c_schema)?;
if let DataType::Struct(fields) = dtype {
- Ok(Schema::new(fields))
+ Ok(Schema::new(fields).with_metadata(c_schema.metadata()?))
} else {
Err(ArrowError::CDataInterface(
"Unable to interpret C data struct as a Schema".to_string(),
@@ -558,7 +686,8 @@ impl TryFrom<&Field> for FFI_ArrowSchema {
FFI_ArrowSchema::try_from(field.data_type())?
.with_name(field.name())?
- .with_flags(flags)
+ .with_flags(flags)?
+ .with_metadata(field.metadata())
}
}
@@ -567,7 +696,8 @@ impl TryFrom<&Schema> for FFI_ArrowSchema {
fn try_from(schema: &Schema) -> Result<Self, ArrowError> {
let dtype = DataType::Struct(schema.fields().clone());
- let c_schema = FFI_ArrowSchema::try_from(&dtype)?;
+ let c_schema =
+
FFI_ArrowSchema::try_from(&dtype)?.with_metadata(&schema.metadata)?;
Ok(c_schema)
}
}
@@ -655,7 +785,9 @@ mod tests {
Field::new("name", DataType::Utf8, false),
Field::new("address", DataType::Utf8, false),
Field::new("priority", DataType::UInt8, false),
- ]);
+ ])
+ .with_metadata([("hello".to_string(), "world".to_string())].into());
+
round_trip_schema(schema);
// test that we can interpret struct types as schema
@@ -700,4 +832,29 @@ mod tests {
let arrow_schema = FFI_ArrowSchema::try_from(schema).unwrap();
assert!(arrow_schema.child(0).dictionary_ordered());
}
+
+ #[test]
+ fn test_set_field_metadata() {
+ let metadata_cases: Vec<HashMap<String, String>> = vec![
+ [].into(),
+ [("key".to_string(), "value".to_string())].into(),
+ [
+ ("key".to_string(), "".to_string()),
+ ("ascii123".to_string(), "ä½ å¥½".to_string()),
+ ("".to_string(), "value".to_string()),
+ ]
+ .into(),
+ ];
+
+ let mut schema = FFI_ArrowSchema::try_new("b", vec![], None)
+ .unwrap()
+ .with_name("test")
+ .unwrap();
+
+ for metadata in metadata_cases {
+ schema = schema.with_metadata(&metadata).unwrap();
+ let field = Field::try_from(&schema).unwrap();
+ assert_eq!(field.metadata(), &metadata);
+ }
+ }
}
diff --git a/arrow/src/ffi.rs b/arrow/src/ffi.rs
index 9d0ed0b85..9179b1279 100644
--- a/arrow/src/ffi.rs
+++ b/arrow/src/ffi.rs
@@ -497,7 +497,8 @@ mod tests {
use crate::datatypes::{Field, Int8Type};
use arrow_array::builder::UnionBuilder;
use arrow_array::types::{Float64Type, Int32Type};
- use arrow_array::{Float64Array, UnionArray};
+ use arrow_array::{Float64Array, StructArray, UnionArray};
+ use std::collections::HashMap;
use std::convert::TryFrom;
use std::mem::ManuallyDrop;
use std::ptr::addr_of_mut;
@@ -1092,6 +1093,30 @@ mod tests {
Ok(())
}
+ #[test]
+ fn test_struct_array() -> Result<()> {
+ let metadata: HashMap<String, String> =
+ [("Hello".to_string(), "World! 😊".to_string())].into();
+ let struct_array = StructArray::from(vec![(
+ Field::new("a", DataType::Int32, false).with_metadata(metadata),
+ Arc::new(Int32Array::from(vec![2, 4, 6])) as Arc<dyn Array>,
+ )]);
+
+ // export it
+ let array = ArrowArray::try_from(struct_array.data().clone())?;
+
+ // (simulate consumer) import it
+ let data = ArrayData::try_from(array)?;
+ let array = make_array(data);
+
+ // perform some operation
+ let array = array.as_any().downcast_ref::<StructArray>().unwrap();
+ assert_eq!(array.data_type(), struct_array.data_type());
+ assert_eq!(array, &struct_array);
+
+ Ok(())
+ }
+
#[test]
fn test_union_sparse_array() -> Result<()> {
let mut builder = UnionBuilder::new_sparse();