This is an automated email from the ASF dual-hosted git repository.

mbrobbel pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 973e6fca9b Add `ArrowError::AvroError`, remaining types and roundtrip 
tests to `arrow-avro`,  (#8595)
973e6fca9b is described below

commit 973e6fca9bb3b416623a488eac16c7da954fbc84
Author: Connor Sanders <[email protected]>
AuthorDate: Wed Oct 15 03:24:17 2025 -0500

    Add `ArrowError::AvroError`, remaining types and roundtrip tests to 
`arrow-avro`,  (#8595)
    
    # Which issue does this PR close?
    
    - Closes #4886
    - Stacked on #8584
    
    # Rationale for this change
    
    This PR brings Arrow-Avro round‑trip coverage up to date with modern
    Arrow types and the latest Avro logical types. In particular, Avro 1.12
    adds `timestamp-nanos` and `local-timestamp-nanos`. Enabling these
    logical types and filling in missing Avro writer encoders for Arrow’s
    newer *view* and list families allows lossless read/write and simpler
    pipelines.
    
    It also hardens timestamp/time scaling in the writer to avoid silent
    overflow when converting seconds to milliseconds, surfacing a clear
    error instead.
    
    # What changes are included in this PR?
    
    * **Nanosecond timestamps**: Introduces a `TimestampNanos(bool)` codec
    in `arrow-avro` that maps Avro `timestamp-nanos` /
    `local-timestamp-nanos` to Arrow `Timestamp(Nanosecond, tz)`. The
    reader/decoder, union field kinds, and Arrow `DataType` mapping are all
    extended accordingly. Logical type detection is wired through both
    `logicalType` and the `arrowTimeUnit="nanosecond"` attribute.
    * **UUID logical type round‑trip fix**: When reading Avro
    `logicalType="uuid"` fields, preserve that logical type in Arrow field
    metadata so writers can round‑trip it back to Avro.
    * **Avro writer encoders**: Add the missing array encoders and coverage
    for Arrow’s `ListView`, `LargeListView`, and `FixedSizeList`, and extend
    array encoder support to `BinaryView` and `Utf8View`. (See large
    additions in `writer/encoder.rs`.)
    * **Safer time/timestamp scaling**: Guard second to millisecond
    conversions in `Time32`/`Timestamp` encoders to prevent overflow;
    encoding now returns a clear `InvalidArgument` error in those cases.
    * **Schema utilities**: Add `AvroSchemaOptions` with `null_order` and
    `strip_metadata` flags so Avro JSON can be built while optionally
    omitting internal Arrow keys during round‑trip schema generation.
    * **Tests & round‑trip coverage**: Add unit tests for nanosecond
    timestamp decoding (UTC, local, and with nulls) and additional
    end‑to‑end/round‑trip tests for the updated writer paths.
    
    # Are these changes tested?
    
    Yes.
    
    * New decoder tests validate `Timestamp(Nanosecond, tz)` behavior for
    UTC and local timestamps and for nullable unions.
    * Writer tests validate the nanosecond encoder and exercise an overflow
    path for second→millisecond conversion that now returns an error.
    * Additional round‑trip tests were added alongside the new encoders.
    
    # Are there any user-facing changes?
    
    N/A since `arrow-avro` is not public yet.
---
 arrow-avro/src/codec.rs          |  39 ++-
 arrow-avro/src/reader/mod.rs     |  16 +-
 arrow-avro/src/reader/record.rs  | 114 ++++++++-
 arrow-avro/src/schema.rs         | 289 ++++++++++++++++++----
 arrow-avro/src/writer/encoder.rs | 521 +++++++++++++++++++++++++++++++++++++--
 arrow-avro/src/writer/format.rs  |  12 +-
 arrow-avro/src/writer/mod.rs     | 326 +++++++++++++++++++++++-
 arrow-schema/src/error.rs        |   3 +
 8 files changed, 1234 insertions(+), 86 deletions(-)

diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index a6a495bdf3..4f4669270d 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -349,7 +349,8 @@ impl AvroDataType {
             Codec::Int64
             | Codec::TimeMicros
             | Codec::TimestampMillis(_)
-            | Codec::TimestampMicros(_) => 
AvroLiteral::Long(parse_json_i64(default_json, "long")?),
+            | Codec::TimestampMicros(_)
+            | Codec::TimestampNanos(_) => 
AvroLiteral::Long(parse_json_i64(default_json, "long")?),
             #[cfg(feature = "avro_custom_types")]
             Codec::DurationNanos
             | Codec::DurationMicros
@@ -652,6 +653,11 @@ pub(crate) enum Codec {
     /// Maps to Arrow's Timestamp(TimeUnit::Microsecond) data type
     /// The boolean parameter indicates whether the timestamp has a UTC 
timezone (true) or is local time (false)
     TimestampMicros(bool),
+    /// Represents Avro timestamp-nanos or local-timestamp-nanos logical type
+    ///
+    /// Maps to Arrow's Timestamp(TimeUnit::Nanosecond) data type
+    /// The boolean parameter indicates whether the timestamp has a UTC 
timezone (true) or is local time (false)
+    TimestampNanos(bool),
     /// Represents Avro fixed type, maps to Arrow's FixedSizeBinary data type
     /// The i32 parameter indicates the fixed binary size
     Fixed(i32),
@@ -715,6 +721,9 @@ impl Codec {
             Self::TimestampMicros(is_utc) => {
                 DataType::Timestamp(TimeUnit::Microsecond, is_utc.then(|| 
"+00:00".into()))
             }
+            Self::TimestampNanos(is_utc) => {
+                DataType::Timestamp(TimeUnit::Nanosecond, is_utc.then(|| 
"+00:00".into()))
+            }
             Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
             Self::Fixed(size) => DataType::FixedSizeBinary(*size),
             Self::Decimal(precision, scale, _size) => {
@@ -917,6 +926,8 @@ enum UnionFieldKind {
     TimestampMillisLocal,
     TimestampMicrosUtc,
     TimestampMicrosLocal,
+    TimestampNanosUtc,
+    TimestampNanosLocal,
     Duration,
     Fixed,
     Decimal,
@@ -946,6 +957,8 @@ impl From<&Codec> for UnionFieldKind {
             Codec::TimestampMillis(false) => Self::TimestampMillisLocal,
             Codec::TimestampMicros(true) => Self::TimestampMicrosUtc,
             Codec::TimestampMicros(false) => Self::TimestampMicrosLocal,
+            Codec::TimestampNanos(true) => Self::TimestampNanosUtc,
+            Codec::TimestampNanos(false) => Self::TimestampNanosLocal,
             Codec::Interval => Self::Duration,
             Codec::Fixed(_) => Self::Fixed,
             Codec::Decimal(..) => Self::Decimal,
@@ -1399,7 +1412,17 @@ impl<'a> Maker<'a> {
                     (Some("local-timestamp-micros"), c @ Codec::Int64) => {
                         *c = Codec::TimestampMicros(false)
                     }
-                    (Some("uuid"), c @ Codec::Utf8) => *c = Codec::Uuid,
+                    (Some("timestamp-nanos"), c @ Codec::Int64) => *c = 
Codec::TimestampNanos(true),
+                    (Some("local-timestamp-nanos"), c @ Codec::Int64) => {
+                        *c = Codec::TimestampNanos(false)
+                    }
+                    (Some("uuid"), c @ Codec::Utf8) => {
+                        // Map Avro string+logicalType=uuid into the UUID 
Codec,
+                        // and preserve the logicalType in Arrow field metadata
+                        // so writers can round-trip it correctly.
+                        *c = Codec::Uuid;
+                        field.metadata.insert("logicalType".into(), 
"uuid".into());
+                    }
                     #[cfg(feature = "avro_custom_types")]
                     (Some("arrow.duration-nanos"), c @ Codec::Int64) => *c = 
Codec::DurationNanos,
                     #[cfg(feature = "avro_custom_types")]
@@ -1437,6 +1460,18 @@ impl<'a> Maker<'a> {
                     }
                     (None, _) => {}
                 }
+                if matches!(field.codec, Codec::Int64) {
+                    if let Some(unit) = t
+                        .attributes
+                        .additional
+                        .get("arrowTimeUnit")
+                        .and_then(|v| v.as_str())
+                    {
+                        if unit == "nanosecond" {
+                            field.codec = Codec::TimestampNanos(false);
+                        }
+                    }
+                }
                 if !t.attributes.additional.is_empty() {
                     for (k, v) in &t.attributes.additional {
                         field.metadata.insert(k.to_string(), v.to_string());
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index cd1833d35c..7e0bfad0ce 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -7437,7 +7437,6 @@ mod test {
             "entire RecordBatch mismatch (schema, all columns, all rows)"
         );
     }
-
     #[test]
     fn comprehensive_e2e_resolution_test() {
         use serde_json::Value;
@@ -7593,6 +7592,7 @@ mod test {
         let batch = read_alltypes_with_reader_schema(path, 
reader_schema.clone());
 
         const UUID_EXT_KEY: &str = "ARROW:extension:name";
+        const UUID_LOGICAL_KEY: &str = "logicalType";
 
         let uuid_md_top: Option<HashMap<String, String>> = batch
             .schema()
@@ -7600,7 +7600,12 @@ mod test {
             .ok()
             .and_then(|f| {
                 let md = f.metadata();
-                if md.get(UUID_EXT_KEY).is_some() {
+                let has_ext = md.get(UUID_EXT_KEY).is_some();
+                let is_uuid_logical = md
+                    .get(UUID_LOGICAL_KEY)
+                    .map(|v| v.trim_matches('"') == "uuid")
+                    .unwrap_or(false);
+                if has_ext || is_uuid_logical {
                     Some(md.clone())
                 } else {
                     None
@@ -7617,7 +7622,12 @@ mod test {
                     .find(|(_, child)| child.name() == "uuid")
                     .and_then(|(_, child)| {
                         let md = child.metadata();
-                        if md.get(UUID_EXT_KEY).is_some() {
+                        let has_ext = md.get(UUID_EXT_KEY).is_some();
+                        let is_uuid_logical = md
+                            .get(UUID_LOGICAL_KEY)
+                            .map(|v| v.trim_matches('"') == "uuid")
+                            .unwrap_or(false);
+                        if has_ext || is_uuid_logical {
                             Some(md.clone())
                         } else {
                             None
diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs
index 7eac382d9f..0412a3e754 100644
--- a/arrow-avro/src/reader/record.rs
+++ b/arrow-avro/src/reader/record.rs
@@ -206,6 +206,7 @@ enum Decoder {
     TimeMicros(Vec<i64>),
     TimestampMillis(bool, Vec<i64>),
     TimestampMicros(bool, Vec<i64>),
+    TimestampNanos(bool, Vec<i64>),
     Int32ToInt64(Vec<i64>),
     Int32ToFloat32(Vec<f32>),
     Int32ToFloat64(Vec<f64>),
@@ -324,6 +325,9 @@ impl Decoder {
             (Codec::TimestampMicros(is_utc), _) => {
                 Self::TimestampMicros(*is_utc, 
Vec::with_capacity(DEFAULT_CAPACITY))
             }
+            (Codec::TimestampNanos(is_utc), _) => {
+                Self::TimestampNanos(*is_utc, 
Vec::with_capacity(DEFAULT_CAPACITY))
+            }
             #[cfg(feature = "avro_custom_types")]
             (Codec::DurationNanos, _) => {
                 Self::DurationNanosecond(Vec::with_capacity(DEFAULT_CAPACITY))
@@ -530,7 +534,8 @@ impl Decoder {
             | Self::Int32ToInt64(v)
             | Self::TimeMicros(v)
             | Self::TimestampMillis(_, v)
-            | Self::TimestampMicros(_, v) => v.push(0),
+            | Self::TimestampMicros(_, v)
+            | Self::TimestampNanos(_, v) => v.push(0),
             #[cfg(feature = "avro_custom_types")]
             Self::DurationSecond(v)
             | Self::DurationMillisecond(v)
@@ -643,7 +648,8 @@ impl Decoder {
             | Self::Int32ToInt64(v)
             | Self::TimeMicros(v)
             | Self::TimestampMillis(_, v)
-            | Self::TimestampMicros(_, v) => match lit {
+            | Self::TimestampMicros(_, v)
+            | Self::TimestampNanos(_, v) => match lit {
                 AvroLiteral::Long(i) => {
                     v.push(*i);
                     Ok(())
@@ -854,7 +860,8 @@ impl Decoder {
             Self::Int64(values)
             | Self::TimeMicros(values)
             | Self::TimestampMillis(_, values)
-            | Self::TimestampMicros(_, values) => values.push(buf.get_long()?),
+            | Self::TimestampMicros(_, values)
+            | Self::TimestampNanos(_, values) => values.push(buf.get_long()?),
             #[cfg(feature = "avro_custom_types")]
             Self::DurationSecond(values)
             | Self::DurationMillisecond(values)
@@ -1070,6 +1077,10 @@ impl Decoder {
                 flush_primitive::<TimestampMicrosecondType>(values, nulls)
                     .with_timezone_opt(is_utc.then(|| "+00:00")),
             ),
+            Self::TimestampNanos(is_utc, values) => Arc::new(
+                flush_primitive::<TimestampNanosecondType>(values, nulls)
+                    .with_timezone_opt(is_utc.then(|| "+00:00")),
+            ),
             #[cfg(feature = "avro_custom_types")]
             Self::DurationSecond(values) => {
                 Arc::new(flush_primitive::<DurationSecondType>(values, nulls))
@@ -1959,6 +1970,7 @@ enum Skipper {
     TimeMicros,
     TimestampMillis,
     TimestampMicros,
+    TimestampNanos,
     Fixed(usize),
     Decimal(Option<usize>),
     UuidString,
@@ -1983,6 +1995,7 @@ impl Skipper {
             Codec::TimeMicros => Self::TimeMicros,
             Codec::TimestampMillis(_) => Self::TimestampMillis,
             Codec::TimestampMicros(_) => Self::TimestampMicros,
+            Codec::TimestampNanos(_) => Self::TimestampNanos,
             #[cfg(feature = "avro_custom_types")]
             Codec::DurationNanos
             | Codec::DurationMicros
@@ -2044,7 +2057,11 @@ impl Skipper {
                 buf.get_int()?;
                 Ok(())
             }
-            Self::Int64 | Self::TimeMicros | Self::TimestampMillis | 
Self::TimestampMicros => {
+            Self::Int64
+            | Self::TimeMicros
+            | Self::TimestampMillis
+            | Self::TimestampMicros
+            | Self::TimestampNanos => {
                 buf.get_long()?;
                 Ok(())
             }
@@ -4647,4 +4664,93 @@ mod tests {
             .expect("Int32Array");
         assert_eq!(a.values(), &[1, 2, 3]);
     }
+
+    #[test]
+    fn test_timestamp_nanos_decoding_utc() {
+        let avro_type = avro_from_codec(Codec::TimestampNanos(true));
+        let mut decoder = Decoder::try_new(&avro_type).expect("create 
TimestampNanos decoder");
+        let mut data = Vec::new();
+        for v in [0_i64, 1_i64, -1_i64, 1_234_567_890_i64] {
+            data.extend_from_slice(&encode_avro_long(v));
+        }
+        let mut cur = AvroCursor::new(&data);
+        for _ in 0..4 {
+            decoder.decode(&mut cur).expect("decode nanos ts");
+        }
+        let array = decoder.flush(None).expect("flush nanos ts");
+        let ts = array
+            .as_any()
+            .downcast_ref::<TimestampNanosecondArray>()
+            .expect("TimestampNanosecondArray");
+        assert_eq!(ts.values(), &[0, 1, -1, 1_234_567_890]);
+        match ts.data_type() {
+            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
+                assert_eq!(tz.as_deref(), Some("+00:00"));
+            }
+            other => panic!("expected Timestamp(Nanosecond, Some(\"+00:00\")), 
got {other:?}"),
+        }
+    }
+
+    #[test]
+    fn test_timestamp_nanos_decoding_local() {
+        let avro_type = avro_from_codec(Codec::TimestampNanos(false));
+        let mut decoder = Decoder::try_new(&avro_type).expect("create 
TimestampNanos decoder");
+        let mut data = Vec::new();
+        for v in [10_i64, 20_i64, -30_i64] {
+            data.extend_from_slice(&encode_avro_long(v));
+        }
+        let mut cur = AvroCursor::new(&data);
+        for _ in 0..3 {
+            decoder.decode(&mut cur).expect("decode nanos ts");
+        }
+        let array = decoder.flush(None).expect("flush nanos ts");
+        let ts = array
+            .as_any()
+            .downcast_ref::<TimestampNanosecondArray>()
+            .expect("TimestampNanosecondArray");
+        assert_eq!(ts.values(), &[10, 20, -30]);
+        match ts.data_type() {
+            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
+                assert_eq!(tz.as_deref(), None);
+            }
+            other => panic!("expected Timestamp(Nanosecond, None), got 
{other:?}"),
+        }
+    }
+
+    #[test]
+    fn test_timestamp_nanos_decoding_with_nulls() {
+        let avro_type = AvroDataType::new(
+            Codec::TimestampNanos(false),
+            Default::default(),
+            Some(Nullability::NullFirst),
+        );
+        let mut decoder = Decoder::try_new(&avro_type).expect("create nullable 
TimestampNanos");
+        let mut data = Vec::new();
+        data.extend_from_slice(&encode_avro_long(1));
+        data.extend_from_slice(&encode_avro_long(42));
+        data.extend_from_slice(&encode_avro_long(0));
+        data.extend_from_slice(&encode_avro_long(1));
+        data.extend_from_slice(&encode_avro_long(-7));
+        let mut cur = AvroCursor::new(&data);
+        for _ in 0..3 {
+            decoder.decode(&mut cur).expect("decode nullable nanos ts");
+        }
+        let array = decoder.flush(None).expect("flush nullable nanos ts");
+        let ts = array
+            .as_any()
+            .downcast_ref::<TimestampNanosecondArray>()
+            .expect("TimestampNanosecondArray");
+        assert_eq!(ts.len(), 3);
+        assert!(ts.is_valid(0));
+        assert!(ts.is_null(1));
+        assert!(ts.is_valid(2));
+        assert_eq!(ts.value(0), 42);
+        assert_eq!(ts.value(2), -7);
+        match ts.data_type() {
+            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
+                assert_eq!(tz.as_deref(), None);
+            }
+            other => panic!("expected Timestamp(Nanosecond, None), got 
{other:?}"),
+        }
+    }
 }
diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs
index cff8ee1937..819ea1f16e 100644
--- a/arrow-avro/src/schema.rs
+++ b/arrow-avro/src/schema.rs
@@ -329,6 +329,12 @@ pub(crate) struct Fixed<'a> {
     pub(crate) attributes: Attributes<'a>,
 }
 
+#[derive(Debug, Copy, Clone, PartialEq, Default)]
+pub(crate) struct AvroSchemaOptions {
+    pub(crate) null_order: Option<Nullability>,
+    pub(crate) strip_metadata: bool,
+}
+
 /// A wrapper for an Avro schema in its JSON string representation.
 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 pub struct AvroSchema {
@@ -428,7 +434,7 @@ impl AvroSchema {
         build_canonical(schema, None)
     }
 
-    /// Build Avro JSON from an Arrow [`ArrowSchema`], applying the given 
null‑union order.
+    /// Build Avro JSON from an Arrow [`ArrowSchema`], applying the given 
null‑union order and optionally stripping internal Arrow metadata.
     ///
     /// If the input Arrow schema already contains Avro JSON in
     /// [`SCHEMA_METADATA_KEY`], that JSON is returned verbatim to preserve
@@ -436,17 +442,21 @@ impl AvroSchema {
     /// honoring `null_union_order` at **all nullable sites**.
     pub(crate) fn from_arrow_with_options(
         schema: &ArrowSchema,
-        null_order: Option<Nullability>,
+        options: Option<AvroSchemaOptions>,
     ) -> Result<AvroSchema, ArrowError> {
-        if let Some(json) = schema.metadata.get(SCHEMA_METADATA_KEY) {
-            return Ok(AvroSchema::new(json.clone()));
+        let opts = options.unwrap_or_default();
+        let order = opts.null_order.unwrap_or_default();
+        let strip = opts.strip_metadata;
+        if !strip {
+            if let Some(json) = schema.metadata.get(SCHEMA_METADATA_KEY) {
+                return Ok(AvroSchema::new(json.clone()));
+            }
         }
-        let order = null_order.unwrap_or_default();
         let mut name_gen = NameGenerator::default();
         let fields_json = schema
             .fields()
             .iter()
-            .map(|f| arrow_field_to_avro(f, &mut name_gen, order))
+            .map(|f| arrow_field_to_avro(f, &mut name_gen, order, strip))
             .collect::<Result<Vec<_>, _>>()?;
         let record_name = schema
             .metadata
@@ -1170,7 +1180,14 @@ fn wrap_nullable(inner: Value, null_order: Nullability) 
-> Value {
     let null = Value::String("null".into());
     match inner {
         Value::Array(mut union) => {
-            union.retain(|v| !is_avro_json_null(v));
+            // If this site is already a union and already contains "null",
+            // preserve the branch order exactly. Reordering "null" breaks
+            // the correspondence between Arrow union child order (type_ids)
+            // and the Avro branch index written on the wire.
+            if union.iter().any(is_avro_json_null) {
+                return Value::Array(union);
+            }
+            // Otherwise, inject "null" without reordering existing branches.
             match null_order {
                 Nullability::NullFirst => union.insert(0, null),
                 Nullability::NullSecond => union.push(null),
@@ -1234,6 +1251,7 @@ fn datatype_to_avro(
     metadata: &HashMap<String, String>,
     name_gen: &mut NameGenerator,
     null_order: Nullability,
+    strip: bool,
 ) -> Result<(Value, JsonMap<String, Value>), ArrowError> {
     let mut extras = JsonMap::new();
     let mut handle_decimal = |precision: &u8, scale: &i8| -> Result<Value, 
ArrowError> {
@@ -1288,22 +1306,24 @@ fn datatype_to_avro(
         DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => 
Value::String("string".into()),
         DataType::Binary | DataType::LargeBinary => 
Value::String("bytes".into()),
         DataType::BinaryView => {
-            extras.insert("arrowBinaryView".into(), Value::Bool(true));
+            if !strip {
+                extras.insert("arrowBinaryView".into(), Value::Bool(true));
+            }
             Value::String("bytes".into())
         }
         DataType::FixedSizeBinary(len) => {
-            // UUID handling:
-            // - When the canonical extension feature is ON *and* this field 
is the Arrow canonical UUID
-            //   (extension name "arrow.uuid" or legacy "uuid"), emit Avro 
string with logicalType "uuid".
-            // - Otherwise, fall back to a named fixed of size = len.
-            #[cfg(not(feature = "canonical_extension_types"))]
-            let is_uuid = false;
+            let md_is_uuid = metadata
+                .get("logicalType")
+                .map(|s| s.trim_matches('"') == "uuid")
+                .unwrap_or(false);
             #[cfg(feature = "canonical_extension_types")]
-            let is_uuid = (*len == 16)
-                && metadata
-                    .get(arrow_schema::extension::EXTENSION_TYPE_NAME_KEY)
-                    .map(|value| value == arrow_schema::extension::Uuid::NAME 
|| value == "uuid")
-                    .unwrap_or(false);
+            let ext_is_uuid = metadata
+                .get(arrow_schema::extension::EXTENSION_TYPE_NAME_KEY)
+                .map(|v| v == arrow_schema::extension::Uuid::NAME || v == 
"uuid")
+                .unwrap_or(false);
+            #[cfg(not(feature = "canonical_extension_types"))]
+            let ext_is_uuid = false;
+            let is_uuid = (*len == 16) && (md_is_uuid || ext_is_uuid);
             if is_uuid {
                 json!({ "type": "string", "logicalType": "uuid" })
             } else {
@@ -1334,7 +1354,9 @@ fn datatype_to_avro(
         DataType::Time32(unit) => match unit {
             TimeUnit::Millisecond => json!({ "type": "int", "logicalType": 
"time-millis" }),
             TimeUnit::Second => {
-                extras.insert("arrowTimeUnit".into(), 
Value::String("second".into()));
+                if !strip {
+                    extras.insert("arrowTimeUnit".into(), 
Value::String("second".into()));
+                }
                 Value::String("int".into())
             }
             _ => Value::String("int".into()),
@@ -1342,7 +1364,9 @@ fn datatype_to_avro(
         DataType::Time64(unit) => match unit {
             TimeUnit::Microsecond => json!({ "type": "long", "logicalType": 
"time-micros" }),
             TimeUnit::Nanosecond => {
-                extras.insert("arrowTimeUnit".into(), 
Value::String("nanosecond".into()));
+                if !strip {
+                    extras.insert("arrowTimeUnit".into(), 
Value::String("nanosecond".into()));
+                }
                 Value::String("long".into())
             }
             _ => Value::String("long".into()),
@@ -1353,15 +1377,18 @@ fn datatype_to_avro(
                 (TimeUnit::Millisecond, false) => "local-timestamp-millis",
                 (TimeUnit::Microsecond, true) => "timestamp-micros",
                 (TimeUnit::Microsecond, false) => "local-timestamp-micros",
+                (TimeUnit::Nanosecond, true) => "timestamp-nanos",
+                (TimeUnit::Nanosecond, false) => "local-timestamp-nanos",
                 (TimeUnit::Second, _) => {
-                    extras.insert("arrowTimeUnit".into(), 
Value::String("second".into()));
-                    return Ok((Value::String("long".into()), extras));
-                }
-                (TimeUnit::Nanosecond, _) => {
-                    extras.insert("arrowTimeUnit".into(), 
Value::String("nanosecond".into()));
+                    if !strip {
+                        extras.insert("arrowTimeUnit".into(), 
Value::String("second".into()));
+                    }
                     return Ok((Value::String("long".into()), extras));
                 }
             };
+            if !strip && matches!(unit, TimeUnit::Nanosecond) {
+                extras.insert("arrowTimeUnit".into(), 
Value::String("nanosecond".into()));
+            }
             json!({ "type": "long", "logicalType": logical_type })
         }
         #[cfg(not(feature = "avro_custom_types"))]
@@ -1396,18 +1423,22 @@ fn datatype_to_avro(
             json!(obj)
         }
         DataType::Interval(IntervalUnit::YearMonth) => {
-            extras.insert(
-                "arrowIntervalUnit".into(),
-                Value::String("yearmonth".into()),
-            );
+            if !strip {
+                extras.insert(
+                    "arrowIntervalUnit".into(),
+                    Value::String("yearmonth".into()),
+                );
+            }
             Value::String("long".into())
         }
         DataType::Interval(IntervalUnit::DayTime) => {
-            extras.insert("arrowIntervalUnit".into(), 
Value::String("daytime".into()));
+            if !strip {
+                extras.insert("arrowIntervalUnit".into(), 
Value::String("daytime".into()));
+            }
             Value::String("long".into())
         }
         DataType::List(child) | DataType::LargeList(child) => {
-            if matches!(dt, DataType::LargeList(_)) {
+            if matches!(dt, DataType::LargeList(_)) && !strip {
                 extras.insert("arrowLargeList".into(), Value::Bool(true));
             }
             let items_schema = process_datatype(
@@ -1417,6 +1448,7 @@ fn datatype_to_avro(
                 name_gen,
                 null_order,
                 child.is_nullable(),
+                strip,
             )?;
             json!({
                 "type": "array",
@@ -1424,10 +1456,12 @@ fn datatype_to_avro(
             })
         }
         DataType::ListView(child) | DataType::LargeListView(child) => {
-            if matches!(dt, DataType::LargeListView(_)) {
+            if matches!(dt, DataType::LargeListView(_)) && !strip {
                 extras.insert("arrowLargeList".into(), Value::Bool(true));
             }
-            extras.insert("arrowListView".into(), Value::Bool(true));
+            if !strip {
+                extras.insert("arrowListView".into(), Value::Bool(true));
+            }
             let items_schema = process_datatype(
                 child.data_type(),
                 child.name(),
@@ -1435,6 +1469,7 @@ fn datatype_to_avro(
                 name_gen,
                 null_order,
                 child.is_nullable(),
+                strip,
             )?;
             json!({
                 "type": "array",
@@ -1442,7 +1477,9 @@ fn datatype_to_avro(
             })
         }
         DataType::FixedSizeList(child, len) => {
-            extras.insert("arrowFixedSize".into(), json!(len));
+            if !strip {
+                extras.insert("arrowFixedSize".into(), json!(len));
+            }
             let items_schema = process_datatype(
                 child.data_type(),
                 child.name(),
@@ -1450,6 +1487,7 @@ fn datatype_to_avro(
                 name_gen,
                 null_order,
                 child.is_nullable(),
+                strip,
             )?;
             json!({
                 "type": "array",
@@ -1472,6 +1510,7 @@ fn datatype_to_avro(
                 name_gen,
                 null_order,
                 value_field.is_nullable(),
+                strip,
             )?;
             json!({
                 "type": "map",
@@ -1481,7 +1520,7 @@ fn datatype_to_avro(
         DataType::Struct(fields) => {
             let avro_fields = fields
                 .iter()
-                .map(|field| arrow_field_to_avro(field, name_gen, null_order))
+                .map(|field| arrow_field_to_avro(field, name_gen, null_order, 
strip))
                 .collect::<Result<Vec<_>, _>>()?;
             // Prefer avro.name/avro.namespace when provided on the struct 
field metadata
             let chosen_name = metadata
@@ -1524,6 +1563,7 @@ fn datatype_to_avro(
                     name_gen,
                     null_order,
                     false,
+                    strip,
                 )?
             }
         }
@@ -1546,6 +1586,7 @@ fn datatype_to_avro(
                 values.metadata(),
                 name_gen,
                 null_order,
+                strip,
             )?;
             let mut merged = merge_extras(value_schema, value_extras);
             if values.is_nullable() {
@@ -1564,6 +1605,7 @@ fn datatype_to_avro(
                 values.metadata(),
                 name_gen,
                 null_order,
+                strip,
             )?;
             return Ok((value_schema, JsonMap::new()));
         }
@@ -1578,6 +1620,7 @@ fn datatype_to_avro(
                     field_ref.metadata(),
                     name_gen,
                     null_order,
+                    strip,
                 )?;
                 // Avro unions cannot immediately contain another union
                 if matches!(branch_schema, Value::Array(_)) {
@@ -1597,21 +1640,22 @@ fn datatype_to_avro(
                     ));
                 }
             }
-            extras.insert(
-                "arrowUnionMode".into(),
-                Value::String(
-                    match mode {
-                        UnionMode::Sparse => "sparse",
-                        UnionMode::Dense => "dense",
-                    }
-                    .to_string(),
-                ),
-            );
-            extras.insert(
-                "arrowUnionTypeIds".into(),
-                Value::Array(type_ids.into_iter().map(|id| 
json!(id)).collect()),
-            );
-
+            if !strip {
+                extras.insert(
+                    "arrowUnionMode".into(),
+                    Value::String(
+                        match mode {
+                            UnionMode::Sparse => "sparse",
+                            UnionMode::Dense => "dense",
+                        }
+                        .to_string(),
+                    ),
+                );
+                extras.insert(
+                    "arrowUnionTypeIds".into(),
+                    Value::Array(type_ids.into_iter().map(|id| 
json!(id)).collect()),
+                );
+            }
             Value::Array(branches)
         }
         #[cfg(not(feature = "small_decimals"))]
@@ -1631,8 +1675,9 @@ fn process_datatype(
     name_gen: &mut NameGenerator,
     null_order: Nullability,
     is_nullable: bool,
+    strip: bool,
 ) -> Result<Value, ArrowError> {
-    let (schema, extras) = datatype_to_avro(dt, field_name, metadata, 
name_gen, null_order)?;
+    let (schema, extras) = datatype_to_avro(dt, field_name, metadata, 
name_gen, null_order, strip)?;
     let mut merged = merge_extras(schema, extras);
     if is_nullable {
         merged = wrap_nullable(merged, null_order)
@@ -1644,6 +1689,7 @@ fn arrow_field_to_avro(
     field: &ArrowField,
     name_gen: &mut NameGenerator,
     null_order: Nullability,
+    strip: bool,
 ) -> Result<Value, ArrowError> {
     let avro_name = sanitise_avro_name(field.name());
     let schema_value = process_datatype(
@@ -1653,6 +1699,7 @@ fn arrow_field_to_avro(
         name_gen,
         null_order,
         field.is_nullable(),
+        strip,
     )?;
     // Build the field map
     let mut map = JsonMap::with_capacity(field.metadata().len() + 3);
@@ -2952,4 +2999,142 @@ mod tests {
             "expected missing-default error, got: {err}"
         );
     }
+
+    #[test]
+    fn 
test_from_arrow_with_options_respects_schema_metadata_when_not_stripping() {
+        let field = ArrowField::new("x", DataType::Int32, true);
+        let injected_json =
+            
r#"{"type":"record","name":"Injected","fields":[{"name":"ignored","type":"int"}]}"#
+                .to_string();
+        let mut md = HashMap::new();
+        md.insert(SCHEMA_METADATA_KEY.to_string(), injected_json.clone());
+        md.insert("custom".to_string(), "123".to_string());
+        let arrow_schema = ArrowSchema::new_with_metadata(vec![field], md);
+        let opts = AvroSchemaOptions {
+            null_order: Some(Nullability::NullSecond),
+            strip_metadata: false,
+        };
+        let out = AvroSchema::from_arrow_with_options(&arrow_schema, 
Some(opts)).unwrap();
+        assert_eq!(
+            out.json_string, injected_json,
+            "When strip_metadata=false and avro.schema is present, return the 
embedded JSON verbatim"
+        );
+        let v: Value = serde_json::from_str(&out.json_string).unwrap();
+        assert_eq!(v.get("type").and_then(|t| t.as_str()), Some("record"));
+        assert_eq!(v.get("name").and_then(|n| n.as_str()), Some("Injected"));
+    }
+
+    #[test]
+    fn 
test_from_arrow_with_options_ignores_schema_metadata_when_stripping_and_keeps_passthrough()
 {
+        let field = ArrowField::new("x", DataType::Int32, true);
+        let injected_json =
+            
r#"{"type":"record","name":"Injected","fields":[{"name":"ignored","type":"int"}]}"#
+                .to_string();
+        let mut md = HashMap::new();
+        md.insert(SCHEMA_METADATA_KEY.to_string(), injected_json);
+        md.insert("custom_meta".to_string(), "7".to_string());
+        let arrow_schema = ArrowSchema::new_with_metadata(vec![field], md);
+        let opts = AvroSchemaOptions {
+            null_order: Some(Nullability::NullFirst),
+            strip_metadata: true,
+        };
+        let out = AvroSchema::from_arrow_with_options(&arrow_schema, 
Some(opts)).unwrap();
+        assert_json_contains(&out.json_string, "\"type\":\"record\"");
+        assert_json_contains(&out.json_string, "\"name\":\"topLevelRecord\"");
+        assert_json_contains(&out.json_string, "\"custom_meta\":7");
+    }
+
+    #[test]
+    fn test_from_arrow_with_options_null_first_for_nullable_primitive() {
+        let field = ArrowField::new("s", DataType::Utf8, true);
+        let arrow_schema = single_field_schema(field);
+        let opts = AvroSchemaOptions {
+            null_order: Some(Nullability::NullFirst),
+            strip_metadata: true,
+        };
+        let out = AvroSchema::from_arrow_with_options(&arrow_schema, 
Some(opts)).unwrap();
+        let v: Value = serde_json::from_str(&out.json_string).unwrap();
+        let arr = v["fields"][0]["type"]
+            .as_array()
+            .expect("nullable primitive should be Avro union array");
+        assert_eq!(arr[0], Value::String("null".into()));
+        assert_eq!(arr[1], Value::String("string".into()));
+    }
+
+    #[test]
+    fn test_from_arrow_with_options_null_second_for_nullable_primitive() {
+        let field = ArrowField::new("s", DataType::Utf8, true);
+        let arrow_schema = single_field_schema(field);
+        let opts = AvroSchemaOptions {
+            null_order: Some(Nullability::NullSecond),
+            strip_metadata: true,
+        };
+        let out = AvroSchema::from_arrow_with_options(&arrow_schema, 
Some(opts)).unwrap();
+        let v: Value = serde_json::from_str(&out.json_string).unwrap();
+        let arr = v["fields"][0]["type"]
+            .as_array()
+            .expect("nullable primitive should be Avro union array");
+        assert_eq!(arr[0], Value::String("string".into()));
+        assert_eq!(arr[1], Value::String("null".into()));
+    }
+
+    #[test]
+    fn test_from_arrow_with_options_union_extras_respected_by_strip_metadata() 
{
+        let uf: UnionFields = vec![
+            (2i8, Arc::new(ArrowField::new("a", DataType::Int32, false))),
+            (7i8, Arc::new(ArrowField::new("b", DataType::Utf8, false))),
+        ]
+        .into_iter()
+        .collect();
+        let union_dt = DataType::Union(uf, UnionMode::Dense);
+        let arrow_schema = single_field_schema(ArrowField::new("u", union_dt, 
true));
+        let with_extras = AvroSchema::from_arrow_with_options(
+            &arrow_schema,
+            Some(AvroSchemaOptions {
+                null_order: Some(Nullability::NullFirst),
+                strip_metadata: false,
+            }),
+        )
+        .unwrap();
+        let v_with: Value = 
serde_json::from_str(&with_extras.json_string).unwrap();
+        let union_arr = v_with["fields"][0]["type"].as_array().expect("union 
array");
+        let first_obj = union_arr
+            .iter()
+            .find(|b| b.is_object())
+            .expect("expected an object branch with extras");
+        let obj = first_obj.as_object().unwrap();
+        assert_eq!(obj.get("type").and_then(|t| t.as_str()), Some("int"));
+        assert_eq!(
+            obj.get("arrowUnionMode").and_then(|m| m.as_str()),
+            Some("dense")
+        );
+        let type_ids: Vec<i64> = obj["arrowUnionTypeIds"]
+            .as_array()
+            .expect("arrowUnionTypeIds array")
+            .iter()
+            .map(|n| n.as_i64().expect("i64"))
+            .collect();
+        assert_eq!(type_ids, vec![2, 7]);
+        let stripped = AvroSchema::from_arrow_with_options(
+            &arrow_schema,
+            Some(AvroSchemaOptions {
+                null_order: Some(Nullability::NullFirst),
+                strip_metadata: true,
+            }),
+        )
+        .unwrap();
+        let v_stripped: Value = 
serde_json::from_str(&stripped.json_string).unwrap();
+        let union_arr2 = v_stripped["fields"][0]["type"]
+            .as_array()
+            .expect("union array");
+        assert!(
+            !union_arr2.iter().any(|b| b
+                .as_object()
+                .is_some_and(|m| m.contains_key("arrowUnionMode"))),
+            "extras must be removed when strip_metadata=true"
+        );
+        assert_eq!(union_arr2[0], Value::String("null".into()));
+        assert_eq!(union_arr2[1], Value::String("int".into()));
+        assert_eq!(union_arr2[2], Value::String("string".into()));
+    }
 }
diff --git a/arrow-avro/src/writer/encoder.rs b/arrow-avro/src/writer/encoder.rs
index 5ddb798846..79aee4fae0 100644
--- a/arrow-avro/src/writer/encoder.rs
+++ b/arrow-avro/src/writer/encoder.rs
@@ -20,7 +20,6 @@
 use crate::codec::{AvroDataType, AvroField, Codec};
 use crate::schema::{Fingerprint, Nullability, Prefix};
 use arrow_array::cast::AsArray;
-use arrow_array::types::RunEndIndexType;
 use arrow_array::types::{
     ArrowPrimitiveType, Date32Type, DurationMicrosecondType, 
DurationMillisecondType,
     DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, 
Int16Type, Int32Type,
@@ -28,10 +27,15 @@ use arrow_array::types::{
     Time32MillisecondType, Time64MicrosecondType, TimestampMicrosecondType,
     TimestampMillisecondType,
 };
+use arrow_array::types::{
+    RunEndIndexType, Time32SecondType, TimestampNanosecondType, 
TimestampSecondType,
+};
 use arrow_array::{
-    Array, Decimal128Array, Decimal256Array, DictionaryArray, 
FixedSizeBinaryArray,
-    GenericBinaryArray, GenericListArray, GenericStringArray, LargeListArray, 
ListArray, MapArray,
-    OffsetSizeTrait, PrimitiveArray, RecordBatch, RunArray, StringArray, 
StructArray, UnionArray,
+    Array, BinaryViewArray, Decimal128Array, Decimal256Array, DictionaryArray,
+    FixedSizeBinaryArray, FixedSizeListArray, GenericBinaryArray, 
GenericListArray,
+    GenericListViewArray, GenericStringArray, LargeListArray, 
LargeListViewArray, ListArray,
+    ListViewArray, MapArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, 
RunArray, StringArray,
+    StringViewArray, StructArray, UnionArray,
 };
 #[cfg(feature = "small_decimals")]
 use arrow_array::{Decimal32Array, Decimal64Array};
@@ -237,15 +241,72 @@ impl<'a> FieldEncoder<'a> {
                 DataType::LargeUtf8 => {
                     
Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
                 }
+                DataType::Utf8View => {
+                    let arr = array
+                        .as_any()
+                        .downcast_ref::<StringViewArray>()
+                        .ok_or_else(|| {
+                            ArrowError::SchemaError("Expected 
StringViewArray".into())
+                        })?;
+                    Encoder::Utf8View(Utf8ViewEncoder(arr))
+                }
+                DataType::BinaryView => {
+                    let arr = array
+                        .as_any()
+                        .downcast_ref::<BinaryViewArray>()
+                        .ok_or_else(|| {
+                            ArrowError::SchemaError("Expected 
BinaryViewArray".into())
+                        })?;
+                    Encoder::BinaryView(BinaryViewEncoder(arr))
+                }
                 DataType::Int32 => 
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
                 DataType::Int64 => 
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
                 DataType::Date32 => 
Encoder::Date32(IntEncoder(array.as_primitive::<Date32Type>())),
+                DataType::Date64 => {
+                    return Err(ArrowError::NotYetImplemented(
+                        "Avro logical type 'date' is days since epoch (int). 
Arrow Date64 (ms) has no direct Avro logical type; cast to Date32 or to a 
Timestamp."
+                            .into(),
+                    ));
+                }
+                DataType::Time32(TimeUnit::Second) => 
Encoder::Time32SecsToMillis(
+                    
Time32SecondsToMillisEncoder(array.as_primitive::<Time32SecondType>()),
+                ),
                 DataType::Time32(TimeUnit::Millisecond) => {
                     
Encoder::Time32Millis(IntEncoder(array.as_primitive::<Time32MillisecondType>()))
                 }
+                DataType::Time32(TimeUnit::Microsecond) => {
+                    return Err(ArrowError::InvalidArgumentError(
+                        "Arrow Time32 only supports Second or Millisecond. Use 
Time64 for microseconds."
+                            .into(),
+                    ));
+                }
+                DataType::Time32(TimeUnit::Nanosecond) => {
+                    return Err(ArrowError::InvalidArgumentError(
+                        "Arrow Time32 only supports Second or Millisecond. Use 
Time64 for nanoseconds."
+                            .into(),
+                    ));
+                }
                 DataType::Time64(TimeUnit::Microsecond) => 
Encoder::Time64Micros(LongEncoder(
                     array.as_primitive::<Time64MicrosecondType>(),
                 )),
+                DataType::Time64(TimeUnit::Nanosecond) => {
+                    return Err(ArrowError::NotYetImplemented(
+                        "Avro writer does not support time-nanos; cast to 
Time64(Microsecond)."
+                            .into(),
+                    ));
+                }
+                DataType::Time64(TimeUnit::Millisecond) => {
+                    return Err(ArrowError::InvalidArgumentError(
+                        "Arrow Time64 with millisecond unit is not a valid 
Arrow type (use Time32 for millis)."
+                            .into(),
+                    ));
+                }
+                DataType::Time64(TimeUnit::Second) => {
+                    return Err(ArrowError::InvalidArgumentError(
+                        "Arrow Time64 with second unit is not a valid Arrow 
type (use Time32 for seconds)."
+                            .into(),
+                    ));
+                }
                 DataType::Float32 => {
                     
Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
                 }
@@ -266,17 +327,20 @@ impl<'a> FieldEncoder<'a> {
                     Encoder::Fixed(FixedEncoder(arr))
                 }
                 DataType::Timestamp(unit, _) => match unit {
+                    TimeUnit::Second => {
+                        
Encoder::TimestampSecsToMillis(TimestampSecondsToMillisEncoder(
+                            array.as_primitive::<TimestampSecondType>(),
+                        ))
+                    }
                     TimeUnit::Millisecond => 
Encoder::TimestampMillis(LongEncoder(
                         array.as_primitive::<TimestampMillisecondType>(),
                     )),
-                    TimeUnit::Microsecond => Encoder::Timestamp(LongEncoder(
+                    TimeUnit::Microsecond => 
Encoder::TimestampMicros(LongEncoder(
                         array.as_primitive::<TimestampMicrosecondType>(),
                     )),
-                    other => {
-                        return Err(ArrowError::NotYetImplemented(format!(
-                            "Avro writer does not support Timestamp with unit 
{other:?}"
-                        )));
-                    }
+                    TimeUnit::Nanosecond => 
Encoder::TimestampNanos(LongEncoder(
+                        array.as_primitive::<TimestampNanosecondType>(),
+                    )),
                 },
                 DataType::Interval(unit) => match unit {
                     IntervalUnit::MonthDayNano => 
Encoder::IntervalMonthDayNano(DurationEncoder(
@@ -342,9 +406,46 @@ impl<'a> FieldEncoder<'a> {
                         item_plan.as_ref(),
                     )?))
                 }
+                DataType::ListView(_) => {
+                    let arr = array
+                        .as_any()
+                        .downcast_ref::<ListViewArray>()
+                        .ok_or_else(|| ArrowError::SchemaError("Expected 
ListViewArray".into()))?;
+                    Encoder::ListView(Box::new(ListViewEncoder32::try_new(
+                        arr,
+                        *items_nullability,
+                        item_plan.as_ref(),
+                    )?))
+                }
+                DataType::LargeListView(_) => {
+                    let arr = array
+                        .as_any()
+                        .downcast_ref::<LargeListViewArray>()
+                        .ok_or_else(|| {
+                            ArrowError::SchemaError("Expected 
LargeListViewArray".into())
+                        })?;
+                    Encoder::LargeListView(Box::new(ListViewEncoder64::try_new(
+                        arr,
+                        *items_nullability,
+                        item_plan.as_ref(),
+                    )?))
+                }
+                DataType::FixedSizeList(_, _) => {
+                    let arr = array
+                        .as_any()
+                        .downcast_ref::<FixedSizeListArray>()
+                        .ok_or_else(|| {
+                            ArrowError::SchemaError("Expected 
FixedSizeListArray".into())
+                        })?;
+                    
Encoder::FixedSizeList(Box::new(FixedSizeListEncoder::try_new(
+                        arr,
+                        *items_nullability,
+                        item_plan.as_ref(),
+                    )?))
+                }
                 other => {
                     return Err(ArrowError::SchemaError(format!(
-                        "Avro array site requires Arrow List/LargeList, found: 
{other:?}"
+                        "Avro array site requires Arrow 
List/LargeList/ListView/LargeListView/FixedSizeList, found: {other:?}"
                     )));
                 }
             },
@@ -845,16 +946,19 @@ impl FieldPlan {
                 Ok(FieldPlan::Struct { bindings })
             }
             Codec::List(items_dt) => match arrow_field.data_type() {
-                DataType::List(field_ref) => Ok(FieldPlan::List {
+                DataType::List(field_ref)
+                | DataType::LargeList(field_ref)
+                | DataType::ListView(field_ref)
+                | DataType::LargeListView(field_ref) => Ok(FieldPlan::List {
                     items_nullability: items_dt.nullability(),
                     item_plan: Box::new(FieldPlan::build(items_dt.as_ref(), 
field_ref.as_ref())?),
                 }),
-                DataType::LargeList(field_ref) => Ok(FieldPlan::List {
+                DataType::FixedSizeList(field_ref, _len) => Ok(FieldPlan::List 
{
                     items_nullability: items_dt.nullability(),
                     item_plan: Box::new(FieldPlan::build(items_dt.as_ref(), 
field_ref.as_ref())?),
                 }),
                 other => Err(ArrowError::SchemaError(format!(
-                    "Avro array maps to Arrow List/LargeList, found: {other:?}"
+                    "Avro array maps to Arrow 
List/LargeList/ListView/LargeListView/FixedSizeList, found: {other:?}"
                 ))),
             },
             Codec::Map(values_dt) => {
@@ -1003,9 +1107,12 @@ enum Encoder<'a> {
     Boolean(BooleanEncoder<'a>),
     Int(IntEncoder<'a, Int32Type>),
     Long(LongEncoder<'a, Int64Type>),
-    Timestamp(LongEncoder<'a, TimestampMicrosecondType>),
+    TimestampMicros(LongEncoder<'a, TimestampMicrosecondType>),
     TimestampMillis(LongEncoder<'a, TimestampMillisecondType>),
+    TimestampNanos(LongEncoder<'a, TimestampNanosecondType>),
+    TimestampSecsToMillis(TimestampSecondsToMillisEncoder<'a>),
     Date32(IntEncoder<'a, Date32Type>),
+    Time32SecsToMillis(Time32SecondsToMillisEncoder<'a>),
     Time32Millis(IntEncoder<'a, Time32MillisecondType>),
     Time64Micros(LongEncoder<'a, Time64MicrosecondType>),
     DurationSeconds(LongEncoder<'a, DurationSecondType>),
@@ -1018,8 +1125,13 @@ enum Encoder<'a> {
     LargeBinary(BinaryEncoder<'a, i64>),
     Utf8(Utf8Encoder<'a>),
     Utf8Large(Utf8LargeEncoder<'a>),
+    Utf8View(Utf8ViewEncoder<'a>),
+    BinaryView(BinaryViewEncoder<'a>),
     List(Box<ListEncoder32<'a>>),
     LargeList(Box<ListEncoder64<'a>>),
+    ListView(Box<ListViewEncoder32<'a>>),
+    LargeListView(Box<ListViewEncoder64<'a>>),
+    FixedSizeList(Box<FixedSizeListEncoder<'a>>),
     Struct(Box<StructEncoder<'a>>),
     /// Avro `fixed` encoder (raw bytes, no length)
     Fixed(FixedEncoder<'a>),
@@ -1055,9 +1167,12 @@ impl<'a> Encoder<'a> {
             Encoder::Boolean(e) => e.encode(out, idx),
             Encoder::Int(e) => e.encode(out, idx),
             Encoder::Long(e) => e.encode(out, idx),
-            Encoder::Timestamp(e) => e.encode(out, idx),
+            Encoder::TimestampMicros(e) => e.encode(out, idx),
             Encoder::TimestampMillis(e) => e.encode(out, idx),
+            Encoder::TimestampNanos(e) => e.encode(out, idx),
+            Encoder::TimestampSecsToMillis(e) => e.encode(out, idx),
             Encoder::Date32(e) => e.encode(out, idx),
+            Encoder::Time32SecsToMillis(e) => e.encode(out, idx),
             Encoder::Time32Millis(e) => e.encode(out, idx),
             Encoder::Time64Micros(e) => e.encode(out, idx),
             Encoder::DurationSeconds(e) => e.encode(out, idx),
@@ -1070,8 +1185,13 @@ impl<'a> Encoder<'a> {
             Encoder::LargeBinary(e) => e.encode(out, idx),
             Encoder::Utf8(e) => e.encode(out, idx),
             Encoder::Utf8Large(e) => e.encode(out, idx),
+            Encoder::Utf8View(e) => e.encode(out, idx),
+            Encoder::BinaryView(e) => e.encode(out, idx),
             Encoder::List(e) => e.encode(out, idx),
             Encoder::LargeList(e) => e.encode(out, idx),
+            Encoder::ListView(e) => e.encode(out, idx),
+            Encoder::LargeListView(e) => e.encode(out, idx),
+            Encoder::FixedSizeList(e) => e.encode(out, idx),
             Encoder::Struct(e) => e.encode(out, idx),
             Encoder::Fixed(e) => (e).encode(out, idx),
             Encoder::Uuid(e) => (e).encode(out, idx),
@@ -1118,6 +1238,32 @@ impl<'a, P: ArrowPrimitiveType<Native = i64>> 
LongEncoder<'a, P> {
     }
 }
 
+/// Time32(Second) to Avro time-millis (int), via safe scaling by 1000
+struct Time32SecondsToMillisEncoder<'a>(&'a PrimitiveArray<Time32SecondType>);
+impl<'a> Time32SecondsToMillisEncoder<'a> {
+    #[inline]
+    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> 
Result<(), ArrowError> {
+        let secs = self.0.value(idx);
+        let millis = secs.checked_mul(1000).ok_or_else(|| {
+            ArrowError::InvalidArgumentError("time32(secs) * 1000 
overflowed".into())
+        })?;
+        write_int(out, millis)
+    }
+}
+
+/// Timestamp(Second) to Avro timestamp-millis (long), via safe scaling by 1000
+struct TimestampSecondsToMillisEncoder<'a>(&'a 
PrimitiveArray<TimestampSecondType>);
+impl<'a> TimestampSecondsToMillisEncoder<'a> {
+    #[inline]
+    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> 
Result<(), ArrowError> {
+        let secs = self.0.value(idx);
+        let millis = secs.checked_mul(1000).ok_or_else(|| {
+            ArrowError::InvalidArgumentError("timestamp(secs) * 1000 
overflowed".into())
+        })?;
+        write_long(out, millis)
+    }
+}
+
 /// Unified binary encoder generic over offset size (i32/i64).
 struct BinaryEncoder<'a, O: OffsetSizeTrait>(&'a GenericBinaryArray<O>);
 impl<'a, O: OffsetSizeTrait> BinaryEncoder<'a, O> {
@@ -1126,6 +1272,22 @@ impl<'a, O: OffsetSizeTrait> BinaryEncoder<'a, O> {
     }
 }
 
+/// BinaryView (byte view) encoder.
+struct BinaryViewEncoder<'a>(&'a BinaryViewArray);
+impl BinaryViewEncoder<'_> {
+    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> 
Result<(), ArrowError> {
+        write_len_prefixed(out, self.0.value(idx))
+    }
+}
+
+/// StringView encoder.
+struct Utf8ViewEncoder<'a>(&'a StringViewArray);
+impl Utf8ViewEncoder<'_> {
+    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> 
Result<(), ArrowError> {
+        write_len_prefixed(out, self.0.value(idx).as_bytes())
+    }
+}
+
 struct F32Encoder<'a>(&'a arrow_array::Float32Array);
 impl F32Encoder<'_> {
     fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> 
Result<(), ArrowError> {
@@ -1470,6 +1632,109 @@ impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> {
     }
 }
 
+/// ListView encoder using `(offset, size)` buffers.
+struct ListViewEncoder<'a, O: OffsetSizeTrait> {
+    list: &'a GenericListViewArray<O>,
+    values: FieldEncoder<'a>,
+    values_offset: usize,
+}
+type ListViewEncoder32<'a> = ListViewEncoder<'a, i32>;
+type ListViewEncoder64<'a> = ListViewEncoder<'a, i64>;
+
+impl<'a, O: OffsetSizeTrait> ListViewEncoder<'a, O> {
+    fn try_new(
+        list: &'a GenericListViewArray<O>,
+        items_nullability: Option<Nullability>,
+        item_plan: &FieldPlan,
+    ) -> Result<Self, ArrowError> {
+        let child_field = match list.data_type() {
+            DataType::ListView(field) => field.as_ref(),
+            DataType::LargeListView(field) => field.as_ref(),
+            _ => {
+                return Err(ArrowError::SchemaError(
+                    "Expected ListView or LargeListView for 
ListViewEncoder".into(),
+                ));
+            }
+        };
+        let values_enc = prepare_value_site_encoder(
+            list.values().as_ref(),
+            child_field,
+            items_nullability,
+            item_plan,
+        )?;
+        Ok(Self {
+            list,
+            values: values_enc,
+            values_offset: list.values().offset(),
+        })
+    }
+
+    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> 
Result<(), ArrowError> {
+        let start = self.list.value_offset(idx).to_usize().ok_or_else(|| {
+            ArrowError::InvalidArgumentError(format!(
+                "Error converting value_offset[{idx}] to usize"
+            ))
+        })?;
+        let len = self.list.value_size(idx).to_usize().ok_or_else(|| {
+            ArrowError::InvalidArgumentError(format!("Error converting 
value_size[{idx}] to usize"))
+        })?;
+        let start = start + self.values_offset;
+        let end = start + len;
+        encode_blocked_range(out, start, end, |out, row| {
+            self.values
+                .encode(out, row.saturating_sub(self.values_offset))
+        })
+    }
+}
+
+/// FixedSizeList encoder.
+struct FixedSizeListEncoder<'a> {
+    list: &'a FixedSizeListArray,
+    values: FieldEncoder<'a>,
+    values_offset: usize,
+    elem_len: usize,
+}
+
+impl<'a> FixedSizeListEncoder<'a> {
+    fn try_new(
+        list: &'a FixedSizeListArray,
+        items_nullability: Option<Nullability>,
+        item_plan: &FieldPlan,
+    ) -> Result<Self, ArrowError> {
+        let child_field = match list.data_type() {
+            DataType::FixedSizeList(field, _len) => field.as_ref(),
+            _ => {
+                return Err(ArrowError::SchemaError(
+                    "Expected FixedSizeList for FixedSizeListEncoder".into(),
+                ));
+            }
+        };
+        let values_enc = prepare_value_site_encoder(
+            list.values().as_ref(),
+            child_field,
+            items_nullability,
+            item_plan,
+        )?;
+        Ok(Self {
+            list,
+            values: values_enc,
+            values_offset: list.values().offset(),
+            elem_len: list.value_length() as usize,
+        })
+    }
+
+    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> 
Result<(), ArrowError> {
+        // Starting index is relative to values() start
+        let rel = self.list.value_offset(idx) as usize;
+        let start = self.values_offset + rel;
+        let end = start + self.elem_len;
+        encode_blocked_range(out, start, end, |out, row| {
+            self.values
+                .encode(out, row.saturating_sub(self.values_offset))
+        })
+    }
+}
+
 fn prepare_value_site_encoder<'a>(
     values_array: &'a dyn Array,
     value_field: &Field,
@@ -1491,7 +1756,7 @@ impl FixedEncoder<'_> {
     }
 }
 
-/// Avro UUID logical type encoder: Arrow FixedSizeBinary(16) → Avro string 
(UUID).
+/// Avro UUID logical type encoder: Arrow FixedSizeBinary(16) to Avro string 
(UUID).
 /// Spec: uuid is a logical type over string (RFC‑4122). We output hyphenated 
form.
 struct UuidEncoder<'a>(&'a FixedSizeBinaryArray);
 impl UuidEncoder<'_> {
@@ -2614,4 +2879,226 @@ mod tests {
             _ => panic!("expected SchemaError"),
         }
     }
+
+    #[test]
+    fn timestamp_micros_encoder() {
+        // Mirrors the style used by `timestamp_millis_encoder`
+        test_scalar_primitive_encoding::<TimestampMicrosecondType>(
+            &[
+                1_704_067_200_000_000, // 2024-01-01T00:00:00Z in micros
+                0,                     // epoch
+                -123_456_789,          // pre-epoch
+            ],
+            &[None, Some(1_704_067_200_000_000)],
+        );
+    }
+
+    #[test]
+    fn list_encoder_nullable_items_null_first() {
+        // One List row with three elements: [Some(1), None, Some(2)]
+        let values = Int32Array::from(vec![Some(1), None, Some(2)]);
+        let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 3].into());
+        let list = ListArray::new(
+            Field::new("item", DataType::Int32, true).into(),
+            offsets,
+            Arc::new(values) as ArrayRef,
+            None,
+        );
+
+        let plan = FieldPlan::List {
+            items_nullability: Some(Nullability::NullFirst),
+            item_plan: Box::new(FieldPlan::Scalar),
+        };
+
+        // Avro array encoding per row: one positive block, then 0 terminator.
+        // For NullFirst: Some(v) => branch 1 (0x02) then the value; None => 
branch 0 (0x00)
+        let mut expected = Vec::new();
+        expected.extend(avro_long_bytes(3)); // block of 3
+        expected.extend(avro_long_bytes(1)); // union branch=1 (value)
+        expected.extend(avro_long_bytes(1)); // value 1
+        expected.extend(avro_long_bytes(0)); // union branch=0 (null)
+        expected.extend(avro_long_bytes(1)); // union branch=1 (value)
+        expected.extend(avro_long_bytes(2)); // value 2
+        expected.extend(avro_long_bytes(0)); // block terminator
+
+        let got = encode_all(&list, &plan, None);
+        assert_bytes_eq(&got, &expected);
+    }
+
+    #[test]
+    fn large_list_encoder_nullable_items_null_first() {
+        // LargeList single row: [Some(10), None]
+        let values = Int32Array::from(vec![Some(10), None]);
+        let offsets = arrow_buffer::OffsetBuffer::new(vec![0i64, 2].into());
+        let list = LargeListArray::new(
+            Field::new("item", DataType::Int32, true).into(),
+            offsets,
+            Arc::new(values) as ArrayRef,
+            None,
+        );
+
+        let plan = FieldPlan::List {
+            items_nullability: Some(Nullability::NullFirst),
+            item_plan: Box::new(FieldPlan::Scalar),
+        };
+
+        let mut expected = Vec::new();
+        expected.extend(avro_long_bytes(2)); // block of 2
+        expected.extend(avro_long_bytes(1)); // union branch=1 (value)
+        expected.extend(avro_long_bytes(10)); // value 10
+        expected.extend(avro_long_bytes(0)); // union branch=0 (null)
+        expected.extend(avro_long_bytes(0)); // block terminator
+
+        let got = encode_all(&list, &plan, None);
+        assert_bytes_eq(&got, &expected);
+    }
+
+    #[test]
+    fn map_encoder_string_keys_nullable_int_values_null_first() {
+        // One map row: {"k1": Some(7), "k2": None}
+        let keys = StringArray::from(vec!["k1", "k2"]);
+        let values = Int32Array::from(vec![Some(7), None]);
+
+        let entries_fields = Fields::from(vec![
+            Field::new("key", DataType::Utf8, false),
+            Field::new("value", DataType::Int32, true),
+        ]);
+        let entries = StructArray::new(
+            entries_fields,
+            vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
+            None,
+        );
+
+        // Single row -> offsets [0, 2]
+        let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 2].into());
+        let map = MapArray::new(
+            Field::new("entries", entries.data_type().clone(), false).into(),
+            offsets,
+            entries,
+            None,
+            false,
+        );
+
+        let plan = FieldPlan::Map {
+            values_nullability: Some(Nullability::NullFirst),
+            value_plan: Box::new(FieldPlan::Scalar),
+        };
+
+        // Expected:
+        // - one positive block (len=2)
+        // - "k1", branch=1 + value=7
+        // - "k2", branch=0 (null)
+        // - end-of-block marker 0
+        let mut expected = Vec::new();
+        expected.extend(avro_long_bytes(2)); // block length 2
+        expected.extend(avro_len_prefixed_bytes(b"k1")); // key "k1"
+        expected.extend(avro_long_bytes(1)); // union branch 1 (value)
+        expected.extend(avro_long_bytes(7)); // value 7
+        expected.extend(avro_len_prefixed_bytes(b"k2")); // key "k2"
+        expected.extend(avro_long_bytes(0)); // union branch 0 (null)
+        expected.extend(avro_long_bytes(0)); // block terminator
+
+        let got = encode_all(&map, &plan, None);
+        assert_bytes_eq(&got, &expected);
+    }
+
+    #[test]
+    fn time32_seconds_to_millis_encoder() {
+        // Time32(Second) must encode as Avro time-millis (ms since midnight).
+        let arr: 
arrow_array::PrimitiveArray<arrow_array::types::Time32SecondType> =
+            vec![0i32, 1, -2, 12_345].into();
+
+        let got = encode_all(&arr, &FieldPlan::Scalar, None);
+
+        let mut expected = Vec::new();
+        for secs in [0i32, 1, -2, 12_345] {
+            let millis = (secs as i64) * 1000;
+            expected.extend_from_slice(&avro_long_bytes(millis));
+        }
+        assert_bytes_eq(&got, &expected);
+    }
+
+    #[test]
+    fn time32_seconds_to_millis_overflow() {
+        // Choose a value that will overflow i32 when multiplied by 1000.
+        let overflow_secs: i32 = i32::MAX / 1000 + 1;
+        let arr: 
arrow_array::PrimitiveArray<arrow_array::types::Time32SecondType> =
+            vec![overflow_secs].into();
+
+        let field = arrow_schema::Field::new(
+            "f",
+            arrow_schema::DataType::Time32(arrow_schema::TimeUnit::Second),
+            true,
+        );
+        let mut enc = FieldEncoder::make_encoder(&arr, &field, 
&FieldPlan::Scalar, None).unwrap();
+
+        let mut out = Vec::new();
+        let err = enc.encode(&mut out, 0).unwrap_err();
+        match err {
+            arrow_schema::ArrowError::InvalidArgumentError(msg) => {
+                assert!(
+                    msg.contains("overflowed") || msg.contains("overflow"),
+                    "unexpected message: {msg}"
+                )
+            }
+            other => panic!("expected InvalidArgumentError, got {other:?}"),
+        }
+    }
+
+    #[test]
+    fn timestamp_seconds_to_millis_encoder() {
+        // Timestamp(Second) must encode as Avro timestamp-millis (ms since 
epoch).
+        let arr: 
arrow_array::PrimitiveArray<arrow_array::types::TimestampSecondType> =
+            vec![0i64, 1, -1, 1_234_567_890].into();
+
+        let got = encode_all(&arr, &FieldPlan::Scalar, None);
+
+        let mut expected = Vec::new();
+        for secs in [0i64, 1, -1, 1_234_567_890] {
+            let millis = secs * 1000;
+            expected.extend_from_slice(&avro_long_bytes(millis));
+        }
+        assert_bytes_eq(&got, &expected);
+    }
+
+    #[test]
+    fn timestamp_seconds_to_millis_overflow() {
+        // Overflow i64 when multiplied by 1000.
+        let overflow_secs: i64 = i64::MAX / 1000 + 1;
+        let arr: 
arrow_array::PrimitiveArray<arrow_array::types::TimestampSecondType> =
+            vec![overflow_secs].into();
+
+        let field = arrow_schema::Field::new(
+            "f",
+            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second, 
None),
+            true,
+        );
+        let mut enc = FieldEncoder::make_encoder(&arr, &field, 
&FieldPlan::Scalar, None).unwrap();
+
+        let mut out = Vec::new();
+        let err = enc.encode(&mut out, 0).unwrap_err();
+        match err {
+            arrow_schema::ArrowError::InvalidArgumentError(msg) => {
+                assert!(
+                    msg.contains("overflowed") || msg.contains("overflow"),
+                    "unexpected message: {msg}"
+                )
+            }
+            other => panic!("expected InvalidArgumentError, got {other:?}"),
+        }
+    }
+
+    #[test]
+    fn timestamp_nanos_encoder() {
+        let arr: 
arrow_array::PrimitiveArray<arrow_array::types::TimestampNanosecondType> =
+            vec![0i64, 1, -1, 123].into();
+
+        let got = encode_all(&arr, &FieldPlan::Scalar, None);
+
+        let mut expected = Vec::new();
+        for ns in [0i64, 1, -1, 123] {
+            expected.extend_from_slice(&avro_long_bytes(ns));
+        }
+        assert_bytes_eq(&got, &expected);
+    }
 }
diff --git a/arrow-avro/src/writer/format.rs b/arrow-avro/src/writer/format.rs
index f885fb86a6..07534c960a 100644
--- a/arrow-avro/src/writer/format.rs
+++ b/arrow-avro/src/writer/format.rs
@@ -15,8 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
+//! Avro Writer Formats for Arrow.
+
 use crate::compression::{CODEC_METADATA_KEY, CompressionCodec};
-use crate::schema::{AvroSchema, SCHEMA_METADATA_KEY};
+use crate::schema::{AvroSchema, AvroSchemaOptions, SCHEMA_METADATA_KEY};
 use crate::writer::encoder::write_long;
 use arrow_schema::{ArrowError, Schema};
 use rand::RngCore;
@@ -63,7 +65,13 @@ impl AvroFormat for AvroOcfFormat {
         // Choose the Avro schema JSON that the file will advertise.
         // If `schema.metadata[SCHEMA_METADATA_KEY]` exists, 
AvroSchema::try_from
         // uses it verbatim; otherwise it is generated from the Arrow schema.
-        let avro_schema = AvroSchema::try_from(schema)?;
+        let avro_schema = AvroSchema::from_arrow_with_options(
+            schema,
+            Some(AvroSchemaOptions {
+                null_order: None,
+                strip_metadata: true,
+            }),
+        )?;
         // Magic
         writer
             .write_all(b"Obj\x01")
diff --git a/arrow-avro/src/writer/mod.rs b/arrow-avro/src/writer/mod.rs
index c1ec61c7fa..1d4699865b 100644
--- a/arrow-avro/src/writer/mod.rs
+++ b/arrow-avro/src/writer/mod.rs
@@ -395,17 +395,23 @@ mod tests {
     use crate::reader::ReaderBuilder;
     use crate::schema::{AvroSchema, SchemaStore};
     use crate::test_util::arrow_test_data;
+    use arrow::datatypes::TimeUnit;
     #[cfg(feature = "avro_custom_types")]
     use arrow_array::types::{Int16Type, Int32Type, Int64Type};
+    use arrow_array::types::{
+        Time32MillisecondType, Time64MicrosecondType, TimestampMicrosecondType,
+        TimestampMillisecondType, TimestampNanosecondType,
+    };
     use arrow_array::{
-        Array, ArrayRef, BinaryArray, Int32Array, RecordBatch, StructArray, 
UnionArray,
+        Array, ArrayRef, BinaryArray, Date32Array, Int32Array, PrimitiveArray, 
RecordBatch,
+        StructArray, UnionArray,
     };
     #[cfg(feature = "avro_custom_types")]
     use arrow_array::{Int16Array, Int64Array, RunArray, StringArray};
     #[cfg(not(feature = "avro_custom_types"))]
     use arrow_schema::{DataType, Field, Schema};
     #[cfg(feature = "avro_custom_types")]
-    use arrow_schema::{DataType, Field, Schema, TimeUnit};
+    use arrow_schema::{DataType, Field, Schema};
     use std::collections::HashMap;
     use std::collections::HashSet;
     use std::fs::File;
@@ -758,8 +764,8 @@ mod tests {
     }
 
     // Strict equality (schema + values) only when canonical extension types 
are enabled
-    #[cfg(feature = "canonical_extension_types")]
     #[test]
+    #[cfg(feature = "canonical_extension_types")]
     fn test_round_trip_duration_and_uuid_ocf() -> Result<(), ArrowError> {
         use arrow_schema::{DataType, IntervalUnit};
         let in_file =
@@ -806,8 +812,8 @@ mod tests {
     }
 
     // Feature OFF: only values are asserted equal; schema may legitimately 
differ (uuid as fixed(16))
-    #[cfg(not(feature = "canonical_extension_types"))]
     #[test]
+    #[cfg(not(feature = "canonical_extension_types"))]
     fn test_duration_and_uuid_ocf_without_extensions_round_trips_values() -> 
Result<(), ArrowError>
     {
         use arrow::datatypes::{DataType, IntervalUnit};
@@ -869,8 +875,9 @@ mod tests {
         let uuid_rt = rt_schema.field_with_name("uuid_field")?;
         assert_eq!(uuid_rt.data_type(), &DataType::FixedSizeBinary(16));
         assert_eq!(
-            uuid_rt.metadata().get("avro.name").map(|s| s.as_str()),
-            Some("uuid_field")
+            uuid_rt.metadata().get("logicalType").map(|s| s.as_str()),
+            Some("uuid"),
+            "expected `logicalType = \"uuid\"` on round-tripped field metadata"
         );
 
         // 3) Duration remains Interval(MonthDayNano)
@@ -1859,4 +1866,311 @@ mod tests {
         assert_eq!(got, &expected);
         Ok(())
     }
+
+    #[test]
+    // TODO: avoid requiring snappy for this file
+    #[cfg(feature = "snappy")]
+    fn test_nullable_impala_roundtrip() -> Result<(), ArrowError> {
+        let path = arrow_test_data("avro/nullable.impala.avro");
+        let rdr_file = File::open(&path).expect("open 
avro/nullable.impala.avro");
+        let reader = ReaderBuilder::new()
+            .build(BufReader::new(rdr_file))
+            .expect("build reader for nullable.impala.avro");
+        let in_schema = reader.schema();
+        assert!(
+            in_schema.fields().iter().any(|f| f.is_nullable()),
+            "expected at least one nullable field in avro/nullable.impala.avro"
+        );
+        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
+        let original =
+            arrow::compute::concat_batches(&in_schema, 
&input_batches).expect("concat input");
+        let buffer: Vec<u8> = Vec::new();
+        let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
+        writer.write(&original)?;
+        writer.finish()?;
+        let out_bytes = writer.into_inner();
+        let rt_reader = ReaderBuilder::new()
+            .build(Cursor::new(out_bytes))
+            .expect("build reader for round-tripped in-memory OCF");
+        let rt_schema = rt_reader.schema();
+        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
+        let roundtrip =
+            arrow::compute::concat_batches(&rt_schema, 
&rt_batches).expect("concat roundtrip");
+        assert_eq!(
+            roundtrip, original,
+            "Round-trip Avro data mismatch for nullable.impala.avro"
+        );
+        Ok(())
+    }
+
+    #[test]
+    #[cfg(feature = "snappy")]
+    fn test_datapage_v2_roundtrip() -> Result<(), ArrowError> {
+        let path = arrow_test_data("avro/datapage_v2.snappy.avro");
+        let rdr_file = File::open(&path).expect("open 
avro/datapage_v2.snappy.avro");
+        let reader = ReaderBuilder::new()
+            .build(BufReader::new(rdr_file))
+            .expect("build reader for datapage_v2.snappy.avro");
+        let in_schema = reader.schema();
+        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
+        let original =
+            arrow::compute::concat_batches(&in_schema, 
&input_batches).expect("concat input");
+        let mut writer = AvroWriter::new(Vec::<u8>::new(), 
in_schema.as_ref().clone())?;
+        writer.write(&original)?;
+        writer.finish()?;
+        let bytes = writer.into_inner();
+        let rt_reader = ReaderBuilder::new()
+            .build(Cursor::new(bytes))
+            .expect("build round-trip reader");
+        let rt_schema = rt_reader.schema();
+        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
+        let round_trip =
+            arrow::compute::concat_batches(&rt_schema, 
&rt_batches).expect("concat round_trip");
+        assert_eq!(
+            round_trip, original,
+            "Round-trip batch mismatch for datapage_v2.snappy.avro"
+        );
+        Ok(())
+    }
+
+    #[test]
+    #[cfg(feature = "snappy")]
+    fn test_single_nan_roundtrip() -> Result<(), ArrowError> {
+        let path = arrow_test_data("avro/single_nan.avro");
+        let in_file = File::open(&path).expect("open avro/single_nan.avro");
+        let reader = ReaderBuilder::new()
+            .build(BufReader::new(in_file))
+            .expect("build reader for single_nan.avro");
+        let in_schema = reader.schema();
+        let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
+        let original =
+            arrow::compute::concat_batches(&in_schema, 
&in_batches).expect("concat input");
+        let mut writer = AvroWriter::new(Vec::<u8>::new(), 
original.schema().as_ref().clone())?;
+        writer.write(&original)?;
+        writer.finish()?;
+        let bytes = writer.into_inner();
+        let rt_reader = ReaderBuilder::new()
+            .build(Cursor::new(bytes))
+            .expect("build round_trip reader");
+        let rt_schema = rt_reader.schema();
+        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
+        let round_trip =
+            arrow::compute::concat_batches(&rt_schema, 
&rt_batches).expect("concat round_trip");
+        assert_eq!(
+            round_trip, original,
+            "Round-trip batch mismatch for avro/single_nan.avro"
+        );
+        Ok(())
+    }
+    #[test]
+    // TODO: avoid requiring snappy for this file
+    #[cfg(feature = "snappy")]
+    fn test_dict_pages_offset_zero_roundtrip() -> Result<(), ArrowError> {
+        let path = arrow_test_data("avro/dict-page-offset-zero.avro");
+        let rdr_file = File::open(&path).expect("open 
avro/dict-page-offset-zero.avro");
+        let reader = ReaderBuilder::new()
+            .build(BufReader::new(rdr_file))
+            .expect("build reader for dict-page-offset-zero.avro");
+        let in_schema = reader.schema();
+        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
+        let original =
+            arrow::compute::concat_batches(&in_schema, 
&input_batches).expect("concat input");
+        let buffer: Vec<u8> = Vec::new();
+        let mut writer = AvroWriter::new(buffer, 
original.schema().as_ref().clone())?;
+        writer.write(&original)?;
+        writer.finish()?;
+        let bytes = writer.into_inner();
+        let rt_reader = ReaderBuilder::new()
+            .build(Cursor::new(bytes))
+            .expect("build reader for round-trip");
+        let rt_schema = rt_reader.schema();
+        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
+        let roundtrip =
+            arrow::compute::concat_batches(&rt_schema, 
&rt_batches).expect("concat roundtrip");
+        assert_eq!(
+            roundtrip, original,
+            "Round-trip batch mismatch for avro/dict-page-offset-zero.avro"
+        );
+        Ok(())
+    }
+
+    #[test]
+    #[cfg(feature = "snappy")]
+    fn test_repeated_no_annotation_roundtrip() -> Result<(), ArrowError> {
+        let path = arrow_test_data("avro/repeated_no_annotation.avro");
+        let in_file = File::open(&path).expect("open 
avro/repeated_no_annotation.avro");
+        let reader = ReaderBuilder::new()
+            .build(BufReader::new(in_file))
+            .expect("build reader for repeated_no_annotation.avro");
+        let in_schema = reader.schema();
+        let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
+        let original =
+            arrow::compute::concat_batches(&in_schema, 
&in_batches).expect("concat input");
+        let mut writer = AvroWriter::new(Vec::<u8>::new(), 
original.schema().as_ref().clone())?;
+        writer.write(&original)?;
+        writer.finish()?;
+        let bytes = writer.into_inner();
+        let rt_reader = ReaderBuilder::new()
+            .build(Cursor::new(bytes))
+            .expect("build reader for round-trip buffer");
+        let rt_schema = rt_reader.schema();
+        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
+        let round_trip =
+            arrow::compute::concat_batches(&rt_schema, 
&rt_batches).expect("concat round-trip");
+        assert_eq!(
+            round_trip, original,
+            "Round-trip batch mismatch for avro/repeated_no_annotation.avro"
+        );
+        Ok(())
+    }
+
+    #[test]
+    fn test_nested_record_type_reuse_roundtrip() -> Result<(), ArrowError> {
+        let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
+            .join("test/data/nested_record_reuse.avro")
+            .to_string_lossy()
+            .into_owned();
+        let in_file = File::open(&path).expect("open 
avro/nested_record_reuse.avro");
+        let reader = ReaderBuilder::new()
+            .build(BufReader::new(in_file))
+            .expect("build reader for nested_record_reuse.avro");
+        let in_schema = reader.schema();
+        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
+        let input =
+            arrow::compute::concat_batches(&in_schema, 
&input_batches).expect("concat input");
+        let mut writer = AvroWriter::new(Vec::<u8>::new(), 
in_schema.as_ref().clone())?;
+        writer.write(&input)?;
+        writer.finish()?;
+        let bytes = writer.into_inner();
+        let rt_reader = ReaderBuilder::new()
+            .build(Cursor::new(bytes))
+            .expect("build round_trip reader");
+        let rt_schema = rt_reader.schema();
+        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
+        let round_trip =
+            arrow::compute::concat_batches(&rt_schema, 
&rt_batches).expect("concat round_trip");
+        assert_eq!(
+            round_trip, input,
+            "Round-trip batch mismatch for nested_record_reuse.avro"
+        );
+        Ok(())
+    }
+
+    #[test]
+    fn test_enum_type_reuse_roundtrip() -> Result<(), ArrowError> {
+        let path =
+            
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test/data/enum_reuse.avro");
+        let rdr_file = std::fs::File::open(&path).expect("open 
test/data/enum_reuse.avro");
+        let reader = ReaderBuilder::new()
+            .build(std::io::BufReader::new(rdr_file))
+            .expect("build reader for enum_reuse.avro");
+        let in_schema = reader.schema();
+        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
+        let original =
+            arrow::compute::concat_batches(&in_schema, 
&input_batches).expect("concat input");
+        let mut writer = AvroWriter::new(Vec::<u8>::new(), 
original.schema().as_ref().clone())?;
+        writer.write(&original)?;
+        writer.finish()?;
+        let bytes = writer.into_inner();
+        let rt_reader = ReaderBuilder::new()
+            .build(std::io::Cursor::new(bytes))
+            .expect("build round_trip reader");
+        let rt_schema = rt_reader.schema();
+        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
+        let round_trip =
+            arrow::compute::concat_batches(&rt_schema, 
&rt_batches).expect("concat round_trip");
+        assert_eq!(
+            round_trip, original,
+            "Avro enum type reuse round-trip mismatch"
+        );
+        Ok(())
+    }
+
+    #[test]
+    fn comprehensive_e2e_test_roundtrip() -> Result<(), ArrowError> {
+        let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
+            .join("test/data/comprehensive_e2e.avro");
+        let rdr_file = File::open(&path).expect("open 
test/data/comprehensive_e2e.avro");
+        let reader = ReaderBuilder::new()
+            .build(BufReader::new(rdr_file))
+            .expect("build reader for comprehensive_e2e.avro");
+        let in_schema = reader.schema();
+        let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
+        let original =
+            arrow::compute::concat_batches(&in_schema, 
&in_batches).expect("concat input");
+        let sink: Vec<u8> = Vec::new();
+        let mut writer = AvroWriter::new(sink, 
original.schema().as_ref().clone())?;
+        writer.write(&original)?;
+        writer.finish()?;
+        let bytes = writer.into_inner();
+        let rt_reader = ReaderBuilder::new()
+            .build(Cursor::new(bytes))
+            .expect("build round-trip reader");
+        let rt_schema = rt_reader.schema();
+        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
+        let roundtrip =
+            arrow::compute::concat_batches(&rt_schema, 
&rt_batches).expect("concat roundtrip");
+        assert_eq!(
+            roundtrip, original,
+            "Round-trip batch mismatch for comprehensive_e2e.avro"
+        );
+        Ok(())
+    }
+
+    #[test]
+    fn test_roundtrip_new_time_encoders_writer() -> Result<(), ArrowError> {
+        let schema = Schema::new(vec![
+            Field::new("d32", DataType::Date32, false),
+            Field::new("t32_ms", DataType::Time32(TimeUnit::Millisecond), 
false),
+            Field::new("t64_us", DataType::Time64(TimeUnit::Microsecond), 
false),
+            Field::new(
+                "ts_ms",
+                DataType::Timestamp(TimeUnit::Millisecond, None),
+                false,
+            ),
+            Field::new(
+                "ts_us",
+                DataType::Timestamp(TimeUnit::Microsecond, None),
+                false,
+            ),
+            Field::new(
+                "ts_ns",
+                DataType::Timestamp(TimeUnit::Nanosecond, None),
+                false,
+            ),
+        ]);
+        let d32 = Date32Array::from(vec![0, 1, -1]);
+        let t32_ms: PrimitiveArray<Time32MillisecondType> =
+            vec![0_i32, 12_345_i32, 86_399_999_i32].into();
+        let t64_us: PrimitiveArray<Time64MicrosecondType> =
+            vec![0_i64, 1_234_567_i64, 86_399_999_999_i64].into();
+        let ts_ms: PrimitiveArray<TimestampMillisecondType> =
+            vec![0_i64, -1_i64, 1_700_000_000_000_i64].into();
+        let ts_us: PrimitiveArray<TimestampMicrosecondType> = vec![0_i64, 
1_i64, -1_i64].into();
+        let ts_ns: PrimitiveArray<TimestampNanosecondType> = vec![0_i64, 
1_i64, -1_i64].into();
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![
+                Arc::new(d32) as ArrayRef,
+                Arc::new(t32_ms) as ArrayRef,
+                Arc::new(t64_us) as ArrayRef,
+                Arc::new(ts_ms) as ArrayRef,
+                Arc::new(ts_us) as ArrayRef,
+                Arc::new(ts_ns) as ArrayRef,
+            ],
+        )?;
+        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
+        writer.write(&batch)?;
+        writer.finish()?;
+        let bytes = writer.into_inner();
+        let rt_reader = ReaderBuilder::new()
+            .build(std::io::Cursor::new(bytes))
+            .expect("build reader for round-trip of new time encoders");
+        let rt_schema = rt_reader.schema();
+        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
+        let roundtrip =
+            arrow::compute::concat_batches(&rt_schema, 
&rt_batches).expect("concat roundtrip");
+        assert_eq!(roundtrip, batch);
+        Ok(())
+    }
 }
diff --git a/arrow-schema/src/error.rs b/arrow-schema/src/error.rs
index e8f367143d..8c113cba86 100644
--- a/arrow-schema/src/error.rs
+++ b/arrow-schema/src/error.rs
@@ -46,6 +46,8 @@ pub enum ArrowError {
     CsvError(String),
     /// Error during JSON-related operations.
     JsonError(String),
+    /// Error during Avro-related operations.
+    AvroError(String),
     /// Error during IO operations.
     IoError(String, std::io::Error),
     /// Error during IPC operations in `arrow-ipc` or `arrow-flight`.
@@ -109,6 +111,7 @@ impl Display for ArrowError {
             ArrowError::ComputeError(desc) => write!(f, "Compute error: 
{desc}"),
             ArrowError::ArithmeticOverflow(desc) => write!(f, "Arithmetic 
overflow: {desc}"),
             ArrowError::DivideByZero => write!(f, "Divide by zero error"),
+            ArrowError::AvroError(desc) => write!(f, "Avro error: {desc}"),
             ArrowError::CsvError(desc) => write!(f, "Csv error: {desc}"),
             ArrowError::JsonError(desc) => write!(f, "Json error: {desc}"),
             ArrowError::IoError(desc, _) => write!(f, "Io error: {desc}"),


Reply via email to