This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 17058c76be6 IPC format support for StringViewArray and BinaryViewArray
(#5525)
17058c76be6 is described below
commit 17058c76be6206b5f87ffa3fb1aaa10c255c2e67
Author: Xiangpeng Hao <[email protected]>
AuthorDate: Mon Apr 1 14:30:56 2024 -0400
IPC format support for StringViewArray and BinaryViewArray (#5525)
* check in ipc format for view types
* update tests
* fix variadic counting
* fix linting, address comments
* Apply suggestions from code review
Co-authored-by: Benjamin Kietzman <[email protected]>
Co-authored-by: Raphael Taylor-Davies
<[email protected]>
* address some review comments
* update comments
* Add tests and fix bugs with dict types
* make clippy happy
* update test cases
---------
Co-authored-by: Benjamin Kietzman <[email protected]>
Co-authored-by: Raphael Taylor-Davies
<[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
arrow-ipc/src/convert.rs | 15 +++-
arrow-ipc/src/reader.rs | 197 ++++++++++++++++++++++++++++++++++++++++++-----
arrow-ipc/src/writer.rs | 111 ++++++++++++++++++++++++++
3 files changed, 304 insertions(+), 19 deletions(-)
diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs
index 51e54215ea7..49da0efae3a 100644
--- a/arrow-ipc/src/convert.rs
+++ b/arrow-ipc/src/convert.rs
@@ -247,8 +247,10 @@ pub(crate) fn get_data_type(field: crate::Field,
may_be_dictionary: bool) -> Dat
}
}
crate::Type::Binary => DataType::Binary,
+ crate::Type::BinaryView => DataType::BinaryView,
crate::Type::LargeBinary => DataType::LargeBinary,
crate::Type::Utf8 => DataType::Utf8,
+ crate::Type::Utf8View => DataType::Utf8View,
crate::Type::LargeUtf8 => DataType::LargeUtf8,
crate::Type::FixedSizeBinary => {
let fsb = field.type_as_fixed_size_binary().unwrap();
@@ -548,7 +550,16 @@ pub(crate) fn get_fb_field_type<'a>(
.as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
},
- BinaryView | Utf8View => unimplemented!("unimplemented"),
+ BinaryView => FBFieldType {
+ type_type: crate::Type::BinaryView,
+ type_:
crate::BinaryViewBuilder::new(fbb).finish().as_union_value(),
+ children: Some(fbb.create_vector(&empty_fields[..])),
+ },
+ Utf8View => FBFieldType {
+ type_type: crate::Type::Utf8View,
+ type_: crate::Utf8ViewBuilder::new(fbb).finish().as_union_value(),
+ children: Some(fbb.create_vector(&empty_fields[..])),
+ },
Utf8 => FBFieldType {
type_type: crate::Type::Utf8,
type_: crate::Utf8Builder::new(fbb).finish().as_union_value(),
@@ -921,7 +932,9 @@ mod tests {
true,
),
Field::new("utf8", DataType::Utf8, false),
+ Field::new("utf8_view", DataType::Utf8View, false),
Field::new("binary", DataType::Binary, false),
+ Field::new("binary_view", DataType::BinaryView, false),
Field::new_list("list[u8]", Field::new("item",
DataType::UInt8, false), true),
Field::new_fixed_size_list(
"fixed_size_list[u8]",
diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index dd0365da4bc..4591777c1e3 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -25,7 +25,7 @@ mod stream;
pub use stream::*;
use flatbuffers::{VectorIter, VerifierOptions};
-use std::collections::HashMap;
+use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;
@@ -64,6 +64,9 @@ fn read_buffer(
/// Coordinates reading arrays based on data types.
///
+/// `variadic_counts` encodes the number of buffers to read for variadic types
(e.g., Utf8View, BinaryView)
+/// When encounter such types, we pop from the front of the queue to get the
number of buffers to read.
+///
/// Notes:
/// * In the IPC format, null buffers are always set, but may be empty. We
discard them if an array has 0 nulls
/// * Numeric values inside list arrays are often stored as 64-bit values
regardless of their data type size.
@@ -71,7 +74,11 @@ fn read_buffer(
/// - check if the bit width of non-64-bit numbers is 64, and
/// - read the buffer as 64-bit (signed integer or float), and
/// - cast the 64-bit array to the appropriate data type
-fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef,
ArrowError> {
+fn create_array(
+ reader: &mut ArrayReader,
+ field: &Field,
+ variadic_counts: &mut VecDeque<i64>,
+) -> Result<ArrayRef, ArrowError> {
let data_type = field.data_type();
match data_type {
Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array(
@@ -83,6 +90,18 @@ fn create_array(reader: &mut ArrayReader, field: &Field) ->
Result<ArrayRef, Arr
reader.next_buffer()?,
],
),
+ BinaryView | Utf8View => {
+ let count = variadic_counts
+ .pop_front()
+ .ok_or(ArrowError::IpcError(format!(
+ "Missing variadic count for {data_type} column"
+ )))?;
+ let count = count + 2; // view and null buffer.
+ let buffers = (0..count)
+ .map(|_| reader.next_buffer())
+ .collect::<Result<Vec<_>, _>>()?;
+ create_primitive_array(reader.next_node(field)?, data_type,
&buffers)
+ }
FixedSizeBinary(_) => create_primitive_array(
reader.next_node(field)?,
data_type,
@@ -91,13 +110,13 @@ fn create_array(reader: &mut ArrayReader, field: &Field)
-> Result<ArrayRef, Arr
List(ref list_field) | LargeList(ref list_field) | Map(ref list_field,
_) => {
let list_node = reader.next_node(field)?;
let list_buffers = [reader.next_buffer()?, reader.next_buffer()?];
- let values = create_array(reader, list_field)?;
+ let values = create_array(reader, list_field, variadic_counts)?;
create_list_array(list_node, data_type, &list_buffers, values)
}
FixedSizeList(ref list_field, _) => {
let list_node = reader.next_node(field)?;
let list_buffers = [reader.next_buffer()?];
- let values = create_array(reader, list_field)?;
+ let values = create_array(reader, list_field, variadic_counts)?;
create_list_array(list_node, data_type, &list_buffers, values)
}
Struct(struct_fields) => {
@@ -109,7 +128,7 @@ fn create_array(reader: &mut ArrayReader, field: &Field) ->
Result<ArrayRef, Arr
// TODO investigate whether just knowing the number of buffers
could
// still work
for struct_field in struct_fields {
- let child = create_array(reader, struct_field)?;
+ let child = create_array(reader, struct_field,
variadic_counts)?;
struct_arrays.push((struct_field.clone(), child));
}
let null_count = struct_node.null_count() as usize;
@@ -123,8 +142,8 @@ fn create_array(reader: &mut ArrayReader, field: &Field) ->
Result<ArrayRef, Arr
}
RunEndEncoded(run_ends_field, values_field) => {
let run_node = reader.next_node(field)?;
- let run_ends = create_array(reader, run_ends_field)?;
- let values = create_array(reader, values_field)?;
+ let run_ends = create_array(reader, run_ends_field,
variadic_counts)?;
+ let values = create_array(reader, values_field, variadic_counts)?;
let run_array_length = run_node.length() as usize;
let data = ArrayData::builder(data_type.clone())
@@ -177,7 +196,7 @@ fn create_array(reader: &mut ArrayReader, field: &Field) ->
Result<ArrayRef, Arr
let mut ids = Vec::with_capacity(fields.len());
for (id, field) in fields.iter() {
- let child = create_array(reader, field)?;
+ let child = create_array(reader, field, variadic_counts)?;
children.push((field.as_ref().clone(), child));
ids.push(id);
}
@@ -230,6 +249,11 @@ fn create_primitive_array(
.null_bit_buffer(null_buffer)
.build_aligned()?
}
+ BinaryView | Utf8View => ArrayData::builder(data_type.clone())
+ .len(length)
+ .buffers(buffers[1..].to_vec())
+ .null_bit_buffer(null_buffer)
+ .build_aligned()?,
_ if data_type.is_primitive() || matches!(data_type, Boolean |
FixedSizeBinary(_)) => {
// read 2 buffers: null buffer (optional) and data buffer
ArrayData::builder(data_type.clone())
@@ -328,7 +352,11 @@ impl<'a> ArrayReader<'a> {
})
}
- fn skip_field(&mut self, field: &Field) -> Result<(), ArrowError> {
+ fn skip_field(
+ &mut self,
+ field: &Field,
+ variadic_count: &mut VecDeque<i64>,
+ ) -> Result<(), ArrowError> {
self.next_node(field)?;
match field.data_type() {
@@ -337,6 +365,18 @@ impl<'a> ArrayReader<'a> {
self.skip_buffer()
}
}
+ Utf8View | BinaryView => {
+ let count = variadic_count
+ .pop_front()
+ .ok_or(ArrowError::IpcError(format!(
+ "Missing variadic count for {} column",
+ field.data_type()
+ )))?;
+ let count = count + 2; // view and null buffer.
+ for _i in 0..count {
+ self.skip_buffer()
+ }
+ }
FixedSizeBinary(_) => {
self.skip_buffer();
self.skip_buffer();
@@ -344,23 +384,23 @@ impl<'a> ArrayReader<'a> {
List(list_field) | LargeList(list_field) | Map(list_field, _) => {
self.skip_buffer();
self.skip_buffer();
- self.skip_field(list_field)?;
+ self.skip_field(list_field, variadic_count)?;
}
FixedSizeList(list_field, _) => {
self.skip_buffer();
- self.skip_field(list_field)?;
+ self.skip_field(list_field, variadic_count)?;
}
Struct(struct_fields) => {
self.skip_buffer();
// skip for each field
for struct_field in struct_fields {
- self.skip_field(struct_field)?
+ self.skip_field(struct_field, variadic_count)?
}
}
RunEndEncoded(run_ends_field, values_field) => {
- self.skip_field(run_ends_field)?;
- self.skip_field(values_field)?;
+ self.skip_field(run_ends_field, variadic_count)?;
+ self.skip_field(values_field, variadic_count)?;
}
Dictionary(_, _) => {
self.skip_buffer(); // Nulls
@@ -375,7 +415,7 @@ impl<'a> ArrayReader<'a> {
};
for (_, field) in fields.iter() {
- self.skip_field(field)?
+ self.skip_field(field, variadic_count)?
}
}
Null => {} // No buffer increases
@@ -403,6 +443,10 @@ pub fn read_record_batch(
let field_nodes = batch.nodes().ok_or_else(|| {
ArrowError::IpcError("Unable to get field nodes from IPC
RecordBatch".to_string())
})?;
+
+ let mut variadic_counts: VecDeque<i64> =
+ batch.variadicBufferCounts().into_iter().flatten().collect();
+
let batch_compression = batch.compression();
let compression = batch_compression
.map(|batch_compression| batch_compression.codec().try_into())
@@ -425,12 +469,13 @@ pub fn read_record_batch(
for (idx, field) in schema.fields().iter().enumerate() {
// Create array for projected field
if let Some(proj_idx) = projection.iter().position(|p| p == &idx) {
- let child = create_array(&mut reader, field)?;
+ let child = create_array(&mut reader, field, &mut
variadic_counts)?;
arrays.push((proj_idx, child));
} else {
- reader.skip_field(field)?;
+ reader.skip_field(field, &mut variadic_counts)?;
}
}
+ assert!(variadic_counts.is_empty());
arrays.sort_by_key(|t| t.0);
RecordBatch::try_new_with_options(
Arc::new(schema.project(projection)?),
@@ -441,9 +486,10 @@ pub fn read_record_batch(
let mut children = vec![];
// keep track of index as lists require more than one node
for field in schema.fields() {
- let child = create_array(&mut reader, field)?;
+ let child = create_array(&mut reader, field, &mut
variadic_counts)?;
children.push(child);
}
+ assert!(variadic_counts.is_empty());
RecordBatch::try_new_with_options(schema, children, &options)
}
}
@@ -1759,6 +1805,121 @@ mod tests {
assert_eq!(input_batch, output_batch);
}
+ const LONG_TEST_STRING: &str =
+ "This is a long string to make sure binary view array handles it";
+
+ #[test]
+ fn test_roundtrip_view_types() {
+ let schema = Schema::new(vec![
+ Field::new("field_1", DataType::BinaryView, true),
+ Field::new("field_2", DataType::Utf8, true),
+ Field::new("field_3", DataType::Utf8View, true),
+ ]);
+ let bin_values: Vec<Option<&[u8]>> = vec![
+ Some(b"foo"),
+ None,
+ Some(b"bar"),
+ Some(LONG_TEST_STRING.as_bytes()),
+ ];
+ let utf8_values: Vec<Option<&str>> =
+ vec![Some("foo"), None, Some("bar"), Some(LONG_TEST_STRING)];
+ let bin_view_array = BinaryViewArray::from_iter(bin_values);
+ let utf8_array = StringArray::from_iter(utf8_values.iter());
+ let utf8_view_array = StringViewArray::from_iter(utf8_values);
+ let record_batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![
+ Arc::new(bin_view_array),
+ Arc::new(utf8_array),
+ Arc::new(utf8_view_array),
+ ],
+ )
+ .unwrap();
+
+ assert_eq!(record_batch, roundtrip_ipc(&record_batch));
+ assert_eq!(record_batch, roundtrip_ipc_stream(&record_batch));
+
+ let sliced_batch = record_batch.slice(1, 2);
+ assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
+ assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
+ }
+
+ #[test]
+ fn test_roundtrip_view_types_nested_dict() {
+ let bin_values: Vec<Option<&[u8]>> = vec![
+ Some(b"foo"),
+ None,
+ Some(b"bar"),
+ Some(LONG_TEST_STRING.as_bytes()),
+ Some(b"field"),
+ ];
+ let utf8_values: Vec<Option<&str>> = vec![
+ Some("foo"),
+ None,
+ Some("bar"),
+ Some(LONG_TEST_STRING),
+ Some("field"),
+ ];
+ let bin_view_array = Arc::new(BinaryViewArray::from_iter(bin_values));
+ let utf8_view_array =
Arc::new(StringViewArray::from_iter(utf8_values));
+
+ let key_dict_keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
+ let key_dict_array = DictionaryArray::new(key_dict_keys,
utf8_view_array.clone());
+ let keys_field = Arc::new(Field::new_dict(
+ "keys",
+ DataType::Dictionary(Box::new(DataType::Int8),
Box::new(DataType::Utf8View)),
+ true,
+ 1,
+ false,
+ ));
+
+ let value_dict_keys = Int8Array::from_iter_values([0, 3, 0, 1, 2, 0,
1]);
+ let value_dict_array = DictionaryArray::new(value_dict_keys,
bin_view_array);
+ let values_field = Arc::new(Field::new_dict(
+ "values",
+ DataType::Dictionary(Box::new(DataType::Int8),
Box::new(DataType::BinaryView)),
+ true,
+ 2,
+ false,
+ ));
+ let entry_struct = StructArray::from(vec![
+ (keys_field, make_array(key_dict_array.into_data())),
+ (values_field, make_array(value_dict_array.into_data())),
+ ]);
+
+ let map_data_type = DataType::Map(
+ Arc::new(Field::new(
+ "entries",
+ entry_struct.data_type().clone(),
+ false,
+ )),
+ false,
+ );
+ let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 7]);
+ let map_data = ArrayData::builder(map_data_type)
+ .len(3)
+ .add_buffer(entry_offsets)
+ .add_child_data(entry_struct.into_data())
+ .build()
+ .unwrap();
+ let map_array = MapArray::from(map_data);
+
+ let dict_keys = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1,
2]);
+ let dict_dict_array = DictionaryArray::new(dict_keys,
Arc::new(map_array));
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "f1",
+ dict_dict_array.data_type().clone(),
+ false,
+ )]));
+ let batch = RecordBatch::try_new(schema,
vec![Arc::new(dict_dict_array)]).unwrap();
+ assert_eq!(batch, roundtrip_ipc(&batch));
+ assert_eq!(batch, roundtrip_ipc_stream(&batch));
+
+ let sliced_batch = batch.slice(1, 2);
+ assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
+ assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
+ }
+
#[test]
fn test_no_columns_batch() {
let schema = Arc::new(Schema::empty());
diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs
index 22edfbc2454..2a3474fe0fc 100644
--- a/arrow-ipc/src/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -412,6 +412,8 @@ impl IpcDataGenerator {
let compression_codec: Option<CompressionCodec> =
batch_compression_type.map(TryInto::try_into).transpose()?;
+ let mut variadic_buffer_counts = vec![];
+
for array in batch.columns() {
let array_data = array.to_data();
offset = write_array_data(
@@ -425,6 +427,8 @@ impl IpcDataGenerator {
compression_codec,
write_options,
)?;
+
+ append_variadic_buffer_counts(&mut variadic_buffer_counts,
&array_data);
}
// pad the tail of body data
let len = arrow_data.len();
@@ -434,6 +438,12 @@ impl IpcDataGenerator {
// write data
let buffers = fbb.create_vector(&buffers);
let nodes = fbb.create_vector(&nodes);
+ let variadic_buffer = if variadic_buffer_counts.is_empty() {
+ None
+ } else {
+ Some(fbb.create_vector(&variadic_buffer_counts))
+ };
+
let root = {
let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
batch_builder.add_length(batch.num_rows() as i64);
@@ -442,6 +452,10 @@ impl IpcDataGenerator {
if let Some(c) = compression {
batch_builder.add_compression(c);
}
+
+ if let Some(v) = variadic_buffer {
+ batch_builder.add_variadicBufferCounts(v);
+ }
let b = batch_builder.finish();
b.as_union_value()
};
@@ -501,6 +515,9 @@ impl IpcDataGenerator {
write_options,
)?;
+ let mut variadic_buffer_counts = vec![];
+ append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
+
// pad the tail of body data
let len = arrow_data.len();
let pad_len = pad_to_8(len as u32);
@@ -509,6 +526,11 @@ impl IpcDataGenerator {
// write data
let buffers = fbb.create_vector(&buffers);
let nodes = fbb.create_vector(&nodes);
+ let variadic_buffer = if variadic_buffer_counts.is_empty() {
+ None
+ } else {
+ Some(fbb.create_vector(&variadic_buffer_counts))
+ };
let root = {
let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
@@ -518,6 +540,9 @@ impl IpcDataGenerator {
if let Some(c) = compression {
batch_builder.add_compression(c);
}
+ if let Some(v) = variadic_buffer {
+ batch_builder.add_variadicBufferCounts(v);
+ }
batch_builder.finish()
};
@@ -547,6 +572,25 @@ impl IpcDataGenerator {
}
}
+fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
+ match array.data_type() {
+ DataType::BinaryView | DataType::Utf8View => {
+ // The spec documents the counts only includes the variadic
buffers, not the view/null buffers.
+ //
https://arrow.apache.org/docs/format/Columnar.html#variadic-buffers
+ counts.push(array.buffers().len() as i64 - 1);
+ }
+ DataType::Dictionary(_, _) => {
+ // Do nothing
+ // Dictionary types are handled in `encode_dictionaries`.
+ }
+ _ => {
+ for child in array.child_data() {
+ append_variadic_buffer_counts(counts, child)
+ }
+ }
+ }
+}
+
pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData,
ArrowError> {
match arr.data_type() {
DataType::RunEndEncoded(k, _) => match k.data_type() {
@@ -1249,6 +1293,22 @@ fn write_array_data(
compression_codec,
)?;
}
+ } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
+ // Slicing the views buffer is safe and easy,
+ // but pruning unneeded data buffers is much more nuanced since it's
complicated to prove that no views reference the pruned buffers
+ //
+ // Current implementation just serialize the raw arrays as given and
not try to optimize anything.
+ // If users wants to "compact" the arrays prior to sending them over
IPC,
+ // they should consider the gc API suggested in #5513
+ for buffer in array_data.buffers() {
+ offset = write_buffer(
+ buffer.as_slice(),
+ buffers,
+ arrow_data,
+ offset,
+ compression_codec,
+ )?;
+ }
} else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8)
{
let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
for buffer in [offsets, values] {
@@ -1804,6 +1864,57 @@ mod tests {
write_union_file(IpcWriteOptions::try_new(8, false,
MetadataVersion::V5).unwrap());
}
+ #[test]
+ fn test_write_view_types() {
+ const LONG_TEST_STRING: &str =
+ "This is a long string to make sure binary view array handles it";
+ let schema = Schema::new(vec![
+ Field::new("field1", DataType::BinaryView, true),
+ Field::new("field2", DataType::Utf8View, true),
+ ]);
+ let values: Vec<Option<&[u8]>> = vec![
+ Some(b"foo"),
+ Some(b"bar"),
+ Some(LONG_TEST_STRING.as_bytes()),
+ ];
+ let binary_array = BinaryViewArray::from_iter(values);
+ let utf8_array =
+ StringViewArray::from_iter(vec![Some("foo"), Some("bar"),
Some(LONG_TEST_STRING)]);
+ let record_batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(binary_array), Arc::new(utf8_array)],
+ )
+ .unwrap();
+
+ let mut file = tempfile::tempfile().unwrap();
+ {
+ let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
+ writer.write(&record_batch).unwrap();
+ writer.finish().unwrap();
+ }
+ file.rewind().unwrap();
+ {
+ let mut reader = FileReader::try_new(&file, None).unwrap();
+ let read_batch = reader.next().unwrap().unwrap();
+ read_batch
+ .columns()
+ .iter()
+ .zip(record_batch.columns())
+ .for_each(|(a, b)| {
+ assert_eq!(a, b);
+ });
+ }
+ file.rewind().unwrap();
+ {
+ let mut reader = FileReader::try_new(&file,
Some(vec![0])).unwrap();
+ let read_batch = reader.next().unwrap().unwrap();
+ assert_eq!(read_batch.num_columns(), 1);
+ let read_array = read_batch.column(0);
+ let write_array = record_batch.column(0);
+ assert_eq!(read_array, write_array);
+ }
+ }
+
#[test]
fn truncate_ipc_record_batch() {
fn create_batch(rows: usize) -> RecordBatch {