jecsand838 commented on code in PR #8492:
URL: https://github.com/apache/arrow-rs/pull/8492#discussion_r2404161572
##########
arrow-avro/src/reader/mod.rs:
##########
@@ -6051,4 +6167,2159 @@ mod test {
"INACTIVE"
);
}
+
+ #[test]
+ fn comprehensive_e2e_test() {
+ let path = "test/data/comprehensive_e2e.avro";
+ let batch = read_file(path, 1024, false);
+ let schema = batch.schema();
+
+ #[inline]
+ fn tid_by_name(fields: &UnionFields, want: &str) -> i8 {
+ for (tid, f) in fields.iter() {
+ if f.name() == want {
+ return tid;
+ }
+ }
+ panic!("union child '{want}' not found");
+ }
+
+ #[inline]
+ fn tid_by_dt(fields: &UnionFields, pred: impl Fn(&DataType) -> bool)
-> i8 {
+ for (tid, f) in fields.iter() {
+ if pred(f.data_type()) {
+ return tid;
+ }
+ }
+ panic!("no union child matches predicate");
+ }
+
+ fn mk_dense_union(
+ fields: &UnionFields,
+ type_ids: Vec<i8>,
+ offsets: Vec<i32>,
+ provide: impl Fn(&Field) -> Option<ArrayRef>,
+ ) -> ArrayRef {
+ fn empty_child_for(dt: &DataType) -> Arc<dyn Array> {
+ match dt {
+ DataType::Null => Arc::new(NullArray::new(0)),
+ DataType::Boolean =>
Arc::new(BooleanArray::from(Vec::<bool>::new())),
+ DataType::Int32 =>
Arc::new(Int32Array::from(Vec::<i32>::new())),
+ DataType::Int64 =>
Arc::new(Int64Array::from(Vec::<i64>::new())),
+ DataType::Float32 =>
Arc::new(Float32Array::from(Vec::<f32>::new())),
+ DataType::Float64 =>
Arc::new(Float64Array::from(Vec::<f64>::new())),
+ DataType::Binary =>
Arc::new(BinaryArray::from(Vec::<&[u8]>::new())),
+ DataType::Utf8 =>
Arc::new(StringArray::from(Vec::<&str>::new())),
+ DataType::Date32 =>
Arc::new(Date32Array::from(Vec::<i32>::new())),
+ DataType::Time32(arrow_schema::TimeUnit::Millisecond) => {
+
Arc::new(Time32MillisecondArray::from(Vec::<i32>::new()))
+ }
+ DataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
+
Arc::new(Time64MicrosecondArray::from(Vec::<i64>::new()))
+ }
+ DataType::Timestamp(arrow_schema::TimeUnit::Millisecond,
tz) => {
+ let a =
TimestampMillisecondArray::from(Vec::<i64>::new());
+ Arc::new(if let Some(tz) = tz {
+ a.with_timezone(tz.clone())
+ } else {
+ a
+ })
+ }
+ DataType::Timestamp(arrow_schema::TimeUnit::Microsecond,
tz) => {
+ let a =
TimestampMicrosecondArray::from(Vec::<i64>::new());
+ Arc::new(if let Some(tz) = tz {
+ a.with_timezone(tz.clone())
+ } else {
+ a
+ })
+ }
+ DataType::Interval(IntervalUnit::MonthDayNano) => Arc::new(
+
IntervalMonthDayNanoArray::from(Vec::<IntervalMonthDayNano>::new()),
+ ),
+ DataType::FixedSizeBinary(sz) => Arc::new(
+ FixedSizeBinaryArray::try_from_sparse_iter_with_size(
+ std::iter::empty::<Option<Vec<u8>>>(),
+ *sz,
+ )
+ .unwrap(),
+ ),
+ DataType::Dictionary(_, _) => {
+ let keys = Int32Array::from(Vec::<i32>::new());
+ let values =
Arc::new(StringArray::from(Vec::<&str>::new()));
+ Arc::new(DictionaryArray::<Int32Type>::try_new(keys,
values).unwrap())
+ }
+ DataType::Struct(fields) => {
+ let children: Vec<ArrayRef> = fields
+ .iter()
+ .map(|f| empty_child_for(f.data_type()) as
ArrayRef)
+ .collect();
+ Arc::new(StructArray::new(fields.clone(), children,
None))
+ }
+ DataType::List(field) => {
+ let offsets =
OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0]));
+ Arc::new(
+ ListArray::try_new(
+ field.clone(),
+ offsets,
+ empty_child_for(field.data_type()),
+ None,
+ )
+ .unwrap(),
+ )
+ }
+ DataType::Map(entry_field, is_sorted) => {
+ let (key_field, val_field) = match
entry_field.data_type() {
+ DataType::Struct(fs) => (fs[0].clone(),
fs[1].clone()),
+ other => panic!("unexpected map entries type:
{other:?}"),
+ };
+ let keys = StringArray::from(Vec::<&str>::new());
+ let vals: ArrayRef = match val_field.data_type() {
+ DataType::Null => Arc::new(NullArray::new(0)) as
ArrayRef,
+ DataType::Boolean => {
+
Arc::new(BooleanArray::from(Vec::<bool>::new())) as ArrayRef
+ }
+ DataType::Int32 => {
+ Arc::new(Int32Array::from(Vec::<i32>::new()))
as ArrayRef
+ }
+ DataType::Int64 => {
+ Arc::new(Int64Array::from(Vec::<i64>::new()))
as ArrayRef
+ }
+ DataType::Float32 => {
+
Arc::new(Float32Array::from(Vec::<f32>::new())) as ArrayRef
+ }
+ DataType::Float64 => {
+
Arc::new(Float64Array::from(Vec::<f64>::new())) as ArrayRef
+ }
+ DataType::Utf8 => {
+
Arc::new(StringArray::from(Vec::<&str>::new())) as ArrayRef
+ }
+ DataType::Binary => {
+
Arc::new(BinaryArray::from(Vec::<&[u8]>::new())) as ArrayRef
+ }
+ DataType::Union(uf, _) => {
+ let children: Vec<ArrayRef> = uf
+ .iter()
+ .map(|(_, f)|
empty_child_for(f.data_type()))
+ .collect();
+ Arc::new(
+ UnionArray::try_new(
+ uf.clone(),
+
ScalarBuffer::<i8>::from(Vec::<i8>::new()),
+
Some(ScalarBuffer::<i32>::from(Vec::<i32>::new())),
+ children,
+ )
+ .unwrap(),
+ ) as ArrayRef
+ }
+ other => panic!("unsupported map value type:
{other:?}"),
+ };
+ let entries = StructArray::new(
+ Fields::from(vec![
+ key_field.as_ref().clone(),
+ val_field.as_ref().clone(),
+ ]),
+ vec![Arc::new(keys) as ArrayRef, vals],
+ None,
+ );
+ let offsets =
OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0]));
+ Arc::new(MapArray::new(
+ entry_field.clone(),
+ offsets,
+ entries,
+ None,
+ *is_sorted,
+ ))
+ }
+ other => panic!("empty_child_for: unhandled type
{other:?}"),
+ }
+ }
+ let children: Vec<ArrayRef> = fields
+ .iter()
+ .map(|(_, f)| provide(f).unwrap_or_else(||
empty_child_for(f.data_type())))
+ .collect();
+ Arc::new(
+ UnionArray::try_new(
+ fields.clone(),
+ ScalarBuffer::<i8>::from(type_ids),
+ Some(ScalarBuffer::<i32>::from(offsets)),
+ children,
+ )
+ .unwrap(),
+ ) as ArrayRef
+ }
+
+ #[inline]
+ fn uuid16_from_str(s: &str) -> [u8; 16] {
+ let mut out = [0u8; 16];
+ let mut idx = 0usize;
+ let mut hi: Option<u8> = None;
+ for ch in s.chars() {
+ if ch == '-' {
+ continue;
+ }
+ let v = ch.to_digit(16).expect("invalid hex digit in UUID") as
u8;
+ if let Some(h) = hi {
+ out[idx] = (h << 4) | v;
+ idx += 1;
+ hi = None;
+ } else {
+ hi = Some(v);
+ }
+ }
+ assert_eq!(idx, 16, "UUID must decode to 16 bytes");
+ out
+ }
+ let date_a: i32 = 19_000; // 2022-01-08
+ let time_ms_a: i32 = 12 * 3_600_000 + 34 * 60_000 + 56_000 + 789;
+ let time_us_eod: i64 = 86_400_000_000 - 1;
+ let ts_ms_2024_01_01: i64 = 1_704_067_200_000; // 2024-01-01T00:00:00Z
+ let ts_us_2024_01_01: i64 = ts_ms_2024_01_01 * 1_000;
+ let dur_small = IntervalMonthDayNanoType::make_value(1, 2,
3_000_000_000);
+ let dur_zero = IntervalMonthDayNanoType::make_value(0, 0, 0);
+ let dur_large =
+ IntervalMonthDayNanoType::make_value(12, 31, ((86_400_000 - 1) as
i64) * 1_000_000);
+ let dur_2years = IntervalMonthDayNanoType::make_value(24, 0, 0);
+ let uuid1 = uuid16_from_str("fe7bc30b-4ce8-4c5e-b67c-2234a2d38e66");
+ let uuid2 = uuid16_from_str("0826cc06-d2e3-4599-b4ad-af5fa6905cdb");
+
+ #[inline]
+ fn push_like(
+ reader_schema: &arrow_schema::Schema,
+ name: &str,
+ arr: ArrayRef,
+ fields: &mut Vec<FieldRef>,
+ cols: &mut Vec<ArrayRef>,
+ ) {
+ let src = reader_schema
+ .field_with_name(name)
+ .unwrap_or_else(|_| panic!("source schema missing field
'{name}'"));
+ let mut f = Field::new(name, arr.data_type().clone(),
src.is_nullable());
+ let md = src.metadata();
+ if !md.is_empty() {
+ f = f.with_metadata(md.clone());
+ }
+ fields.push(Arc::new(f));
+ cols.push(arr);
+ }
+
+ let mut fields: Vec<FieldRef> = Vec::new();
+ let mut columns: Vec<ArrayRef> = Vec::new();
+ push_like(
+ schema.as_ref(),
+ "id",
+ Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
+ &mut fields,
+ &mut columns,
+ );
+ push_like(
+ schema.as_ref(),
+ "flag",
+ Arc::new(BooleanArray::from(vec![true, false, true, false])) as
ArrayRef,
+ &mut fields,
+ &mut columns,
+ );
+ push_like(
+ schema.as_ref(),
+ "ratio_f32",
+ Arc::new(Float32Array::from(vec![1.25f32, -0.0, 3.5, 9.75])) as
ArrayRef,
+ &mut fields,
+ &mut columns,
+ );
+ push_like(
+ schema.as_ref(),
+ "ratio_f64",
+ Arc::new(Float64Array::from(vec![2.5f64, -1.0, 7.0, -2.25])) as
ArrayRef,
+ &mut fields,
+ &mut columns,
+ );
+ push_like(
+ schema.as_ref(),
+ "count_i32",
+ Arc::new(Int32Array::from(vec![7, -1, 0, 123])) as ArrayRef,
+ &mut fields,
+ &mut columns,
+ );
+ push_like(
+ schema.as_ref(),
+ "count_i64",
+ Arc::new(Int64Array::from(vec![
+ 7_000_000_000i64,
+ -2,
+ 0,
+ -9_876_543_210i64,
+ ])) as ArrayRef,
+ &mut fields,
+ &mut columns,
+ );
+ push_like(
+ schema.as_ref(),
+ "opt_i32_nullfirst",
+ Arc::new(Int32Array::from(vec![None, Some(42), None, Some(0)])) as
ArrayRef,
+ &mut fields,
+ &mut columns,
+ );
+ push_like(
+ schema.as_ref(),
+ "opt_str_nullsecond",
+ Arc::new(StringArray::from(vec![
+ Some("alpha"),
+ None,
+ Some("s3"),
+ Some(""),
+ ])) as ArrayRef,
+ &mut fields,
+ &mut columns,
+ );
+ {
+ let uf = match schema
+ .field_with_name("tri_union_prim")
+ .unwrap()
+ .data_type()
+ {
+ DataType::Union(f, UnionMode::Dense) => f.clone(),
+ other => panic!("tri_union_prim should be dense union, got
{other:?}"),
+ };
+ let tid_i = tid_by_name(&uf, "int");
+ let tid_s = tid_by_name(&uf, "string");
+ let tid_b = tid_by_name(&uf, "boolean");
+ let tids = vec![tid_i, tid_s, tid_b, tid_s];
+ let offs = vec![0, 0, 0, 1];
+ let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() {
+ DataType::Int32 => Some(Arc::new(Int32Array::from(vec![0])) as
ArrayRef),
+ DataType::Utf8 => Some(Arc::new(StringArray::from(vec!["hi",
""])) as ArrayRef),
+ DataType::Boolean =>
Some(Arc::new(BooleanArray::from(vec![true])) as ArrayRef),
+ _ => None,
+ });
+ push_like(
+ schema.as_ref(),
+ "tri_union_prim",
+ arr,
+ &mut fields,
+ &mut columns,
+ );
+ }
+
+ push_like(
+ schema.as_ref(),
+ "str_utf8",
+ Arc::new(StringArray::from(vec!["hello", "", "world", "✓
unicode"])) as ArrayRef,
+ &mut fields,
+ &mut columns,
+ );
+ push_like(
+ schema.as_ref(),
+ "raw_bytes",
+ Arc::new(BinaryArray::from(vec![
+ b"\x00\x01".as_ref(),
+ b"".as_ref(),
+ b"\xFF\x00".as_ref(),
+ b"\x10\x20\x30\x40".as_ref(),
+ ])) as ArrayRef,
+ &mut fields,
+ &mut columns,
+ );
+ {
+ let it = [
+ Some(*b"0123456789ABCDEF"),
+ Some([0u8; 16]),
+ Some(*b"ABCDEFGHIJKLMNOP"),
+ Some([0xAA; 16]),
+ ]
+ .into_iter();
+ let arr =
+
Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size(it, 16).unwrap())
+ as ArrayRef;
+ push_like(
+ schema.as_ref(),
+ "fx16_plain",
+ arr,
+ &mut fields,
+ &mut columns,
+ );
+ }
+ {
+ #[cfg(feature = "small_decimals")]
+ let dec10_2 = Arc::new(
+ Decimal64Array::from_iter_values([123456i64, -1, 0,
9_999_999_999i64])
+ .with_precision_and_scale(10, 2)
+ .unwrap(),
+ ) as ArrayRef;
+ #[cfg(not(feature = "small_decimals"))]
+ let dec10_2 = Arc::new(
+ Decimal128Array::from_iter_values([123456i128, -1, 0,
9_999_999_999i128])
+ .with_precision_and_scale(10, 2)
+ .unwrap(),
+ ) as ArrayRef;
+ push_like(
+ schema.as_ref(),
+ "dec_bytes_s10_2",
+ dec10_2,
+ &mut fields,
+ &mut columns,
+ );
+ }
+ {
+ #[cfg(feature = "small_decimals")]
+ let dec20_4 = Arc::new(
+ Decimal128Array::from_iter_values([1_234_567_891_234i128,
-420_000i128, 0, -1i128])
+ .with_precision_and_scale(20, 4)
+ .unwrap(),
+ ) as ArrayRef;
+ #[cfg(not(feature = "small_decimals"))]
+ let dec20_4 = Arc::new(
+ Decimal128Array::from_iter_values([1_234_567_891_234i128,
-420_000i128, 0, -1i128])
+ .with_precision_and_scale(20, 4)
+ .unwrap(),
+ ) as ArrayRef;
+ push_like(
+ schema.as_ref(),
+ "dec_fix_s20_4",
+ dec20_4,
+ &mut fields,
+ &mut columns,
+ );
+ }
+ {
+ let it = [Some(uuid1), Some(uuid2), Some(uuid1),
Some(uuid2)].into_iter();
+ let arr =
+
Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size(it, 16).unwrap())
+ as ArrayRef;
+ push_like(schema.as_ref(), "uuid_str", arr, &mut fields, &mut
columns);
+ }
+ push_like(
+ schema.as_ref(),
+ "d_date",
+ Arc::new(Date32Array::from(vec![date_a, 0, 1, 365])) as ArrayRef,
+ &mut fields,
+ &mut columns,
+ );
+ push_like(
+ schema.as_ref(),
+ "t_millis",
+ Arc::new(Time32MillisecondArray::from(vec![
+ time_ms_a,
+ 0,
+ 1,
+ 86_400_000 - 1,
+ ])) as ArrayRef,
+ &mut fields,
+ &mut columns,
+ );
+ push_like(
+ schema.as_ref(),
+ "t_micros",
+ Arc::new(Time64MicrosecondArray::from(vec![
+ time_us_eod,
+ 0,
+ 1,
+ 1_000_000,
+ ])) as ArrayRef,
+ &mut fields,
+ &mut columns,
+ );
+ {
+ let a = TimestampMillisecondArray::from(vec![
+ ts_ms_2024_01_01,
+ -1,
+ ts_ms_2024_01_01 + 123,
+ 0,
+ ])
+ .with_timezone("+00:00");
+ push_like(
+ schema.as_ref(),
+ "ts_millis_utc",
+ Arc::new(a) as ArrayRef,
+ &mut fields,
+ &mut columns,
+ );
+ }
+ {
+ let a = TimestampMicrosecondArray::from(vec![
+ ts_us_2024_01_01,
+ 1,
+ ts_us_2024_01_01 + 456,
+ 0,
+ ])
+ .with_timezone("+00:00");
+ push_like(
+ schema.as_ref(),
+ "ts_micros_utc",
+ Arc::new(a) as ArrayRef,
+ &mut fields,
+ &mut columns,
+ );
+ }
+ push_like(
+ schema.as_ref(),
+ "ts_millis_local",
+ Arc::new(TimestampMillisecondArray::from(vec![
+ ts_ms_2024_01_01 + 86_400_000,
+ 0,
+ ts_ms_2024_01_01 + 789,
+ 123_456_789,
+ ])) as ArrayRef,
+ &mut fields,
+ &mut columns,
+ );
+ push_like(
+ schema.as_ref(),
+ "ts_micros_local",
+ Arc::new(TimestampMicrosecondArray::from(vec![
+ ts_us_2024_01_01 + 123_456,
+ 0,
+ ts_us_2024_01_01 + 101_112,
+ 987_654_321,
+ ])) as ArrayRef,
+ &mut fields,
+ &mut columns,
+ );
+ {
+ let v = vec![dur_small, dur_zero, dur_large, dur_2years];
+ push_like(
+ schema.as_ref(),
+ "interval_mdn",
+ Arc::new(IntervalMonthDayNanoArray::from(v)) as ArrayRef,
+ &mut fields,
+ &mut columns,
+ );
+ }
+ {
+ let keys = Int32Array::from(vec![1, 2, 3, 0]); // NEW, PROCESSING,
DONE, UNKNOWN
+ let values = Arc::new(StringArray::from(vec![
+ "UNKNOWN",
+ "NEW",
+ "PROCESSING",
+ "DONE",
+ ])) as ArrayRef;
+ let dict = DictionaryArray::<Int32Type>::try_new(keys,
values).unwrap();
+ push_like(
+ schema.as_ref(),
+ "status",
+ Arc::new(dict) as ArrayRef,
+ &mut fields,
+ &mut columns,
+ );
+ }
+ {
+ let list_field = match
schema.field_with_name("arr_union").unwrap().data_type() {
+ DataType::List(f) => f.clone(),
+ other => panic!("arr_union should be List, got {other:?}"),
+ };
+ let uf = match list_field.data_type() {
+ DataType::Union(f, UnionMode::Dense) => f.clone(),
+ other => panic!("arr_union item should be union, got
{other:?}"),
+ };
+ let tid_l = tid_by_name(&uf, "long");
+ let tid_s = tid_by_name(&uf, "string");
+ let tid_n = tid_by_name(&uf, "null");
+ let type_ids = vec![
+ tid_l, tid_s, tid_n, tid_l, tid_n, tid_s, tid_l, tid_l, tid_s,
tid_n, tid_l,
+ ];
+ let offsets = vec![0, 0, 0, 1, 1, 1, 2, 3, 2, 2, 4];
+ let values = mk_dense_union(&uf, type_ids, offsets, |f| match
f.data_type() {
+ DataType::Int64 => {
+ Some(Arc::new(Int64Array::from(vec![1i64, -3, 0, -1, 0]))
as ArrayRef)
+ }
+ DataType::Utf8 => {
+ Some(Arc::new(StringArray::from(vec!["x", "z", "end"])) as
ArrayRef)
+ }
+ DataType::Null => Some(Arc::new(NullArray::new(3)) as
ArrayRef),
+ _ => None,
+ });
+ let list_offsets =
OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 4, 7, 8, 11]));
+ let arr = Arc::new(ListArray::try_new(list_field, list_offsets,
values, None).unwrap())
+ as ArrayRef;
+ push_like(schema.as_ref(), "arr_union", arr, &mut fields, &mut
columns);
+ }
+ {
+ let (entry_field, entries_fields, uf, is_sorted) =
+ match schema.field_with_name("map_union").unwrap().data_type()
{
+ DataType::Map(entry_field, is_sorted) => {
+ let fs = match entry_field.data_type() {
+ DataType::Struct(fs) => fs.clone(),
+ other => panic!("map entries must be struct, got
{other:?}"),
+ };
+ let val_f = fs[1].clone();
+ let uf = match val_f.data_type() {
+ DataType::Union(f, UnionMode::Dense) => f.clone(),
+ other => panic!("map value must be union, got
{other:?}"),
+ };
+ (entry_field.clone(), fs, uf, *is_sorted)
+ }
+ other => panic!("map_union should be Map, got {other:?}"),
+ };
+ let keys = StringArray::from(vec!["a", "b", "c", "neg", "pi",
"ok"]);
+ let moff = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 3,
4, 4, 6]));
+ let tid_null = tid_by_name(&uf, "null");
+ let tid_d = tid_by_name(&uf, "double");
+ let tid_s = tid_by_name(&uf, "string");
+ let type_ids = vec![tid_d, tid_null, tid_s, tid_d, tid_d, tid_s];
+ let offsets = vec![0, 0, 0, 1, 2, 1];
+ let pi_5dp = (std::f64::consts::PI * 100_000.0).trunc() /
100_000.0;
+ let vals = mk_dense_union(&uf, type_ids, offsets, |f| match
f.data_type() {
+ DataType::Float64 => {
+ Some(Arc::new(Float64Array::from(vec![1.5f64, -0.5,
pi_5dp])) as ArrayRef)
+ }
+ DataType::Utf8 => {
+ Some(Arc::new(StringArray::from(vec!["yes", "true"])) as
ArrayRef)
+ }
+ DataType::Null => Some(Arc::new(NullArray::new(2)) as
ArrayRef),
+ _ => None,
+ });
+ let entries = StructArray::new(
+ entries_fields.clone(),
+ vec![Arc::new(keys) as ArrayRef, vals],
+ None,
+ );
+ let map =
+ Arc::new(MapArray::new(entry_field, moff, entries, None,
is_sorted)) as ArrayRef;
+ push_like(schema.as_ref(), "map_union", map, &mut fields, &mut
columns);
+ }
+ {
+ let fs = match
schema.field_with_name("address").unwrap().data_type() {
+ DataType::Struct(fs) => fs.clone(),
+ other => panic!("address should be Struct, got {other:?}"),
+ };
+ let street = Arc::new(StringArray::from(vec![
+ "100 Main",
+ "",
+ "42 Galaxy Way",
+ "End Ave",
+ ])) as ArrayRef;
+ let zip = Arc::new(Int32Array::from(vec![12345, 0, 42424, 1])) as
ArrayRef;
+ let country = Arc::new(StringArray::from(vec!["US", "CA", "US",
"GB"])) as ArrayRef;
+ let arr = Arc::new(StructArray::new(fs, vec![street, zip,
country], None)) as ArrayRef;
+ push_like(schema.as_ref(), "address", arr, &mut fields, &mut
columns);
+ }
+ {
+ let fs = match
schema.field_with_name("maybe_auth").unwrap().data_type() {
+ DataType::Struct(fs) => fs.clone(),
+ other => panic!("maybe_auth should be Struct, got {other:?}"),
+ };
+ let user =
+ Arc::new(StringArray::from(vec!["alice", "bob", "carol",
"dave"])) as ArrayRef;
+ let token_values: Vec<Option<&[u8]>> = vec![
+ None, // row 1: null
+ Some(b"\x01\x02\x03".as_ref()), // row 2: bytes
+ None, // row 3: null
+ Some(b"".as_ref()), // row 4: empty bytes
+ ];
+ let token = Arc::new(BinaryArray::from(token_values)) as ArrayRef;
+ let arr = Arc::new(StructArray::new(fs, vec![user, token], None))
as ArrayRef;
+ push_like(
+ schema.as_ref(),
+ "maybe_auth",
+ arr,
+ &mut fields,
+ &mut columns,
+ );
+ }
+ {
+ let uf = match schema
+ .field_with_name("union_enum_record_array_map")
+ .unwrap()
+ .data_type()
+ {
+ DataType::Union(f, UnionMode::Dense) => f.clone(),
+ other => panic!("union_enum_record_array_map should be union,
got {other:?}"),
+ };
+ let mut tid_enum: Option<i8> = None;
+ let mut tid_rec_a: Option<i8> = None;
+ let mut tid_array: Option<i8> = None;
+ let mut tid_map: Option<i8> = None;
+ let mut map_entry_field: Option<FieldRef> = None;
+ let mut map_sorted: bool = false;
+ for (tid, f) in uf.iter() {
+ match f.data_type() {
+ DataType::Dictionary(_, _) => tid_enum = Some(tid),
+ DataType::Struct(childs)
+ if childs.len() == 2
+ && childs[0].name() == "a"
+ && childs[1].name() == "b" =>
+ {
+ tid_rec_a = Some(tid)
+ }
+ DataType::List(item) if matches!(item.data_type(),
DataType::Int64) => {
+ tid_array = Some(tid)
+ }
+ DataType::Map(ef, is_sorted) => {
+ tid_map = Some(tid);
+ map_entry_field = Some(ef.clone());
+ map_sorted = *is_sorted;
+ }
+ _ => {}
+ }
+ }
+ let (tid_enum, tid_rec_a, tid_array, tid_map) = (
+ tid_enum.unwrap(),
+ tid_rec_a.unwrap(),
+ tid_array.unwrap(),
+ tid_map.unwrap(),
+ );
+ let tids = vec![tid_enum, tid_rec_a, tid_array, tid_map];
+ let offs = vec![0, 0, 0, 0];
+ let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() {
+ DataType::Dictionary(_, _) => {
+ let keys = Int32Array::from(vec![0i32]);
+ let values =
+ Arc::new(StringArray::from(vec!["RED", "GREEN",
"BLUE"])) as ArrayRef;
+ Some(
+ Arc::new(DictionaryArray::<Int32Type>::try_new(keys,
values).unwrap())
+ as ArrayRef,
+ )
+ }
+ DataType::Struct(fs)
+ if fs.len() == 2 && fs[0].name() == "a" && fs[1].name() ==
"b" =>
+ {
+ let a = Int32Array::from(vec![7]);
+ let b = StringArray::from(vec!["rec"]);
+ Some(Arc::new(StructArray::new(
+ fs.clone(),
+ vec![Arc::new(a), Arc::new(b)],
+ None,
+ )) as ArrayRef)
+ }
+ DataType::List(field) => {
+ let values = Int64Array::from(vec![1i64, 2, 3]);
+ let offsets =
OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 3]));
+ Some(Arc::new(
+ ListArray::try_new(field.clone(), offsets,
Arc::new(values), None).unwrap(),
+ ) as ArrayRef)
+ }
+ DataType::Map(_, _) => {
+ let entry_field = map_entry_field.clone().unwrap();
+ let (key_field, val_field) = match entry_field.data_type()
{
+ DataType::Struct(fs) => (fs[0].clone(), fs[1].clone()),
+ _ => unreachable!(),
+ };
+ let keys = StringArray::from(vec!["k"]);
+ let vals = StringArray::from(vec!["v"]);
+ let entries = StructArray::new(
+ Fields::from(vec![key_field.as_ref().clone(),
val_field.as_ref().clone()]),
+ vec![Arc::new(keys) as ArrayRef, Arc::new(vals) as
ArrayRef],
+ None,
+ );
+ let offsets =
OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 1]));
+ Some(Arc::new(MapArray::new(
+ entry_field.clone(),
+ offsets,
+ entries,
+ None,
+ map_sorted,
+ )) as ArrayRef)
+ }
+ _ => None,
+ });
+ push_like(
+ schema.as_ref(),
+ "union_enum_record_array_map",
+ arr,
+ &mut fields,
+ &mut columns,
+ );
+ }
+ {
+ let uf = match schema
+ .field_with_name("union_date_or_fixed4")
+ .unwrap()
+ .data_type()
+ {
+ DataType::Union(f, UnionMode::Dense) => f.clone(),
+ other => panic!("union_date_or_fixed4 should be union, got
{other:?}"),
+ };
+ let tid_date = tid_by_dt(&uf, |dt| matches!(dt, DataType::Date32));
+ let tid_fx4 = tid_by_dt(&uf, |dt| matches!(dt,
DataType::FixedSizeBinary(4)));
+ let tids = vec![tid_date, tid_fx4, tid_date, tid_fx4];
+ let offs = vec![0, 0, 1, 1];
+ let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() {
+ DataType::Date32 =>
Some(Arc::new(Date32Array::from(vec![date_a, 0])) as ArrayRef),
+ DataType::FixedSizeBinary(4) => {
+ let it = [Some(*b"\x00\x11\x22\x33"),
Some(*b"ABCD")].into_iter();
+ Some(Arc::new(
+
FixedSizeBinaryArray::try_from_sparse_iter_with_size(it, 4).unwrap(),
+ ) as ArrayRef)
+ }
+ _ => None,
+ });
+ push_like(
+ schema.as_ref(),
+ "union_date_or_fixed4",
+ arr,
+ &mut fields,
+ &mut columns,
+ );
+ }
+ {
+ let uf = match schema
+ .field_with_name("union_interval_or_string")
+ .unwrap()
+ .data_type()
+ {
+ DataType::Union(f, UnionMode::Dense) => f.clone(),
+ other => panic!("union_interval_or_string should be union, got
{other:?}"),
+ };
+ let tid_dur = tid_by_dt(&uf, |dt| {
+ matches!(dt, DataType::Interval(IntervalUnit::MonthDayNano))
+ });
+ let tid_str = tid_by_dt(&uf, |dt| matches!(dt, DataType::Utf8));
+ let tids = vec![tid_dur, tid_str, tid_dur, tid_str];
+ let offs = vec![0, 0, 1, 1];
+ let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() {
+ DataType::Interval(IntervalUnit::MonthDayNano) =>
Some(Arc::new(
+ IntervalMonthDayNanoArray::from(vec![dur_small,
dur_large]),
+ )
+ as ArrayRef),
+ DataType::Utf8 => Some(Arc::new(StringArray::from(vec![
+ "duration-as-text",
+ "iso-8601-period-P1Y",
+ ])) as ArrayRef),
+ _ => None,
+ });
+ push_like(
+ schema.as_ref(),
+ "union_interval_or_string",
+ arr,
+ &mut fields,
+ &mut columns,
+ );
+ }
+ {
+ let uf = match schema
+ .field_with_name("union_uuid_or_fixed10")
+ .unwrap()
+ .data_type()
+ {
+ DataType::Union(f, UnionMode::Dense) => f.clone(),
+ other => panic!("union_uuid_or_fixed10 should be union, got
{other:?}"),
+ };
+ let tid_uuid = tid_by_dt(&uf, |dt| matches!(dt,
DataType::FixedSizeBinary(16)));
+ let tid_fx10 = tid_by_dt(&uf, |dt| matches!(dt,
DataType::FixedSizeBinary(10)));
+ let tids = vec![tid_uuid, tid_fx10, tid_uuid, tid_fx10];
+ let offs = vec![0, 0, 1, 1];
+ let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() {
+ DataType::FixedSizeBinary(16) => {
+ let it = [Some(uuid1), Some(uuid2)].into_iter();
+ Some(Arc::new(
+
FixedSizeBinaryArray::try_from_sparse_iter_with_size(it, 16).unwrap(),
+ ) as ArrayRef)
+ }
+ DataType::FixedSizeBinary(10) => {
+ let fx10_a = [0xAAu8; 10];
+ let fx10_b = [0x00u8, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66,
0x77, 0x88, 0x99];
+ let it = [Some(fx10_a), Some(fx10_b)].into_iter();
+ Some(Arc::new(
+
FixedSizeBinaryArray::try_from_sparse_iter_with_size(it, 10).unwrap(),
+ ) as ArrayRef)
+ }
+ _ => None,
+ });
+ push_like(
+ schema.as_ref(),
+ "union_uuid_or_fixed10",
+ arr,
+ &mut fields,
+ &mut columns,
+ );
+ }
+ {
+ let list_field = match schema
+ .field_with_name("array_records_with_union")
+ .unwrap()
+ .data_type()
+ {
+ DataType::List(f) => f.clone(),
+ other => panic!("array_records_with_union should be List, got
{other:?}"),
+ };
+ let kv_fields = match list_field.data_type() {
+ DataType::Struct(fs) => fs.clone(),
+ other => panic!("array_records_with_union items must be
Struct, got {other:?}"),
+ };
+ let val_field = kv_fields
+ .iter()
+ .find(|f| f.name() == "val")
+ .unwrap()
+ .clone();
+ let uf = match val_field.data_type() {
+ DataType::Union(f, UnionMode::Dense) => f.clone(),
+ other => panic!("KV.val should be union, got {other:?}"),
+ };
+ let keys = Arc::new(StringArray::from(vec!["k1", "k2", "k", "k3",
"x"])) as ArrayRef;
+ let tid_null = tid_by_name(&uf, "null");
+ let tid_i = tid_by_name(&uf, "int");
+ let tid_l = tid_by_name(&uf, "long");
+ let type_ids = vec![tid_i, tid_null, tid_l, tid_null, tid_i];
+ let offsets = vec![0, 0, 0, 1, 1];
+ let vals = mk_dense_union(&uf, type_ids, offsets, |f| match
f.data_type() {
+ DataType::Int32 => Some(Arc::new(Int32Array::from(vec![5,
-5])) as ArrayRef),
+ DataType::Int64 =>
Some(Arc::new(Int64Array::from(vec![99i64])) as ArrayRef),
+ DataType::Null => Some(Arc::new(NullArray::new(2)) as
ArrayRef),
+ _ => None,
+ });
+ let values_struct =
+ Arc::new(StructArray::new(kv_fields.clone(), vec![keys, vals],
None)) as ArrayRef;
+ let list_offsets =
OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2, 3, 4, 5]));
+ let arr = Arc::new(
+ ListArray::try_new(list_field, list_offsets, values_struct,
None).unwrap(),
+ ) as ArrayRef;
+ push_like(
+ schema.as_ref(),
+ "array_records_with_union",
+ arr,
+ &mut fields,
+ &mut columns,
+ );
+ }
+ {
+ let uf = match schema
+ .field_with_name("union_map_or_array_int")
+ .unwrap()
+ .data_type()
+ {
+ DataType::Union(f, UnionMode::Dense) => f.clone(),
+ other => panic!("union_map_or_array_int should be union, got
{other:?}"),
+ };
+ let tid_map = tid_by_dt(&uf, |dt| matches!(dt, DataType::Map(_,
_)));
+ let tid_list = tid_by_dt(&uf, |dt| matches!(dt,
DataType::List(_)));
+ let map_child: ArrayRef = {
+ let (entry_field, is_sorted) = match uf
+ .iter()
+ .find(|(tid, _)| *tid == tid_map)
+ .unwrap()
+ .1
+ .data_type()
+ {
+ DataType::Map(ef, is_sorted) => (ef.clone(), *is_sorted),
+ _ => unreachable!(),
+ };
+ let (key_field, val_field) = match entry_field.data_type() {
+ DataType::Struct(fs) => (fs[0].clone(), fs[1].clone()),
+ _ => unreachable!(),
+ };
+ let keys = StringArray::from(vec!["x", "y", "only"]);
+ let vals = Int32Array::from(vec![1, 2, 10]);
+ let entries = StructArray::new(
+ Fields::from(vec![key_field.as_ref().clone(),
val_field.as_ref().clone()]),
+ vec![Arc::new(keys) as ArrayRef, Arc::new(vals) as
ArrayRef],
+ None,
+ );
+ let moff = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0,
2, 3]));
+ Arc::new(MapArray::new(entry_field, moff, entries, None,
is_sorted)) as ArrayRef
+ };
+ let list_child: ArrayRef = {
+ let list_field = match uf
+ .iter()
+ .find(|(tid, _)| *tid == tid_list)
+ .unwrap()
+ .1
+ .data_type()
+ {
+ DataType::List(f) => f.clone(),
+ _ => unreachable!(),
+ };
+ let values = Int32Array::from(vec![1, 2, 3, 0]);
+ let offsets =
OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 3, 4]));
+ Arc::new(ListArray::try_new(list_field, offsets,
Arc::new(values), None).unwrap())
+ as ArrayRef
+ };
+ let tids = vec![tid_map, tid_list, tid_map, tid_list];
+ let offs = vec![0, 0, 1, 1];
+ let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() {
+ DataType::Map(_, _) => Some(map_child.clone()),
+ DataType::List(_) => Some(list_child.clone()),
+ _ => None,
+ });
+ push_like(
+ schema.as_ref(),
+ "union_map_or_array_int",
+ arr,
+ &mut fields,
+ &mut columns,
+ );
+ }
+ push_like(
+ schema.as_ref(),
+ "renamed_with_default",
+ Arc::new(Int32Array::from(vec![100, 42, 7, 42])) as ArrayRef,
+ &mut fields,
+ &mut columns,
+ );
+ {
+ let fs = match
schema.field_with_name("person").unwrap().data_type() {
+ DataType::Struct(fs) => fs.clone(),
+ other => panic!("person should be Struct, got {other:?}"),
+ };
+ let name =
+ Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol",
"Dave"])) as ArrayRef;
+ let age = Arc::new(Int32Array::from(vec![30, 0, 25, 41])) as
ArrayRef;
+ let arr = Arc::new(StructArray::new(fs, vec![name, age], None)) as
ArrayRef;
+ push_like(schema.as_ref(), "person", arr, &mut fields, &mut
columns);
+ }
+ let expected =
+ RecordBatch::try_new(Arc::new(Schema::new(Fields::from(fields))),
columns).unwrap();
+ assert_eq!(
+ expected, batch,
+ "entire RecordBatch mismatch (schema, all columns, all rows)"
+ );
+ }
+
+ #[test]
+ fn comprehensive_e2e_resolution_test() {
Review Comment:
100%. I'll probably come back around at some point and enhance the
maintainability of these tests. I just wanted to throw in every edge case I
could think of.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]