This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new f7ea0aa815 Add arrow-avro Reader support for Dense Union and Union 
resolution (Part 2) (#8349)
f7ea0aa815 is described below

commit f7ea0aa815d24ab1cf66bfebe92c4c85f891e4d1
Author: Connor Sanders <[email protected]>
AuthorDate: Wed Sep 24 05:34:38 2025 -0500

    Add arrow-avro Reader support for Dense Union and Union resolution (Part 2) 
(#8349)
    
    # Which issue does this PR close?
    
    This work continues arrow-avro schema resolution support and aligns
    behavior with the Avro spec.
    
    - **Related to**: #4886 (“Add Avro Support”): ongoing work to round out
    the reader/decoder, including schema resolution and type promotion.
    - **Follow-ups/Context**: #8348 (Add arrow-avro Reader support for Dense
    Union and Union resolution (Part 1)), #8293 (Add projection with default
    values support to RecordDecoder), #8124 (schema resolution & type
    promotion for the decoder), #8223 (enum mapping for schema resolution).
    These previous efforts established the foundations that this PR extends
    to Union types and Union resolution.
    
    # Rationale for this change
    
    `arrow-avro` lacked end‑to‑end support for Avro unions and Arrow `Union`
    schemas. Many Avro datasets rely on unions (i.e.., `["null","string"]`,
    tagged unions of different records), and without schema‐level resolution
    and JSON encoding the crate could not interoperate cleanly. This PR
    complete the initial Decoder support for Union types and Union
    resolution.
    
    # What changes are included in this PR?
    
    * Decoder support for Dense Union decoding and Union resolution.
    
    # Are these changes tested?
    
    Yes,
    New detailed end to end integration tests have been added to
    `reader/mod.rs` and unit tests covering the new Union and Union
    resolution functionality are included in the `reader/record.rs` file.
    
    # Are there any user-facing changes?
    
    N/A
    
    ---------
    
    Co-authored-by: Ryan Johnson <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 arrow-avro/src/codec.rs                |   18 +
 arrow-avro/src/reader/mod.rs           | 1607 +++++++++++++++++++++++++++++++-
 arrow-avro/src/reader/record.rs        |  909 +++++++++++++++++-
 arrow-avro/test/data/README.md         |   57 +-
 arrow-avro/test/data/union_fields.avro |  Bin 0 -> 3430 bytes
 5 files changed, 2570 insertions(+), 21 deletions(-)

diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index 64fc0488e3..9e2e6ea7bd 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -29,6 +29,8 @@ use arrow_schema::{DECIMAL32_MAX_PRECISION, 
DECIMAL64_MAX_PRECISION};
 use indexmap::IndexMap;
 use serde_json::Value;
 use std::collections::{HashMap, HashSet};
+use std::fmt;
+use std::fmt::Display;
 use std::sync::Arc;
 use strum_macros::AsRefStr;
 
@@ -117,6 +119,22 @@ pub(crate) enum Promotion {
     BytesToString,
 }
 
+impl Display for Promotion {
+    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
+        match self {
+            Self::Direct => write!(formatter, "Direct"),
+            Self::IntToLong => write!(formatter, "Int->Long"),
+            Self::IntToFloat => write!(formatter, "Int->Float"),
+            Self::IntToDouble => write!(formatter, "Int->Double"),
+            Self::LongToFloat => write!(formatter, "Long->Float"),
+            Self::LongToDouble => write!(formatter, "Long->Double"),
+            Self::FloatToDouble => write!(formatter, "Float->Double"),
+            Self::StringToBytes => write!(formatter, "String->Bytes"),
+            Self::BytesToString => write!(formatter, "Bytes->String"),
+        }
+    }
+}
+
 /// Information required to resolve a writer union against a reader union (or 
single type).
 #[derive(Debug, Clone, PartialEq)]
 pub struct ResolvedUnion {
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index 56a7bef17e..c9e4b1d229 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -19,6 +19,19 @@
 //!
 //! Facilities to read Apache Avro–encoded data into Arrow's `RecordBatch` 
format.
 //!
+//! ### Limitations
+//!
+//!- **Avro unions with > 127 branches are not supported.**
+//!  When decoding Avro unions to Arrow `UnionArray`, Arrow stores the union
+//!  type identifiers in an **8‑bit signed** buffer (`i8`). This implies a
+//!  practical limit of **127** distinct branch ids. Inputs that resolve to
+//!  more than 127 branches will return an error. If you truly need more,
+//!  model the schema as a **union of unions**, per the Arrow format spec.
+//!
+//!  See: Arrow Columnar Format — Dense Union (“types buffer: 8‑bit signed;
+//!  a union with more than 127 possible types can be modeled as a union of
+//!  unions”).
+//!
 //! This module exposes three layers of the API surface, from highest to 
lowest-level:
 //!
 //! * [`ReaderBuilder`](crate::reader::ReaderBuilder): configures how Avro is 
read (batch size, strict union handling,
@@ -1289,14 +1302,19 @@ mod test {
         ArrayBuilder, BooleanBuilder, Float32Builder, Float64Builder, 
Int32Builder, Int64Builder,
         ListBuilder, MapBuilder, StringBuilder, StructBuilder,
     };
+    use arrow_array::cast::AsArray;
     use arrow_array::types::{Int32Type, IntervalMonthDayNanoType};
     use arrow_array::*;
-    use arrow_buffer::{i256, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer};
-    use arrow_schema::{ArrowError, DataType, Field, Fields, IntervalUnit, 
Schema};
+    use arrow_buffer::{
+        i256, Buffer, IntervalMonthDayNano, NullBuffer, OffsetBuffer, 
ScalarBuffer,
+    };
+    use arrow_schema::{
+        ArrowError, DataType, Field, FieldRef, Fields, IntervalUnit, Schema, 
UnionFields, UnionMode,
+    };
     use bytes::{Buf, BufMut, Bytes};
     use futures::executor::block_on;
     use futures::{stream, Stream, StreamExt, TryStreamExt};
-    use serde_json::Value;
+    use serde_json::{json, Value};
     use std::collections::HashMap;
     use std::fs;
     use std::fs::File;
@@ -2734,6 +2752,1589 @@ mod test {
         }
     }
 
+    #[test]
+    fn test_union_fields_avro_nullable_and_general_unions() {
+        let path = "test/data/union_fields.avro";
+        let batch = read_file(path, 1024, false);
+        let schema = batch.schema();
+        let idx = schema.index_of("nullable_int_nullfirst").unwrap();
+        let a = batch.column(idx).as_primitive::<Int32Type>();
+        assert_eq!(a.len(), 4);
+        assert!(a.is_null(0));
+        assert_eq!(a.value(1), 42);
+        assert!(a.is_null(2));
+        assert_eq!(a.value(3), 0);
+        let idx = schema.index_of("nullable_string_nullsecond").unwrap();
+        let s = batch
+            .column(idx)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .expect("nullable_string_nullsecond should be Utf8");
+        assert_eq!(s.len(), 4);
+        assert_eq!(s.value(0), "s1");
+        assert!(s.is_null(1));
+        assert_eq!(s.value(2), "s3");
+        assert!(s.is_valid(3)); // empty string, not null
+        assert_eq!(s.value(3), "");
+        let idx = schema.index_of("union_prim").unwrap();
+        let u = batch
+            .column(idx)
+            .as_any()
+            .downcast_ref::<UnionArray>()
+            .expect("union_prim should be Union");
+        let fields = match u.data_type() {
+            DataType::Union(fields, mode) => {
+                assert!(matches!(mode, UnionMode::Dense), "expect dense 
unions");
+                fields
+            }
+            other => panic!("expected Union, got {other:?}"),
+        };
+        let tid_by_name = |name: &str| -> i8 {
+            for (tid, f) in fields.iter() {
+                if f.name() == name {
+                    return tid;
+                }
+            }
+            panic!("union child '{name}' not found");
+        };
+        let expected_type_ids = vec![
+            tid_by_name("long"),
+            tid_by_name("int"),
+            tid_by_name("float"),
+            tid_by_name("double"),
+        ];
+        let type_ids: Vec<i8> = u.type_ids().iter().copied().collect();
+        assert_eq!(
+            type_ids, expected_type_ids,
+            "branch selection for union_prim rows"
+        );
+        let longs = u
+            .child(tid_by_name("long"))
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .unwrap();
+        assert_eq!(longs.len(), 1);
+        let ints = u
+            .child(tid_by_name("int"))
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        assert_eq!(ints.len(), 1);
+        let floats = u
+            .child(tid_by_name("float"))
+            .as_any()
+            .downcast_ref::<Float32Array>()
+            .unwrap();
+        assert_eq!(floats.len(), 1);
+        let doubles = u
+            .child(tid_by_name("double"))
+            .as_any()
+            .downcast_ref::<Float64Array>()
+            .unwrap();
+        assert_eq!(doubles.len(), 1);
+        let idx = schema.index_of("union_bytes_vs_string").unwrap();
+        let u = batch
+            .column(idx)
+            .as_any()
+            .downcast_ref::<UnionArray>()
+            .expect("union_bytes_vs_string should be Union");
+        let fields = match u.data_type() {
+            DataType::Union(fields, _) => fields,
+            other => panic!("expected Union, got {other:?}"),
+        };
+        let tid_by_name = |name: &str| -> i8 {
+            for (tid, f) in fields.iter() {
+                if f.name() == name {
+                    return tid;
+                }
+            }
+            panic!("union child '{name}' not found");
+        };
+        let tid_bytes = tid_by_name("bytes");
+        let tid_string = tid_by_name("string");
+        let type_ids: Vec<i8> = u.type_ids().iter().copied().collect();
+        assert_eq!(
+            type_ids,
+            vec![tid_bytes, tid_string, tid_string, tid_bytes],
+            "branch selection for bytes/string union"
+        );
+        let s_child = u
+            .child(tid_string)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert_eq!(s_child.len(), 2);
+        assert_eq!(s_child.value(0), "hello");
+        assert_eq!(s_child.value(1), "world");
+        let b_child = u
+            .child(tid_bytes)
+            .as_any()
+            .downcast_ref::<BinaryArray>()
+            .unwrap();
+        assert_eq!(b_child.len(), 2);
+        assert_eq!(b_child.value(0), &[0x00, 0xFF, 0x7F]);
+        assert_eq!(b_child.value(1), b""); // previously: &[]
+        let idx = schema.index_of("union_enum_records_array_map").unwrap();
+        let u = batch
+            .column(idx)
+            .as_any()
+            .downcast_ref::<UnionArray>()
+            .expect("union_enum_records_array_map should be Union");
+        let fields = match u.data_type() {
+            DataType::Union(fields, _) => fields,
+            other => panic!("expected Union, got {other:?}"),
+        };
+        let mut tid_enum: Option<i8> = None;
+        let mut tid_rec_a: Option<i8> = None;
+        let mut tid_rec_b: Option<i8> = None;
+        let mut tid_array: Option<i8> = None;
+        let mut tid_map: Option<i8> = None;
+        for (tid, f) in fields.iter() {
+            match f.data_type() {
+                DataType::Dictionary(_, _) => tid_enum = Some(tid),
+                DataType::Struct(childs) => {
+                    if childs.len() == 2 && childs[0].name() == "a" && 
childs[1].name() == "b" {
+                        tid_rec_a = Some(tid);
+                    } else if childs.len() == 2
+                        && childs[0].name() == "x"
+                        && childs[1].name() == "y"
+                    {
+                        tid_rec_b = Some(tid);
+                    }
+                }
+                DataType::List(_) => tid_array = Some(tid),
+                DataType::Map(_, _) => tid_map = Some(tid),
+                _ => {}
+            }
+        }
+        let (tid_enum, tid_rec_a, tid_rec_b, tid_array) = (
+            tid_enum.expect("enum child"),
+            tid_rec_a.expect("RecA child"),
+            tid_rec_b.expect("RecB child"),
+            tid_array.expect("array<long> child"),
+        );
+        let type_ids: Vec<i8> = u.type_ids().iter().copied().collect();
+        assert_eq!(
+            type_ids,
+            vec![tid_enum, tid_rec_a, tid_rec_b, tid_array],
+            "branch selection for complex union"
+        );
+        let dict = u
+            .child(tid_enum)
+            .as_any()
+            .downcast_ref::<DictionaryArray<Int32Type>>()
+            .unwrap();
+        assert_eq!(dict.len(), 1);
+        assert!(dict.is_valid(0));
+        let rec_a = u
+            .child(tid_rec_a)
+            .as_any()
+            .downcast_ref::<StructArray>()
+            .unwrap();
+        assert_eq!(rec_a.len(), 1);
+        let a_val = rec_a
+            .column_by_name("a")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        assert_eq!(a_val.value(0), 7);
+        let b_val = rec_a
+            .column_by_name("b")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert_eq!(b_val.value(0), "x");
+        // RecB row: {"x": 123456789, "y": b"\xFF\x00"}
+        let rec_b = u
+            .child(tid_rec_b)
+            .as_any()
+            .downcast_ref::<StructArray>()
+            .unwrap();
+        let x_val = rec_b
+            .column_by_name("x")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .unwrap();
+        assert_eq!(x_val.value(0), 123_456_789_i64);
+        let y_val = rec_b
+            .column_by_name("y")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<BinaryArray>()
+            .unwrap();
+        assert_eq!(y_val.value(0), &[0xFF, 0x00]);
+        let arr = u
+            .child(tid_array)
+            .as_any()
+            .downcast_ref::<ListArray>()
+            .unwrap();
+        assert_eq!(arr.len(), 1);
+        let first_values = arr.value(0);
+        let longs = 
first_values.as_any().downcast_ref::<Int64Array>().unwrap();
+        assert_eq!(longs.len(), 3);
+        assert_eq!(longs.value(0), 1);
+        assert_eq!(longs.value(1), 2);
+        assert_eq!(longs.value(2), 3);
+        let idx = schema.index_of("union_date_or_fixed4").unwrap();
+        let u = batch
+            .column(idx)
+            .as_any()
+            .downcast_ref::<UnionArray>()
+            .expect("union_date_or_fixed4 should be Union");
+        let fields = match u.data_type() {
+            DataType::Union(fields, _) => fields,
+            other => panic!("expected Union, got {other:?}"),
+        };
+        let mut tid_date: Option<i8> = None;
+        let mut tid_fixed: Option<i8> = None;
+        for (tid, f) in fields.iter() {
+            match f.data_type() {
+                DataType::Date32 => tid_date = Some(tid),
+                DataType::FixedSizeBinary(4) => tid_fixed = Some(tid),
+                _ => {}
+            }
+        }
+        let (tid_date, tid_fixed) = (tid_date.expect("date"), 
tid_fixed.expect("fixed(4)"));
+        let type_ids: Vec<i8> = u.type_ids().iter().copied().collect();
+        assert_eq!(
+            type_ids,
+            vec![tid_date, tid_fixed, tid_date, tid_fixed],
+            "branch selection for date/fixed4 union"
+        );
+        let dates = u
+            .child(tid_date)
+            .as_any()
+            .downcast_ref::<Date32Array>()
+            .unwrap();
+        assert_eq!(dates.len(), 2);
+        assert_eq!(dates.value(0), 19_000); // ~2022‑01‑15
+        assert_eq!(dates.value(1), 0); // epoch
+        let fixed = u
+            .child(tid_fixed)
+            .as_any()
+            .downcast_ref::<FixedSizeBinaryArray>()
+            .unwrap();
+        assert_eq!(fixed.len(), 2);
+        assert_eq!(fixed.value(0), b"ABCD");
+        assert_eq!(fixed.value(1), &[0x00, 0x11, 0x22, 0x33]);
+    }
+
+    #[test]
+    fn test_union_schema_resolution_all_type_combinations() {
+        let path = "test/data/union_fields.avro";
+        let baseline = read_file(path, 1024, false);
+        let baseline_schema = baseline.schema();
+        let mut root = load_writer_schema_json(path);
+        assert_eq!(root["type"], "record", "writer schema must be a record");
+        let fields = root
+            .get_mut("fields")
+            .and_then(|f| f.as_array_mut())
+            .expect("record has fields");
+        fn is_named_type(obj: &Value, ty: &str, nm: &str) -> bool {
+            obj.get("type").and_then(|v| v.as_str()) == Some(ty)
+                && obj.get("name").and_then(|v| v.as_str()) == Some(nm)
+        }
+        fn is_logical(obj: &Value, prim: &str, lt: &str) -> bool {
+            obj.get("type").and_then(|v| v.as_str()) == Some(prim)
+                && obj.get("logicalType").and_then(|v| v.as_str()) == Some(lt)
+        }
+        fn find_first(arr: &[Value], pred: impl Fn(&Value) -> bool) -> 
Option<Value> {
+            arr.iter().find(|v| pred(v)).cloned()
+        }
+        fn prim(s: &str) -> Value {
+            Value::String(s.to_string())
+        }
+        for f in fields.iter_mut() {
+            let Some(name) = f.get("name").and_then(|n| n.as_str()) else {
+                continue;
+            };
+            match name {
+                // Flip null ordering – should not affect values
+                "nullable_int_nullfirst" => {
+                    f["type"] = json!(["int", "null"]);
+                }
+                "nullable_string_nullsecond" => {
+                    f["type"] = json!(["null", "string"]);
+                }
+                "union_prim" => {
+                    let orig = f["type"].as_array().unwrap().clone();
+                    let long = prim("long");
+                    let double = prim("double");
+                    let string = prim("string");
+                    let bytes = prim("bytes");
+                    let boolean = prim("boolean");
+                    assert!(orig.contains(&long));
+                    assert!(orig.contains(&double));
+                    assert!(orig.contains(&string));
+                    assert!(orig.contains(&bytes));
+                    assert!(orig.contains(&boolean));
+                    f["type"] = json!([long, double, string, bytes, boolean]);
+                }
+                "union_bytes_vs_string" => {
+                    f["type"] = json!(["string", "bytes"]);
+                }
+                "union_fixed_dur_decfix" => {
+                    let orig = f["type"].as_array().unwrap().clone();
+                    let fx8 = find_first(&orig, |o| is_named_type(o, "fixed", 
"Fx8")).unwrap();
+                    let dur12 = find_first(&orig, |o| is_named_type(o, 
"fixed", "Dur12")).unwrap();
+                    let decfix16 =
+                        find_first(&orig, |o| is_named_type(o, "fixed", 
"DecFix16")).unwrap();
+                    f["type"] = json!([decfix16, dur12, fx8]);
+                }
+                "union_enum_records_array_map" => {
+                    let orig = f["type"].as_array().unwrap().clone();
+                    let enum_color = find_first(&orig, |o| {
+                        o.get("type").and_then(|v| v.as_str()) == Some("enum")
+                    })
+                    .unwrap();
+                    let rec_a = find_first(&orig, |o| is_named_type(o, 
"record", "RecA")).unwrap();
+                    let rec_b = find_first(&orig, |o| is_named_type(o, 
"record", "RecB")).unwrap();
+                    let arr = find_first(&orig, |o| {
+                        o.get("type").and_then(|v| v.as_str()) == Some("array")
+                    })
+                    .unwrap();
+                    let map = find_first(&orig, |o| {
+                        o.get("type").and_then(|v| v.as_str()) == Some("map")
+                    })
+                    .unwrap();
+                    f["type"] = json!([arr, map, rec_b, rec_a, enum_color]);
+                }
+                "union_date_or_fixed4" => {
+                    let orig = f["type"].as_array().unwrap().clone();
+                    let date = find_first(&orig, |o| is_logical(o, "int", 
"date")).unwrap();
+                    let fx4 = find_first(&orig, |o| is_named_type(o, "fixed", 
"Fx4")).unwrap();
+                    f["type"] = json!([fx4, date]);
+                }
+                "union_time_millis_or_enum" => {
+                    let orig = f["type"].as_array().unwrap().clone();
+                    let time_ms =
+                        find_first(&orig, |o| is_logical(o, "int", 
"time-millis")).unwrap();
+                    let en = find_first(&orig, |o| {
+                        o.get("type").and_then(|v| v.as_str()) == Some("enum")
+                    })
+                    .unwrap();
+                    f["type"] = json!([en, time_ms]);
+                }
+                "union_time_micros_or_string" => {
+                    let orig = f["type"].as_array().unwrap().clone();
+                    let time_us =
+                        find_first(&orig, |o| is_logical(o, "long", 
"time-micros")).unwrap();
+                    f["type"] = json!(["string", time_us]);
+                }
+                "union_ts_millis_utc_or_array" => {
+                    let orig = f["type"].as_array().unwrap().clone();
+                    let ts_ms =
+                        find_first(&orig, |o| is_logical(o, "long", 
"timestamp-millis")).unwrap();
+                    let arr = find_first(&orig, |o| {
+                        o.get("type").and_then(|v| v.as_str()) == Some("array")
+                    })
+                    .unwrap();
+                    f["type"] = json!([arr, ts_ms]);
+                }
+                "union_ts_micros_local_or_bytes" => {
+                    let orig = f["type"].as_array().unwrap().clone();
+                    let lts_us =
+                        find_first(&orig, |o| is_logical(o, "long", 
"local-timestamp-micros"))
+                            .unwrap();
+                    f["type"] = json!(["bytes", lts_us]);
+                }
+                "union_uuid_or_fixed10" => {
+                    let orig = f["type"].as_array().unwrap().clone();
+                    let uuid = find_first(&orig, |o| is_logical(o, "string", 
"uuid")).unwrap();
+                    let fx10 = find_first(&orig, |o| is_named_type(o, "fixed", 
"Fx10")).unwrap();
+                    f["type"] = json!([fx10, uuid]);
+                }
+                "union_dec_bytes_or_dec_fixed" => {
+                    let orig = f["type"].as_array().unwrap().clone();
+                    let dec_bytes = find_first(&orig, |o| {
+                        o.get("type").and_then(|v| v.as_str()) == Some("bytes")
+                            && o.get("logicalType").and_then(|v| v.as_str()) 
== Some("decimal")
+                    })
+                    .unwrap();
+                    let dec_fix = find_first(&orig, |o| {
+                        is_named_type(o, "fixed", "DecFix20")
+                            && o.get("logicalType").and_then(|v| v.as_str()) 
== Some("decimal")
+                    })
+                    .unwrap();
+                    f["type"] = json!([dec_fix, dec_bytes]);
+                }
+                "union_null_bytes_string" => {
+                    f["type"] = json!(["bytes", "string", "null"]);
+                }
+                "array_of_union" => {
+                    let obj = f
+                        .get_mut("type")
+                        .expect("array type")
+                        .as_object_mut()
+                        .unwrap();
+                    obj.insert("items".to_string(), json!(["string", "long"]));
+                }
+                "map_of_union" => {
+                    let obj = f
+                        .get_mut("type")
+                        .expect("map type")
+                        .as_object_mut()
+                        .unwrap();
+                    obj.insert("values".to_string(), json!(["double", 
"null"]));
+                }
+                "record_with_union_field" => {
+                    let rec = f
+                        .get_mut("type")
+                        .expect("record type")
+                        .as_object_mut()
+                        .unwrap();
+                    let rec_fields = 
rec.get_mut("fields").unwrap().as_array_mut().unwrap();
+                    let mut found = false;
+                    for rf in rec_fields.iter_mut() {
+                        if rf.get("name").and_then(|v| v.as_str()) == 
Some("u") {
+                            rf["type"] = json!(["string", "long"]); // rely on 
int→long promotion
+                            found = true;
+                            break;
+                        }
+                    }
+                    assert!(found, "field 'u' expected in HasUnion");
+                }
+                "union_ts_micros_utc_or_map" => {
+                    let orig = f["type"].as_array().unwrap().clone();
+                    let ts_us =
+                        find_first(&orig, |o| is_logical(o, "long", 
"timestamp-micros")).unwrap();
+                    let map = find_first(&orig, |o| {
+                        o.get("type").and_then(|v| v.as_str()) == Some("map")
+                    })
+                    .unwrap();
+                    f["type"] = json!([map, ts_us]);
+                }
+                "union_ts_millis_local_or_string" => {
+                    let orig = f["type"].as_array().unwrap().clone();
+                    let lts_ms =
+                        find_first(&orig, |o| is_logical(o, "long", 
"local-timestamp-millis"))
+                            .unwrap();
+                    f["type"] = json!(["string", lts_ms]);
+                }
+                "union_bool_or_string" => {
+                    f["type"] = json!(["string", "boolean"]);
+                }
+                _ => {}
+            }
+        }
+        let reader_schema = AvroSchema::new(root.to_string());
+        let resolved = read_alltypes_with_reader_schema(path, reader_schema);
+
+        fn branch_token(dt: &DataType) -> String {
+            match dt {
+                DataType::Null => "null".into(),
+                DataType::Boolean => "boolean".into(),
+                DataType::Int32 => "int".into(),
+                DataType::Int64 => "long".into(),
+                DataType::Float32 => "float".into(),
+                DataType::Float64 => "double".into(),
+                DataType::Binary => "bytes".into(),
+                DataType::Utf8 => "string".into(),
+                DataType::Date32 => "date".into(),
+                DataType::Time32(arrow_schema::TimeUnit::Millisecond) => 
"time-millis".into(),
+                DataType::Time64(arrow_schema::TimeUnit::Microsecond) => 
"time-micros".into(),
+                DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, tz) 
=> if tz.is_some() {
+                    "timestamp-millis"
+                } else {
+                    "local-timestamp-millis"
+                }
+                .into(),
+                DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, tz) 
=> if tz.is_some() {
+                    "timestamp-micros"
+                } else {
+                    "local-timestamp-micros"
+                }
+                .into(),
+                DataType::Interval(IntervalUnit::MonthDayNano) => 
"duration".into(),
+                DataType::FixedSizeBinary(n) => format!("fixed{n}"),
+                DataType::Dictionary(_, _) => "enum".into(),
+                DataType::Decimal128(p, s) => format!("decimal({p},{s})"),
+                DataType::Decimal256(p, s) => format!("decimal({p},{s})"),
+                #[cfg(feature = "small_decimals")]
+                DataType::Decimal64(p, s) => format!("decimal({p},{s})"),
+                DataType::Struct(fields) => {
+                    if fields.len() == 2 && fields[0].name() == "a" && 
fields[1].name() == "b" {
+                        "record:RecA".into()
+                    } else if fields.len() == 2
+                        && fields[0].name() == "x"
+                        && fields[1].name() == "y"
+                    {
+                        "record:RecB".into()
+                    } else {
+                        "record".into()
+                    }
+                }
+                DataType::List(_) => "array".into(),
+                DataType::Map(_, _) => "map".into(),
+                other => format!("{other:?}"),
+            }
+        }
+
+        fn union_tokens(u: &UnionArray) -> (Vec<i8>, HashMap<i8, String>) {
+            let fields = match u.data_type() {
+                DataType::Union(fields, _) => fields,
+                other => panic!("expected Union, got {other:?}"),
+            };
+            let mut dict: HashMap<i8, String> = 
HashMap::with_capacity(fields.len());
+            for (tid, f) in fields.iter() {
+                dict.insert(tid, branch_token(f.data_type()));
+            }
+            let ids: Vec<i8> = u.type_ids().iter().copied().collect();
+            (ids, dict)
+        }
+
+        fn expected_token(field_name: &str, writer_token: &str) -> String {
+            match field_name {
+                "union_prim" => match writer_token {
+                    "int" => "long".into(),
+                    "float" => "double".into(),
+                    other => other.into(),
+                },
+                "record_with_union_field.u" => match writer_token {
+                    "int" => "long".into(),
+                    other => other.into(),
+                },
+                _ => writer_token.into(),
+            }
+        }
+
+        fn get_union<'a>(
+            rb: &'a RecordBatch,
+            schema: arrow_schema::SchemaRef,
+            fname: &str,
+        ) -> &'a UnionArray {
+            let idx = schema.index_of(fname).unwrap();
+            rb.column(idx)
+                .as_any()
+                .downcast_ref::<UnionArray>()
+                .unwrap_or_else(|| panic!("{fname} should be a Union"))
+        }
+
+        fn assert_union_equivalent(field_name: &str, u_writer: &UnionArray, 
u_reader: &UnionArray) {
+            let (ids_w, dict_w) = union_tokens(u_writer);
+            let (ids_r, dict_r) = union_tokens(u_reader);
+            assert_eq!(
+                ids_w.len(),
+                ids_r.len(),
+                "{field_name}: row count mismatch between baseline and 
resolved"
+            );
+            for (i, (id_w, id_r)) in 
ids_w.iter().zip(ids_r.iter()).enumerate() {
+                let w_tok = dict_w.get(id_w).unwrap();
+                let want = expected_token(field_name, w_tok);
+                let got = dict_r.get(id_r).unwrap();
+                assert_eq!(
+                    got, &want,
+                    "{field_name}: row {i} resolved to wrong union branch 
(writer={w_tok}, expected={want}, got={got})"
+                );
+            }
+        }
+
+        for (fname, dt) in [
+            ("nullable_int_nullfirst", DataType::Int32),
+            ("nullable_string_nullsecond", DataType::Utf8),
+        ] {
+            let idx_b = baseline_schema.index_of(fname).unwrap();
+            let idx_r = resolved.schema().index_of(fname).unwrap();
+            let col_b = baseline.column(idx_b);
+            let col_r = resolved.column(idx_r);
+            assert_eq!(
+                col_b.data_type(),
+                &dt,
+                "baseline {fname} should decode as non-union with nullability"
+            );
+            assert_eq!(
+                col_b.as_ref(),
+                col_r.as_ref(),
+                "{fname}: values must be identical regardless of null-branch 
order"
+            );
+        }
+        let union_fields = [
+            "union_prim",
+            "union_bytes_vs_string",
+            "union_fixed_dur_decfix",
+            "union_enum_records_array_map",
+            "union_date_or_fixed4",
+            "union_time_millis_or_enum",
+            "union_time_micros_or_string",
+            "union_ts_millis_utc_or_array",
+            "union_ts_micros_local_or_bytes",
+            "union_uuid_or_fixed10",
+            "union_dec_bytes_or_dec_fixed",
+            "union_null_bytes_string",
+            "union_ts_micros_utc_or_map",
+            "union_ts_millis_local_or_string",
+            "union_bool_or_string",
+        ];
+        for fname in union_fields {
+            let u_b = get_union(&baseline, baseline_schema.clone(), fname);
+            let u_r = get_union(&resolved, resolved.schema(), fname);
+            assert_union_equivalent(fname, u_b, u_r);
+        }
+        {
+            let fname = "array_of_union";
+            let idx_b = baseline_schema.index_of(fname).unwrap();
+            let idx_r = resolved.schema().index_of(fname).unwrap();
+            let arr_b = baseline
+                .column(idx_b)
+                .as_any()
+                .downcast_ref::<ListArray>()
+                .expect("array_of_union should be a List");
+            let arr_r = resolved
+                .column(idx_r)
+                .as_any()
+                .downcast_ref::<ListArray>()
+                .expect("array_of_union should be a List");
+            assert_eq!(
+                arr_b.value_offsets(),
+                arr_r.value_offsets(),
+                "{fname}: list offsets changed after resolution"
+            );
+            let u_b = arr_b
+                .values()
+                .as_any()
+                .downcast_ref::<UnionArray>()
+                .expect("array items should be Union");
+            let u_r = arr_r
+                .values()
+                .as_any()
+                .downcast_ref::<UnionArray>()
+                .expect("array items should be Union");
+            let (ids_b, dict_b) = union_tokens(u_b);
+            let (ids_r, dict_r) = union_tokens(u_r);
+            assert_eq!(ids_b.len(), ids_r.len(), "{fname}: values length 
mismatch");
+            for (i, (id_b, id_r)) in 
ids_b.iter().zip(ids_r.iter()).enumerate() {
+                let w_tok = dict_b.get(id_b).unwrap();
+                let got = dict_r.get(id_r).unwrap();
+                assert_eq!(
+                    got, w_tok,
+                    "{fname}: value {i} resolved to wrong branch 
(writer={w_tok}, got={got})"
+                );
+            }
+        }
+        {
+            let fname = "map_of_union";
+            let idx_b = baseline_schema.index_of(fname).unwrap();
+            let idx_r = resolved.schema().index_of(fname).unwrap();
+            let map_b = baseline
+                .column(idx_b)
+                .as_any()
+                .downcast_ref::<MapArray>()
+                .expect("map_of_union should be a Map");
+            let map_r = resolved
+                .column(idx_r)
+                .as_any()
+                .downcast_ref::<MapArray>()
+                .expect("map_of_union should be a Map");
+            assert_eq!(
+                map_b.value_offsets(),
+                map_r.value_offsets(),
+                "{fname}: map value offsets changed after resolution"
+            );
+            let ent_b = map_b.entries();
+            let ent_r = map_r.entries();
+            let val_b_any = ent_b.column(1).as_ref();
+            let val_r_any = ent_r.column(1).as_ref();
+            let b_union = val_b_any.as_any().downcast_ref::<UnionArray>();
+            let r_union = val_r_any.as_any().downcast_ref::<UnionArray>();
+            if let (Some(u_b), Some(u_r)) = (b_union, r_union) {
+                assert_union_equivalent(fname, u_b, u_r);
+            } else {
+                assert_eq!(
+                    val_b_any.data_type(),
+                    val_r_any.data_type(),
+                    "{fname}: value data types differ after resolution"
+                );
+                assert_eq!(
+                    val_b_any, val_r_any,
+                    "{fname}: value arrays differ after resolution (nullable 
value column case)"
+                );
+                let value_nullable = |m: &MapArray| -> bool {
+                    match m.data_type() {
+                        DataType::Map(entries_field, _sorted) => match 
entries_field.data_type() {
+                            DataType::Struct(fields) => {
+                                assert_eq!(fields.len(), 2, "entries struct 
must have 2 fields");
+                                assert_eq!(fields[0].name(), "key");
+                                assert_eq!(fields[1].name(), "value");
+                                fields[1].is_nullable()
+                            }
+                            other => panic!("Map entries field must be Struct, 
got {other:?}"),
+                        },
+                        other => panic!("expected Map data type, got 
{other:?}"),
+                    }
+                };
+                assert!(
+                    value_nullable(map_b),
+                    "{fname}: baseline Map value field should be nullable per 
Arrow spec"
+                );
+                assert!(
+                    value_nullable(map_r),
+                    "{fname}: resolved Map value field should be nullable per 
Arrow spec"
+                );
+            }
+        }
+        {
+            let fname = "record_with_union_field";
+            let idx_b = baseline_schema.index_of(fname).unwrap();
+            let idx_r = resolved.schema().index_of(fname).unwrap();
+            let rec_b = baseline
+                .column(idx_b)
+                .as_any()
+                .downcast_ref::<StructArray>()
+                .expect("record_with_union_field should be a Struct");
+            let rec_r = resolved
+                .column(idx_r)
+                .as_any()
+                .downcast_ref::<StructArray>()
+                .expect("record_with_union_field should be a Struct");
+            let u_b = rec_b
+                .column_by_name("u")
+                .unwrap()
+                .as_any()
+                .downcast_ref::<UnionArray>()
+                .expect("field 'u' should be Union (baseline)");
+            let u_r = rec_r
+                .column_by_name("u")
+                .unwrap()
+                .as_any()
+                .downcast_ref::<UnionArray>()
+                .expect("field 'u' should be Union (resolved)");
+            assert_union_equivalent("record_with_union_field.u", u_b, u_r);
+        }
+    }
+
+    #[test]
+    fn test_union_fields_end_to_end_expected_arrays() {
+        fn tid_by_name(fields: &UnionFields, want: &str) -> i8 {
+            for (tid, f) in fields.iter() {
+                if f.name() == want {
+                    return tid;
+                }
+            }
+            panic!("union child '{want}' not found")
+        }
+
+        fn tid_by_dt(fields: &UnionFields, pred: impl Fn(&DataType) -> bool) 
-> i8 {
+            for (tid, f) in fields.iter() {
+                if pred(f.data_type()) {
+                    return tid;
+                }
+            }
+            panic!("no union child matches predicate")
+        }
+
+        fn uuid16_from_str(s: &str) -> [u8; 16] {
+            fn hex(b: u8) -> u8 {
+                match b {
+                    b'0'..=b'9' => b - b'0',
+                    b'a'..=b'f' => b - b'a' + 10,
+                    b'A'..=b'F' => b - b'A' + 10,
+                    _ => panic!("invalid hex"),
+                }
+            }
+            let mut out = [0u8; 16];
+            let bytes = s.as_bytes();
+            let (mut i, mut j) = (0, 0);
+            while i < bytes.len() {
+                if bytes[i] == b'-' {
+                    i += 1;
+                    continue;
+                }
+                let hi = hex(bytes[i]);
+                let lo = hex(bytes[i + 1]);
+                out[j] = (hi << 4) | lo;
+                j += 1;
+                i += 2;
+            }
+            assert_eq!(j, 16, "uuid must decode to 16 bytes");
+            out
+        }
+
+        fn empty_child_for(dt: &DataType) -> Arc<dyn Array> {
+            match dt {
+                DataType::Null => Arc::new(NullArray::new(0)),
+                DataType::Boolean => 
Arc::new(BooleanArray::from(Vec::<bool>::new())),
+                DataType::Int32 => 
Arc::new(Int32Array::from(Vec::<i32>::new())),
+                DataType::Int64 => 
Arc::new(Int64Array::from(Vec::<i64>::new())),
+                DataType::Float32 => 
Arc::new(arrow_array::Float32Array::from(Vec::<f32>::new())),
+                DataType::Float64 => 
Arc::new(arrow_array::Float64Array::from(Vec::<f64>::new())),
+                DataType::Binary => 
Arc::new(BinaryArray::from(Vec::<&[u8]>::new())),
+                DataType::Utf8 => 
Arc::new(StringArray::from(Vec::<&str>::new())),
+                DataType::Date32 => 
Arc::new(arrow_array::Date32Array::from(Vec::<i32>::new())),
+                DataType::Time32(arrow_schema::TimeUnit::Millisecond) => {
+                    Arc::new(Time32MillisecondArray::from(Vec::<i32>::new()))
+                }
+                DataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
+                    Arc::new(Time64MicrosecondArray::from(Vec::<i64>::new()))
+                }
+                DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, tz) 
=> {
+                    let a = TimestampMillisecondArray::from(Vec::<i64>::new());
+                    Arc::new(if let Some(tz) = tz {
+                        a.with_timezone(tz.clone())
+                    } else {
+                        a
+                    })
+                }
+                DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, tz) 
=> {
+                    let a = TimestampMicrosecondArray::from(Vec::<i64>::new());
+                    Arc::new(if let Some(tz) = tz {
+                        a.with_timezone(tz.clone())
+                    } else {
+                        a
+                    })
+                }
+                DataType::Interval(IntervalUnit::MonthDayNano) => {
+                    
Arc::new(arrow_array::IntervalMonthDayNanoArray::from(Vec::<
+                        IntervalMonthDayNano,
+                    >::new(
+                    )))
+                }
+                DataType::FixedSizeBinary(n) => 
Arc::new(FixedSizeBinaryArray::new_null(*n, 0)),
+                DataType::Dictionary(k, v) => {
+                    assert_eq!(**k, DataType::Int32, "expect int32 keys for 
enums");
+                    let keys = Int32Array::from(Vec::<i32>::new());
+                    let values = match v.as_ref() {
+                        DataType::Utf8 => {
+                            Arc::new(StringArray::from(Vec::<&str>::new())) as 
ArrayRef
+                        }
+                        other => panic!("unexpected dictionary value type 
{other:?}"),
+                    };
+                    Arc::new(DictionaryArray::<Int32Type>::try_new(keys, 
values).unwrap())
+                }
+                DataType::List(field) => {
+                    let values: ArrayRef = match field.data_type() {
+                        DataType::Int32 => {
+                            Arc::new(Int32Array::from(Vec::<i32>::new())) as 
ArrayRef
+                        }
+                        DataType::Int64 => {
+                            Arc::new(Int64Array::from(Vec::<i64>::new())) as 
ArrayRef
+                        }
+                        DataType::Utf8 => {
+                            Arc::new(StringArray::from(Vec::<&str>::new())) as 
ArrayRef
+                        }
+                        DataType::Union(_, _) => {
+                            let (uf, _) = if let DataType::Union(f, m) = 
field.data_type() {
+                                (f.clone(), m)
+                            } else {
+                                unreachable!()
+                            };
+                            let children: Vec<ArrayRef> = uf
+                                .iter()
+                                .map(|(_, f)| empty_child_for(f.data_type()))
+                                .collect();
+                            Arc::new(
+                                UnionArray::try_new(
+                                    uf.clone(),
+                                    ScalarBuffer::<i8>::from(Vec::<i8>::new()),
+                                    
Some(ScalarBuffer::<i32>::from(Vec::<i32>::new())),
+                                    children,
+                                )
+                                .unwrap(),
+                            ) as ArrayRef
+                        }
+                        other => panic!("unsupported list item type: 
{other:?}"),
+                    };
+                    let offsets = 
OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0]));
+                    Arc::new(ListArray::try_new(field.clone(), offsets, 
values, None).unwrap())
+                }
+                DataType::Map(entry_field, ordered) => {
+                    let DataType::Struct(childs) = entry_field.data_type() 
else {
+                        panic!("map entries must be struct")
+                    };
+                    let key_field = &childs[0];
+                    let val_field = &childs[1];
+                    assert_eq!(key_field.data_type(), &DataType::Utf8);
+                    let keys = StringArray::from(Vec::<&str>::new());
+                    let vals: ArrayRef = match val_field.data_type() {
+                        DataType::Float64 => {
+                            
Arc::new(arrow_array::Float64Array::from(Vec::<f64>::new())) as ArrayRef
+                        }
+                        DataType::Int64 => {
+                            Arc::new(Int64Array::from(Vec::<i64>::new())) as 
ArrayRef
+                        }
+                        DataType::Utf8 => {
+                            Arc::new(StringArray::from(Vec::<&str>::new())) as 
ArrayRef
+                        }
+                        DataType::Union(uf, _) => {
+                            let ch: Vec<ArrayRef> = uf
+                                .iter()
+                                .map(|(_, f)| empty_child_for(f.data_type()))
+                                .collect();
+                            Arc::new(
+                                UnionArray::try_new(
+                                    uf.clone(),
+                                    ScalarBuffer::<i8>::from(Vec::<i8>::new()),
+                                    
Some(ScalarBuffer::<i32>::from(Vec::<i32>::new())),
+                                    ch,
+                                )
+                                .unwrap(),
+                            ) as ArrayRef
+                        }
+                        other => panic!("unsupported map value type: 
{other:?}"),
+                    };
+                    let entries = StructArray::new(
+                        Fields::from(vec![key_field.as_ref().clone(), 
val_field.as_ref().clone()]),
+                        vec![Arc::new(keys) as ArrayRef, vals],
+                        None,
+                    );
+                    let offsets = 
OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0]));
+                    Arc::new(MapArray::new(
+                        entry_field.clone(),
+                        offsets,
+                        entries,
+                        None,
+                        *ordered,
+                    ))
+                }
+                other => panic!("empty_child_for: unhandled type {other:?}"),
+            }
+        }
+
+        fn mk_dense_union(
+            fields: &UnionFields,
+            type_ids: Vec<i8>,
+            offsets: Vec<i32>,
+            provide: impl Fn(&Field) -> Option<ArrayRef>,
+        ) -> ArrayRef {
+            let children: Vec<ArrayRef> = fields
+                .iter()
+                .map(|(_, f)| provide(f).unwrap_or_else(|| 
empty_child_for(f.data_type())))
+                .collect();
+
+            Arc::new(
+                UnionArray::try_new(
+                    fields.clone(),
+                    ScalarBuffer::<i8>::from(type_ids),
+                    Some(ScalarBuffer::<i32>::from(offsets)),
+                    children,
+                )
+                .unwrap(),
+            ) as ArrayRef
+        }
+
+        // Dates / times / timestamps from the Avro content block:
+        let date_a: i32 = 19_000;
+        let time_ms_a: i32 = 13 * 3_600_000 + 45 * 60_000 + 30_000 + 123;
+        let time_us_b: i64 = 23 * 3_600_000_000 + 59 * 60_000_000 + 59 * 
1_000_000 + 999_999;
+        let ts_ms_2024_01_01: i64 = 1_704_067_200_000;
+        let ts_us_2024_01_01: i64 = ts_ms_2024_01_01 * 1000;
+        // Fixed / bytes-like values:
+        let fx8_a: [u8; 8] = *b"ABCDEFGH";
+        let fx4_abcd: [u8; 4] = *b"ABCD";
+        let fx4_misc: [u8; 4] = [0x00, 0x11, 0x22, 0x33];
+        let fx10_ascii: [u8; 10] = *b"0123456789";
+        let fx10_aa: [u8; 10] = [0xAA; 10];
+        // Duration logical values as MonthDayNano:
+        let dur_a = IntervalMonthDayNanoType::make_value(1, 2, 3_000_000_000);
+        let dur_b = IntervalMonthDayNanoType::make_value(12, 31, 999_000_000);
+        // UUID logical values (stored as 16-byte FixedSizeBinary in Arrow):
+        let uuid1 = uuid16_from_str("fe7bc30b-4ce8-4c5e-b67c-2234a2d38e66");
+        let uuid2 = uuid16_from_str("0826cc06-d2e3-4599-b4ad-af5fa6905cdb");
+        // Decimals from Avro content:
+        let dec_b_scale2_pos: i128 = 123_456; // "1234.56" bytes-decimal -> 
(precision=10, scale=2)
+        let dec_fix16_neg: i128 = -101; // "-1.01" fixed(16) decimal(10,2)
+        let dec_fix20_s4: i128 = 1_234_567_891_234; // "123456789.1234" 
fixed(20) decimal(20,4)
+        let dec_fix20_s4_neg: i128 = -123; // "-0.0123" fixed(20) decimal(20,4)
+        let path = "test/data/union_fields.avro";
+        let actual = read_file(path, 1024, false);
+        let schema = actual.schema();
+        // Helper to fetch union metadata for a column
+        let get_union = |name: &str| -> (UnionFields, UnionMode) {
+            let idx = schema.index_of(name).unwrap();
+            match schema.field(idx).data_type() {
+                DataType::Union(f, m) => (f.clone(), *m),
+                other => panic!("{name} should be a Union, got {other:?}"),
+            }
+        };
+        let mut expected_cols: Vec<ArrayRef> = 
Vec::with_capacity(schema.fields().len());
+        // 1) ["null","int"]: Int32 (nullable)
+        expected_cols.push(Arc::new(Int32Array::from(vec![
+            None,
+            Some(42),
+            None,
+            Some(0),
+        ])));
+        // 2) ["string","null"]: Utf8 (nullable)
+        expected_cols.push(Arc::new(StringArray::from(vec![
+            Some("s1"),
+            None,
+            Some("s3"),
+            Some(""),
+        ])));
+        // 3) union_prim: 
["boolean","int","long","float","double","bytes","string"]
+        {
+            let (uf, mode) = get_union("union_prim");
+            assert!(matches!(mode, UnionMode::Dense));
+            let tids = vec![
+                tid_by_name(&uf, "long"),
+                tid_by_name(&uf, "int"),
+                tid_by_name(&uf, "float"),
+                tid_by_name(&uf, "double"),
+            ];
+            let offs = vec![0, 0, 0, 0];
+            let arr = mk_dense_union(&uf, tids, offs, |f| match 
f.name().as_str() {
+                "int" => Some(Arc::new(Int32Array::from(vec![-1])) as 
ArrayRef),
+                "long" => 
Some(Arc::new(Int64Array::from(vec![1_234_567_890_123i64])) as ArrayRef),
+                "float" => {
+                    
Some(Arc::new(arrow_array::Float32Array::from(vec![1.25f32])) as ArrayRef)
+                }
+                "double" => {
+                    
Some(Arc::new(arrow_array::Float64Array::from(vec![-2.5f64])) as ArrayRef)
+                }
+                _ => None,
+            });
+            expected_cols.push(arr);
+        }
+        // 4) union_bytes_vs_string: ["bytes","string"]
+        {
+            let (uf, _) = get_union("union_bytes_vs_string");
+            let tids = vec![
+                tid_by_name(&uf, "bytes"),
+                tid_by_name(&uf, "string"),
+                tid_by_name(&uf, "string"),
+                tid_by_name(&uf, "bytes"),
+            ];
+            let offs = vec![0, 0, 1, 1];
+            let arr = mk_dense_union(&uf, tids, offs, |f| match 
f.name().as_str() {
+                "bytes" => Some(
+                    Arc::new(BinaryArray::from(vec![&[0x00, 0xFF, 0x7F][..], 
&[][..]])) as ArrayRef,
+                ),
+                "string" => Some(Arc::new(StringArray::from(vec!["hello", 
"world"])) as ArrayRef),
+                _ => None,
+            });
+            expected_cols.push(arr);
+        }
+        // 5) union_fixed_dur_decfix: [Fx8, Dur12, DecFix16(decimal(10,2))]
+        {
+            let (uf, _) = get_union("union_fixed_dur_decfix");
+            let tid_fx8 = tid_by_dt(&uf, |dt| matches!(dt, 
DataType::FixedSizeBinary(8)));
+            let tid_dur = tid_by_dt(&uf, |dt| {
+                matches!(
+                    dt,
+                    
DataType::Interval(arrow_schema::IntervalUnit::MonthDayNano)
+                )
+            });
+            let tid_dec = tid_by_dt(&uf, |dt| match dt {
+                #[cfg(feature = "small_decimals")]
+                DataType::Decimal64(10, 2) => true,
+                DataType::Decimal128(10, 2) | DataType::Decimal256(10, 2) => 
true,
+                _ => false,
+            });
+            let tids = vec![tid_fx8, tid_dur, tid_dec, tid_dur];
+            let offs = vec![0, 0, 0, 1];
+            let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() {
+                DataType::FixedSizeBinary(8) => {
+                    let it = [Some(fx8_a)].into_iter();
+                    Some(Arc::new(
+                        
FixedSizeBinaryArray::try_from_sparse_iter_with_size(it, 8).unwrap(),
+                    ) as ArrayRef)
+                }
+                DataType::Interval(IntervalUnit::MonthDayNano) => {
+                    
Some(Arc::new(arrow_array::IntervalMonthDayNanoArray::from(vec![
+                        dur_a, dur_b,
+                    ])) as ArrayRef)
+                }
+                #[cfg(feature = "small_decimals")]
+                DataType::Decimal64(10, 2) => {
+                    let a = 
arrow_array::Decimal64Array::from_iter_values([dec_fix16_neg as i64]);
+                    Some(Arc::new(a.with_precision_and_scale(10, 2).unwrap()) 
as ArrayRef)
+                }
+                DataType::Decimal128(10, 2) => {
+                    let a = 
arrow_array::Decimal128Array::from_iter_values([dec_fix16_neg]);
+                    Some(Arc::new(a.with_precision_and_scale(10, 2).unwrap()) 
as ArrayRef)
+                }
+                DataType::Decimal256(10, 2) => {
+                    let a = 
arrow_array::Decimal256Array::from_iter_values([i256::from_i128(
+                        dec_fix16_neg,
+                    )]);
+                    Some(Arc::new(a.with_precision_and_scale(10, 2).unwrap()) 
as ArrayRef)
+                }
+                _ => None,
+            });
+            expected_cols.push(arr);
+        }
+        // 6) union_enum_records_array_map: [enum ColorU, record RecA, record 
RecB, array<long>, map<string>]
+        {
+            let (uf, _) = get_union("union_enum_records_array_map");
+            let tid_enum = tid_by_dt(&uf, |dt| matches!(dt, 
DataType::Dictionary(_, _)));
+            let tid_reca = tid_by_dt(&uf, |dt| {
+                if let DataType::Struct(fs) = dt {
+                    fs.len() == 2 && fs[0].name() == "a" && fs[1].name() == "b"
+                } else {
+                    false
+                }
+            });
+            let tid_recb = tid_by_dt(&uf, |dt| {
+                if let DataType::Struct(fs) = dt {
+                    fs.len() == 2 && fs[0].name() == "x" && fs[1].name() == "y"
+                } else {
+                    false
+                }
+            });
+            let tid_arr = tid_by_dt(&uf, |dt| matches!(dt, DataType::List(_)));
+            let tids = vec![tid_enum, tid_reca, tid_recb, tid_arr];
+            let offs = vec![0, 0, 0, 0];
+            let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() {
+                DataType::Dictionary(_, _) => {
+                    let keys = Int32Array::from(vec![0i32]); // "RED"
+                    let values =
+                        Arc::new(StringArray::from(vec!["RED", "GREEN", 
"BLUE"])) as ArrayRef;
+                    Some(
+                        Arc::new(DictionaryArray::<Int32Type>::try_new(keys, 
values).unwrap())
+                            as ArrayRef,
+                    )
+                }
+                DataType::Struct(fs)
+                    if fs.len() == 2 && fs[0].name() == "a" && fs[1].name() == 
"b" =>
+                {
+                    let a = Int32Array::from(vec![7]);
+                    let b = StringArray::from(vec!["x"]);
+                    Some(Arc::new(StructArray::new(
+                        fs.clone(),
+                        vec![Arc::new(a), Arc::new(b)],
+                        None,
+                    )) as ArrayRef)
+                }
+                DataType::Struct(fs)
+                    if fs.len() == 2 && fs[0].name() == "x" && fs[1].name() == 
"y" =>
+                {
+                    let x = Int64Array::from(vec![123_456_789i64]);
+                    let y = BinaryArray::from(vec![&[0xFF, 0x00][..]]);
+                    Some(Arc::new(StructArray::new(
+                        fs.clone(),
+                        vec![Arc::new(x), Arc::new(y)],
+                        None,
+                    )) as ArrayRef)
+                }
+                DataType::List(field) => {
+                    let values = Int64Array::from(vec![1i64, 2, 3]);
+                    let offsets = 
OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 3]));
+                    Some(Arc::new(
+                        ListArray::try_new(field.clone(), offsets, 
Arc::new(values), None).unwrap(),
+                    ) as ArrayRef)
+                }
+                DataType::Map(_, _) => None,
+                other => panic!("unexpected child {other:?}"),
+            });
+            expected_cols.push(arr);
+        }
+        // 7) union_date_or_fixed4: [date32, fixed(4)]
+        {
+            let (uf, _) = get_union("union_date_or_fixed4");
+            let tid_date = tid_by_dt(&uf, |dt| matches!(dt, DataType::Date32));
+            let tid_fx4 = tid_by_dt(&uf, |dt| matches!(dt, 
DataType::FixedSizeBinary(4)));
+            let tids = vec![tid_date, tid_fx4, tid_date, tid_fx4];
+            let offs = vec![0, 0, 1, 1];
+            let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() {
+                DataType::Date32 => {
+                    Some(Arc::new(arrow_array::Date32Array::from(vec![date_a, 
0])) as ArrayRef)
+                }
+                DataType::FixedSizeBinary(4) => {
+                    let it = [Some(fx4_abcd), Some(fx4_misc)].into_iter();
+                    Some(Arc::new(
+                        
FixedSizeBinaryArray::try_from_sparse_iter_with_size(it, 4).unwrap(),
+                    ) as ArrayRef)
+                }
+                _ => None,
+            });
+            expected_cols.push(arr);
+        }
+        // 8) union_time_millis_or_enum: [time-millis, enum OnOff]
+        {
+            let (uf, _) = get_union("union_time_millis_or_enum");
+            let tid_ms = tid_by_dt(&uf, |dt| {
+                matches!(dt, 
DataType::Time32(arrow_schema::TimeUnit::Millisecond))
+            });
+            let tid_en = tid_by_dt(&uf, |dt| matches!(dt, 
DataType::Dictionary(_, _)));
+            let tids = vec![tid_ms, tid_en, tid_en, tid_ms];
+            let offs = vec![0, 0, 1, 1];
+            let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() {
+                DataType::Time32(arrow_schema::TimeUnit::Millisecond) => {
+                    Some(Arc::new(Time32MillisecondArray::from(vec![time_ms_a, 
0])) as ArrayRef)
+                }
+                DataType::Dictionary(_, _) => {
+                    let keys = Int32Array::from(vec![0i32, 1]); // "ON", "OFF"
+                    let values = Arc::new(StringArray::from(vec!["ON", 
"OFF"])) as ArrayRef;
+                    Some(
+                        Arc::new(DictionaryArray::<Int32Type>::try_new(keys, 
values).unwrap())
+                            as ArrayRef,
+                    )
+                }
+                _ => None,
+            });
+            expected_cols.push(arr);
+        }
+        // 9) union_time_micros_or_string: [time-micros, string]
+        {
+            let (uf, _) = get_union("union_time_micros_or_string");
+            let tid_us = tid_by_dt(&uf, |dt| {
+                matches!(dt, 
DataType::Time64(arrow_schema::TimeUnit::Microsecond))
+            });
+            let tid_s = tid_by_name(&uf, "string");
+            let tids = vec![tid_s, tid_us, tid_s, tid_s];
+            let offs = vec![0, 0, 1, 2];
+            let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() {
+                DataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
+                    
Some(Arc::new(Time64MicrosecondArray::from(vec![time_us_b])) as ArrayRef)
+                }
+                DataType::Utf8 => {
+                    Some(Arc::new(StringArray::from(vec!["evening", "night", 
""])) as ArrayRef)
+                }
+                _ => None,
+            });
+            expected_cols.push(arr);
+        }
+        // 10) union_ts_millis_utc_or_array: [timestamp-millis(TZ), array<int>]
+        {
+            let (uf, _) = get_union("union_ts_millis_utc_or_array");
+            let tid_ts = tid_by_dt(&uf, |dt| {
+                matches!(
+                    dt,
+                    DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _)
+                )
+            });
+            let tid_arr = tid_by_dt(&uf, |dt| matches!(dt, DataType::List(_)));
+            let tids = vec![tid_ts, tid_arr, tid_arr, tid_ts];
+            let offs = vec![0, 0, 1, 1];
+            let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() {
+                DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, tz) 
=> {
+                    let a = TimestampMillisecondArray::from(vec![
+                        ts_ms_2024_01_01,
+                        ts_ms_2024_01_01 + 86_400_000,
+                    ]);
+                    Some(Arc::new(if let Some(tz) = tz {
+                        a.with_timezone(tz.clone())
+                    } else {
+                        a
+                    }) as ArrayRef)
+                }
+                DataType::List(field) => {
+                    let values = Int32Array::from(vec![0, 1, 2, -1, 0, 1]);
+                    let offsets = 
OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 3, 6]));
+                    Some(Arc::new(
+                        ListArray::try_new(field.clone(), offsets, 
Arc::new(values), None).unwrap(),
+                    ) as ArrayRef)
+                }
+                _ => None,
+            });
+            expected_cols.push(arr);
+        }
+        // 11) union_ts_micros_local_or_bytes: [local-timestamp-micros, bytes]
+        {
+            let (uf, _) = get_union("union_ts_micros_local_or_bytes");
+            let tid_lts = tid_by_dt(&uf, |dt| {
+                matches!(
+                    dt,
+                    DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, 
None)
+                )
+            });
+            let tid_b = tid_by_name(&uf, "bytes");
+            let tids = vec![tid_b, tid_lts, tid_b, tid_b];
+            let offs = vec![0, 0, 1, 2];
+            let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() {
+                DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None) 
=> Some(Arc::new(
+                    TimestampMicrosecondArray::from(vec![ts_us_2024_01_01]),
+                )
+                    as ArrayRef),
+                DataType::Binary => Some(Arc::new(BinaryArray::from(vec![
+                    &b"\x11\x22\x33"[..],
+                    &b"\x00"[..],
+                    &b"\x10\x20\x30\x40"[..],
+                ])) as ArrayRef),
+                _ => None,
+            });
+            expected_cols.push(arr);
+        }
+        // 12) union_uuid_or_fixed10: [uuid(string)->fixed(16), fixed(10)]
+        {
+            let (uf, _) = get_union("union_uuid_or_fixed10");
+            let tid_fx16 = tid_by_dt(&uf, |dt| matches!(dt, 
DataType::FixedSizeBinary(16)));
+            let tid_fx10 = tid_by_dt(&uf, |dt| matches!(dt, 
DataType::FixedSizeBinary(10)));
+            let tids = vec![tid_fx16, tid_fx10, tid_fx16, tid_fx10];
+            let offs = vec![0, 0, 1, 1];
+            let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() {
+                DataType::FixedSizeBinary(16) => {
+                    let it = [Some(uuid1), Some(uuid2)].into_iter();
+                    Some(Arc::new(
+                        
FixedSizeBinaryArray::try_from_sparse_iter_with_size(it, 16).unwrap(),
+                    ) as ArrayRef)
+                }
+                DataType::FixedSizeBinary(10) => {
+                    let it = [Some(fx10_ascii), Some(fx10_aa)].into_iter();
+                    Some(Arc::new(
+                        
FixedSizeBinaryArray::try_from_sparse_iter_with_size(it, 10).unwrap(),
+                    ) as ArrayRef)
+                }
+                _ => None,
+            });
+            expected_cols.push(arr);
+        }
+        // 13) union_dec_bytes_or_dec_fixed: [bytes dec(10,2), fixed(20) 
dec(20,4)]
+        {
+            let (uf, _) = get_union("union_dec_bytes_or_dec_fixed");
+            let tid_b10s2 = tid_by_dt(&uf, |dt| match dt {
+                #[cfg(feature = "small_decimals")]
+                DataType::Decimal64(10, 2) => true,
+                DataType::Decimal128(10, 2) | DataType::Decimal256(10, 2) => 
true,
+                _ => false,
+            });
+            let tid_f20s4 = tid_by_dt(&uf, |dt| {
+                matches!(
+                    dt,
+                    DataType::Decimal128(20, 4) | DataType::Decimal256(20, 4)
+                )
+            });
+            let tids = vec![tid_b10s2, tid_f20s4, tid_b10s2, tid_f20s4];
+            let offs = vec![0, 0, 1, 1];
+            let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() {
+                #[cfg(feature = "small_decimals")]
+                DataType::Decimal64(10, 2) => {
+                    let a = Decimal64Array::from_iter_values([dec_b_scale2_pos 
as i64, 0i64]);
+                    Some(Arc::new(a.with_precision_and_scale(10, 2).unwrap()) 
as ArrayRef)
+                }
+                DataType::Decimal128(10, 2) => {
+                    let a = 
Decimal128Array::from_iter_values([dec_b_scale2_pos, 0]);
+                    Some(Arc::new(a.with_precision_and_scale(10, 2).unwrap()) 
as ArrayRef)
+                }
+                DataType::Decimal256(10, 2) => {
+                    let a = Decimal256Array::from_iter_values([
+                        i256::from_i128(dec_b_scale2_pos),
+                        i256::from(0),
+                    ]);
+                    Some(Arc::new(a.with_precision_and_scale(10, 2).unwrap()) 
as ArrayRef)
+                }
+                DataType::Decimal128(20, 4) => {
+                    let a = 
Decimal128Array::from_iter_values([dec_fix20_s4_neg, dec_fix20_s4]);
+                    Some(Arc::new(a.with_precision_and_scale(20, 4).unwrap()) 
as ArrayRef)
+                }
+                DataType::Decimal256(20, 4) => {
+                    let a = Decimal256Array::from_iter_values([
+                        i256::from_i128(dec_fix20_s4_neg),
+                        i256::from_i128(dec_fix20_s4),
+                    ]);
+                    Some(Arc::new(a.with_precision_and_scale(20, 4).unwrap()) 
as ArrayRef)
+                }
+                _ => None,
+            });
+            expected_cols.push(arr);
+        }
+        // 14) union_null_bytes_string: ["null","bytes","string"]
+        {
+            let (uf, _) = get_union("union_null_bytes_string");
+            let tid_n = tid_by_name(&uf, "null");
+            let tid_b = tid_by_name(&uf, "bytes");
+            let tid_s = tid_by_name(&uf, "string");
+            let tids = vec![tid_n, tid_b, tid_s, tid_s];
+            let offs = vec![0, 0, 0, 1];
+            let arr = mk_dense_union(&uf, tids, offs, |f| match 
f.name().as_str() {
+                "null" => Some(Arc::new(arrow_array::NullArray::new(1)) as 
ArrayRef),
+                "bytes" => 
Some(Arc::new(BinaryArray::from(vec![&b"\x01\x02"[..]])) as ArrayRef),
+                "string" => Some(Arc::new(StringArray::from(vec!["text", 
"u"])) as ArrayRef),
+                _ => None,
+            });
+            expected_cols.push(arr);
+        }
+        // 15) array_of_union: array<[long,string]>
+        {
+            let idx = schema.index_of("array_of_union").unwrap();
+            let dt = schema.field(idx).data_type().clone();
+            let (item_field, _) = match &dt {
+                DataType::List(f) => (f.clone(), ()),
+                other => panic!("array_of_union must be List, got {other:?}"),
+            };
+            let (uf, _) = match item_field.data_type() {
+                DataType::Union(f, m) => (f.clone(), m),
+                other => panic!("array_of_union items must be Union, got 
{other:?}"),
+            };
+            let tid_l = tid_by_name(&uf, "long");
+            let tid_s = tid_by_name(&uf, "string");
+            let type_ids = vec![tid_l, tid_s, tid_l, tid_s, tid_l, tid_l, 
tid_s, tid_l];
+            let offsets = vec![0, 0, 1, 1, 2, 3, 2, 4];
+            let values_union =
+                mk_dense_union(&uf, type_ids, offsets, |f| match 
f.name().as_str() {
+                    "long" => {
+                        Some(Arc::new(Int64Array::from(vec![1i64, -5, 42, -1, 
0])) as ArrayRef)
+                    }
+                    "string" => Some(Arc::new(StringArray::from(vec!["a", "", 
"z"])) as ArrayRef),
+                    _ => None,
+                });
+            let list_offsets = 
OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 3, 5, 6, 8]));
+            expected_cols.push(Arc::new(
+                ListArray::try_new(item_field.clone(), list_offsets, 
values_union, None).unwrap(),
+            ));
+        }
+        // 16) map_of_union: map<[null,double]>
+        {
+            let idx = schema.index_of("map_of_union").unwrap();
+            let dt = schema.field(idx).data_type().clone();
+            let (entry_field, ordered) = match &dt {
+                DataType::Map(f, ordered) => (f.clone(), *ordered),
+                other => panic!("map_of_union must be Map, got {other:?}"),
+            };
+            let DataType::Struct(entry_fields) = entry_field.data_type() else {
+                panic!("map entries must be struct")
+            };
+            let key_field = entry_fields[0].clone();
+            let val_field = entry_fields[1].clone();
+            let keys = StringArray::from(vec!["a", "b", "x", "pi"]);
+            let rounded_pi = (std::f64::consts::PI * 100_000.0).round() / 
100_000.0;
+            let values: ArrayRef = match val_field.data_type() {
+                DataType::Union(uf, _) => {
+                    let tid_n = tid_by_name(uf, "null");
+                    let tid_d = tid_by_name(uf, "double");
+                    let tids = vec![tid_n, tid_d, tid_d, tid_d];
+                    let offs = vec![0, 0, 1, 2];
+                    mk_dense_union(uf, tids, offs, |f| match f.name().as_str() 
{
+                        "null" => Some(Arc::new(NullArray::new(1)) as 
ArrayRef),
+                        "double" => 
Some(Arc::new(arrow_array::Float64Array::from(vec![
+                            2.5f64, -0.5f64, rounded_pi,
+                        ])) as ArrayRef),
+                        _ => None,
+                    })
+                }
+                DataType::Float64 => 
Arc::new(arrow_array::Float64Array::from(vec![
+                    None,
+                    Some(2.5),
+                    Some(-0.5),
+                    Some(rounded_pi),
+                ])),
+                other => panic!("unexpected map value type {other:?}"),
+            };
+            let entries = StructArray::new(
+                Fields::from(vec![key_field.as_ref().clone(), 
val_field.as_ref().clone()]),
+                vec![Arc::new(keys) as ArrayRef, values],
+                None,
+            );
+            let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 
2, 3, 3, 4]));
+            expected_cols.push(Arc::new(MapArray::new(
+                entry_field,
+                offsets,
+                entries,
+                None,
+                ordered,
+            )));
+        }
+        // 17) record_with_union_field: struct { id:int, u:[int,string] }
+        {
+            let idx = schema.index_of("record_with_union_field").unwrap();
+            let DataType::Struct(rec_fields) = schema.field(idx).data_type() 
else {
+                panic!("record_with_union_field should be Struct")
+            };
+            let id = Int32Array::from(vec![1, 2, 3, 4]);
+            let u_field = rec_fields.iter().find(|f| f.name() == "u").unwrap();
+            let DataType::Union(uf, _) = u_field.data_type() else {
+                panic!("u must be Union")
+            };
+            let tid_i = tid_by_name(uf, "int");
+            let tid_s = tid_by_name(uf, "string");
+            let tids = vec![tid_s, tid_i, tid_i, tid_s];
+            let offs = vec![0, 0, 1, 1];
+            let u = mk_dense_union(uf, tids, offs, |f| match f.name().as_str() 
{
+                "int" => Some(Arc::new(Int32Array::from(vec![99, 0])) as 
ArrayRef),
+                "string" => Some(Arc::new(StringArray::from(vec!["one", 
"four"])) as ArrayRef),
+                _ => None,
+            });
+            let rec = StructArray::new(rec_fields.clone(), vec![Arc::new(id) 
as ArrayRef, u], None);
+            expected_cols.push(Arc::new(rec));
+        }
+        // 18) union_ts_micros_utc_or_map: [timestamp-micros(TZ), map<long>]
+        {
+            let (uf, _) = get_union("union_ts_micros_utc_or_map");
+            let tid_ts = tid_by_dt(&uf, |dt| {
+                matches!(
+                    dt,
+                    DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, 
Some(_))
+                )
+            });
+            let tid_map = tid_by_dt(&uf, |dt| matches!(dt, DataType::Map(_, 
_)));
+            let tids = vec![tid_ts, tid_map, tid_ts, tid_map];
+            let offs = vec![0, 0, 1, 1];
+            let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() {
+                DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, tz) 
=> {
+                    let a = 
TimestampMicrosecondArray::from(vec![ts_us_2024_01_01, 0i64]);
+                    Some(Arc::new(if let Some(tz) = tz {
+                        a.with_timezone(tz.clone())
+                    } else {
+                        a
+                    }) as ArrayRef)
+                }
+                DataType::Map(entry_field, ordered) => {
+                    let DataType::Struct(fs) = entry_field.data_type() else {
+                        panic!("map entries must be struct")
+                    };
+                    let key_field = fs[0].clone();
+                    let val_field = fs[1].clone();
+                    assert_eq!(key_field.data_type(), &DataType::Utf8);
+                    assert_eq!(val_field.data_type(), &DataType::Int64);
+                    let keys = StringArray::from(vec!["k1", "k2", "n"]);
+                    let vals = Int64Array::from(vec![1i64, 2, 0]);
+                    let entries = StructArray::new(
+                        Fields::from(vec![key_field.as_ref().clone(), 
val_field.as_ref().clone()]),
+                        vec![Arc::new(keys) as ArrayRef, Arc::new(vals) as 
ArrayRef],
+                        None,
+                    );
+                    let offsets = 
OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2, 3]));
+                    Some(Arc::new(MapArray::new(
+                        entry_field.clone(),
+                        offsets,
+                        entries,
+                        None,
+                        *ordered,
+                    )) as ArrayRef)
+                }
+                _ => None,
+            });
+            expected_cols.push(arr);
+        }
+        // 19) union_ts_millis_local_or_string: [local-timestamp-millis, 
string]
+        {
+            let (uf, _) = get_union("union_ts_millis_local_or_string");
+            let tid_ts = tid_by_dt(&uf, |dt| {
+                matches!(
+                    dt,
+                    DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, 
None)
+                )
+            });
+            let tid_s = tid_by_name(&uf, "string");
+            let tids = vec![tid_s, tid_ts, tid_s, tid_s];
+            let offs = vec![0, 0, 1, 2];
+            let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() {
+                DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None) 
=> Some(Arc::new(
+                    TimestampMillisecondArray::from(vec![ts_ms_2024_01_01]),
+                )
+                    as ArrayRef),
+                DataType::Utf8 => {
+                    Some(
+                        Arc::new(StringArray::from(vec!["local midnight", 
"done", ""])) as ArrayRef,
+                    )
+                }
+                _ => None,
+            });
+            expected_cols.push(arr);
+        }
+        // 20) union_bool_or_string: ["boolean","string"]
+        {
+            let (uf, _) = get_union("union_bool_or_string");
+            let tid_b = tid_by_name(&uf, "boolean");
+            let tid_s = tid_by_name(&uf, "string");
+            let tids = vec![tid_b, tid_s, tid_b, tid_s];
+            let offs = vec![0, 0, 1, 1];
+            let arr = mk_dense_union(&uf, tids, offs, |f| match 
f.name().as_str() {
+                "boolean" => Some(Arc::new(BooleanArray::from(vec![true, 
false])) as ArrayRef),
+                "string" => Some(Arc::new(StringArray::from(vec!["no", 
"yes"])) as ArrayRef),
+                _ => None,
+            });
+            expected_cols.push(arr);
+        }
+        let expected = RecordBatch::try_new(schema.clone(), 
expected_cols).unwrap();
+        assert_eq!(
+            actual, expected,
+            "full end-to-end equality for union_fields.avro"
+        );
+    }
+
     #[test]
     fn test_read_zero_byte_avro_file() {
         let batch = read_file("test/data/zero_byte.avro", 3, false);
diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs
index 3295e330a1..950333174b 100644
--- a/arrow-avro/src/reader/record.rs
+++ b/arrow-avro/src/reader/record.rs
@@ -17,13 +17,11 @@
 
 use crate::codec::{
     AvroDataType, AvroField, AvroLiteral, Codec, Promotion, ResolutionInfo, 
ResolvedRecord,
+    ResolvedUnion,
 };
-use crate::reader::block::{Block, BlockDecoder};
 use crate::reader::cursor::AvroCursor;
 use crate::schema::Nullability;
-use arrow_array::builder::{
-    Decimal128Builder, Decimal256Builder, IntervalMonthDayNanoBuilder, 
StringViewBuilder,
-};
+use arrow_array::builder::{Decimal128Builder, Decimal256Builder, 
IntervalMonthDayNanoBuilder};
 #[cfg(feature = "small_decimals")]
 use arrow_array::builder::{Decimal32Builder, Decimal64Builder};
 use arrow_array::types::*;
@@ -31,12 +29,13 @@ use arrow_array::*;
 use arrow_buffer::*;
 use arrow_schema::{
     ArrowError, DataType, Field as ArrowField, FieldRef, Fields, Schema as 
ArrowSchema, SchemaRef,
-    DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION,
+    UnionFields, UnionMode, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION,
 };
 #[cfg(feature = "small_decimals")]
 use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
 use std::cmp::Ordering;
 use std::sync::Arc;
+use strum_macros::AsRefStr;
 use uuid::Uuid;
 
 const DEFAULT_CAPACITY: usize = 1024;
@@ -214,7 +213,7 @@ struct EnumResolution {
     default_index: i32,
 }
 
-#[derive(Debug)]
+#[derive(Debug, AsRefStr)]
 enum Decoder {
     Null(usize),
     Boolean(BooleanBufferBuilder),
@@ -259,11 +258,30 @@ enum Decoder {
     Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
     Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
     Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
+    Union(UnionDecoder),
     Nullable(Nullability, NullBufferBuilder, Box<Decoder>),
 }
 
 impl Decoder {
     fn try_new(data_type: &AvroDataType) -> Result<Self, ArrowError> {
+        if let Some(ResolutionInfo::Union(info)) = 
data_type.resolution.as_ref() {
+            if info.writer_is_union && !info.reader_is_union {
+                let mut clone = data_type.clone();
+                clone.resolution = None; // Build target base decoder without 
Union resolution
+                let target = Box::new(Self::try_new_internal(&clone)?);
+                let decoder = Self::Union(
+                    UnionDecoderBuilder::new()
+                        .with_resolved_union(info.clone())
+                        .with_target(target)
+                        .build()?,
+                );
+                return Ok(decoder);
+            }
+        }
+        Self::try_new_internal(data_type)
+    }
+
+    fn try_new_internal(data_type: &AvroDataType) -> Result<Self, ArrowError> {
         // Extract just the Promotion (if any) to simplify pattern matching
         let promotion = match data_type.resolution.as_ref() {
             Some(ResolutionInfo::Promotion(p)) => Some(p),
@@ -426,10 +444,43 @@ impl Decoder {
                 )
             }
             (Codec::Uuid, _) => 
Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)),
-            (&Codec::Union(_, _, _), _) => {
+            (Codec::Union(encodings, fields, UnionMode::Dense), _) => {
+                let decoders = encodings
+                    .iter()
+                    .map(Self::try_new_internal)
+                    .collect::<Result<Vec<_>, _>>()?;
+                if fields.len() != decoders.len() {
+                    return Err(ArrowError::SchemaError(format!(
+                        "Union has {} fields but {} decoders",
+                        fields.len(),
+                        decoders.len()
+                    )));
+                }
+                // Proactive guard: if a user provides a union with more 
branches than
+                // a 32-bit Avro index can address, fail fast with a clear 
message.
+                let branch_count = decoders.len();
+                let max_addr = (i32::MAX as usize) + 1;
+                if branch_count > max_addr {
+                    return Err(ArrowError::SchemaError(format!(
+                        "Union has {branch_count} branches, which exceeds the 
maximum addressable \
+                         branches by an Avro int tag ({} + 1).",
+                        i32::MAX
+                    )));
+                }
+                let mut builder = UnionDecoderBuilder::new()
+                    .with_fields(fields.clone())
+                    .with_branches(decoders);
+                if let Some(ResolutionInfo::Union(info)) = 
data_type.resolution.as_ref() {
+                    if info.reader_is_union {
+                        builder = builder.with_resolved_union(info.clone());
+                    }
+                }
+                Self::Union(builder.build()?)
+            }
+            (Codec::Union(_, _, _), _) => {
                 return Err(ArrowError::NotYetImplemented(
-                    "Union type decoding is not yet supported".to_string(),
-                ))
+                    "Sparse Arrow unions are not yet supported".to_string(),
+                ));
             }
         };
         Ok(match data_type.nullability() {
@@ -443,7 +494,7 @@ impl Decoder {
     }
 
     /// Append a null record
-    fn append_null(&mut self) {
+    fn append_null(&mut self) -> Result<(), ArrowError> {
         match self {
             Self::Null(count) => *count += 1,
             Self::Boolean(b) => b.append(false),
@@ -468,10 +519,14 @@ impl Decoder {
             Self::Uuid(v) => {
                 v.extend([0; 16]);
             }
-            Self::Array(_, offsets, e) => {
+            Self::Array(_, offsets, _) => {
                 offsets.push_length(0);
             }
-            Self::Record(_, e, _) => e.iter_mut().for_each(|e| 
e.append_null()),
+            Self::Record(_, e, _) => {
+                for encoding in e.iter_mut() {
+                    encoding.append_null();
+                }
+            }
             Self::Map(_, _koff, moff, _, _) => {
                 moff.push_length(0);
             }
@@ -486,11 +541,13 @@ impl Decoder {
             Self::Decimal256(_, _, _, builder) => 
builder.append_value(i256::ZERO),
             Self::Enum(indices, _, _) => indices.push(0),
             Self::Duration(builder) => builder.append_null(),
+            Self::Union(u) => u.append_null()?,
             Self::Nullable(_, null_buffer, inner) => {
                 null_buffer.append(false);
                 inner.append_null();
             }
         }
+        Ok(())
     }
 
     /// Append a single default literal into the decoder's buffers
@@ -499,8 +556,7 @@ impl Decoder {
             Self::Nullable(_, nb, inner) => {
                 if matches!(lit, AvroLiteral::Null) {
                     nb.append(false);
-                    inner.append_null();
-                    Ok(())
+                    inner.append_null()
                 } else {
                     nb.append(true);
                     inner.append_default(lit)
@@ -700,6 +756,7 @@ impl Decoder {
                     "Default for enum must be a symbol".to_string(),
                 )),
             },
+            Self::Union(u) => u.append_default(lit),
             Self::Record(field_meta, decoders, projector) => match lit {
                 AvroLiteral::Map(entries) => {
                     for (i, dec) in decoders.iter_mut().enumerate() {
@@ -834,6 +891,7 @@ impl Decoder {
                 let nanos = (millis as i64) * 1_000_000;
                 builder.append_value(IntervalMonthDayNano::new(months as i32, 
days as i32, nanos));
             }
+            Self::Union(u) => u.decode(buf)?,
             Self::Nullable(order, nb, encoding) => {
                 let branch = buf.read_vlq()?;
                 let is_not_null = match *order {
@@ -852,6 +910,64 @@ impl Decoder {
         Ok(())
     }
 
+    fn decode_with_promotion(
+        &mut self,
+        buf: &mut AvroCursor<'_>,
+        promotion: Promotion,
+    ) -> Result<(), ArrowError> {
+        macro_rules! promote_numeric_to {
+            ($variant:ident, $getter:ident, $to:ty) => {{
+                match self {
+                    Self::$variant(v) => {
+                        let x = buf.$getter()?;
+                        v.push(x as $to);
+                        Ok(())
+                    }
+                    other => Err(ArrowError::ParseError(format!(
+                        "Promotion {promotion} target mismatch: expected {}, 
got {}",
+                        stringify!($variant),
+                        <Self as ::std::convert::AsRef<str>>::as_ref(other)
+                    ))),
+                }
+            }};
+        }
+        match promotion {
+            Promotion::Direct => self.decode(buf),
+            Promotion::IntToLong => promote_numeric_to!(Int64, get_int, i64),
+            Promotion::IntToFloat => promote_numeric_to!(Float32, get_int, 
f32),
+            Promotion::IntToDouble => promote_numeric_to!(Float64, get_int, 
f64),
+            Promotion::LongToFloat => promote_numeric_to!(Float32, get_long, 
f32),
+            Promotion::LongToDouble => promote_numeric_to!(Float64, get_long, 
f64),
+            Promotion::FloatToDouble => promote_numeric_to!(Float64, 
get_float, f64),
+            Promotion::StringToBytes => match self {
+                Self::Binary(offsets, values) | Self::StringToBytes(offsets, 
values) => {
+                    let data = buf.get_bytes()?;
+                    offsets.push_length(data.len());
+                    values.extend_from_slice(data);
+                    Ok(())
+                }
+                other => Err(ArrowError::ParseError(format!(
+                    "Promotion {promotion} target mismatch: expected bytes 
(Binary/StringToBytes), got {}",
+                    <Self as AsRef<str>>::as_ref(other)
+                ))),
+            },
+            Promotion::BytesToString => match self {
+                Self::String(offsets, values)
+                | Self::StringView(offsets, values)
+                | Self::BytesToString(offsets, values) => {
+                    let data = buf.get_bytes()?;
+                    offsets.push_length(data.len());
+                    values.extend_from_slice(data);
+                    Ok(())
+                }
+                other => Err(ArrowError::ParseError(format!(
+                    "Promotion {promotion} target mismatch: expected string 
(String/StringView/BytesToString), got {}",
+                    <Self as AsRef<str>>::as_ref(other)
+                ))),
+            },
+        }
+    }
+
     /// Flush decoded records to an [`ArrayRef`]
     fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, 
ArrowError> {
         Ok(match self {
@@ -950,7 +1066,7 @@ impl Decoder {
                     other => {
                         return Err(ArrowError::InvalidArgumentError(format!(
                             "Map entries field must be a Struct, got {other:?}"
-                        )))
+                        )));
                     }
                 };
                 let entries_struct =
@@ -991,8 +1107,377 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
+            Self::Union(u) => u.flush(nulls)?,
+        })
+    }
+}
+
+// A lookup table for resolving fields between writer and reader schemas 
during record projection.
+#[derive(Debug)]
+struct DispatchLookupTable {
+    // Maps each reader field index `r` to the corresponding writer field 
index.
+    //
+    // Semantics:
+    // - `to_reader[r] >= 0`: The value is an index into the writer's fields. 
The value from
+    //   the writer field is decoded, and `promotion[r]` is applied.
+    // - `to_reader[r] == NO_SOURCE` (-1): No matching writer field exists. 
The reader field's
+    //   default value is used.
+    //
+    // Representation (`i8`):
+    // `i8` is used for a dense, cache-friendly dispatch table, consistent 
with Arrow's use of
+    // `i8` for union type IDs. This requires that writer field indices do not 
exceed `i8::MAX`.
+    //
+    // Invariants:
+    // - `to_reader.len() == promotion.len()` and matches the reader field 
count.
+    // - If `to_reader[r] == NO_SOURCE`, `promotion[r]` is ignored.
+    to_reader: Box<[i8]>,
+    // For each reader field `r`, specifies the `Promotion` to apply to the 
writer's value.
+    //
+    // This is used when a writer field's type can be promoted to a reader 
field's type
+    // (e.g., `Int` to `Long`). It is ignored if `to_reader[r] == NO_SOURCE`.
+    promotion: Box<[Promotion]>,
+}
+
+// Sentinel used in `DispatchLookupTable::to_reader` to mark
+// "no matching writer field".
+const NO_SOURCE: i8 = -1;
+
+impl DispatchLookupTable {
+    fn from_writer_to_reader(
+        promotion_map: &[Option<(usize, Promotion)>],
+    ) -> Result<Self, ArrowError> {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    let idx_i8 = i8::try_from(idx).map_err(|_| {
+                        ArrowError::SchemaError(format!(
+                            "Reader branch index {idx} exceeds i8 range (max 
{})",
+                            i8::MAX
+                        ))
+                    })?;
+                    to_reader.push(idx_i8);
+                    promotion.push(promo);
+                }
+                None => {
+                    to_reader.push(NO_SOURCE);
+                    promotion.push(Promotion::Direct);
+                }
+            }
+        }
+        Ok(Self {
+            to_reader: to_reader.into_boxed_slice(),
+            promotion: promotion.into_boxed_slice(),
+        })
+    }
+
+    // Resolve a writer branch index to (reader_idx, promotion)
+    #[inline]
+    fn resolve(&self, writer_index: usize) -> Option<(usize, Promotion)> {
+        let reader_index = *self.to_reader.get(writer_index)?;
+        (reader_index >= 0).then(|| (reader_index as usize, 
self.promotion[writer_index]))
+    }
+}
+
+#[derive(Debug)]
+struct UnionDecoder {
+    fields: UnionFields,
+    type_ids: Vec<i8>,
+    offsets: Vec<i32>,
+    branches: Vec<Decoder>,
+    counts: Vec<i32>,
+    reader_type_codes: Vec<i8>,
+    null_branch: Option<usize>,
+    default_emit_idx: usize,
+    null_emit_idx: usize,
+    plan: UnionReadPlan,
+}
+
+impl Default for UnionDecoder {
+    fn default() -> Self {
+        Self {
+            fields: UnionFields::empty(),
+            type_ids: Vec::new(),
+            offsets: Vec::new(),
+            branches: Vec::new(),
+            counts: Vec::new(),
+            reader_type_codes: Vec::new(),
+            null_branch: None,
+            default_emit_idx: 0,
+            null_emit_idx: 0,
+            plan: UnionReadPlan::Passthrough,
+        }
+    }
+}
+
+#[derive(Debug)]
+enum UnionReadPlan {
+    ReaderUnion {
+        lookup_table: DispatchLookupTable,
+    },
+    FromSingle {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+        lookup_table: DispatchLookupTable,
+    },
+    Passthrough,
+}
+
+impl UnionDecoder {
+    fn try_new(
+        fields: UnionFields,
+        branches: Vec<Decoder>,
+        resolved: Option<ResolvedUnion>,
+    ) -> Result<Self, ArrowError> {
+        let reader_type_codes = fields.iter().map(|(tid, _)| 
tid).collect::<Vec<i8>>();
+        let null_branch = branches.iter().position(|b| matches!(b, 
Decoder::Null(_)));
+        let default_emit_idx = 0;
+        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
+        let branch_len = branches.len().max(reader_type_codes.len());
+        // Guard against impractically large unions that cannot be indexed by 
an Avro int
+        let max_addr = (i32::MAX as usize) + 1;
+        if branches.len() > max_addr {
+            return Err(ArrowError::SchemaError(format!(
+                "Reader union has {} branches, which exceeds the maximum 
addressable \
+                 branches by an Avro int tag ({} + 1).",
+                branches.len(),
+                i32::MAX
+            )));
+        }
+        Ok(Self {
+            fields,
+            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
+            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
+            branches,
+            counts: vec![0; branch_len],
+            reader_type_codes,
+            null_branch,
+            default_emit_idx,
+            null_emit_idx,
+            plan: Self::plan_from_resolved(resolved)?,
+        })
+    }
+
+    fn try_new_from_writer_union(
+        info: ResolvedUnion,
+        target: Box<Decoder>,
+    ) -> Result<Self, ArrowError> {
+        // This constructor is only for writer-union to single-type resolution
+        debug_assert!(info.writer_is_union && !info.reader_is_union);
+        let lookup_table = 
DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
+        Ok(Self {
+            plan: UnionReadPlan::ToSingle {
+                target,
+                lookup_table,
+            },
+            ..Self::default()
+        })
+    }
+
+    fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> 
Result<UnionReadPlan, ArrowError> {
+        let Some(info) = resolved else {
+            return Ok(UnionReadPlan::Passthrough);
+        };
+        match (info.writer_is_union, info.reader_is_union) {
+            (true, true) => {
+                let lookup_table =
+                    
DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
+                Ok(UnionReadPlan::ReaderUnion { lookup_table })
+            }
+            (false, true) => {
+                let Some(&(reader_idx, promotion)) =
+                    info.writer_to_reader.first().and_then(Option::as_ref)
+                else {
+                    return Err(ArrowError::SchemaError(
+                        "Writer type does not match any reader union 
branch".to_string(),
+                    ));
+                };
+                Ok(UnionReadPlan::FromSingle {
+                    reader_idx,
+                    promotion,
+                })
+            }
+            (true, false) => Err(ArrowError::InvalidArgumentError(
+                "UnionDecoder::try_new cannot build writer-union to single; 
use UnionDecoderBuilder with a target"
+                    .to_string(),
+            )),
+            // (false, false) is invalid and should never be constructed by 
the resolver.
+            _ => Err(ArrowError::SchemaError(
+                "ResolvedUnion constructed for non-union sides; resolver 
should return None"
+                    .to_string(),
+            )),
+        }
+    }
+
+    #[inline]
+    fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, ArrowError> {
+        // Avro unions are encoded by first writing the zero-based branch 
index.
+        // In Avro 1.11.1 this is specified as an *int*; older specs said 
*long*,
+        // but both use zig-zag varint encoding, so decoding as long is 
compatible
+        // with either form and widely used in practice.
+        let raw = buf.get_long()?;
+        if raw < 0 {
+            return Err(ArrowError::ParseError(format!(
+                "Negative union branch index {raw}"
+            )));
+        }
+        usize::try_from(raw).map_err(|_| {
+            ArrowError::ParseError(format!(
+                "Union branch index {raw} does not fit into usize on this 
platform ({}-bit)",
+                (usize::BITS as usize)
+            ))
         })
     }
+
+    #[inline]
+    fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, 
ArrowError> {
+        let branches_len = self.branches.len();
+        let Some(reader_branch) = self.branches.get_mut(reader_idx) else {
+            return Err(ArrowError::ParseError(format!(
+                "Union branch index {reader_idx} out of range ({branches_len} 
branches)"
+            )));
+        };
+        self.type_ids.push(self.reader_type_codes[reader_idx]);
+        self.offsets.push(self.counts[reader_idx]);
+        self.counts[reader_idx] += 1;
+        Ok(reader_branch)
+    }
+
+    #[inline]
+    fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(), 
ArrowError>
+    where
+        F: FnOnce(&mut Decoder) -> Result<(), ArrowError>,
+    {
+        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
+            return action(target);
+        }
+        let reader_idx = match &self.plan {
+            UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
+            _ => fallback_idx,
+        };
+        self.emit_to(reader_idx).and_then(action)
+    }
+
+    fn append_null(&mut self) -> Result<(), ArrowError> {
+        self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
+    }
+
+    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
+        self.on_decoder(self.default_emit_idx, |decoder| 
decoder.append_default(lit))
+    }
+
+    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
+        let (reader_idx, promotion) = match &mut self.plan {
+            UnionReadPlan::Passthrough => (Self::read_tag(buf)?, 
Promotion::Direct),
+            UnionReadPlan::ReaderUnion { lookup_table } => {
+                let idx = Self::read_tag(buf)?;
+                lookup_table.resolve(idx).ok_or_else(|| {
+                    ArrowError::ParseError(format!(
+                        "Union branch index {idx} not resolvable by reader 
schema"
+                    ))
+                })?
+            }
+            UnionReadPlan::FromSingle {
+                reader_idx,
+                promotion,
+            } => (*reader_idx, *promotion),
+            UnionReadPlan::ToSingle {
+                target,
+                lookup_table,
+            } => {
+                let idx = Self::read_tag(buf)?;
+                return match lookup_table.resolve(idx) {
+                    Some((_, promotion)) => target.decode_with_promotion(buf, 
promotion),
+                    None => Err(ArrowError::ParseError(format!(
+                        "Writer union branch {idx} does not resolve to reader 
type"
+                    ))),
+                };
+            }
+        };
+        let decoder = self.emit_to(reader_idx)?;
+        decoder.decode_with_promotion(buf, promotion)
+    }
+
+    fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, 
ArrowError> {
+        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
+            return target.flush(nulls);
+        }
+        debug_assert!(
+            nulls.is_none(),
+            "UnionArray does not accept a validity bitmap; \
+                     nulls should have been materialized as a Null child 
during decode"
+        );
+        let children = self
+            .branches
+            .iter_mut()
+            .map(|d| d.flush(None))
+            .collect::<Result<Vec<_>, _>>()?;
+        let arr = UnionArray::try_new(
+            self.fields.clone(),
+            flush_values(&mut self.type_ids).into_iter().collect(),
+            Some(flush_values(&mut self.offsets).into_iter().collect()),
+            children,
+        )
+        .map_err(|e| ArrowError::ParseError(e.to_string()))?;
+        Ok(Arc::new(arr))
+    }
+}
+
+#[derive(Debug, Default)]
+struct UnionDecoderBuilder {
+    fields: Option<UnionFields>,
+    branches: Option<Vec<Decoder>>,
+    resolved: Option<ResolvedUnion>,
+    target: Option<Box<Decoder>>,
+}
+
+impl UnionDecoderBuilder {
+    fn new() -> Self {
+        Self::default()
+    }
+
+    fn with_fields(mut self, fields: UnionFields) -> Self {
+        self.fields = Some(fields);
+        self
+    }
+
+    fn with_branches(mut self, branches: Vec<Decoder>) -> Self {
+        self.branches = Some(branches);
+        self
+    }
+
+    fn with_resolved_union(mut self, resolved_union: ResolvedUnion) -> Self {
+        self.resolved = Some(resolved_union);
+        self
+    }
+
+    fn with_target(mut self, target: Box<Decoder>) -> Self {
+        self.target = Some(target);
+        self
+    }
+
+    fn build(self) -> Result<UnionDecoder, ArrowError> {
+        match (self.resolved, self.fields, self.branches, self.target) {
+            (resolved, Some(fields), Some(branches), None) => {
+                UnionDecoder::try_new(fields, branches, resolved)
+            }
+            (Some(info), None, None, Some(target))
+                if info.writer_is_union && !info.reader_is_union =>
+            {
+                UnionDecoder::try_new_from_writer_union(info, target)
+            }
+            _ => Err(ArrowError::InvalidArgumentError(
+                "Invalid UnionDecoderBuilder configuration: expected either \
+                 (fields + branches + resolved) with no target for 
reader-unions, or \
+                 (resolved + target) with no fields/branches for writer-union 
to single."
+                    .to_string(),
+            )),
+        }
+    }
 }
 
 #[derive(Debug, Copy, Clone)]
@@ -1247,8 +1732,7 @@ impl Projector {
         if let Some(default_literal) = self.field_defaults[index].as_ref() {
             decoder.append_default(default_literal)
         } else {
-            decoder.append_null();
-            Ok(())
+            decoder.append_null()
         }
     }
 
@@ -1314,6 +1798,7 @@ enum Skipper {
     List(Box<Skipper>),
     Map(Box<Skipper>),
     Struct(Vec<Skipper>),
+    Union(Vec<Skipper>),
     Nullable(Nullability, Box<Skipper>),
 }
 
@@ -1344,6 +1829,23 @@ impl Skipper {
             ),
             Codec::Map(values) => 
Self::Map(Box::new(Skipper::from_avro(values)?)),
             Codec::Interval => Self::DurationFixed12,
+            Codec::Union(encodings, _, _) => {
+                let max_addr = (i32::MAX as usize) + 1;
+                if encodings.len() > max_addr {
+                    return Err(ArrowError::SchemaError(format!(
+                        "Writer union has {} branches, which exceeds the 
maximum addressable \
+                         branches by an Avro int tag ({} + 1).",
+                        encodings.len(),
+                        i32::MAX
+                    )));
+                }
+                Self::Union(
+                    encodings
+                        .iter()
+                        .map(Skipper::from_avro)
+                        .collect::<Result<_, _>>()?,
+                )
+            }
             _ => {
                 return Err(ArrowError::NotYetImplemented(format!(
                     "Skipper not implemented for codec {:?}",
@@ -1421,6 +1923,28 @@ impl Skipper {
                 }
                 Ok(())
             }
+            Self::Union(encodings) => {
+                // Union tag must be ZigZag-decoded
+                let raw = buf.get_long()?;
+                if raw < 0 {
+                    return Err(ArrowError::ParseError(format!(
+                        "Negative union branch index {raw}"
+                    )));
+                }
+                let idx: usize = usize::try_from(raw).map_err(|_| {
+                    ArrowError::ParseError(format!(
+                        "Union branch index {raw} does not fit into usize on 
this platform ({}-bit)",
+                        (usize::BITS as usize)
+                    ))
+                })?;
+                let Some(encoding) = encodings.get_mut(idx) else {
+                    return Err(ArrowError::ParseError(format!(
+                        "Union branch index {idx} out of range for skipper ({} 
branches)",
+                        encodings.len()
+                    )));
+                };
+                encoding.skip(buf)
+            }
             Self::Nullable(order, inner) => {
                 let branch = buf.read_vlq()?;
                 let is_not_null = match *order {
@@ -1488,6 +2012,142 @@ mod tests {
         Decoder::try_new(field.data_type()).unwrap()
     }
 
+    #[test]
+    fn 
test_union_resolution_writer_union_reader_union_reorder_and_promotion_dense() {
+        let ws = Schema::Union(vec![
+            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
+            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
+        ]);
+        let rs = Schema::Union(vec![
+            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
+            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
+        ]);
+        let field = AvroField::resolve_from_writer_and_reader(&ws, &rs, false, 
false).unwrap();
+        let mut dec = Decoder::try_new(field.data_type()).unwrap();
+        let mut rec1 = encode_avro_long(0);
+        rec1.extend(encode_avro_int(7));
+        let mut cur1 = AvroCursor::new(&rec1);
+        dec.decode(&mut cur1).unwrap();
+        let mut rec2 = encode_avro_long(1);
+        rec2.extend(encode_avro_bytes("abc".as_bytes()));
+        let mut cur2 = AvroCursor::new(&rec2);
+        dec.decode(&mut cur2).unwrap();
+        let arr = dec.flush(None).unwrap();
+        let ua = arr
+            .as_any()
+            .downcast_ref::<UnionArray>()
+            .expect("dense union output");
+        assert_eq!(
+            ua.type_id(0),
+            1,
+            "first value must select reader 'long' branch"
+        );
+        assert_eq!(ua.value_offset(0), 0);
+        assert_eq!(
+            ua.type_id(1),
+            0,
+            "second value must select reader 'string' branch"
+        );
+        assert_eq!(ua.value_offset(1), 0);
+        let long_child = 
ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
+        assert_eq!(long_child.len(), 1);
+        assert_eq!(long_child.value(0), 7);
+        let str_child = 
ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(str_child.len(), 1);
+        assert_eq!(str_child.value(0), "abc");
+    }
+
+    #[test]
+    fn 
test_union_resolution_writer_union_reader_nonunion_promotion_int_to_long() {
+        let ws = Schema::Union(vec![
+            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
+            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
+        ]);
+        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
+        let field = AvroField::resolve_from_writer_and_reader(&ws, &rs, false, 
false).unwrap();
+        let mut dec = Decoder::try_new(field.data_type()).unwrap();
+        let mut data = encode_avro_long(0);
+        data.extend(encode_avro_int(5));
+        let mut cur = AvroCursor::new(&data);
+        dec.decode(&mut cur).unwrap();
+        let arr = dec.flush(None).unwrap();
+        let out = arr.as_any().downcast_ref::<Int64Array>().unwrap();
+        assert_eq!(out.len(), 1);
+        assert_eq!(out.value(0), 5);
+    }
+
+    #[test]
+    fn test_union_resolution_writer_union_reader_nonunion_mismatch_errors() {
+        let ws = Schema::Union(vec![
+            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
+            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
+        ]);
+        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
+        let field = AvroField::resolve_from_writer_and_reader(&ws, &rs, false, 
false).unwrap();
+        let mut dec = Decoder::try_new(field.data_type()).unwrap();
+        let mut data = encode_avro_long(1);
+        data.extend(encode_avro_bytes("z".as_bytes()));
+        let mut cur = AvroCursor::new(&data);
+        let res = dec.decode(&mut cur);
+        assert!(
+            res.is_err(),
+            "expected error when writer union branch does not resolve to 
reader non-union type"
+        );
+    }
+
+    #[test]
+    fn 
test_union_resolution_writer_nonunion_reader_union_selects_matching_branch() {
+        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
+        let rs = Schema::Union(vec![
+            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
+            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
+        ]);
+        let field = AvroField::resolve_from_writer_and_reader(&ws, &rs, false, 
false).unwrap();
+        let mut dec = Decoder::try_new(field.data_type()).unwrap();
+        let data = encode_avro_int(6);
+        let mut cur = AvroCursor::new(&data);
+        dec.decode(&mut cur).unwrap();
+        let arr = dec.flush(None).unwrap();
+        let ua = arr
+            .as_any()
+            .downcast_ref::<UnionArray>()
+            .expect("dense union output");
+        assert_eq!(ua.len(), 1);
+        assert_eq!(
+            ua.type_id(0),
+            1,
+            "must resolve to reader 'long' branch (type_id 1)"
+        );
+        assert_eq!(ua.value_offset(0), 0);
+        let long_child = 
ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
+        assert_eq!(long_child.len(), 1);
+        assert_eq!(long_child.value(0), 6);
+        let str_child = 
ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(str_child.len(), 0, "string branch must be empty");
+    }
+
+    #[test]
+    fn 
test_union_resolution_writer_union_reader_union_unmapped_branch_errors() {
+        let ws = Schema::Union(vec![
+            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
+            Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean)),
+        ]);
+        let rs = Schema::Union(vec![
+            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
+            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
+        ]);
+        let field = AvroField::resolve_from_writer_and_reader(&ws, &rs, false, 
false).unwrap();
+        let mut dec = Decoder::try_new(field.data_type()).unwrap();
+        let mut data = encode_avro_long(1);
+        data.push(1);
+        let mut cur = AvroCursor::new(&data);
+        let res = dec.decode(&mut cur);
+        assert!(
+            res.is_err(),
+            "expected error for unmapped writer 'boolean' branch"
+        );
+    }
+
     #[test]
     fn test_schema_resolution_promotion_int_to_long() {
         let mut dec = decoder_for_promotion(PrimitiveType::Int, 
PrimitiveType::Long, false);
@@ -2566,6 +3226,182 @@ mod tests {
         assert_eq!(id.value(1), 7);
     }
 
+    fn make_dense_union_avro(
+        children: Vec<(Codec, &'_ str, DataType)>,
+        type_ids: Vec<i8>,
+    ) -> AvroDataType {
+        let mut avro_children: Vec<AvroDataType> = 
Vec::with_capacity(children.len());
+        let mut fields: Vec<arrow_schema::Field> = 
Vec::with_capacity(children.len());
+        for (codec, name, dt) in children.into_iter() {
+            avro_children.push(AvroDataType::new(codec, Default::default(), 
None));
+            fields.push(arrow_schema::Field::new(name, dt, true));
+        }
+        let union_fields = UnionFields::new(type_ids, fields);
+        let union_codec = Codec::Union(avro_children.into(), union_fields, 
UnionMode::Dense);
+        AvroDataType::new(union_codec, Default::default(), None)
+    }
+
+    #[test]
+    fn test_union_dense_two_children_custom_type_ids() {
+        let union_dt = make_dense_union_avro(
+            vec![
+                (Codec::Int32, "i", DataType::Int32),
+                (Codec::Utf8, "s", DataType::Utf8),
+            ],
+            vec![2, 5],
+        );
+        let mut dec = Decoder::try_new(&union_dt).unwrap();
+        let mut r1 = Vec::new();
+        r1.extend_from_slice(&encode_avro_long(0));
+        r1.extend_from_slice(&encode_avro_int(7));
+        let mut r2 = Vec::new();
+        r2.extend_from_slice(&encode_avro_long(1));
+        r2.extend_from_slice(&encode_avro_bytes(b"x"));
+        let mut r3 = Vec::new();
+        r3.extend_from_slice(&encode_avro_long(0));
+        r3.extend_from_slice(&encode_avro_int(-1));
+        dec.decode(&mut AvroCursor::new(&r1)).unwrap();
+        dec.decode(&mut AvroCursor::new(&r2)).unwrap();
+        dec.decode(&mut AvroCursor::new(&r3)).unwrap();
+        let array = dec.flush(None).unwrap();
+        let ua = array
+            .as_any()
+            .downcast_ref::<UnionArray>()
+            .expect("expected UnionArray");
+        assert_eq!(ua.len(), 3);
+        assert_eq!(ua.type_id(0), 2);
+        assert_eq!(ua.type_id(1), 5);
+        assert_eq!(ua.type_id(2), 2);
+        assert_eq!(ua.value_offset(0), 0);
+        assert_eq!(ua.value_offset(1), 0);
+        assert_eq!(ua.value_offset(2), 1);
+        let int_child = ua
+            .child(2)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("int child");
+        assert_eq!(int_child.len(), 2);
+        assert_eq!(int_child.value(0), 7);
+        assert_eq!(int_child.value(1), -1);
+        let str_child = ua
+            .child(5)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .expect("string child");
+        assert_eq!(str_child.len(), 1);
+        assert_eq!(str_child.value(0), "x");
+    }
+
+    #[test]
+    fn test_union_dense_with_null_and_string_children() {
+        let union_dt = make_dense_union_avro(
+            vec![
+                (Codec::Null, "n", DataType::Null),
+                (Codec::Utf8, "s", DataType::Utf8),
+            ],
+            vec![42, 7],
+        );
+        let mut dec = Decoder::try_new(&union_dt).unwrap();
+        let r1 = encode_avro_long(0);
+        let mut r2 = Vec::new();
+        r2.extend_from_slice(&encode_avro_long(1));
+        r2.extend_from_slice(&encode_avro_bytes(b"abc"));
+        let r3 = encode_avro_long(0);
+        dec.decode(&mut AvroCursor::new(&r1)).unwrap();
+        dec.decode(&mut AvroCursor::new(&r2)).unwrap();
+        dec.decode(&mut AvroCursor::new(&r3)).unwrap();
+        let array = dec.flush(None).unwrap();
+        let ua = array
+            .as_any()
+            .downcast_ref::<UnionArray>()
+            .expect("expected UnionArray");
+        assert_eq!(ua.len(), 3);
+        assert_eq!(ua.type_id(0), 42);
+        assert_eq!(ua.type_id(1), 7);
+        assert_eq!(ua.type_id(2), 42);
+        assert_eq!(ua.value_offset(0), 0);
+        assert_eq!(ua.value_offset(1), 0);
+        assert_eq!(ua.value_offset(2), 1);
+        let null_child = ua
+            .child(42)
+            .as_any()
+            .downcast_ref::<NullArray>()
+            .expect("null child");
+        assert_eq!(null_child.len(), 2);
+        let str_child = ua
+            .child(7)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .expect("string child");
+        assert_eq!(str_child.len(), 1);
+        assert_eq!(str_child.value(0), "abc");
+    }
+
+    #[test]
+    fn test_union_decode_negative_branch_index_errors() {
+        let union_dt = make_dense_union_avro(
+            vec![
+                (Codec::Int32, "i", DataType::Int32),
+                (Codec::Utf8, "s", DataType::Utf8),
+            ],
+            vec![0, 1],
+        );
+        let mut dec = Decoder::try_new(&union_dt).unwrap();
+        let row = encode_avro_long(-1); // decodes back to -1
+        let err = dec
+            .decode(&mut AvroCursor::new(&row))
+            .expect_err("expected error for negative branch index");
+        let msg = err.to_string();
+        assert!(
+            msg.contains("Negative union branch index"),
+            "unexpected error message: {msg}"
+        );
+    }
+
+    #[test]
+    fn test_union_decode_out_of_range_branch_index_errors() {
+        let union_dt = make_dense_union_avro(
+            vec![
+                (Codec::Int32, "i", DataType::Int32),
+                (Codec::Utf8, "s", DataType::Utf8),
+            ],
+            vec![10, 11],
+        );
+        let mut dec = Decoder::try_new(&union_dt).unwrap();
+        let row = encode_avro_long(2);
+        let err = dec
+            .decode(&mut AvroCursor::new(&row))
+            .expect_err("expected error for out-of-range branch index");
+        let msg = err.to_string();
+        assert!(
+            msg.contains("out of range"),
+            "unexpected error message: {msg}"
+        );
+    }
+
+    #[test]
+    fn test_union_sparse_mode_not_supported() {
+        let children: Vec<AvroDataType> = vec![
+            AvroDataType::new(Codec::Int32, Default::default(), None),
+            AvroDataType::new(Codec::Utf8, Default::default(), None),
+        ];
+        let uf = UnionFields::new(
+            vec![1, 3],
+            vec![
+                arrow_schema::Field::new("i", DataType::Int32, true),
+                arrow_schema::Field::new("s", DataType::Utf8, true),
+            ],
+        );
+        let codec = Codec::Union(children.into(), uf, UnionMode::Sparse);
+        let dt = AvroDataType::new(codec, Default::default(), None);
+        let err = Decoder::try_new(&dt).expect_err("sparse union should not be 
supported");
+        let msg = err.to_string();
+        assert!(
+            msg.contains("Sparse Arrow unions are not yet supported"),
+            "unexpected error message: {msg}"
+        );
+    }
+
     fn make_record_decoder_with_projector_defaults(
         reader_fields: &[(&str, DataType, bool)],
         field_defaults: Vec<Option<AvroLiteral>>,
@@ -3006,4 +3842,43 @@ mod tests {
         assert_eq!(id.value(0), 99);
         assert_eq!(name.value(0), "alice");
     }
+
+    #[test]
+    fn union_type_ids_are_not_child_indexes() {
+        let encodings: Vec<AvroDataType> =
+            vec![avro_from_codec(Codec::Int32), avro_from_codec(Codec::Utf8)];
+        let fields: UnionFields = [
+            (42_i8, Arc::new(ArrowField::new("a", DataType::Int32, true))),
+            (7_i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
+        ]
+        .into_iter()
+        .collect();
+        let dt = avro_from_codec(Codec::Union(
+            encodings.into(),
+            fields.clone(),
+            UnionMode::Dense,
+        ));
+        let mut dec = Decoder::try_new(&dt).expect("decoder");
+        let mut b1 = encode_avro_long(1);
+        b1.extend(encode_avro_bytes("hi".as_bytes()));
+        dec.decode(&mut AvroCursor::new(&b1)).expect("decode b1");
+        let mut b0 = encode_avro_long(0);
+        b0.extend(encode_avro_int(5));
+        dec.decode(&mut AvroCursor::new(&b0)).expect("decode b0");
+        let arr = dec.flush(None).expect("flush");
+        let ua = arr.as_any().downcast_ref::<UnionArray>().expect("union");
+        assert_eq!(ua.len(), 2);
+        assert_eq!(ua.type_id(0), 7, "type id must come from UnionFields");
+        assert_eq!(ua.type_id(1), 42, "type id must come from UnionFields");
+        assert_eq!(ua.value_offset(0), 0);
+        assert_eq!(ua.value_offset(1), 0);
+        let utf8_child = 
ua.child(7).as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(utf8_child.len(), 1);
+        assert_eq!(utf8_child.value(0), "hi");
+        let int_child = 
ua.child(42).as_any().downcast_ref::<Int32Array>().unwrap();
+        assert_eq!(int_child.len(), 1);
+        assert_eq!(int_child.value(0), 5);
+        let type_ids: Vec<i8> = fields.iter().map(|(tid, _)| tid).collect();
+        assert_eq!(type_ids, vec![42_i8, 7_i8]);
+    }
 }
diff --git a/arrow-avro/test/data/README.md b/arrow-avro/test/data/README.md
index 51416c8416..1d7d8482f9 100644
--- a/arrow-avro/test/data/README.md
+++ b/arrow-avro/test/data/README.md
@@ -141,7 +141,62 @@ Options:
 * --scale (default 10) — the decimal scale used for the 256 files
 * --no-verify — skip reading the files back for printed verification
 
+## Union File
+
+**Purpose:** Exercise a wide variety of Avro **union** shapes (including 
nullable unions, unions of ambiguous scalar types, unions of named types, and 
unions inside arrays, maps, and nested records) to validate `arrow-avro` union 
decoding and schema‑resolution paths.
+
+**Format:** Avro Object Container File (OCF) written by `fastavro.writer` with 
embedded writer schema.
+
+**Record count:** four rows. Each row selects different branches across the 
unions to ensure coverage (i.e., toggling between bytes vs. string, fixed vs. 
duration vs. decimal, enum vs. record alternatives, etc.).
+
+**How this file was created:**
+
+1. Script: 
[`create_avro_union_file.py`](https://gist.github.com/jecsand838/f4bf85ad597ab34575219df515156444)
  
+   Runs with Python 3 and uses **fastavro** to emit `union_fields.avro` in the 
working directory.
+2. Quick reproduce:
+   ```bash
+   pip install fastavro
+   python3 create_avro_union_file.py
+   # Outputs: ./union_fields.avro
+   ```
+
+> Note: Avro OCF files include a *sync marker*; `fastavro.writer` generates a 
random one if not provided, so byte‑for‑byte output may vary between runs even 
with the same data. This does not affect the embedded schema or logical content.
+
+**Writer schema (overview):** The record is named `UnionTypesRecord` and 
defines the following fields:
+
+| Field                             | Union branches / details                 
                                                                                
                                                                                
                                                                                
                                                                                
                                                                          |
+|-----------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `nullable_int_nullfirst`          | `["null","int"]` (tests null‑first 
ordering)                                                                       
                                                                                
                                                                                
                                                                                
                                                                                
|
+| `nullable_string_nullsecond`      | `["string","null"]` (tests null‑second 
ordering; in Avro, a union field’s default must match the *first* branch)       
                                                                                
                                                                                
                                                                                
                                                                            |
+| `union_prim`                      | 
`["boolean","int","long","float","double","bytes","string"]`                    
                                                                                
                                                                                
                                                                                
                                                                                
                                   |
+| `union_bytes_vs_string`           | `["bytes","string"]` (ambiguous scalar 
union; script uses fastavro’s tuple notation to disambiguate)                   
                                                                                
                                                                                
                                                                                
                                                                            |
+| `union_fixed_dur_decfix`          | `["Fx8","Dur12","DecFix16"]` where:<br>• 
`Fx8` = `fixed`(size=8)<br>• `Dur12` = `fixed`(size=12, 
`logicalType`=`duration`)<br>• `DecFix16` = `fixed`(size=16, 
`logicalType`=`decimal`, precision=10, scale=2)<br>**Notes:** Avro `duration` 
is a `fixed[12]` storing **months, days, millis** as three **little‑endian** 
32‑bit integers; Avro `decimal` on `bytes`/`fixed` uses **two’s‑complement 
big‑endian** encoding of the unscaled integer. |
+| `union_enum_records_array_map`    | `[ColorU, RecA, RecB, array<long>, 
map<string>]` where:<br>• `ColorU` = `enum` {`RED`,`GREEN`,`BLUE`}<br>• `RecA` 
= `record` {`a:int`, `b:string`}<br>• `RecB` = `record` {`x:long`, `y:bytes`}   
                                                                                
                                                                                
                                                                                
 |
+| `union_date_or_fixed4`            | `[int (logicalType=`date`), Fx4]` where 
`Fx4` = `fixed`(size=4)                                                         
                                                                                
                                                                                
                                                                                
                                                                           |
+| `union_time_millis_or_enum`       | `[int (logicalType=`time-millis`), 
OnOff]` where `OnOff` = `enum` {`ON`,`OFF`}                                     
                                                                                
                                                                                
                                                                                
                                                                                
|
+| `union_time_micros_or_string`     | `[long (logicalType=`time-micros`), 
string]`                                                                        
                                                                                
                                                                                
                                                                                
                                                                               |
+| `union_ts_millis_utc_or_array`    | `[long (logicalType=`timestamp-millis`), 
array<int>]`                                                                    
                                                                                
                                                                                
                                                                                
                                                                          |
+| `union_ts_micros_local_or_bytes`  | `[long 
(logicalType=`local-timestamp-micros`), bytes]`                                 
                                                                                
                                                                                
                                                                                
                                                                                
                            |
+| `union_uuid_or_fixed10`           | `[string (logicalType=`uuid`), Fx10]` 
where `Fx10` = `fixed`(size=10)                                                 
                                                                                
                                                                                
                                                                                
                                                                             |
+| `union_dec_bytes_or_dec_fixed`    | `[bytes (decimal p=10 s=2), DecFix20]` 
where `DecFix20` = `fixed`(size=20, decimal p=20 s=4) — decimal encoding is 
big‑endian two’s‑complement.                                                    
                                                                                
                                                                                
                                                                                
|
+| `union_null_bytes_string`         | `["null","bytes","string"]`              
                                                                                
                                                                                
                                                                                
                                                                                
                                                                          |
+| `array_of_union`                  | `array<["long","string"]>`               
                                                                                
                                                                                
                                                                                
                                                                                
                                                                          |
+| `map_of_union`                    | `map<["null","double"]>`                 
                                                                                
                                                                                
                                                                                
                                                                                
                                                                          |
+| `record_with_union_field`         | `HasUnion` = `record` {`id:int`, 
`u:["int","string"]`}                                                           
                                                                                
                                                                                
                                                                                
                                                                                
  |
+| `union_ts_micros_utc_or_map`      | `[long (logicalType=`timestamp-micros`), 
map<long>]`                                                                     
                                                                                
                                                                                
                                                                                
                                                                          |
+| `union_ts_millis_local_or_string` | `[long 
(logicalType=`local-timestamp-millis`), string]`                                
                                                                                
                                                                                
                                                                                
                                                                                
                            |
+| `union_bool_or_string`            | `["boolean","string"]`                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                          |
+
+**Implementation notes (generation):**
+
+* The script uses **fastavro’s tuple notation** `(branch_name, value)` to 
select branches in ambiguous unions (e.g., bytes vs. string, multiple named 
records). See *“Using the tuple notation to specify which branch of a union to 
take”* in the fastavro docs.
+* Decimal values are pre‑encoded to the required **big‑endian 
two’s‑complement** byte sequence before writing (for both `bytes` and `fixed` 
decimal logical types).
+* The `duration` logical type payloads are 12‑byte triples: **months / days / 
milliseconds**, little‑endian each.
+
+**Source / Repro script:**
+`create_avro_union_file.py` (Gist): contains the full writer schema, record 
builders covering four rows, and the `fastavro.writer` call which emits 
`union_fields.avro`.
+
 ## Other Files
 
-This directory contains other small OCF files used by `arrow-avro` tests. 
Details on these will be added in 
+This directory contains other small OCF files used by `arrow-avro` tests. 
Details on these will be added in
 follow-up PRs.
\ No newline at end of file
diff --git a/arrow-avro/test/data/union_fields.avro 
b/arrow-avro/test/data/union_fields.avro
new file mode 100644
index 0000000000..e0ffb82bd4
Binary files /dev/null and b/arrow-avro/test/data/union_fields.avro differ

Reply via email to