This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push: new b726b6face Add additional integration tests to arrow-avro (#7974) b726b6face is described below commit b726b6facec81e45f57459227d11bdd8e3098544 Author: nathaniel-d-ef <nathan...@elastiflow.com> AuthorDate: Tue Jul 22 16:40:27 2025 -0500 Add additional integration tests to arrow-avro (#7974) # Which issue does this PR close? Part of https://github.com/apache/arrow-rs/issues/4886 Completes the breaking down/porting of the changes in https://github.com/apache/arrow-rs/pull/6965. This PR will be closed upon merge of this PR. # Rationale for this change This change brings over the remaining integration tests present in the original PR, which validate the reader logic against the files from `testing/data/avro`. PRs containing this logic have already been merged (but are not yet released) which these tests now validate. # What changes are included in this PR? The following files are now read in: - alltypes_dictionary.avro - alltypes_nulls_plain.avro - binary.avro - dict-page-offset-zero.avro - avro/list_columns.avro - nested_lists.snappy.avro - single_nan.avro - datapage_v2.snappy.avro - nested_records.avro - repeated_no_annotation.avro # Are these changes tested? This PR consists of integration tests validating code merged recently into this crate. No changes in functionality are included. # Are there any user-facing changes? N/A --- arrow-avro/Cargo.toml | 1 + arrow-avro/src/reader/mod.rs | 603 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 601 insertions(+), 3 deletions(-) diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml index 383735e652..e2280b251f 100644 --- a/arrow-avro/Cargo.toml +++ b/arrow-avro/Cargo.toml @@ -58,6 +58,7 @@ crc = { version = "3.0", optional = true } uuid = "1.17" [dev-dependencies] +arrow-data = { workspace = true } rand = { version = "0.9.1", default-features = false, features = [ "std", "std_rng", diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs index 3bc7d94b7c..b98777d3d7 100644 --- a/arrow-avro/src/reader/mod.rs +++ b/arrow-avro/src/reader/mod.rs @@ -396,13 +396,15 @@ mod test { use crate::reader::vlq::VLQDecoder; use crate::reader::{read_header, Decoder, Reader, ReaderBuilder}; use crate::test_util::arrow_test_data; + use arrow::array::ArrayDataBuilder; use arrow_array::builder::{ - Float64Builder, Int32Builder, ListBuilder, MapBuilder, StringBuilder, StructBuilder, + ArrayBuilder, BooleanBuilder, Float32Builder, Float64Builder, Int32Builder, Int64Builder, + ListBuilder, MapBuilder, StringBuilder, StructBuilder, }; - use arrow_array::types::{Int32Type, IntervalMonthDayNanoType}; use arrow_array::*; - use arrow_schema::{ArrowError, DataType, Field, IntervalUnit, Schema}; + use arrow_buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer}; + use arrow_schema::{ArrowError, DataType, Field, Fields, IntervalUnit, Schema}; use bytes::{Buf, BufMut, Bytes}; use futures::executor::block_on; use futures::{stream, Stream, StreamExt, TryStreamExt}; @@ -599,6 +601,154 @@ mod test { } } + #[test] + fn test_alltypes_dictionary() { + let file = "avro/alltypes_dictionary.avro"; + let expected = RecordBatch::try_from_iter_with_nullable([ + ("id", Arc::new(Int32Array::from(vec![0, 1])) as _, true), + ( + "bool_col", + Arc::new(BooleanArray::from(vec![Some(true), Some(false)])) as _, + true, + ), + ( + "tinyint_col", + Arc::new(Int32Array::from(vec![0, 1])) as _, + true, + ), + ( + "smallint_col", + Arc::new(Int32Array::from(vec![0, 1])) as _, + true, + ), + ("int_col", Arc::new(Int32Array::from(vec![0, 1])) as _, true), + ( + "bigint_col", + Arc::new(Int64Array::from(vec![0, 10])) as _, + true, + ), + ( + "float_col", + Arc::new(Float32Array::from(vec![0.0, 1.1])) as _, + true, + ), + ( + "double_col", + Arc::new(Float64Array::from(vec![0.0, 10.1])) as _, + true, + ), + ( + "date_string_col", + Arc::new(BinaryArray::from_iter_values([b"01/01/09", b"01/01/09"])) as _, + true, + ), + ( + "string_col", + Arc::new(BinaryArray::from_iter_values([b"0", b"1"])) as _, + true, + ), + ( + "timestamp_col", + Arc::new( + TimestampMicrosecondArray::from_iter_values([ + 1230768000000000, // 2009-01-01T00:00:00.000 + 1230768060000000, // 2009-01-01T00:01:00.000 + ]) + .with_timezone("+00:00"), + ) as _, + true, + ), + ]) + .unwrap(); + let file_path = arrow_test_data(file); + let batch_large = read_file(&file_path, 8, false); + assert_eq!( + batch_large, expected, + "Decoded RecordBatch does not match for file {file}" + ); + let batch_small = read_file(&file_path, 3, false); + assert_eq!( + batch_small, expected, + "Decoded RecordBatch (batch size 3) does not match for file {file}" + ); + } + + #[test] + fn test_alltypes_nulls_plain() { + let file = "avro/alltypes_nulls_plain.avro"; + let expected = RecordBatch::try_from_iter_with_nullable([ + ( + "string_col", + Arc::new(StringArray::from(vec![None::<&str>])) as _, + true, + ), + ("int_col", Arc::new(Int32Array::from(vec![None])) as _, true), + ( + "bool_col", + Arc::new(BooleanArray::from(vec![None])) as _, + true, + ), + ( + "bigint_col", + Arc::new(Int64Array::from(vec![None])) as _, + true, + ), + ( + "float_col", + Arc::new(Float32Array::from(vec![None])) as _, + true, + ), + ( + "double_col", + Arc::new(Float64Array::from(vec![None])) as _, + true, + ), + ( + "bytes_col", + Arc::new(BinaryArray::from(vec![None::<&[u8]>])) as _, + true, + ), + ]) + .unwrap(); + let file_path = arrow_test_data(file); + let batch_large = read_file(&file_path, 8, false); + assert_eq!( + batch_large, expected, + "Decoded RecordBatch does not match for file {file}" + ); + let batch_small = read_file(&file_path, 3, false); + assert_eq!( + batch_small, expected, + "Decoded RecordBatch (batch size 3) does not match for file {file}" + ); + } + + #[test] + fn test_binary() { + let file = arrow_test_data("avro/binary.avro"); + let batch = read_file(&file, 8, false); + let expected = RecordBatch::try_from_iter_with_nullable([( + "foo", + Arc::new(BinaryArray::from_iter_values(vec![ + b"\x00".as_ref(), + b"\x01".as_ref(), + b"\x02".as_ref(), + b"\x03".as_ref(), + b"\x04".as_ref(), + b"\x05".as_ref(), + b"\x06".as_ref(), + b"\x07".as_ref(), + b"\x08".as_ref(), + b"\t".as_ref(), + b"\n".as_ref(), + b"\x0b".as_ref(), + ])) as Arc<dyn Array>, + true, + )]) + .unwrap(); + assert_eq!(batch, expected); + } + #[test] fn test_decode_stream_with_schema() { struct TestCase<'a> { @@ -725,6 +875,153 @@ mod test { } } + #[test] + fn test_dict_pages_offset_zero() { + let file = arrow_test_data("avro/dict-page-offset-zero.avro"); + let batch = read_file(&file, 32, false); + let num_rows = batch.num_rows(); + let expected_field = Int32Array::from(vec![Some(1552); num_rows]); + let expected = RecordBatch::try_from_iter_with_nullable([( + "l_partkey", + Arc::new(expected_field) as Arc<dyn Array>, + true, + )]) + .unwrap(); + assert_eq!(batch, expected); + } + + #[test] + fn test_list_columns() { + let file = arrow_test_data("avro/list_columns.avro"); + let mut int64_list_builder = ListBuilder::new(Int64Builder::new()); + { + { + let values = int64_list_builder.values(); + values.append_value(1); + values.append_value(2); + values.append_value(3); + } + int64_list_builder.append(true); + } + { + { + let values = int64_list_builder.values(); + values.append_null(); + values.append_value(1); + } + int64_list_builder.append(true); + } + { + { + let values = int64_list_builder.values(); + values.append_value(4); + } + int64_list_builder.append(true); + } + let int64_list = int64_list_builder.finish(); + let mut utf8_list_builder = ListBuilder::new(StringBuilder::new()); + { + { + let values = utf8_list_builder.values(); + values.append_value("abc"); + values.append_value("efg"); + values.append_value("hij"); + } + utf8_list_builder.append(true); + } + { + utf8_list_builder.append(false); + } + { + { + let values = utf8_list_builder.values(); + values.append_value("efg"); + values.append_null(); + values.append_value("hij"); + values.append_value("xyz"); + } + utf8_list_builder.append(true); + } + let utf8_list = utf8_list_builder.finish(); + let expected = RecordBatch::try_from_iter_with_nullable([ + ("int64_list", Arc::new(int64_list) as Arc<dyn Array>, true), + ("utf8_list", Arc::new(utf8_list) as Arc<dyn Array>, true), + ]) + .unwrap(); + let batch = read_file(&file, 8, false); + assert_eq!(batch, expected); + } + + #[test] + fn test_nested_lists() { + use arrow_data::ArrayDataBuilder; + let file = arrow_test_data("avro/nested_lists.snappy.avro"); + let inner_values = StringArray::from(vec![ + Some("a"), + Some("b"), + Some("c"), + Some("d"), + Some("a"), + Some("b"), + Some("c"), + Some("d"), + Some("e"), + Some("a"), + Some("b"), + Some("c"), + Some("d"), + Some("e"), + Some("f"), + ]); + let inner_offsets = Buffer::from_slice_ref([0, 2, 3, 3, 4, 6, 8, 8, 9, 11, 13, 14, 14, 15]); + let inner_validity = [ + true, true, false, true, true, true, false, true, true, true, true, false, true, + ]; + let inner_null_buffer = Buffer::from_iter(inner_validity.iter().copied()); + let inner_field = Field::new("item", DataType::Utf8, true); + let inner_list_data = ArrayDataBuilder::new(DataType::List(Arc::new(inner_field))) + .len(13) + .add_buffer(inner_offsets) + .add_child_data(inner_values.to_data()) + .null_bit_buffer(Some(inner_null_buffer)) + .build() + .unwrap(); + let inner_list_array = ListArray::from(inner_list_data); + let middle_offsets = Buffer::from_slice_ref([0, 2, 4, 6, 8, 11, 13]); + let middle_validity = [true; 6]; + let middle_null_buffer = Buffer::from_iter(middle_validity.iter().copied()); + let middle_field = Field::new("item", inner_list_array.data_type().clone(), true); + let middle_list_data = ArrayDataBuilder::new(DataType::List(Arc::new(middle_field))) + .len(6) + .add_buffer(middle_offsets) + .add_child_data(inner_list_array.to_data()) + .null_bit_buffer(Some(middle_null_buffer)) + .build() + .unwrap(); + let middle_list_array = ListArray::from(middle_list_data); + let outer_offsets = Buffer::from_slice_ref([0, 2, 4, 6]); + let outer_null_buffer = Buffer::from_slice_ref([0b111]); // all 3 rows valid + let outer_field = Field::new("item", middle_list_array.data_type().clone(), true); + let outer_list_data = ArrayDataBuilder::new(DataType::List(Arc::new(outer_field))) + .len(3) + .add_buffer(outer_offsets) + .add_child_data(middle_list_array.to_data()) + .null_bit_buffer(Some(outer_null_buffer)) + .build() + .unwrap(); + let a_expected = ListArray::from(outer_list_data); + let b_expected = Int32Array::from(vec![1, 1, 1]); + let expected = RecordBatch::try_from_iter_with_nullable([ + ("a", Arc::new(a_expected) as Arc<dyn Array>, true), + ("b", Arc::new(b_expected) as Arc<dyn Array>, true), + ]) + .unwrap(); + let left = read_file(&file, 8, false); + assert_eq!(left, expected, "Mismatch for batch size=8"); + let left_small = read_file(&file, 3, false); + assert_eq!(left_small, expected, "Mismatch for batch size=3"); + } + #[test] fn test_simple() { let tests = [ @@ -813,6 +1110,23 @@ mod test { } } + #[test] + fn test_single_nan() { + let file = arrow_test_data("avro/single_nan.avro"); + let actual = read_file(&file, 1, false); + use arrow_array::Float64Array; + let schema = Arc::new(Schema::new(vec![Field::new( + "mycol", + DataType::Float64, + true, + )])); + let col = Float64Array::from(vec![None]); + let expected = RecordBatch::try_new(schema, vec![Arc::new(col)]).unwrap(); + assert_eq!(actual, expected); + let actual2 = read_file(&file, 2, false); + assert_eq!(actual2, expected); + } + #[test] fn test_duration_uuid() { let batch = read_file("test/data/duration_uuid.avro", 4, false); @@ -874,6 +1188,289 @@ mod test { assert_eq!(&expected_uuid_array, uuid_array); } + #[test] + fn test_datapage_v2() { + let file = arrow_test_data("avro/datapage_v2.snappy.avro"); + let batch = read_file(&file, 8, false); + let a = StringArray::from(vec![ + Some("abc"), + Some("abc"), + Some("abc"), + None, + Some("abc"), + ]); + let b = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]); + let c = Float64Array::from(vec![Some(2.0), Some(3.0), Some(4.0), Some(5.0), Some(2.0)]); + let d = BooleanArray::from(vec![ + Some(true), + Some(true), + Some(true), + Some(false), + Some(true), + ]); + let e_values = Int32Array::from(vec![ + Some(1), + Some(2), + Some(3), + Some(1), + Some(2), + Some(3), + Some(1), + Some(2), + ]); + let e_offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 3, 3, 3, 6, 8])); + let e_validity = Some(NullBuffer::from(vec![true, false, false, true, true])); + let field_e = Arc::new(Field::new("item", DataType::Int32, true)); + let e = ListArray::new(field_e, e_offsets, Arc::new(e_values), e_validity); + let expected = RecordBatch::try_from_iter_with_nullable([ + ("a", Arc::new(a) as Arc<dyn Array>, true), + ("b", Arc::new(b) as Arc<dyn Array>, true), + ("c", Arc::new(c) as Arc<dyn Array>, true), + ("d", Arc::new(d) as Arc<dyn Array>, true), + ("e", Arc::new(e) as Arc<dyn Array>, true), + ]) + .unwrap(); + assert_eq!(batch, expected); + } + + #[test] + fn test_nested_records() { + let f1_f1_1 = StringArray::from(vec!["aaa", "bbb"]); + let f1_f1_2 = Int32Array::from(vec![10, 20]); + let rounded_pi = (std::f64::consts::PI * 100.0).round() / 100.0; + let f1_f1_3_1 = Float64Array::from(vec![rounded_pi, rounded_pi]); + let f1_f1_3 = StructArray::from(vec![( + Arc::new(Field::new("f1_3_1", DataType::Float64, false)), + Arc::new(f1_f1_3_1) as Arc<dyn Array>, + )]); + let f1_expected = StructArray::from(vec![ + ( + Arc::new(Field::new("f1_1", DataType::Utf8, false)), + Arc::new(f1_f1_1) as Arc<dyn Array>, + ), + ( + Arc::new(Field::new("f1_2", DataType::Int32, false)), + Arc::new(f1_f1_2) as Arc<dyn Array>, + ), + ( + Arc::new(Field::new( + "f1_3", + DataType::Struct(Fields::from(vec![Field::new( + "f1_3_1", + DataType::Float64, + false, + )])), + false, + )), + Arc::new(f1_f1_3) as Arc<dyn Array>, + ), + ]); + + let f2_fields = vec![ + Field::new("f2_1", DataType::Boolean, false), + Field::new("f2_2", DataType::Float32, false), + ]; + let f2_struct_builder = StructBuilder::new( + f2_fields + .iter() + .map(|f| Arc::new(f.clone())) + .collect::<Vec<Arc<Field>>>(), + vec![ + Box::new(BooleanBuilder::new()) as Box<dyn arrow_array::builder::ArrayBuilder>, + Box::new(Float32Builder::new()) as Box<dyn arrow_array::builder::ArrayBuilder>, + ], + ); + let mut f2_list_builder = ListBuilder::new(f2_struct_builder); + { + let struct_builder = f2_list_builder.values(); + struct_builder.append(true); + { + let b = struct_builder.field_builder::<BooleanBuilder>(0).unwrap(); + b.append_value(true); + } + { + let b = struct_builder.field_builder::<Float32Builder>(1).unwrap(); + b.append_value(1.2_f32); + } + struct_builder.append(true); + { + let b = struct_builder.field_builder::<BooleanBuilder>(0).unwrap(); + b.append_value(true); + } + { + let b = struct_builder.field_builder::<Float32Builder>(1).unwrap(); + b.append_value(2.2_f32); + } + f2_list_builder.append(true); + } + { + let struct_builder = f2_list_builder.values(); + struct_builder.append(true); + { + let b = struct_builder.field_builder::<BooleanBuilder>(0).unwrap(); + b.append_value(false); + } + { + let b = struct_builder.field_builder::<Float32Builder>(1).unwrap(); + b.append_value(10.2_f32); + } + f2_list_builder.append(true); + } + + let list_array_with_nullable_items = f2_list_builder.finish(); + + let item_field = Arc::new(Field::new( + "item", + list_array_with_nullable_items.values().data_type().clone(), + false, + )); + let list_data_type = DataType::List(item_field); + + let f2_array_data = list_array_with_nullable_items + .to_data() + .into_builder() + .data_type(list_data_type) + .build() + .unwrap(); + let f2_expected = ListArray::from(f2_array_data); + + let mut f3_struct_builder = StructBuilder::new( + vec![Arc::new(Field::new("f3_1", DataType::Utf8, false))], + vec![Box::new(StringBuilder::new()) as Box<dyn ArrayBuilder>], + ); + f3_struct_builder.append(true); + { + let b = f3_struct_builder.field_builder::<StringBuilder>(0).unwrap(); + b.append_value("xyz"); + } + f3_struct_builder.append(false); + { + let b = f3_struct_builder.field_builder::<StringBuilder>(0).unwrap(); + b.append_null(); + } + let f3_expected = f3_struct_builder.finish(); + let f4_fields = [Field::new("f4_1", DataType::Int64, false)]; + let f4_struct_builder = StructBuilder::new( + f4_fields + .iter() + .map(|f| Arc::new(f.clone())) + .collect::<Vec<Arc<Field>>>(), + vec![Box::new(Int64Builder::new()) as Box<dyn arrow_array::builder::ArrayBuilder>], + ); + let mut f4_list_builder = ListBuilder::new(f4_struct_builder); + { + let struct_builder = f4_list_builder.values(); + struct_builder.append(true); + { + let b = struct_builder.field_builder::<Int64Builder>(0).unwrap(); + b.append_value(200); + } + struct_builder.append(false); + { + let b = struct_builder.field_builder::<Int64Builder>(0).unwrap(); + b.append_null(); + } + f4_list_builder.append(true); + } + { + let struct_builder = f4_list_builder.values(); + struct_builder.append(false); + { + let b = struct_builder.field_builder::<Int64Builder>(0).unwrap(); + b.append_null(); + } + struct_builder.append(true); + { + let b = struct_builder.field_builder::<Int64Builder>(0).unwrap(); + b.append_value(300); + } + f4_list_builder.append(true); + } + let f4_expected = f4_list_builder.finish(); + + let expected = RecordBatch::try_from_iter_with_nullable([ + ("f1", Arc::new(f1_expected) as Arc<dyn Array>, false), + ("f2", Arc::new(f2_expected) as Arc<dyn Array>, false), + ("f3", Arc::new(f3_expected) as Arc<dyn Array>, true), + ("f4", Arc::new(f4_expected) as Arc<dyn Array>, false), + ]) + .unwrap(); + + let file = arrow_test_data("avro/nested_records.avro"); + let batch_large = read_file(&file, 8, false); + assert_eq!( + batch_large, expected, + "Decoded RecordBatch does not match expected data for nested records (batch size 8)" + ); + let batch_small = read_file(&file, 3, false); + assert_eq!( + batch_small, expected, + "Decoded RecordBatch does not match expected data for nested records (batch size 3)" + ); + } + + #[test] + fn test_repeated_no_annotation() { + let file = arrow_test_data("avro/repeated_no_annotation.avro"); + let batch_large = read_file(&file, 8, false); + use arrow_array::{Int32Array, Int64Array, ListArray, StringArray, StructArray}; + use arrow_buffer::Buffer; + use arrow_schema::{DataType, Field, Fields}; + let id_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]); + let number_array = Int64Array::from(vec![ + Some(5555555555), + Some(1111111111), + Some(1111111111), + Some(2222222222), + Some(3333333333), + ]); + let kind_array = + StringArray::from(vec![None, Some("home"), Some("home"), None, Some("mobile")]); + let phone_fields = Fields::from(vec![ + Field::new("number", DataType::Int64, true), + Field::new("kind", DataType::Utf8, true), + ]); + let phone_struct_data = ArrayDataBuilder::new(DataType::Struct(phone_fields)) + .len(5) + .child_data(vec![number_array.into_data(), kind_array.into_data()]) + .build() + .unwrap(); + let phone_struct_array = StructArray::from(phone_struct_data); + let phone_list_offsets = Buffer::from_slice_ref([0, 0, 0, 0, 1, 2, 5]); + let phone_list_validity = Buffer::from_iter([false, false, true, true, true, true]); + let phone_item_field = Field::new("item", phone_struct_array.data_type().clone(), true); + let phone_list_data = ArrayDataBuilder::new(DataType::List(Arc::new(phone_item_field))) + .len(6) + .add_buffer(phone_list_offsets) + .null_bit_buffer(Some(phone_list_validity)) + .child_data(vec![phone_struct_array.into_data()]) + .build() + .unwrap(); + let phone_list_array = ListArray::from(phone_list_data); + let phone_numbers_validity = Buffer::from_iter([false, false, true, true, true, true]); + let phone_numbers_field = Field::new("phone", phone_list_array.data_type().clone(), true); + let phone_numbers_struct_data = + ArrayDataBuilder::new(DataType::Struct(Fields::from(vec![phone_numbers_field]))) + .len(6) + .null_bit_buffer(Some(phone_numbers_validity)) + .child_data(vec![phone_list_array.into_data()]) + .build() + .unwrap(); + let phone_numbers_struct_array = StructArray::from(phone_numbers_struct_data); + let expected = arrow_array::RecordBatch::try_from_iter_with_nullable([ + ("id", Arc::new(id_array) as _, true), + ( + "phoneNumbers", + Arc::new(phone_numbers_struct_array) as _, + true, + ), + ]) + .unwrap(); + assert_eq!(batch_large, expected, "Mismatch for batch_size=8"); + let batch_small = read_file(&file, 3, false); + assert_eq!(batch_small, expected, "Mismatch for batch_size=3"); + } + #[test] fn test_nonnullable_impala() { let file = arrow_test_data("avro/nonnullable.impala.avro");