jecsand838 commented on code in PR #8492:
URL: https://github.com/apache/arrow-rs/pull/8492#discussion_r2404161572


##########
arrow-avro/src/reader/mod.rs:
##########
@@ -6051,4 +6167,2159 @@ mod test {
             "INACTIVE"
         );
     }
+
+    #[test]
+    fn comprehensive_e2e_test() {
+        let path = "test/data/comprehensive_e2e.avro";
+        let batch = read_file(path, 1024, false);
+        let schema = batch.schema();
+
+        #[inline]
+        fn tid_by_name(fields: &UnionFields, want: &str) -> i8 {
+            for (tid, f) in fields.iter() {
+                if f.name() == want {
+                    return tid;
+                }
+            }
+            panic!("union child '{want}' not found");
+        }
+
+        #[inline]
+        fn tid_by_dt(fields: &UnionFields, pred: impl Fn(&DataType) -> bool) 
-> i8 {
+            for (tid, f) in fields.iter() {
+                if pred(f.data_type()) {
+                    return tid;
+                }
+            }
+            panic!("no union child matches predicate");
+        }
+
+        fn mk_dense_union(
+            fields: &UnionFields,
+            type_ids: Vec<i8>,
+            offsets: Vec<i32>,
+            provide: impl Fn(&Field) -> Option<ArrayRef>,
+        ) -> ArrayRef {
+            fn empty_child_for(dt: &DataType) -> Arc<dyn Array> {
+                match dt {
+                    DataType::Null => Arc::new(NullArray::new(0)),
+                    DataType::Boolean => 
Arc::new(BooleanArray::from(Vec::<bool>::new())),
+                    DataType::Int32 => 
Arc::new(Int32Array::from(Vec::<i32>::new())),
+                    DataType::Int64 => 
Arc::new(Int64Array::from(Vec::<i64>::new())),
+                    DataType::Float32 => 
Arc::new(Float32Array::from(Vec::<f32>::new())),
+                    DataType::Float64 => 
Arc::new(Float64Array::from(Vec::<f64>::new())),
+                    DataType::Binary => 
Arc::new(BinaryArray::from(Vec::<&[u8]>::new())),
+                    DataType::Utf8 => 
Arc::new(StringArray::from(Vec::<&str>::new())),
+                    DataType::Date32 => 
Arc::new(Date32Array::from(Vec::<i32>::new())),
+                    DataType::Time32(arrow_schema::TimeUnit::Millisecond) => {
+                        
Arc::new(Time32MillisecondArray::from(Vec::<i32>::new()))
+                    }
+                    DataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
+                        
Arc::new(Time64MicrosecondArray::from(Vec::<i64>::new()))
+                    }
+                    DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, 
tz) => {
+                        let a = 
TimestampMillisecondArray::from(Vec::<i64>::new());
+                        Arc::new(if let Some(tz) = tz {
+                            a.with_timezone(tz.clone())
+                        } else {
+                            a
+                        })
+                    }
+                    DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, 
tz) => {
+                        let a = 
TimestampMicrosecondArray::from(Vec::<i64>::new());
+                        Arc::new(if let Some(tz) = tz {
+                            a.with_timezone(tz.clone())
+                        } else {
+                            a
+                        })
+                    }
+                    DataType::Interval(IntervalUnit::MonthDayNano) => Arc::new(
+                        
IntervalMonthDayNanoArray::from(Vec::<IntervalMonthDayNano>::new()),
+                    ),
+                    DataType::FixedSizeBinary(sz) => Arc::new(
+                        FixedSizeBinaryArray::try_from_sparse_iter_with_size(
+                            std::iter::empty::<Option<Vec<u8>>>(),
+                            *sz,
+                        )
+                        .unwrap(),
+                    ),
+                    DataType::Dictionary(_, _) => {
+                        let keys = Int32Array::from(Vec::<i32>::new());
+                        let values = 
Arc::new(StringArray::from(Vec::<&str>::new()));
+                        Arc::new(DictionaryArray::<Int32Type>::try_new(keys, 
values).unwrap())
+                    }
+                    DataType::Struct(fields) => {
+                        let children: Vec<ArrayRef> = fields
+                            .iter()
+                            .map(|f| empty_child_for(f.data_type()) as 
ArrayRef)
+                            .collect();
+                        Arc::new(StructArray::new(fields.clone(), children, 
None))
+                    }
+                    DataType::List(field) => {
+                        let offsets = 
OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0]));
+                        Arc::new(
+                            ListArray::try_new(
+                                field.clone(),
+                                offsets,
+                                empty_child_for(field.data_type()),
+                                None,
+                            )
+                            .unwrap(),
+                        )
+                    }
+                    DataType::Map(entry_field, is_sorted) => {
+                        let (key_field, val_field) = match 
entry_field.data_type() {
+                            DataType::Struct(fs) => (fs[0].clone(), 
fs[1].clone()),
+                            other => panic!("unexpected map entries type: 
{other:?}"),
+                        };
+                        let keys = StringArray::from(Vec::<&str>::new());
+                        let vals: ArrayRef = match val_field.data_type() {
+                            DataType::Null => Arc::new(NullArray::new(0)) as 
ArrayRef,
+                            DataType::Boolean => {
+                                
Arc::new(BooleanArray::from(Vec::<bool>::new())) as ArrayRef
+                            }
+                            DataType::Int32 => {
+                                Arc::new(Int32Array::from(Vec::<i32>::new())) 
as ArrayRef
+                            }
+                            DataType::Int64 => {
+                                Arc::new(Int64Array::from(Vec::<i64>::new())) 
as ArrayRef
+                            }
+                            DataType::Float32 => {
+                                
Arc::new(Float32Array::from(Vec::<f32>::new())) as ArrayRef
+                            }
+                            DataType::Float64 => {
+                                
Arc::new(Float64Array::from(Vec::<f64>::new())) as ArrayRef
+                            }
+                            DataType::Utf8 => {
+                                
Arc::new(StringArray::from(Vec::<&str>::new())) as ArrayRef
+                            }
+                            DataType::Binary => {
+                                
Arc::new(BinaryArray::from(Vec::<&[u8]>::new())) as ArrayRef
+                            }
+                            DataType::Union(uf, _) => {
+                                let children: Vec<ArrayRef> = uf
+                                    .iter()
+                                    .map(|(_, f)| 
empty_child_for(f.data_type()))
+                                    .collect();
+                                Arc::new(
+                                    UnionArray::try_new(
+                                        uf.clone(),
+                                        
ScalarBuffer::<i8>::from(Vec::<i8>::new()),
+                                        
Some(ScalarBuffer::<i32>::from(Vec::<i32>::new())),
+                                        children,
+                                    )
+                                    .unwrap(),
+                                ) as ArrayRef
+                            }
+                            other => panic!("unsupported map value type: 
{other:?}"),
+                        };
+                        let entries = StructArray::new(
+                            Fields::from(vec![
+                                key_field.as_ref().clone(),
+                                val_field.as_ref().clone(),
+                            ]),
+                            vec![Arc::new(keys) as ArrayRef, vals],
+                            None,
+                        );
+                        let offsets = 
OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0]));
+                        Arc::new(MapArray::new(
+                            entry_field.clone(),
+                            offsets,
+                            entries,
+                            None,
+                            *is_sorted,
+                        ))
+                    }
+                    other => panic!("empty_child_for: unhandled type 
{other:?}"),
+                }
+            }
+            let children: Vec<ArrayRef> = fields
+                .iter()
+                .map(|(_, f)| provide(f).unwrap_or_else(|| 
empty_child_for(f.data_type())))
+                .collect();
+            Arc::new(
+                UnionArray::try_new(
+                    fields.clone(),
+                    ScalarBuffer::<i8>::from(type_ids),
+                    Some(ScalarBuffer::<i32>::from(offsets)),
+                    children,
+                )
+                .unwrap(),
+            ) as ArrayRef
+        }
+
+        #[inline]
+        fn uuid16_from_str(s: &str) -> [u8; 16] {
+            let mut out = [0u8; 16];
+            let mut idx = 0usize;
+            let mut hi: Option<u8> = None;
+            for ch in s.chars() {
+                if ch == '-' {
+                    continue;
+                }
+                let v = ch.to_digit(16).expect("invalid hex digit in UUID") as 
u8;
+                if let Some(h) = hi {
+                    out[idx] = (h << 4) | v;
+                    idx += 1;
+                    hi = None;
+                } else {
+                    hi = Some(v);
+                }
+            }
+            assert_eq!(idx, 16, "UUID must decode to 16 bytes");
+            out
+        }
+        let date_a: i32 = 19_000; // 2022-01-08
+        let time_ms_a: i32 = 12 * 3_600_000 + 34 * 60_000 + 56_000 + 789;
+        let time_us_eod: i64 = 86_400_000_000 - 1;
+        let ts_ms_2024_01_01: i64 = 1_704_067_200_000; // 2024-01-01T00:00:00Z
+        let ts_us_2024_01_01: i64 = ts_ms_2024_01_01 * 1_000;
+        let dur_small = IntervalMonthDayNanoType::make_value(1, 2, 
3_000_000_000);
+        let dur_zero = IntervalMonthDayNanoType::make_value(0, 0, 0);
+        let dur_large =
+            IntervalMonthDayNanoType::make_value(12, 31, ((86_400_000 - 1) as 
i64) * 1_000_000);
+        let dur_2years = IntervalMonthDayNanoType::make_value(24, 0, 0);
+        let uuid1 = uuid16_from_str("fe7bc30b-4ce8-4c5e-b67c-2234a2d38e66");
+        let uuid2 = uuid16_from_str("0826cc06-d2e3-4599-b4ad-af5fa6905cdb");
+
+        #[inline]
+        fn push_like(
+            reader_schema: &arrow_schema::Schema,
+            name: &str,
+            arr: ArrayRef,
+            fields: &mut Vec<FieldRef>,
+            cols: &mut Vec<ArrayRef>,
+        ) {
+            let src = reader_schema
+                .field_with_name(name)
+                .unwrap_or_else(|_| panic!("source schema missing field 
'{name}'"));
+            let mut f = Field::new(name, arr.data_type().clone(), 
src.is_nullable());
+            let md = src.metadata();
+            if !md.is_empty() {
+                f = f.with_metadata(md.clone());
+            }
+            fields.push(Arc::new(f));
+            cols.push(arr);
+        }
+
+        let mut fields: Vec<FieldRef> = Vec::new();
+        let mut columns: Vec<ArrayRef> = Vec::new();
+        push_like(
+            schema.as_ref(),
+            "id",
+            Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
+            &mut fields,
+            &mut columns,
+        );
+        push_like(
+            schema.as_ref(),
+            "flag",
+            Arc::new(BooleanArray::from(vec![true, false, true, false])) as 
ArrayRef,
+            &mut fields,
+            &mut columns,
+        );
+        push_like(
+            schema.as_ref(),
+            "ratio_f32",
+            Arc::new(Float32Array::from(vec![1.25f32, -0.0, 3.5, 9.75])) as 
ArrayRef,
+            &mut fields,
+            &mut columns,
+        );
+        push_like(
+            schema.as_ref(),
+            "ratio_f64",
+            Arc::new(Float64Array::from(vec![2.5f64, -1.0, 7.0, -2.25])) as 
ArrayRef,
+            &mut fields,
+            &mut columns,
+        );
+        push_like(
+            schema.as_ref(),
+            "count_i32",
+            Arc::new(Int32Array::from(vec![7, -1, 0, 123])) as ArrayRef,
+            &mut fields,
+            &mut columns,
+        );
+        push_like(
+            schema.as_ref(),
+            "count_i64",
+            Arc::new(Int64Array::from(vec![
+                7_000_000_000i64,
+                -2,
+                0,
+                -9_876_543_210i64,
+            ])) as ArrayRef,
+            &mut fields,
+            &mut columns,
+        );
+        push_like(
+            schema.as_ref(),
+            "opt_i32_nullfirst",
+            Arc::new(Int32Array::from(vec![None, Some(42), None, Some(0)])) as 
ArrayRef,
+            &mut fields,
+            &mut columns,
+        );
+        push_like(
+            schema.as_ref(),
+            "opt_str_nullsecond",
+            Arc::new(StringArray::from(vec![
+                Some("alpha"),
+                None,
+                Some("s3"),
+                Some(""),
+            ])) as ArrayRef,
+            &mut fields,
+            &mut columns,
+        );
+        {
+            let uf = match schema
+                .field_with_name("tri_union_prim")
+                .unwrap()
+                .data_type()
+            {
+                DataType::Union(f, UnionMode::Dense) => f.clone(),
+                other => panic!("tri_union_prim should be dense union, got 
{other:?}"),
+            };
+            let tid_i = tid_by_name(&uf, "int");
+            let tid_s = tid_by_name(&uf, "string");
+            let tid_b = tid_by_name(&uf, "boolean");
+            let tids = vec![tid_i, tid_s, tid_b, tid_s];
+            let offs = vec![0, 0, 0, 1];
+            let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() {
+                DataType::Int32 => Some(Arc::new(Int32Array::from(vec![0])) as 
ArrayRef),
+                DataType::Utf8 => Some(Arc::new(StringArray::from(vec!["hi", 
""])) as ArrayRef),
+                DataType::Boolean => 
Some(Arc::new(BooleanArray::from(vec![true])) as ArrayRef),
+                _ => None,
+            });
+            push_like(
+                schema.as_ref(),
+                "tri_union_prim",
+                arr,
+                &mut fields,
+                &mut columns,
+            );
+        }
+
+        push_like(
+            schema.as_ref(),
+            "str_utf8",
+            Arc::new(StringArray::from(vec!["hello", "", "world", "✓ 
unicode"])) as ArrayRef,
+            &mut fields,
+            &mut columns,
+        );
+        push_like(
+            schema.as_ref(),
+            "raw_bytes",
+            Arc::new(BinaryArray::from(vec![
+                b"\x00\x01".as_ref(),
+                b"".as_ref(),
+                b"\xFF\x00".as_ref(),
+                b"\x10\x20\x30\x40".as_ref(),
+            ])) as ArrayRef,
+            &mut fields,
+            &mut columns,
+        );
+        {
+            let it = [
+                Some(*b"0123456789ABCDEF"),
+                Some([0u8; 16]),
+                Some(*b"ABCDEFGHIJKLMNOP"),
+                Some([0xAA; 16]),
+            ]
+            .into_iter();
+            let arr =
+                
Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size(it, 16).unwrap())
+                    as ArrayRef;
+            push_like(
+                schema.as_ref(),
+                "fx16_plain",
+                arr,
+                &mut fields,
+                &mut columns,
+            );
+        }
+        {
+            #[cfg(feature = "small_decimals")]
+            let dec10_2 = Arc::new(
+                Decimal64Array::from_iter_values([123456i64, -1, 0, 
9_999_999_999i64])
+                    .with_precision_and_scale(10, 2)
+                    .unwrap(),
+            ) as ArrayRef;
+            #[cfg(not(feature = "small_decimals"))]
+            let dec10_2 = Arc::new(
+                Decimal128Array::from_iter_values([123456i128, -1, 0, 
9_999_999_999i128])
+                    .with_precision_and_scale(10, 2)
+                    .unwrap(),
+            ) as ArrayRef;
+            push_like(
+                schema.as_ref(),
+                "dec_bytes_s10_2",
+                dec10_2,
+                &mut fields,
+                &mut columns,
+            );
+        }
+        {
+            #[cfg(feature = "small_decimals")]
+            let dec20_4 = Arc::new(
+                Decimal128Array::from_iter_values([1_234_567_891_234i128, 
-420_000i128, 0, -1i128])
+                    .with_precision_and_scale(20, 4)
+                    .unwrap(),
+            ) as ArrayRef;
+            #[cfg(not(feature = "small_decimals"))]
+            let dec20_4 = Arc::new(
+                Decimal128Array::from_iter_values([1_234_567_891_234i128, 
-420_000i128, 0, -1i128])
+                    .with_precision_and_scale(20, 4)
+                    .unwrap(),
+            ) as ArrayRef;
+            push_like(
+                schema.as_ref(),
+                "dec_fix_s20_4",
+                dec20_4,
+                &mut fields,
+                &mut columns,
+            );
+        }
+        {
+            let it = [Some(uuid1), Some(uuid2), Some(uuid1), 
Some(uuid2)].into_iter();
+            let arr =
+                
Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size(it, 16).unwrap())
+                    as ArrayRef;
+            push_like(schema.as_ref(), "uuid_str", arr, &mut fields, &mut 
columns);
+        }
+        push_like(
+            schema.as_ref(),
+            "d_date",
+            Arc::new(Date32Array::from(vec![date_a, 0, 1, 365])) as ArrayRef,
+            &mut fields,
+            &mut columns,
+        );
+        push_like(
+            schema.as_ref(),
+            "t_millis",
+            Arc::new(Time32MillisecondArray::from(vec![
+                time_ms_a,
+                0,
+                1,
+                86_400_000 - 1,
+            ])) as ArrayRef,
+            &mut fields,
+            &mut columns,
+        );
+        push_like(
+            schema.as_ref(),
+            "t_micros",
+            Arc::new(Time64MicrosecondArray::from(vec![
+                time_us_eod,
+                0,
+                1,
+                1_000_000,
+            ])) as ArrayRef,
+            &mut fields,
+            &mut columns,
+        );
+        {
+            let a = TimestampMillisecondArray::from(vec![
+                ts_ms_2024_01_01,
+                -1,
+                ts_ms_2024_01_01 + 123,
+                0,
+            ])
+            .with_timezone("+00:00");
+            push_like(
+                schema.as_ref(),
+                "ts_millis_utc",
+                Arc::new(a) as ArrayRef,
+                &mut fields,
+                &mut columns,
+            );
+        }
+        {
+            let a = TimestampMicrosecondArray::from(vec![
+                ts_us_2024_01_01,
+                1,
+                ts_us_2024_01_01 + 456,
+                0,
+            ])
+            .with_timezone("+00:00");
+            push_like(
+                schema.as_ref(),
+                "ts_micros_utc",
+                Arc::new(a) as ArrayRef,
+                &mut fields,
+                &mut columns,
+            );
+        }
+        push_like(
+            schema.as_ref(),
+            "ts_millis_local",
+            Arc::new(TimestampMillisecondArray::from(vec![
+                ts_ms_2024_01_01 + 86_400_000,
+                0,
+                ts_ms_2024_01_01 + 789,
+                123_456_789,
+            ])) as ArrayRef,
+            &mut fields,
+            &mut columns,
+        );
+        push_like(
+            schema.as_ref(),
+            "ts_micros_local",
+            Arc::new(TimestampMicrosecondArray::from(vec![
+                ts_us_2024_01_01 + 123_456,
+                0,
+                ts_us_2024_01_01 + 101_112,
+                987_654_321,
+            ])) as ArrayRef,
+            &mut fields,
+            &mut columns,
+        );
+        {
+            let v = vec![dur_small, dur_zero, dur_large, dur_2years];
+            push_like(
+                schema.as_ref(),
+                "interval_mdn",
+                Arc::new(IntervalMonthDayNanoArray::from(v)) as ArrayRef,
+                &mut fields,
+                &mut columns,
+            );
+        }
+        {
+            let keys = Int32Array::from(vec![1, 2, 3, 0]); // NEW, PROCESSING, 
DONE, UNKNOWN
+            let values = Arc::new(StringArray::from(vec![
+                "UNKNOWN",
+                "NEW",
+                "PROCESSING",
+                "DONE",
+            ])) as ArrayRef;
+            let dict = DictionaryArray::<Int32Type>::try_new(keys, 
values).unwrap();
+            push_like(
+                schema.as_ref(),
+                "status",
+                Arc::new(dict) as ArrayRef,
+                &mut fields,
+                &mut columns,
+            );
+        }
+        {
+            let list_field = match 
schema.field_with_name("arr_union").unwrap().data_type() {
+                DataType::List(f) => f.clone(),
+                other => panic!("arr_union should be List, got {other:?}"),
+            };
+            let uf = match list_field.data_type() {
+                DataType::Union(f, UnionMode::Dense) => f.clone(),
+                other => panic!("arr_union item should be union, got 
{other:?}"),
+            };
+            let tid_l = tid_by_name(&uf, "long");
+            let tid_s = tid_by_name(&uf, "string");
+            let tid_n = tid_by_name(&uf, "null");
+            let type_ids = vec![
+                tid_l, tid_s, tid_n, tid_l, tid_n, tid_s, tid_l, tid_l, tid_s, 
tid_n, tid_l,
+            ];
+            let offsets = vec![0, 0, 0, 1, 1, 1, 2, 3, 2, 2, 4];
+            let values = mk_dense_union(&uf, type_ids, offsets, |f| match 
f.data_type() {
+                DataType::Int64 => {
+                    Some(Arc::new(Int64Array::from(vec![1i64, -3, 0, -1, 0])) 
as ArrayRef)
+                }
+                DataType::Utf8 => {
+                    Some(Arc::new(StringArray::from(vec!["x", "z", "end"])) as 
ArrayRef)
+                }
+                DataType::Null => Some(Arc::new(NullArray::new(3)) as 
ArrayRef),
+                _ => None,
+            });
+            let list_offsets = 
OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 4, 7, 8, 11]));
+            let arr = Arc::new(ListArray::try_new(list_field, list_offsets, 
values, None).unwrap())
+                as ArrayRef;
+            push_like(schema.as_ref(), "arr_union", arr, &mut fields, &mut 
columns);
+        }
+        {
+            let (entry_field, entries_fields, uf, is_sorted) =
+                match schema.field_with_name("map_union").unwrap().data_type() 
{
+                    DataType::Map(entry_field, is_sorted) => {
+                        let fs = match entry_field.data_type() {
+                            DataType::Struct(fs) => fs.clone(),
+                            other => panic!("map entries must be struct, got 
{other:?}"),
+                        };
+                        let val_f = fs[1].clone();
+                        let uf = match val_f.data_type() {
+                            DataType::Union(f, UnionMode::Dense) => f.clone(),
+                            other => panic!("map value must be union, got 
{other:?}"),
+                        };
+                        (entry_field.clone(), fs, uf, *is_sorted)
+                    }
+                    other => panic!("map_union should be Map, got {other:?}"),
+                };
+            let keys = StringArray::from(vec!["a", "b", "c", "neg", "pi", 
"ok"]);
+            let moff = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 3, 
4, 4, 6]));
+            let tid_null = tid_by_name(&uf, "null");
+            let tid_d = tid_by_name(&uf, "double");
+            let tid_s = tid_by_name(&uf, "string");
+            let type_ids = vec![tid_d, tid_null, tid_s, tid_d, tid_d, tid_s];
+            let offsets = vec![0, 0, 0, 1, 2, 1];
+            let pi_5dp = (std::f64::consts::PI * 100_000.0).trunc() / 
100_000.0;
+            let vals = mk_dense_union(&uf, type_ids, offsets, |f| match 
f.data_type() {
+                DataType::Float64 => {
+                    Some(Arc::new(Float64Array::from(vec![1.5f64, -0.5, 
pi_5dp])) as ArrayRef)
+                }
+                DataType::Utf8 => {
+                    Some(Arc::new(StringArray::from(vec!["yes", "true"])) as 
ArrayRef)
+                }
+                DataType::Null => Some(Arc::new(NullArray::new(2)) as 
ArrayRef),
+                _ => None,
+            });
+            let entries = StructArray::new(
+                entries_fields.clone(),
+                vec![Arc::new(keys) as ArrayRef, vals],
+                None,
+            );
+            let map =
+                Arc::new(MapArray::new(entry_field, moff, entries, None, 
is_sorted)) as ArrayRef;
+            push_like(schema.as_ref(), "map_union", map, &mut fields, &mut 
columns);
+        }
+        {
+            let fs = match 
schema.field_with_name("address").unwrap().data_type() {
+                DataType::Struct(fs) => fs.clone(),
+                other => panic!("address should be Struct, got {other:?}"),
+            };
+            let street = Arc::new(StringArray::from(vec![
+                "100 Main",
+                "",
+                "42 Galaxy Way",
+                "End Ave",
+            ])) as ArrayRef;
+            let zip = Arc::new(Int32Array::from(vec![12345, 0, 42424, 1])) as 
ArrayRef;
+            let country = Arc::new(StringArray::from(vec!["US", "CA", "US", 
"GB"])) as ArrayRef;
+            let arr = Arc::new(StructArray::new(fs, vec![street, zip, 
country], None)) as ArrayRef;
+            push_like(schema.as_ref(), "address", arr, &mut fields, &mut 
columns);
+        }
+        {
+            let fs = match 
schema.field_with_name("maybe_auth").unwrap().data_type() {
+                DataType::Struct(fs) => fs.clone(),
+                other => panic!("maybe_auth should be Struct, got {other:?}"),
+            };
+            let user =
+                Arc::new(StringArray::from(vec!["alice", "bob", "carol", 
"dave"])) as ArrayRef;
+            let token_values: Vec<Option<&[u8]>> = vec![
+                None,                           // row 1: null
+                Some(b"\x01\x02\x03".as_ref()), // row 2: bytes
+                None,                           // row 3: null
+                Some(b"".as_ref()),             // row 4: empty bytes
+            ];
+            let token = Arc::new(BinaryArray::from(token_values)) as ArrayRef;
+            let arr = Arc::new(StructArray::new(fs, vec![user, token], None)) 
as ArrayRef;
+            push_like(
+                schema.as_ref(),
+                "maybe_auth",
+                arr,
+                &mut fields,
+                &mut columns,
+            );
+        }
+        {
+            let uf = match schema
+                .field_with_name("union_enum_record_array_map")
+                .unwrap()
+                .data_type()
+            {
+                DataType::Union(f, UnionMode::Dense) => f.clone(),
+                other => panic!("union_enum_record_array_map should be union, 
got {other:?}"),
+            };
+            let mut tid_enum: Option<i8> = None;
+            let mut tid_rec_a: Option<i8> = None;
+            let mut tid_array: Option<i8> = None;
+            let mut tid_map: Option<i8> = None;
+            let mut map_entry_field: Option<FieldRef> = None;
+            let mut map_sorted: bool = false;
+            for (tid, f) in uf.iter() {
+                match f.data_type() {
+                    DataType::Dictionary(_, _) => tid_enum = Some(tid),
+                    DataType::Struct(childs)
+                        if childs.len() == 2
+                            && childs[0].name() == "a"
+                            && childs[1].name() == "b" =>
+                    {
+                        tid_rec_a = Some(tid)
+                    }
+                    DataType::List(item) if matches!(item.data_type(), 
DataType::Int64) => {
+                        tid_array = Some(tid)
+                    }
+                    DataType::Map(ef, is_sorted) => {
+                        tid_map = Some(tid);
+                        map_entry_field = Some(ef.clone());
+                        map_sorted = *is_sorted;
+                    }
+                    _ => {}
+                }
+            }
+            let (tid_enum, tid_rec_a, tid_array, tid_map) = (
+                tid_enum.unwrap(),
+                tid_rec_a.unwrap(),
+                tid_array.unwrap(),
+                tid_map.unwrap(),
+            );
+            let tids = vec![tid_enum, tid_rec_a, tid_array, tid_map];
+            let offs = vec![0, 0, 0, 0];
+            let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() {
+                DataType::Dictionary(_, _) => {
+                    let keys = Int32Array::from(vec![0i32]);
+                    let values =
+                        Arc::new(StringArray::from(vec!["RED", "GREEN", 
"BLUE"])) as ArrayRef;
+                    Some(
+                        Arc::new(DictionaryArray::<Int32Type>::try_new(keys, 
values).unwrap())
+                            as ArrayRef,
+                    )
+                }
+                DataType::Struct(fs)
+                    if fs.len() == 2 && fs[0].name() == "a" && fs[1].name() == 
"b" =>
+                {
+                    let a = Int32Array::from(vec![7]);
+                    let b = StringArray::from(vec!["rec"]);
+                    Some(Arc::new(StructArray::new(
+                        fs.clone(),
+                        vec![Arc::new(a), Arc::new(b)],
+                        None,
+                    )) as ArrayRef)
+                }
+                DataType::List(field) => {
+                    let values = Int64Array::from(vec![1i64, 2, 3]);
+                    let offsets = 
OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 3]));
+                    Some(Arc::new(
+                        ListArray::try_new(field.clone(), offsets, 
Arc::new(values), None).unwrap(),
+                    ) as ArrayRef)
+                }
+                DataType::Map(_, _) => {
+                    let entry_field = map_entry_field.clone().unwrap();
+                    let (key_field, val_field) = match entry_field.data_type() 
{
+                        DataType::Struct(fs) => (fs[0].clone(), fs[1].clone()),
+                        _ => unreachable!(),
+                    };
+                    let keys = StringArray::from(vec!["k"]);
+                    let vals = StringArray::from(vec!["v"]);
+                    let entries = StructArray::new(
+                        Fields::from(vec![key_field.as_ref().clone(), 
val_field.as_ref().clone()]),
+                        vec![Arc::new(keys) as ArrayRef, Arc::new(vals) as 
ArrayRef],
+                        None,
+                    );
+                    let offsets = 
OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 1]));
+                    Some(Arc::new(MapArray::new(
+                        entry_field.clone(),
+                        offsets,
+                        entries,
+                        None,
+                        map_sorted,
+                    )) as ArrayRef)
+                }
+                _ => None,
+            });
+            push_like(
+                schema.as_ref(),
+                "union_enum_record_array_map",
+                arr,
+                &mut fields,
+                &mut columns,
+            );
+        }
+        {
+            let uf = match schema
+                .field_with_name("union_date_or_fixed4")
+                .unwrap()
+                .data_type()
+            {
+                DataType::Union(f, UnionMode::Dense) => f.clone(),
+                other => panic!("union_date_or_fixed4 should be union, got 
{other:?}"),
+            };
+            let tid_date = tid_by_dt(&uf, |dt| matches!(dt, DataType::Date32));
+            let tid_fx4 = tid_by_dt(&uf, |dt| matches!(dt, 
DataType::FixedSizeBinary(4)));
+            let tids = vec![tid_date, tid_fx4, tid_date, tid_fx4];
+            let offs = vec![0, 0, 1, 1];
+            let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() {
+                DataType::Date32 => 
Some(Arc::new(Date32Array::from(vec![date_a, 0])) as ArrayRef),
+                DataType::FixedSizeBinary(4) => {
+                    let it = [Some(*b"\x00\x11\x22\x33"), 
Some(*b"ABCD")].into_iter();
+                    Some(Arc::new(
+                        
FixedSizeBinaryArray::try_from_sparse_iter_with_size(it, 4).unwrap(),
+                    ) as ArrayRef)
+                }
+                _ => None,
+            });
+            push_like(
+                schema.as_ref(),
+                "union_date_or_fixed4",
+                arr,
+                &mut fields,
+                &mut columns,
+            );
+        }
+        {
+            let uf = match schema
+                .field_with_name("union_interval_or_string")
+                .unwrap()
+                .data_type()
+            {
+                DataType::Union(f, UnionMode::Dense) => f.clone(),
+                other => panic!("union_interval_or_string should be union, got 
{other:?}"),
+            };
+            let tid_dur = tid_by_dt(&uf, |dt| {
+                matches!(dt, DataType::Interval(IntervalUnit::MonthDayNano))
+            });
+            let tid_str = tid_by_dt(&uf, |dt| matches!(dt, DataType::Utf8));
+            let tids = vec![tid_dur, tid_str, tid_dur, tid_str];
+            let offs = vec![0, 0, 1, 1];
+            let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() {
+                DataType::Interval(IntervalUnit::MonthDayNano) => 
Some(Arc::new(
+                    IntervalMonthDayNanoArray::from(vec![dur_small, 
dur_large]),
+                )
+                    as ArrayRef),
+                DataType::Utf8 => Some(Arc::new(StringArray::from(vec![
+                    "duration-as-text",
+                    "iso-8601-period-P1Y",
+                ])) as ArrayRef),
+                _ => None,
+            });
+            push_like(
+                schema.as_ref(),
+                "union_interval_or_string",
+                arr,
+                &mut fields,
+                &mut columns,
+            );
+        }
+        {
+            let uf = match schema
+                .field_with_name("union_uuid_or_fixed10")
+                .unwrap()
+                .data_type()
+            {
+                DataType::Union(f, UnionMode::Dense) => f.clone(),
+                other => panic!("union_uuid_or_fixed10 should be union, got 
{other:?}"),
+            };
+            let tid_uuid = tid_by_dt(&uf, |dt| matches!(dt, 
DataType::FixedSizeBinary(16)));
+            let tid_fx10 = tid_by_dt(&uf, |dt| matches!(dt, 
DataType::FixedSizeBinary(10)));
+            let tids = vec![tid_uuid, tid_fx10, tid_uuid, tid_fx10];
+            let offs = vec![0, 0, 1, 1];
+            let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() {
+                DataType::FixedSizeBinary(16) => {
+                    let it = [Some(uuid1), Some(uuid2)].into_iter();
+                    Some(Arc::new(
+                        
FixedSizeBinaryArray::try_from_sparse_iter_with_size(it, 16).unwrap(),
+                    ) as ArrayRef)
+                }
+                DataType::FixedSizeBinary(10) => {
+                    let fx10_a = [0xAAu8; 10];
+                    let fx10_b = [0x00u8, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 
0x77, 0x88, 0x99];
+                    let it = [Some(fx10_a), Some(fx10_b)].into_iter();
+                    Some(Arc::new(
+                        
FixedSizeBinaryArray::try_from_sparse_iter_with_size(it, 10).unwrap(),
+                    ) as ArrayRef)
+                }
+                _ => None,
+            });
+            push_like(
+                schema.as_ref(),
+                "union_uuid_or_fixed10",
+                arr,
+                &mut fields,
+                &mut columns,
+            );
+        }
+        {
+            let list_field = match schema
+                .field_with_name("array_records_with_union")
+                .unwrap()
+                .data_type()
+            {
+                DataType::List(f) => f.clone(),
+                other => panic!("array_records_with_union should be List, got 
{other:?}"),
+            };
+            let kv_fields = match list_field.data_type() {
+                DataType::Struct(fs) => fs.clone(),
+                other => panic!("array_records_with_union items must be 
Struct, got {other:?}"),
+            };
+            let val_field = kv_fields
+                .iter()
+                .find(|f| f.name() == "val")
+                .unwrap()
+                .clone();
+            let uf = match val_field.data_type() {
+                DataType::Union(f, UnionMode::Dense) => f.clone(),
+                other => panic!("KV.val should be union, got {other:?}"),
+            };
+            let keys = Arc::new(StringArray::from(vec!["k1", "k2", "k", "k3", 
"x"])) as ArrayRef;
+            let tid_null = tid_by_name(&uf, "null");
+            let tid_i = tid_by_name(&uf, "int");
+            let tid_l = tid_by_name(&uf, "long");
+            let type_ids = vec![tid_i, tid_null, tid_l, tid_null, tid_i];
+            let offsets = vec![0, 0, 0, 1, 1];
+            let vals = mk_dense_union(&uf, type_ids, offsets, |f| match 
f.data_type() {
+                DataType::Int32 => Some(Arc::new(Int32Array::from(vec![5, 
-5])) as ArrayRef),
+                DataType::Int64 => 
Some(Arc::new(Int64Array::from(vec![99i64])) as ArrayRef),
+                DataType::Null => Some(Arc::new(NullArray::new(2)) as 
ArrayRef),
+                _ => None,
+            });
+            let values_struct =
+                Arc::new(StructArray::new(kv_fields.clone(), vec![keys, vals], 
None)) as ArrayRef;
+            let list_offsets = 
OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2, 3, 4, 5]));
+            let arr = Arc::new(
+                ListArray::try_new(list_field, list_offsets, values_struct, 
None).unwrap(),
+            ) as ArrayRef;
+            push_like(
+                schema.as_ref(),
+                "array_records_with_union",
+                arr,
+                &mut fields,
+                &mut columns,
+            );
+        }
+        {
+            let uf = match schema
+                .field_with_name("union_map_or_array_int")
+                .unwrap()
+                .data_type()
+            {
+                DataType::Union(f, UnionMode::Dense) => f.clone(),
+                other => panic!("union_map_or_array_int should be union, got 
{other:?}"),
+            };
+            let tid_map = tid_by_dt(&uf, |dt| matches!(dt, DataType::Map(_, 
_)));
+            let tid_list = tid_by_dt(&uf, |dt| matches!(dt, 
DataType::List(_)));
+            let map_child: ArrayRef = {
+                let (entry_field, is_sorted) = match uf
+                    .iter()
+                    .find(|(tid, _)| *tid == tid_map)
+                    .unwrap()
+                    .1
+                    .data_type()
+                {
+                    DataType::Map(ef, is_sorted) => (ef.clone(), *is_sorted),
+                    _ => unreachable!(),
+                };
+                let (key_field, val_field) = match entry_field.data_type() {
+                    DataType::Struct(fs) => (fs[0].clone(), fs[1].clone()),
+                    _ => unreachable!(),
+                };
+                let keys = StringArray::from(vec!["x", "y", "only"]);
+                let vals = Int32Array::from(vec![1, 2, 10]);
+                let entries = StructArray::new(
+                    Fields::from(vec![key_field.as_ref().clone(), 
val_field.as_ref().clone()]),
+                    vec![Arc::new(keys) as ArrayRef, Arc::new(vals) as 
ArrayRef],
+                    None,
+                );
+                let moff = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 
2, 3]));
+                Arc::new(MapArray::new(entry_field, moff, entries, None, 
is_sorted)) as ArrayRef
+            };
+            let list_child: ArrayRef = {
+                let list_field = match uf
+                    .iter()
+                    .find(|(tid, _)| *tid == tid_list)
+                    .unwrap()
+                    .1
+                    .data_type()
+                {
+                    DataType::List(f) => f.clone(),
+                    _ => unreachable!(),
+                };
+                let values = Int32Array::from(vec![1, 2, 3, 0]);
+                let offsets = 
OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 3, 4]));
+                Arc::new(ListArray::try_new(list_field, offsets, 
Arc::new(values), None).unwrap())
+                    as ArrayRef
+            };
+            let tids = vec![tid_map, tid_list, tid_map, tid_list];
+            let offs = vec![0, 0, 1, 1];
+            let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() {
+                DataType::Map(_, _) => Some(map_child.clone()),
+                DataType::List(_) => Some(list_child.clone()),
+                _ => None,
+            });
+            push_like(
+                schema.as_ref(),
+                "union_map_or_array_int",
+                arr,
+                &mut fields,
+                &mut columns,
+            );
+        }
+        push_like(
+            schema.as_ref(),
+            "renamed_with_default",
+            Arc::new(Int32Array::from(vec![100, 42, 7, 42])) as ArrayRef,
+            &mut fields,
+            &mut columns,
+        );
+        {
+            let fs = match 
schema.field_with_name("person").unwrap().data_type() {
+                DataType::Struct(fs) => fs.clone(),
+                other => panic!("person should be Struct, got {other:?}"),
+            };
+            let name =
+                Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol", 
"Dave"])) as ArrayRef;
+            let age = Arc::new(Int32Array::from(vec![30, 0, 25, 41])) as 
ArrayRef;
+            let arr = Arc::new(StructArray::new(fs, vec![name, age], None)) as 
ArrayRef;
+            push_like(schema.as_ref(), "person", arr, &mut fields, &mut 
columns);
+        }
+        let expected =
+            RecordBatch::try_new(Arc::new(Schema::new(Fields::from(fields))), 
columns).unwrap();
+        assert_eq!(
+            expected, batch,
+            "entire RecordBatch mismatch (schema, all columns, all rows)"
+        );
+    }
+
+    #[test]
+    fn comprehensive_e2e_resolution_test() {

Review Comment:
   100%. I'll probably come back around at some point and enhance the 
maintainability of these tests. I just wanted to throw in every edge case I 
could think of.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to