jecsand838 commented on code in PR #9291:
URL: https://github.com/apache/arrow-rs/pull/9291#discussion_r2806581002


##########
arrow-avro/src/writer/mod.rs:
##########
@@ -3252,4 +3260,1631 @@ mod tests {
         assert_eq!(expected_str, actual_str);
         Ok(())
     }
+
+    /// Helper to roundtrip a RecordBatch through OCF writer/reader
+    fn roundtrip_ocf(batch: &RecordBatch) -> Result<RecordBatch, AvroError> {
+        let schema = batch.schema();
+        let mut buffer = Vec::<u8>::new();
+        let mut writer = AvroWriter::new(&mut buffer, 
schema.as_ref().clone())?;
+        writer.write(batch)?;
+        writer.finish()?;
+        drop(writer);
+        let reader = ReaderBuilder::new()
+            .build(Cursor::new(buffer))
+            .expect("build reader for roundtrip OCF");
+        // Get the Avro schema JSON from the OCF header
+        let avro_schema_json = reader
+            .avro_header()
+            .get(SCHEMA_METADATA_KEY)
+            .map(|raw| std::str::from_utf8(raw).expect("valid 
UTF-8").to_string());
+        // Get the Arrow schema and add the Avro schema metadata
+        let arrow_schema = reader.schema();
+        let rt_schema = if let Some(json) = avro_schema_json {
+            let mut metadata = arrow_schema.metadata().clone();
+            metadata.insert(SCHEMA_METADATA_KEY.to_string(), json);
+            Arc::new(Schema::new_with_metadata(
+                arrow_schema.fields().clone(),
+                metadata,
+            ))
+        } else {
+            arrow_schema
+        };
+        let rt_batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, 
_>>()?;
+        Ok(arrow::compute::concat_batches(&rt_schema, 
&rt_batches).expect("concat roundtrip"))
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_int8_custom_types() -> Result<(), AvroError> {
+        use arrow_array::Int8Array;
+
+        let schema = Schema::new(vec![Field::new("val", DataType::Int8, 
true)]);
+        let values: Vec<Option<i8>> = vec![
+            Some(i8::MIN),
+            Some(-1),
+            Some(0),
+            None,
+            Some(1),
+            Some(i8::MAX),
+        ];
+        let array = Int8Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+
+        let roundtrip = roundtrip_ocf(&batch)?;
+
+        // With avro_custom_types: expect exact Int8 type
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Int8,
+            "Expected Int8 type with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int8Array>()
+            .expect("Int8Array");
+        assert_eq!(got, &Int8Array::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_int8_no_custom_widens_to_int32() -> Result<(), 
AvroError> {
+        use arrow_array::Int8Array;
+
+        let schema = Schema::new(vec![Field::new("val", DataType::Int8, 
true)]);
+        let values: Vec<Option<i8>> = vec![
+            Some(i8::MIN),
+            Some(-1),
+            Some(0),
+            None,
+            Some(1),
+            Some(i8::MAX),
+        ];
+        let array = Int8Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+
+        let roundtrip = roundtrip_ocf(&batch)?;
+
+        // Without avro_custom_types: expect Int32 (widened)
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Int32,
+            "Expected Int32 type without avro_custom_types (widened from Int8)"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("Int32Array");
+        let expected: Vec<Option<i32>> = values.iter().map(|v| v.map(|x| x as 
i32)).collect();
+        assert_eq!(got, &Int32Array::from(expected));
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_int16_custom_types() -> Result<(), AvroError> {
+        let schema = Schema::new(vec![Field::new("val", DataType::Int16, 
true)]);
+        let values: Vec<Option<i16>> = vec![
+            Some(i16::MIN),
+            Some(-1),
+            Some(0),
+            None,
+            Some(1),
+            Some(i16::MAX),
+        ];
+        let array = Int16Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+
+        let roundtrip = roundtrip_ocf(&batch)?;
+
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Int16,
+            "Expected Int16 type with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int16Array>()
+            .expect("Int16Array");
+        assert_eq!(got, &Int16Array::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_int16_no_custom_widens_to_int32() -> Result<(), 
AvroError> {
+        use arrow_array::Int16Array;
+
+        let schema = Schema::new(vec![Field::new("val", DataType::Int16, 
true)]);
+        let values: Vec<Option<i16>> = vec![
+            Some(i16::MIN),
+            Some(-1),
+            Some(0),
+            None,
+            Some(1),
+            Some(i16::MAX),
+        ];
+        let array = Int16Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+
+        let roundtrip = roundtrip_ocf(&batch)?;
+
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Int32,
+            "Expected Int32 type without avro_custom_types (widened from 
Int16)"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("Int32Array");
+        let expected: Vec<Option<i32>> = values.iter().map(|v| v.map(|x| x as 
i32)).collect();
+        assert_eq!(got, &Int32Array::from(expected));
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_uint8_custom_types() -> Result<(), AvroError> {
+        use arrow_array::UInt8Array;
+
+        let schema = Schema::new(vec![Field::new("val", DataType::UInt8, 
true)]);
+        let values: Vec<Option<u8>> = vec![Some(0), Some(1), None, Some(127), 
Some(u8::MAX)];
+        let array = UInt8Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+
+        let roundtrip = roundtrip_ocf(&batch)?;
+
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::UInt8,
+            "Expected UInt8 type with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<UInt8Array>()
+            .expect("UInt8Array");
+        assert_eq!(got, &UInt8Array::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_uint8_no_custom_widens_to_int32() -> Result<(), 
AvroError> {
+        use arrow_array::UInt8Array;
+
+        let schema = Schema::new(vec![Field::new("val", DataType::UInt8, 
true)]);
+        let values: Vec<Option<u8>> = vec![Some(0), Some(1), None, Some(127), 
Some(u8::MAX)];
+        let array = UInt8Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+
+        let roundtrip = roundtrip_ocf(&batch)?;
+
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Int32,
+            "Expected Int32 type without avro_custom_types (widened from 
UInt8)"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("Int32Array");
+        let expected: Vec<Option<i32>> = values.iter().map(|v| v.map(|x| x as 
i32)).collect();
+        assert_eq!(got, &Int32Array::from(expected));
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_uint16_custom_types() -> Result<(), AvroError> {
+        use arrow_array::UInt16Array;
+
+        let schema = Schema::new(vec![Field::new("val", DataType::UInt16, 
true)]);
+        let values: Vec<Option<u16>> = vec![Some(0), Some(1), None, 
Some(32767), Some(u16::MAX)];
+        let array = UInt16Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+
+        let roundtrip = roundtrip_ocf(&batch)?;
+
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::UInt16,
+            "Expected UInt16 type with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<UInt16Array>()
+            .expect("UInt16Array");
+        assert_eq!(got, &UInt16Array::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_uint16_no_custom_widens_to_int32() -> Result<(), 
AvroError> {
+        use arrow_array::UInt16Array;
+
+        let schema = Schema::new(vec![Field::new("val", DataType::UInt16, 
true)]);
+        let values: Vec<Option<u16>> = vec![Some(0), Some(1), None, 
Some(32767), Some(u16::MAX)];
+        let array = UInt16Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+
+        let roundtrip = roundtrip_ocf(&batch)?;
+
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Int32,
+            "Expected Int32 type without avro_custom_types (widened from 
UInt16)"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("Int32Array");
+        let expected: Vec<Option<i32>> = values.iter().map(|v| v.map(|x| x as 
i32)).collect();
+        assert_eq!(got, &Int32Array::from(expected));
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_uint32_custom_types() -> Result<(), AvroError> {
+        use arrow_array::UInt32Array;
+
+        let schema = Schema::new(vec![Field::new("val", DataType::UInt32, 
true)]);
+        let values: Vec<Option<u32>> = vec![
+            Some(0),
+            Some(1),
+            None,
+            Some(i32::MAX as u32),
+            Some(u32::MAX),
+        ];
+        let array = UInt32Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+
+        let roundtrip = roundtrip_ocf(&batch)?;
+
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::UInt32,
+            "Expected UInt32 type with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<UInt32Array>()
+            .expect("UInt32Array");
+        assert_eq!(got, &UInt32Array::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_uint32_no_custom_widens_to_int64() -> Result<(), 
AvroError> {
+        use arrow_array::UInt32Array;
+
+        let schema = Schema::new(vec![Field::new("val", DataType::UInt32, 
true)]);
+        let values: Vec<Option<u32>> = vec![
+            Some(0),
+            Some(1),
+            None,
+            Some(i32::MAX as u32),
+            Some(u32::MAX),
+        ];
+        let array = UInt32Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+
+        let roundtrip = roundtrip_ocf(&batch)?;
+
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Int64,
+            "Expected Int64 type without avro_custom_types (widened from 
UInt32)"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .expect("Int64Array");
+        let expected: Vec<Option<i64>> = values.iter().map(|v| v.map(|x| x as 
i64)).collect();
+        assert_eq!(got, &Int64Array::from(expected));
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_uint64_custom_types() -> Result<(), AvroError> {
+        use arrow_array::UInt64Array;
+
+        let schema = Schema::new(vec![Field::new("val", DataType::UInt64, 
true)]);
+        // Include values > i64::MAX to test full u64 range
+        let values: Vec<Option<u64>> = vec![
+            Some(0),
+            Some(1),
+            None,
+            Some(i64::MAX as u64),
+            Some(u64::MAX),
+        ];
+        let array = UInt64Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+
+        let roundtrip = roundtrip_ocf(&batch)?;
+
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::UInt64,
+            "Expected UInt64 type with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<UInt64Array>()
+            .expect("UInt64Array");
+        assert_eq!(got, &UInt64Array::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_uint64_no_custom_widens_to_int64() -> Result<(), 
AvroError> {
+        use arrow_array::UInt64Array;
+        let schema = Schema::new(vec![Field::new("val", DataType::UInt64, 
true)]);
+        let values: Vec<Option<u64>> = vec![Some(0), Some(1), None, 
Some(i64::MAX as u64)];
+        let array = UInt64Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Int64,
+            "Expected Int64 type without avro_custom_types (widened from 
UInt64)"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .expect("Int64Array");
+        let expected: Vec<Option<i64>> = values.iter().map(|v| v.map(|x| x as 
i64)).collect();
+        assert_eq!(got, &Int64Array::from(expected));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_uint64_overflow_errors_without_custom() {
+        use arrow_array::UInt64Array;
+        let schema = Schema::new(vec![Field::new("val", DataType::UInt64, 
false)]);
+        let values: Vec<u64> = vec![u64::MAX];
+        let array = UInt64Array::from(values);
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])
+            .expect("create batch");
+        let result = roundtrip_ocf(&batch);
+        assert!(
+            result.is_err(),
+            "Expected error when encoding UInt64 > i64::MAX without 
avro_custom_types"
+        );
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_float16_custom_types() -> Result<(), AvroError> {
+        use arrow_array::Float16Array;
+        use half::f16;
+        let schema = Schema::new(vec![Field::new("val", DataType::Float16, 
true)]);
+        let values: Vec<Option<f16>> = vec![
+            Some(f16::ZERO),
+            Some(f16::ONE),
+            None,
+            Some(f16::NEG_ONE),
+            Some(f16::MAX),
+            Some(f16::MIN),
+        ];
+        let array = Float16Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Float16,
+            "Expected Float16 type with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Float16Array>()
+            .expect("Float16Array");
+        assert_eq!(got, &Float16Array::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_float16_no_custom_widens_to_float32() -> Result<(), 
AvroError> {
+        use arrow_array::{Float16Array, Float32Array};
+        use half::f16;
+        let schema = Schema::new(vec![Field::new("val", DataType::Float16, 
true)]);
+        let values: Vec<Option<f16>> =
+            vec![Some(f16::ZERO), Some(f16::ONE), None, Some(f16::NEG_ONE)];
+        let array = Float16Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Float32,
+            "Expected Float32 type without avro_custom_types (widened from 
Float16)"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Float32Array>()
+            .expect("Float32Array");
+        let expected: Vec<Option<f32>> = values.iter().map(|v| v.map(|x| 
x.to_f32())).collect();
+        assert_eq!(got, &Float32Array::from(expected));
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_date64_custom_types() -> Result<(), AvroError> {
+        use arrow_array::Date64Array;
+        let schema = Schema::new(vec![Field::new("val", DataType::Date64, 
true)]);
+        // Date64 is milliseconds since epoch
+        let values: Vec<Option<i64>> = vec![
+            Some(0),
+            Some(86_400_000), // 1 day
+            None,
+            Some(1_609_459_200_000), // 2021-01-01
+        ];
+        let array = Date64Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Date64,
+            "Expected Date64 type with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Date64Array>()
+            .expect("Date64Array");
+        assert_eq!(got, &Date64Array::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_date64_no_custom_as_timestamp_millis() -> Result<(), 
AvroError> {
+        use arrow_array::{Date64Array, TimestampMillisecondArray};
+        let schema = Schema::new(vec![Field::new("val", DataType::Date64, 
true)]);
+        let values: Vec<Option<i64>> =
+            vec![Some(0), Some(86_400_000), None, Some(1_609_459_200_000)];
+        let array = Date64Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        // Date64 without custom types maps to local-timestamp-millis which 
reads as Timestamp(Millisecond, None)
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Timestamp(TimeUnit::Millisecond, None),
+            "Expected Timestamp(Millisecond, None) without avro_custom_types"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<TimestampMillisecondArray>()
+            .expect("TimestampMillisecondArray");
+        // Values should be identical (both are i64 millis)
+        assert_eq!(got, &TimestampMillisecondArray::from(values));
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_time64_nanosecond_custom_types() -> Result<(), 
AvroError> {
+        use arrow_array::Time64NanosecondArray;
+        let schema = Schema::new(vec![Field::new(
+            "val",
+            DataType::Time64(TimeUnit::Nanosecond),
+            true,
+        )]);
+        let values: Vec<Option<i64>> = vec![
+            Some(0),
+            Some(1_000_000_000), // 1 second in nanos
+            None,
+            Some(86_399_999_999_999), // 23:59:59.999999999
+        ];
+        let array = Time64NanosecondArray::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Time64(TimeUnit::Nanosecond),
+            "Expected Time64(Nanosecond) with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Time64NanosecondArray>()
+            .expect("Time64NanosecondArray");
+        assert_eq!(got, &Time64NanosecondArray::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_time64_nanos_no_custom_truncates_to_micros() -> 
Result<(), AvroError> {
+        use arrow_array::{Time64MicrosecondArray, Time64NanosecondArray};
+
+        let schema = Schema::new(vec![Field::new(
+            "val",
+            DataType::Time64(TimeUnit::Nanosecond),
+            true,
+        )]);
+        // Use values evenly divisible by 1000 to avoid truncation issues
+        let values: Vec<Option<i64>> = vec![
+            Some(0),
+            Some(1_000_000_000), // 1 second
+            None,
+            Some(86_399_999_000_000), // 23:59:59.999000
+        ];
+        let array = Time64NanosecondArray::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Time64(TimeUnit::Microsecond),
+            "Expected Time64(Microsecond) without avro_custom_types (truncated 
from nanoseconds)"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Time64MicrosecondArray>()
+            .expect("Time64MicrosecondArray");
+        let expected: Vec<Option<i64>> = values.iter().map(|v| v.map(|x| x / 
1000)).collect();
+        assert_eq!(got, &Time64MicrosecondArray::from(expected));
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_time32_second_custom_types() -> Result<(), AvroError> {
+        use arrow_array::Time32SecondArray;
+        let schema = Schema::new(vec![Field::new(
+            "val",
+            DataType::Time32(TimeUnit::Second),
+            true,
+        )]);
+        let values: Vec<Option<i32>> = vec![
+            Some(0),
+            Some(3600), // 1 hour
+            None,
+            Some(86399), // 23:59:59
+        ];
+        let array = Time32SecondArray::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Time32(TimeUnit::Second),
+            "Expected Time32(Second) with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Time32SecondArray>()
+            .expect("Time32SecondArray");
+        assert_eq!(got, &Time32SecondArray::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_time32_second_no_custom_scales_to_millis() -> Result<(), 
AvroError> {
+        use arrow_array::{Time32MillisecondArray, Time32SecondArray};
+        let schema = Schema::new(vec![Field::new(
+            "val",
+            DataType::Time32(TimeUnit::Second),
+            true,
+        )]);
+        let values: Vec<Option<i32>> = vec![Some(0), Some(3600), None, 
Some(86399)];
+        let array = Time32SecondArray::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Time32(TimeUnit::Millisecond),
+            "Expected Time32(Millisecond) without avro_custom_types (scaled 
from seconds)"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Time32MillisecondArray>()
+            .expect("Time32MillisecondArray");
+        let expected: Vec<Option<i32>> = values.iter().map(|v| v.map(|x| x * 
1000)).collect();
+        assert_eq!(got, &Time32MillisecondArray::from(expected));
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_timestamp_second_custom_types() -> Result<(), AvroError> 
{
+        use arrow_array::TimestampSecondArray;
+        let schema = Schema::new(vec![Field::new(
+            "val",
+            DataType::Timestamp(TimeUnit::Second, Some("+00:00".into())),
+            true,
+        )]);
+        let values: Vec<Option<i64>> = vec![
+            Some(0),
+            Some(1609459200), // 2021-01-01 00:00:00 UTC
+            None,
+            Some(1735689600), // 2025-01-01 00:00:00 UTC
+        ];
+        let array = 
TimestampSecondArray::from(values.clone()).with_timezone("+00:00");
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Timestamp(TimeUnit::Second, Some("+00:00".into())),
+            "Expected Timestamp(Second, UTC) with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<TimestampSecondArray>()
+            .expect("TimestampSecondArray");
+        let expected = 
TimestampSecondArray::from(values).with_timezone("+00:00");
+        assert_eq!(got, &expected);
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_timestamp_second_no_custom_scales_to_millis() -> 
Result<(), AvroError> {
+        use arrow_array::{TimestampMillisecondArray, TimestampSecondArray};
+        let schema = Schema::new(vec![Field::new(
+            "val",
+            DataType::Timestamp(TimeUnit::Second, Some("+00:00".into())),
+            true,
+        )]);
+        let values: Vec<Option<i64>> = vec![Some(0), Some(1609459200), None, 
Some(1735689600)];
+        let array = 
TimestampSecondArray::from(values.clone()).with_timezone("+00:00");
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
+            "Expected Timestamp(Millisecond, UTC) without avro_custom_types 
(scaled from seconds)"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<TimestampMillisecondArray>()
+            .expect("TimestampMillisecondArray");
+        let expected: Vec<Option<i64>> = values.iter().map(|v| v.map(|x| x * 
1000)).collect();
+        let expected_array = 
TimestampMillisecondArray::from(expected).with_timezone("+00:00");
+        assert_eq!(got, &expected_array);
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_interval_year_month_custom_types() -> Result<(), 
AvroError> {
+        use arrow_array::IntervalYearMonthArray;
+        let schema = Schema::new(vec![Field::new(
+            "val",
+            DataType::Interval(IntervalUnit::YearMonth),
+            true,
+        )]);
+        let values: Vec<Option<i32>> = vec![
+            Some(0),
+            Some(12), // 1 year
+            None,
+            Some(-6), // -6 months
+            Some(25), // 2 years 1 month
+        ];
+        let array = IntervalYearMonthArray::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Interval(IntervalUnit::YearMonth),
+            "Expected Interval(YearMonth) with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<IntervalYearMonthArray>()
+            .expect("IntervalYearMonthArray");
+        assert_eq!(got, &IntervalYearMonthArray::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_interval_year_month_no_custom() -> Result<(), AvroError> 
{
+        use arrow_array::{IntervalMonthDayNanoArray, IntervalYearMonthArray};
+        let schema = Schema::new(vec![Field::new(
+            "val",
+            DataType::Interval(IntervalUnit::YearMonth),
+            true,
+        )]);
+        // Only non-negative values for standard Avro duration
+        let values: Vec<Option<i32>> = vec![Some(0), Some(12), None, Some(25)];
+        let array = IntervalYearMonthArray::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        // Without custom types, reads as Interval(MonthDayNano) via Avro 
duration
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Interval(IntervalUnit::MonthDayNano),
+            "Expected Interval(MonthDayNano) without avro_custom_types"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<IntervalMonthDayNanoArray>()
+            .expect("IntervalMonthDayNanoArray");
+        // Convert expected values to MonthDayNano format
+        let expected: Vec<Option<IntervalMonthDayNano>> = values
+            .iter()
+            .map(|v| v.map(|months| IntervalMonthDayNano::new(months, 0, 0)))
+            .collect();
+        assert_eq!(got, &IntervalMonthDayNanoArray::from(expected));
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_interval_day_time_custom_types() -> Result<(), 
AvroError> {
+        use arrow_array::IntervalDayTimeArray;
+        use arrow_buffer::IntervalDayTime;
+        let schema = Schema::new(vec![Field::new(
+            "val",
+            DataType::Interval(IntervalUnit::DayTime),
+            true,
+        )]);
+        let values: Vec<Option<IntervalDayTime>> = vec![
+            Some(IntervalDayTime::new(0, 0)),
+            Some(IntervalDayTime::new(1, 1000)), // 1 day, 1 second
+            None,
+            Some(IntervalDayTime::new(30, 3600000)), // 30 days, 1 hour
+        ];
+        let array = IntervalDayTimeArray::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Interval(IntervalUnit::DayTime),
+            "Expected Interval(DayTime) with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<IntervalDayTimeArray>()
+            .expect("IntervalDayTimeArray");
+        assert_eq!(got, &IntervalDayTimeArray::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_interval_day_time_no_custom() -> Result<(), AvroError> {
+        use arrow_array::{IntervalDayTimeArray, IntervalMonthDayNanoArray};
+        use arrow_buffer::IntervalDayTime;
+        let schema = Schema::new(vec![Field::new(
+            "val",
+            DataType::Interval(IntervalUnit::DayTime),
+            true,
+        )]);
+        let values: Vec<Option<IntervalDayTime>> = vec![
+            Some(IntervalDayTime::new(0, 0)),
+            Some(IntervalDayTime::new(1, 1000)),
+            None,
+            Some(IntervalDayTime::new(30, 3600000)),
+        ];
+        let array = IntervalDayTimeArray::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Interval(IntervalUnit::MonthDayNano),
+            "Expected Interval(MonthDayNano) without avro_custom_types"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<IntervalMonthDayNanoArray>()
+            .expect("IntervalMonthDayNanoArray");
+        let expected: Vec<Option<IntervalMonthDayNano>> = values
+            .iter()
+            .map(|v| {
+                v.map(|dt| {
+                    let nanos = (dt.milliseconds as i64) * 1_000_000;
+                    IntervalMonthDayNano::new(0, dt.days, nanos)
+                })
+            })
+            .collect();
+        assert_eq!(got, &IntervalMonthDayNanoArray::from(expected));
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_interval_month_day_nano_custom_types() -> Result<(), 
AvroError> {
+        use arrow_array::IntervalMonthDayNanoArray;
+        use arrow_buffer::IntervalMonthDayNano;
+        let schema = Schema::new(vec![Field::new(
+            "val",
+            DataType::Interval(IntervalUnit::MonthDayNano),
+            true,
+        )]);
+        let values: Vec<Option<IntervalMonthDayNano>> = vec![
+            Some(IntervalMonthDayNano::new(0, 0, 0)),
+            Some(IntervalMonthDayNano::new(1, 2, 3)), // sub-millisecond nanos 
ok
+            None,
+            Some(IntervalMonthDayNano::new(-4, -5, -6)),
+        ];
+        let array = IntervalMonthDayNanoArray::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Interval(IntervalUnit::MonthDayNano),
+            "Expected Interval(MonthDayNano) with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<IntervalMonthDayNanoArray>()
+            .expect("IntervalMonthDayNanoArray");
+        assert_eq!(got, &IntervalMonthDayNanoArray::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_interval_month_day_nano_no_custom() -> Result<(), 
AvroError> {
+        use arrow_array::IntervalMonthDayNanoArray;
+        use arrow_buffer::IntervalMonthDayNano;
+        let schema = Schema::new(vec![Field::new(
+            "val",
+            DataType::Interval(IntervalUnit::MonthDayNano),
+            true,
+        )]);
+        // Only representable values for Avro duration: non-negative and whole 
milliseconds
+        let values: Vec<Option<IntervalMonthDayNano>> = vec![
+            Some(IntervalMonthDayNano::new(0, 0, 0)),
+            Some(IntervalMonthDayNano::new(1, 2, 3_000_000)),
+            None,
+            Some(IntervalMonthDayNano::new(4, 5, 6_000_000)),
+        ];
+        let array = IntervalMonthDayNanoArray::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Interval(IntervalUnit::MonthDayNano),
+            "Expected Interval(MonthDayNano) without avro_custom_types"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<IntervalMonthDayNanoArray>()
+            .expect("IntervalMonthDayNanoArray");
+        assert_eq!(got, &IntervalMonthDayNanoArray::from(values));
+        Ok(())
+    }
+
+    fn schemas_equal_ignoring_metadata(left: &Schema, right: &Schema) -> bool {
+        if left.fields().len() != right.fields().len() {
+            return false;
+        }
+        for (l, r) in left.fields().iter().zip(right.fields().iter()) {
+            if l.name() != r.name()
+                || l.data_type() != r.data_type()
+                || l.is_nullable() != r.is_nullable()
+            {
+                return false;
+            }
+        }
+        true
+    }
+
+    fn avro_field_type<'a>(avro_schema: &'a Value, name: &str) -> &'a Value {
+        let fields = avro_schema
+            .get("fields")
+            .and_then(|v| v.as_array())
+            .expect("avro schema has 'fields' array");
+        fields
+            .iter()
+            .find(|f| f.get("name").and_then(|n| n.as_str()) == Some(name))
+            .unwrap_or_else(|| panic!("avro schema missing field '{name}'"))
+            .get("type")
+            .expect("field has 'type'")
+    }
+
+    #[test]
+    fn e2e_types_and_schema_alignment() -> Result<(), AvroError> {
+        // Values are chosen to:
+        // - exercise full UInt64 range when `avro_custom_types` is enabled
+        // - exercise negative / sub-millisecond intervals when 
`avro_custom_types` is enabled
+        // - remain representable under standard Avro logical types when 
`avro_custom_types` is disabled
+        let i8_values: Vec<Option<i8>> = vec![Some(i8::MIN), Some(-1), 
Some(i8::MAX)];
+        let i16_values: Vec<Option<i16>> = vec![Some(i16::MIN), Some(-1), 
Some(i16::MAX)];
+        let u8_values: Vec<Option<u8>> = vec![Some(0), Some(1), Some(u8::MAX)];
+        let u16_values: Vec<Option<u16>> = vec![Some(0), Some(1), 
Some(u16::MAX)];
+        let u32_values: Vec<Option<u32>> = vec![Some(0), Some(1), 
Some(u32::MAX)];
+        let u64_values: Vec<Option<u64>> = if cfg!(feature = 
"avro_custom_types") {
+            vec![Some(0), Some(i64::MAX as u64), Some((i64::MAX as u64) + 1)]
+        } else {
+            // Must remain <= i64::MAX when `avro_custom_types` is disabled
+            vec![Some(0), Some((i64::MAX as u64) - 1), Some(i64::MAX as u64)]
+        };
+        let f16_values: Vec<Option<f16>> = vec![
+            Some(f16::from_f32(1.5)),
+            Some(f16::from_f32(-2.0)),
+            Some(f16::from_f32(0.0)),
+        ];
+        let date64_values: Vec<Option<i64>> = vec![Some(-86_400_000), Some(0), 
Some(86_400_000)];
+        let time32s_values: Vec<Option<i32>> = vec![Some(0), Some(1), 
Some(86_399)];
+        let time64ns_values: Vec<Option<i64>> = vec![
+            Some(0),
+            Some(1_234_567_890), // truncation case for no-custom (nanos -> 
micros)
+            Some(86_399_000_000_123_i64), // near end-of-day, also truncation
+        ];
+        let ts_s_local_values: Vec<Option<i64>> = vec![Some(-1), Some(0), 
Some(1)];
+        let ts_s_utc_values: Vec<Option<i64>> = vec![Some(1), Some(2), 
Some(3)];
+        let iv_ym_values: Vec<Option<i32>> = if cfg!(feature = 
"avro_custom_types") {
+            vec![Some(0), Some(-6), Some(25)]
+        } else {
+            // Avro duration cannot represent negative months without custom 
types
+            vec![Some(0), Some(12), Some(25)]
+        };
+        let iv_dt_values: Vec<Option<IntervalDayTime>> = if cfg!(feature = 
"avro_custom_types") {
+            vec![
+                Some(IntervalDayTime::new(0, 0)),
+                Some(IntervalDayTime::new(1, 1000)),
+                Some(IntervalDayTime::new(-1, -1000)),
+            ]
+        } else {
+            // Avro duration cannot represent negative day-time without custom 
types
+            vec![
+                Some(IntervalDayTime::new(0, 0)),
+                Some(IntervalDayTime::new(1, 1000)),
+                Some(IntervalDayTime::new(30, 3_600_000)),
+            ]
+        };
+        let iv_mdn_values: Vec<Option<IntervalMonthDayNano>> =
+            if cfg!(feature = "avro_custom_types") {
+                vec![
+                    Some(IntervalMonthDayNano::new(0, 0, 0)),
+                    Some(IntervalMonthDayNano::new(1, 2, 3)), // 
sub-millisecond
+                    Some(IntervalMonthDayNano::new(-1, -2, -3)), // negative
+                ]
+            } else {
+                // Avro duration requires non-negative and whole milliseconds
+                vec![
+                    Some(IntervalMonthDayNano::new(0, 0, 0)),
+                    Some(IntervalMonthDayNano::new(1, 2, 3_000_000)), // 3ms
+                    Some(IntervalMonthDayNano::new(10, 20, 30_000_000_000)), 
// 30s
+                ]
+            };
+        // Build a batch containing all impacted types from issue #9290
+        let schema = Schema::new(vec![
+            Field::new("i8", DataType::Int8, false),
+            Field::new("i16", DataType::Int16, false),
+            Field::new("u8", DataType::UInt8, false),
+            Field::new("u16", DataType::UInt16, false),
+            Field::new("u32", DataType::UInt32, false),
+            Field::new("u64", DataType::UInt64, false),
+            Field::new("f16", DataType::Float16, false),
+            Field::new("date64", DataType::Date64, false),
+            Field::new("time32s", DataType::Time32(TimeUnit::Second), false),
+            Field::new("time64ns", DataType::Time64(TimeUnit::Nanosecond), 
false),
+            Field::new(
+                "ts_s_local",
+                DataType::Timestamp(TimeUnit::Second, None),
+                false,
+            ),
+            Field::new(
+                "ts_s_utc",
+                DataType::Timestamp(TimeUnit::Second, Some("+00:00".into())),
+                false,
+            ),
+            Field::new("iv_ym", DataType::Interval(IntervalUnit::YearMonth), 
false),
+            Field::new("iv_dt", DataType::Interval(IntervalUnit::DayTime), 
false),
+            Field::new(
+                "iv_mdn",
+                DataType::Interval(IntervalUnit::MonthDayNano),
+                false,
+            ),
+        ]);
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![
+                Arc::new(Int8Array::from(i8_values.clone())) as ArrayRef,
+                Arc::new(Int16Array::from(i16_values.clone())) as ArrayRef,
+                Arc::new(UInt8Array::from(u8_values.clone())) as ArrayRef,
+                Arc::new(UInt16Array::from(u16_values.clone())) as ArrayRef,
+                Arc::new(UInt32Array::from(u32_values.clone())) as ArrayRef,
+                Arc::new(UInt64Array::from(u64_values.clone())) as ArrayRef,
+                Arc::new(Float16Array::from(f16_values.clone())) as ArrayRef,
+                Arc::new(Date64Array::from(date64_values.clone())) as ArrayRef,
+                Arc::new(Time32SecondArray::from(time32s_values.clone())) as 
ArrayRef,
+                Arc::new(Time64NanosecondArray::from(time64ns_values.clone())) 
as ArrayRef,
+                
Arc::new(TimestampSecondArray::from(ts_s_local_values.clone())) as ArrayRef,
+                Arc::new(
+                    
TimestampSecondArray::from(ts_s_utc_values.clone()).with_timezone("+00:00"),
+                ) as ArrayRef,
+                Arc::new(IntervalYearMonthArray::from(iv_ym_values.clone())) 
as ArrayRef,
+                Arc::new(IntervalDayTimeArray::from(iv_dt_values.clone())) as 
ArrayRef,
+                
Arc::new(IntervalMonthDayNanoArray::from(iv_mdn_values.clone())) as ArrayRef,
+            ],
+        )?;
+        let rt = roundtrip_ocf(&batch)?;
+        let rt_schema = rt.schema();
+        let avro_schema_json = rt_schema
+            .metadata()
+            .get(SCHEMA_METADATA_KEY)
+            .expect("avro.schema missing in round-tripped batch metadata");
+        let avro_schema: Value =
+            serde_json::from_str(avro_schema_json).expect("valid avro schema 
json");
+        let rt_arrow_schema = rt.schema();
+        if cfg!(feature = "avro_custom_types") {
+            assert!(
+                schemas_equal_ignoring_metadata(rt_arrow_schema.as_ref(), 
&schema),
+                "Schema fields mismatch.\nExpected: {:?}\nGot: {:?}",
+                schema,
+                rt_arrow_schema
+            );
+            for field_name in ["u64", "f16", "iv_ym", "iv_dt", "iv_mdn"] {
+                let field = rt_arrow_schema
+                    .field_with_name(field_name)
+                    .expect("field exists");
+                assert!(
+                    field.metadata().get(AVRO_NAME_METADATA_KEY).is_some(),
+                    "Field '{}' should have avro.name metadata",
+                    field_name
+                );
+            }
+        } else {
+            let exp_schema = Schema::new(vec![

Review Comment:
   it's not the same. One is the expected schema without `avro_custom_types` 
set and the other is with it set. I left a comment to help explain this.



##########
arrow-avro/src/writer/mod.rs:
##########
@@ -3252,4 +3260,1631 @@ mod tests {
         assert_eq!(expected_str, actual_str);
         Ok(())
     }
+
+    /// Helper to roundtrip a RecordBatch through OCF writer/reader
+    fn roundtrip_ocf(batch: &RecordBatch) -> Result<RecordBatch, AvroError> {
+        let schema = batch.schema();
+        let mut buffer = Vec::<u8>::new();
+        let mut writer = AvroWriter::new(&mut buffer, 
schema.as_ref().clone())?;
+        writer.write(batch)?;
+        writer.finish()?;
+        drop(writer);
+        let reader = ReaderBuilder::new()
+            .build(Cursor::new(buffer))
+            .expect("build reader for roundtrip OCF");
+        // Get the Avro schema JSON from the OCF header
+        let avro_schema_json = reader
+            .avro_header()
+            .get(SCHEMA_METADATA_KEY)
+            .map(|raw| std::str::from_utf8(raw).expect("valid 
UTF-8").to_string());
+        // Get the Arrow schema and add the Avro schema metadata
+        let arrow_schema = reader.schema();
+        let rt_schema = if let Some(json) = avro_schema_json {
+            let mut metadata = arrow_schema.metadata().clone();
+            metadata.insert(SCHEMA_METADATA_KEY.to_string(), json);
+            Arc::new(Schema::new_with_metadata(
+                arrow_schema.fields().clone(),
+                metadata,
+            ))
+        } else {
+            arrow_schema
+        };
+        let rt_batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, 
_>>()?;
+        Ok(arrow::compute::concat_batches(&rt_schema, 
&rt_batches).expect("concat roundtrip"))
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_int8_custom_types() -> Result<(), AvroError> {
+        use arrow_array::Int8Array;
+
+        let schema = Schema::new(vec![Field::new("val", DataType::Int8, 
true)]);
+        let values: Vec<Option<i8>> = vec![
+            Some(i8::MIN),
+            Some(-1),
+            Some(0),
+            None,
+            Some(1),
+            Some(i8::MAX),
+        ];
+        let array = Int8Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+
+        let roundtrip = roundtrip_ocf(&batch)?;
+
+        // With avro_custom_types: expect exact Int8 type
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Int8,
+            "Expected Int8 type with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int8Array>()
+            .expect("Int8Array");
+        assert_eq!(got, &Int8Array::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_int8_no_custom_widens_to_int32() -> Result<(), 
AvroError> {
+        use arrow_array::Int8Array;
+
+        let schema = Schema::new(vec![Field::new("val", DataType::Int8, 
true)]);
+        let values: Vec<Option<i8>> = vec![
+            Some(i8::MIN),
+            Some(-1),
+            Some(0),
+            None,
+            Some(1),
+            Some(i8::MAX),
+        ];
+        let array = Int8Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+
+        let roundtrip = roundtrip_ocf(&batch)?;
+
+        // Without avro_custom_types: expect Int32 (widened)
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Int32,
+            "Expected Int32 type without avro_custom_types (widened from Int8)"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("Int32Array");
+        let expected: Vec<Option<i32>> = values.iter().map(|v| v.map(|x| x as 
i32)).collect();
+        assert_eq!(got, &Int32Array::from(expected));
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_int16_custom_types() -> Result<(), AvroError> {
+        let schema = Schema::new(vec![Field::new("val", DataType::Int16, 
true)]);
+        let values: Vec<Option<i16>> = vec![
+            Some(i16::MIN),
+            Some(-1),
+            Some(0),
+            None,
+            Some(1),
+            Some(i16::MAX),
+        ];
+        let array = Int16Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+
+        let roundtrip = roundtrip_ocf(&batch)?;
+
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Int16,
+            "Expected Int16 type with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int16Array>()
+            .expect("Int16Array");
+        assert_eq!(got, &Int16Array::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_int16_no_custom_widens_to_int32() -> Result<(), 
AvroError> {
+        use arrow_array::Int16Array;
+
+        let schema = Schema::new(vec![Field::new("val", DataType::Int16, 
true)]);
+        let values: Vec<Option<i16>> = vec![
+            Some(i16::MIN),
+            Some(-1),
+            Some(0),
+            None,
+            Some(1),
+            Some(i16::MAX),
+        ];
+        let array = Int16Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+
+        let roundtrip = roundtrip_ocf(&batch)?;
+
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Int32,
+            "Expected Int32 type without avro_custom_types (widened from 
Int16)"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("Int32Array");
+        let expected: Vec<Option<i32>> = values.iter().map(|v| v.map(|x| x as 
i32)).collect();
+        assert_eq!(got, &Int32Array::from(expected));
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_uint8_custom_types() -> Result<(), AvroError> {
+        use arrow_array::UInt8Array;
+
+        let schema = Schema::new(vec![Field::new("val", DataType::UInt8, 
true)]);
+        let values: Vec<Option<u8>> = vec![Some(0), Some(1), None, Some(127), 
Some(u8::MAX)];
+        let array = UInt8Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+
+        let roundtrip = roundtrip_ocf(&batch)?;
+
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::UInt8,
+            "Expected UInt8 type with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<UInt8Array>()
+            .expect("UInt8Array");
+        assert_eq!(got, &UInt8Array::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_uint8_no_custom_widens_to_int32() -> Result<(), 
AvroError> {
+        use arrow_array::UInt8Array;
+
+        let schema = Schema::new(vec![Field::new("val", DataType::UInt8, 
true)]);
+        let values: Vec<Option<u8>> = vec![Some(0), Some(1), None, Some(127), 
Some(u8::MAX)];
+        let array = UInt8Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+
+        let roundtrip = roundtrip_ocf(&batch)?;
+
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Int32,
+            "Expected Int32 type without avro_custom_types (widened from 
UInt8)"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("Int32Array");
+        let expected: Vec<Option<i32>> = values.iter().map(|v| v.map(|x| x as 
i32)).collect();
+        assert_eq!(got, &Int32Array::from(expected));
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_uint16_custom_types() -> Result<(), AvroError> {
+        use arrow_array::UInt16Array;
+
+        let schema = Schema::new(vec![Field::new("val", DataType::UInt16, 
true)]);
+        let values: Vec<Option<u16>> = vec![Some(0), Some(1), None, 
Some(32767), Some(u16::MAX)];
+        let array = UInt16Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+
+        let roundtrip = roundtrip_ocf(&batch)?;
+
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::UInt16,
+            "Expected UInt16 type with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<UInt16Array>()
+            .expect("UInt16Array");
+        assert_eq!(got, &UInt16Array::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_uint16_no_custom_widens_to_int32() -> Result<(), 
AvroError> {
+        use arrow_array::UInt16Array;
+
+        let schema = Schema::new(vec![Field::new("val", DataType::UInt16, 
true)]);
+        let values: Vec<Option<u16>> = vec![Some(0), Some(1), None, 
Some(32767), Some(u16::MAX)];
+        let array = UInt16Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+
+        let roundtrip = roundtrip_ocf(&batch)?;
+
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Int32,
+            "Expected Int32 type without avro_custom_types (widened from 
UInt16)"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("Int32Array");
+        let expected: Vec<Option<i32>> = values.iter().map(|v| v.map(|x| x as 
i32)).collect();
+        assert_eq!(got, &Int32Array::from(expected));
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_uint32_custom_types() -> Result<(), AvroError> {
+        use arrow_array::UInt32Array;
+
+        let schema = Schema::new(vec![Field::new("val", DataType::UInt32, 
true)]);
+        let values: Vec<Option<u32>> = vec![
+            Some(0),
+            Some(1),
+            None,
+            Some(i32::MAX as u32),
+            Some(u32::MAX),
+        ];
+        let array = UInt32Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+
+        let roundtrip = roundtrip_ocf(&batch)?;
+
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::UInt32,
+            "Expected UInt32 type with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<UInt32Array>()
+            .expect("UInt32Array");
+        assert_eq!(got, &UInt32Array::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_uint32_no_custom_widens_to_int64() -> Result<(), 
AvroError> {
+        use arrow_array::UInt32Array;
+
+        let schema = Schema::new(vec![Field::new("val", DataType::UInt32, 
true)]);
+        let values: Vec<Option<u32>> = vec![
+            Some(0),
+            Some(1),
+            None,
+            Some(i32::MAX as u32),
+            Some(u32::MAX),
+        ];
+        let array = UInt32Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+
+        let roundtrip = roundtrip_ocf(&batch)?;
+
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Int64,
+            "Expected Int64 type without avro_custom_types (widened from 
UInt32)"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .expect("Int64Array");
+        let expected: Vec<Option<i64>> = values.iter().map(|v| v.map(|x| x as 
i64)).collect();
+        assert_eq!(got, &Int64Array::from(expected));
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_uint64_custom_types() -> Result<(), AvroError> {
+        use arrow_array::UInt64Array;
+
+        let schema = Schema::new(vec![Field::new("val", DataType::UInt64, 
true)]);
+        // Include values > i64::MAX to test full u64 range
+        let values: Vec<Option<u64>> = vec![
+            Some(0),
+            Some(1),
+            None,
+            Some(i64::MAX as u64),
+            Some(u64::MAX),
+        ];
+        let array = UInt64Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+
+        let roundtrip = roundtrip_ocf(&batch)?;
+
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::UInt64,
+            "Expected UInt64 type with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<UInt64Array>()
+            .expect("UInt64Array");
+        assert_eq!(got, &UInt64Array::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_uint64_no_custom_widens_to_int64() -> Result<(), 
AvroError> {
+        use arrow_array::UInt64Array;
+        let schema = Schema::new(vec![Field::new("val", DataType::UInt64, 
true)]);
+        let values: Vec<Option<u64>> = vec![Some(0), Some(1), None, 
Some(i64::MAX as u64)];
+        let array = UInt64Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Int64,
+            "Expected Int64 type without avro_custom_types (widened from 
UInt64)"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .expect("Int64Array");
+        let expected: Vec<Option<i64>> = values.iter().map(|v| v.map(|x| x as 
i64)).collect();
+        assert_eq!(got, &Int64Array::from(expected));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_uint64_overflow_errors_without_custom() {
+        use arrow_array::UInt64Array;
+        let schema = Schema::new(vec![Field::new("val", DataType::UInt64, 
false)]);
+        let values: Vec<u64> = vec![u64::MAX];
+        let array = UInt64Array::from(values);
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])
+            .expect("create batch");
+        let result = roundtrip_ocf(&batch);
+        assert!(
+            result.is_err(),
+            "Expected error when encoding UInt64 > i64::MAX without 
avro_custom_types"
+        );
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_float16_custom_types() -> Result<(), AvroError> {
+        use arrow_array::Float16Array;
+        use half::f16;
+        let schema = Schema::new(vec![Field::new("val", DataType::Float16, 
true)]);
+        let values: Vec<Option<f16>> = vec![
+            Some(f16::ZERO),
+            Some(f16::ONE),
+            None,
+            Some(f16::NEG_ONE),
+            Some(f16::MAX),
+            Some(f16::MIN),
+        ];
+        let array = Float16Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Float16,
+            "Expected Float16 type with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Float16Array>()
+            .expect("Float16Array");
+        assert_eq!(got, &Float16Array::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_float16_no_custom_widens_to_float32() -> Result<(), 
AvroError> {
+        use arrow_array::{Float16Array, Float32Array};
+        use half::f16;
+        let schema = Schema::new(vec![Field::new("val", DataType::Float16, 
true)]);
+        let values: Vec<Option<f16>> =
+            vec![Some(f16::ZERO), Some(f16::ONE), None, Some(f16::NEG_ONE)];
+        let array = Float16Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Float32,
+            "Expected Float32 type without avro_custom_types (widened from 
Float16)"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Float32Array>()
+            .expect("Float32Array");
+        let expected: Vec<Option<f32>> = values.iter().map(|v| v.map(|x| 
x.to_f32())).collect();
+        assert_eq!(got, &Float32Array::from(expected));
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_date64_custom_types() -> Result<(), AvroError> {
+        use arrow_array::Date64Array;
+        let schema = Schema::new(vec![Field::new("val", DataType::Date64, 
true)]);
+        // Date64 is milliseconds since epoch
+        let values: Vec<Option<i64>> = vec![
+            Some(0),
+            Some(86_400_000), // 1 day
+            None,
+            Some(1_609_459_200_000), // 2021-01-01
+        ];
+        let array = Date64Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Date64,
+            "Expected Date64 type with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Date64Array>()
+            .expect("Date64Array");
+        assert_eq!(got, &Date64Array::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_date64_no_custom_as_timestamp_millis() -> Result<(), 
AvroError> {
+        use arrow_array::{Date64Array, TimestampMillisecondArray};
+        let schema = Schema::new(vec![Field::new("val", DataType::Date64, 
true)]);
+        let values: Vec<Option<i64>> =
+            vec![Some(0), Some(86_400_000), None, Some(1_609_459_200_000)];
+        let array = Date64Array::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        // Date64 without custom types maps to local-timestamp-millis which 
reads as Timestamp(Millisecond, None)
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Timestamp(TimeUnit::Millisecond, None),
+            "Expected Timestamp(Millisecond, None) without avro_custom_types"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<TimestampMillisecondArray>()
+            .expect("TimestampMillisecondArray");
+        // Values should be identical (both are i64 millis)
+        assert_eq!(got, &TimestampMillisecondArray::from(values));
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_time64_nanosecond_custom_types() -> Result<(), 
AvroError> {
+        use arrow_array::Time64NanosecondArray;
+        let schema = Schema::new(vec![Field::new(
+            "val",
+            DataType::Time64(TimeUnit::Nanosecond),
+            true,
+        )]);
+        let values: Vec<Option<i64>> = vec![
+            Some(0),
+            Some(1_000_000_000), // 1 second in nanos
+            None,
+            Some(86_399_999_999_999), // 23:59:59.999999999
+        ];
+        let array = Time64NanosecondArray::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Time64(TimeUnit::Nanosecond),
+            "Expected Time64(Nanosecond) with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Time64NanosecondArray>()
+            .expect("Time64NanosecondArray");
+        assert_eq!(got, &Time64NanosecondArray::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_time64_nanos_no_custom_truncates_to_micros() -> 
Result<(), AvroError> {
+        use arrow_array::{Time64MicrosecondArray, Time64NanosecondArray};
+
+        let schema = Schema::new(vec![Field::new(
+            "val",
+            DataType::Time64(TimeUnit::Nanosecond),
+            true,
+        )]);
+        // Use values evenly divisible by 1000 to avoid truncation issues
+        let values: Vec<Option<i64>> = vec![
+            Some(0),
+            Some(1_000_000_000), // 1 second
+            None,
+            Some(86_399_999_000_000), // 23:59:59.999000
+        ];
+        let array = Time64NanosecondArray::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Time64(TimeUnit::Microsecond),
+            "Expected Time64(Microsecond) without avro_custom_types (truncated 
from nanoseconds)"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Time64MicrosecondArray>()
+            .expect("Time64MicrosecondArray");
+        let expected: Vec<Option<i64>> = values.iter().map(|v| v.map(|x| x / 
1000)).collect();
+        assert_eq!(got, &Time64MicrosecondArray::from(expected));
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_time32_second_custom_types() -> Result<(), AvroError> {
+        use arrow_array::Time32SecondArray;
+        let schema = Schema::new(vec![Field::new(
+            "val",
+            DataType::Time32(TimeUnit::Second),
+            true,
+        )]);
+        let values: Vec<Option<i32>> = vec![
+            Some(0),
+            Some(3600), // 1 hour
+            None,
+            Some(86399), // 23:59:59
+        ];
+        let array = Time32SecondArray::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Time32(TimeUnit::Second),
+            "Expected Time32(Second) with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Time32SecondArray>()
+            .expect("Time32SecondArray");
+        assert_eq!(got, &Time32SecondArray::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_time32_second_no_custom_scales_to_millis() -> Result<(), 
AvroError> {
+        use arrow_array::{Time32MillisecondArray, Time32SecondArray};
+        let schema = Schema::new(vec![Field::new(
+            "val",
+            DataType::Time32(TimeUnit::Second),
+            true,
+        )]);
+        let values: Vec<Option<i32>> = vec![Some(0), Some(3600), None, 
Some(86399)];
+        let array = Time32SecondArray::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Time32(TimeUnit::Millisecond),
+            "Expected Time32(Millisecond) without avro_custom_types (scaled 
from seconds)"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<Time32MillisecondArray>()
+            .expect("Time32MillisecondArray");
+        let expected: Vec<Option<i32>> = values.iter().map(|v| v.map(|x| x * 
1000)).collect();
+        assert_eq!(got, &Time32MillisecondArray::from(expected));
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_timestamp_second_custom_types() -> Result<(), AvroError> 
{
+        use arrow_array::TimestampSecondArray;
+        let schema = Schema::new(vec![Field::new(
+            "val",
+            DataType::Timestamp(TimeUnit::Second, Some("+00:00".into())),
+            true,
+        )]);
+        let values: Vec<Option<i64>> = vec![
+            Some(0),
+            Some(1609459200), // 2021-01-01 00:00:00 UTC
+            None,
+            Some(1735689600), // 2025-01-01 00:00:00 UTC
+        ];
+        let array = 
TimestampSecondArray::from(values.clone()).with_timezone("+00:00");
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Timestamp(TimeUnit::Second, Some("+00:00".into())),
+            "Expected Timestamp(Second, UTC) with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<TimestampSecondArray>()
+            .expect("TimestampSecondArray");
+        let expected = 
TimestampSecondArray::from(values).with_timezone("+00:00");
+        assert_eq!(got, &expected);
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_timestamp_second_no_custom_scales_to_millis() -> 
Result<(), AvroError> {
+        use arrow_array::{TimestampMillisecondArray, TimestampSecondArray};
+        let schema = Schema::new(vec![Field::new(
+            "val",
+            DataType::Timestamp(TimeUnit::Second, Some("+00:00".into())),
+            true,
+        )]);
+        let values: Vec<Option<i64>> = vec![Some(0), Some(1609459200), None, 
Some(1735689600)];
+        let array = 
TimestampSecondArray::from(values.clone()).with_timezone("+00:00");
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
+            "Expected Timestamp(Millisecond, UTC) without avro_custom_types 
(scaled from seconds)"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<TimestampMillisecondArray>()
+            .expect("TimestampMillisecondArray");
+        let expected: Vec<Option<i64>> = values.iter().map(|v| v.map(|x| x * 
1000)).collect();
+        let expected_array = 
TimestampMillisecondArray::from(expected).with_timezone("+00:00");
+        assert_eq!(got, &expected_array);
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_interval_year_month_custom_types() -> Result<(), 
AvroError> {
+        use arrow_array::IntervalYearMonthArray;
+        let schema = Schema::new(vec![Field::new(
+            "val",
+            DataType::Interval(IntervalUnit::YearMonth),
+            true,
+        )]);
+        let values: Vec<Option<i32>> = vec![
+            Some(0),
+            Some(12), // 1 year
+            None,
+            Some(-6), // -6 months
+            Some(25), // 2 years 1 month
+        ];
+        let array = IntervalYearMonthArray::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Interval(IntervalUnit::YearMonth),
+            "Expected Interval(YearMonth) with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<IntervalYearMonthArray>()
+            .expect("IntervalYearMonthArray");
+        assert_eq!(got, &IntervalYearMonthArray::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_interval_year_month_no_custom() -> Result<(), AvroError> 
{
+        use arrow_array::{IntervalMonthDayNanoArray, IntervalYearMonthArray};
+        let schema = Schema::new(vec![Field::new(
+            "val",
+            DataType::Interval(IntervalUnit::YearMonth),
+            true,
+        )]);
+        // Only non-negative values for standard Avro duration
+        let values: Vec<Option<i32>> = vec![Some(0), Some(12), None, Some(25)];
+        let array = IntervalYearMonthArray::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        // Without custom types, reads as Interval(MonthDayNano) via Avro 
duration
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Interval(IntervalUnit::MonthDayNano),
+            "Expected Interval(MonthDayNano) without avro_custom_types"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<IntervalMonthDayNanoArray>()
+            .expect("IntervalMonthDayNanoArray");
+        // Convert expected values to MonthDayNano format
+        let expected: Vec<Option<IntervalMonthDayNano>> = values
+            .iter()
+            .map(|v| v.map(|months| IntervalMonthDayNano::new(months, 0, 0)))
+            .collect();
+        assert_eq!(got, &IntervalMonthDayNanoArray::from(expected));
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_interval_day_time_custom_types() -> Result<(), 
AvroError> {
+        use arrow_array::IntervalDayTimeArray;
+        use arrow_buffer::IntervalDayTime;
+        let schema = Schema::new(vec![Field::new(
+            "val",
+            DataType::Interval(IntervalUnit::DayTime),
+            true,
+        )]);
+        let values: Vec<Option<IntervalDayTime>> = vec![
+            Some(IntervalDayTime::new(0, 0)),
+            Some(IntervalDayTime::new(1, 1000)), // 1 day, 1 second
+            None,
+            Some(IntervalDayTime::new(30, 3600000)), // 30 days, 1 hour
+        ];
+        let array = IntervalDayTimeArray::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Interval(IntervalUnit::DayTime),
+            "Expected Interval(DayTime) with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<IntervalDayTimeArray>()
+            .expect("IntervalDayTimeArray");
+        assert_eq!(got, &IntervalDayTimeArray::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_interval_day_time_no_custom() -> Result<(), AvroError> {
+        use arrow_array::{IntervalDayTimeArray, IntervalMonthDayNanoArray};
+        use arrow_buffer::IntervalDayTime;
+        let schema = Schema::new(vec![Field::new(
+            "val",
+            DataType::Interval(IntervalUnit::DayTime),
+            true,
+        )]);
+        let values: Vec<Option<IntervalDayTime>> = vec![
+            Some(IntervalDayTime::new(0, 0)),
+            Some(IntervalDayTime::new(1, 1000)),
+            None,
+            Some(IntervalDayTime::new(30, 3600000)),
+        ];
+        let array = IntervalDayTimeArray::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Interval(IntervalUnit::MonthDayNano),
+            "Expected Interval(MonthDayNano) without avro_custom_types"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<IntervalMonthDayNanoArray>()
+            .expect("IntervalMonthDayNanoArray");
+        let expected: Vec<Option<IntervalMonthDayNano>> = values
+            .iter()
+            .map(|v| {
+                v.map(|dt| {
+                    let nanos = (dt.milliseconds as i64) * 1_000_000;
+                    IntervalMonthDayNano::new(0, dt.days, nanos)
+                })
+            })
+            .collect();
+        assert_eq!(got, &IntervalMonthDayNanoArray::from(expected));
+        Ok(())
+    }
+
+    #[cfg(feature = "avro_custom_types")]
+    #[test]
+    fn test_roundtrip_interval_month_day_nano_custom_types() -> Result<(), 
AvroError> {
+        use arrow_array::IntervalMonthDayNanoArray;
+        use arrow_buffer::IntervalMonthDayNano;
+        let schema = Schema::new(vec![Field::new(
+            "val",
+            DataType::Interval(IntervalUnit::MonthDayNano),
+            true,
+        )]);
+        let values: Vec<Option<IntervalMonthDayNano>> = vec![
+            Some(IntervalMonthDayNano::new(0, 0, 0)),
+            Some(IntervalMonthDayNano::new(1, 2, 3)), // sub-millisecond nanos 
ok
+            None,
+            Some(IntervalMonthDayNano::new(-4, -5, -6)),
+        ];
+        let array = IntervalMonthDayNanoArray::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Interval(IntervalUnit::MonthDayNano),
+            "Expected Interval(MonthDayNano) with avro_custom_types enabled"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<IntervalMonthDayNanoArray>()
+            .expect("IntervalMonthDayNanoArray");
+        assert_eq!(got, &IntervalMonthDayNanoArray::from(values));
+        Ok(())
+    }
+
+    #[cfg(not(feature = "avro_custom_types"))]
+    #[test]
+    fn test_roundtrip_interval_month_day_nano_no_custom() -> Result<(), 
AvroError> {
+        use arrow_array::IntervalMonthDayNanoArray;
+        use arrow_buffer::IntervalMonthDayNano;
+        let schema = Schema::new(vec![Field::new(
+            "val",
+            DataType::Interval(IntervalUnit::MonthDayNano),
+            true,
+        )]);
+        // Only representable values for Avro duration: non-negative and whole 
milliseconds
+        let values: Vec<Option<IntervalMonthDayNano>> = vec![
+            Some(IntervalMonthDayNano::new(0, 0, 0)),
+            Some(IntervalMonthDayNano::new(1, 2, 3_000_000)),
+            None,
+            Some(IntervalMonthDayNano::new(4, 5, 6_000_000)),
+        ];
+        let array = IntervalMonthDayNanoArray::from(values.clone());
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array) as ArrayRef])?;
+        let roundtrip = roundtrip_ocf(&batch)?;
+        assert_eq!(
+            roundtrip.schema().field(0).data_type(),
+            &DataType::Interval(IntervalUnit::MonthDayNano),
+            "Expected Interval(MonthDayNano) without avro_custom_types"
+        );
+        let got = roundtrip
+            .column(0)
+            .as_any()
+            .downcast_ref::<IntervalMonthDayNanoArray>()
+            .expect("IntervalMonthDayNanoArray");
+        assert_eq!(got, &IntervalMonthDayNanoArray::from(values));
+        Ok(())
+    }
+
+    fn schemas_equal_ignoring_metadata(left: &Schema, right: &Schema) -> bool {
+        if left.fields().len() != right.fields().len() {
+            return false;
+        }
+        for (l, r) in left.fields().iter().zip(right.fields().iter()) {
+            if l.name() != r.name()
+                || l.data_type() != r.data_type()
+                || l.is_nullable() != r.is_nullable()
+            {
+                return false;
+            }
+        }
+        true
+    }
+
+    fn avro_field_type<'a>(avro_schema: &'a Value, name: &str) -> &'a Value {
+        let fields = avro_schema
+            .get("fields")
+            .and_then(|v| v.as_array())
+            .expect("avro schema has 'fields' array");
+        fields
+            .iter()
+            .find(|f| f.get("name").and_then(|n| n.as_str()) == Some(name))
+            .unwrap_or_else(|| panic!("avro schema missing field '{name}'"))
+            .get("type")
+            .expect("field has 'type'")
+    }
+
+    #[test]
+    fn e2e_types_and_schema_alignment() -> Result<(), AvroError> {
+        // Values are chosen to:
+        // - exercise full UInt64 range when `avro_custom_types` is enabled
+        // - exercise negative / sub-millisecond intervals when 
`avro_custom_types` is enabled
+        // - remain representable under standard Avro logical types when 
`avro_custom_types` is disabled
+        let i8_values: Vec<Option<i8>> = vec![Some(i8::MIN), Some(-1), 
Some(i8::MAX)];
+        let i16_values: Vec<Option<i16>> = vec![Some(i16::MIN), Some(-1), 
Some(i16::MAX)];
+        let u8_values: Vec<Option<u8>> = vec![Some(0), Some(1), Some(u8::MAX)];
+        let u16_values: Vec<Option<u16>> = vec![Some(0), Some(1), 
Some(u16::MAX)];
+        let u32_values: Vec<Option<u32>> = vec![Some(0), Some(1), 
Some(u32::MAX)];
+        let u64_values: Vec<Option<u64>> = if cfg!(feature = 
"avro_custom_types") {
+            vec![Some(0), Some(i64::MAX as u64), Some((i64::MAX as u64) + 1)]
+        } else {
+            // Must remain <= i64::MAX when `avro_custom_types` is disabled
+            vec![Some(0), Some((i64::MAX as u64) - 1), Some(i64::MAX as u64)]
+        };
+        let f16_values: Vec<Option<f16>> = vec![
+            Some(f16::from_f32(1.5)),
+            Some(f16::from_f32(-2.0)),
+            Some(f16::from_f32(0.0)),
+        ];
+        let date64_values: Vec<Option<i64>> = vec![Some(-86_400_000), Some(0), 
Some(86_400_000)];
+        let time32s_values: Vec<Option<i32>> = vec![Some(0), Some(1), 
Some(86_399)];
+        let time64ns_values: Vec<Option<i64>> = vec![
+            Some(0),
+            Some(1_234_567_890), // truncation case for no-custom (nanos -> 
micros)
+            Some(86_399_000_000_123_i64), // near end-of-day, also truncation
+        ];
+        let ts_s_local_values: Vec<Option<i64>> = vec![Some(-1), Some(0), 
Some(1)];
+        let ts_s_utc_values: Vec<Option<i64>> = vec![Some(1), Some(2), 
Some(3)];
+        let iv_ym_values: Vec<Option<i32>> = if cfg!(feature = 
"avro_custom_types") {
+            vec![Some(0), Some(-6), Some(25)]
+        } else {
+            // Avro duration cannot represent negative months without custom 
types
+            vec![Some(0), Some(12), Some(25)]
+        };
+        let iv_dt_values: Vec<Option<IntervalDayTime>> = if cfg!(feature = 
"avro_custom_types") {
+            vec![
+                Some(IntervalDayTime::new(0, 0)),
+                Some(IntervalDayTime::new(1, 1000)),
+                Some(IntervalDayTime::new(-1, -1000)),
+            ]
+        } else {
+            // Avro duration cannot represent negative day-time without custom 
types
+            vec![
+                Some(IntervalDayTime::new(0, 0)),
+                Some(IntervalDayTime::new(1, 1000)),
+                Some(IntervalDayTime::new(30, 3_600_000)),
+            ]
+        };
+        let iv_mdn_values: Vec<Option<IntervalMonthDayNano>> =
+            if cfg!(feature = "avro_custom_types") {
+                vec![
+                    Some(IntervalMonthDayNano::new(0, 0, 0)),
+                    Some(IntervalMonthDayNano::new(1, 2, 3)), // 
sub-millisecond
+                    Some(IntervalMonthDayNano::new(-1, -2, -3)), // negative
+                ]
+            } else {
+                // Avro duration requires non-negative and whole milliseconds
+                vec![
+                    Some(IntervalMonthDayNano::new(0, 0, 0)),
+                    Some(IntervalMonthDayNano::new(1, 2, 3_000_000)), // 3ms
+                    Some(IntervalMonthDayNano::new(10, 20, 30_000_000_000)), 
// 30s
+                ]
+            };
+        // Build a batch containing all impacted types from issue #9290
+        let schema = Schema::new(vec![
+            Field::new("i8", DataType::Int8, false),
+            Field::new("i16", DataType::Int16, false),
+            Field::new("u8", DataType::UInt8, false),
+            Field::new("u16", DataType::UInt16, false),
+            Field::new("u32", DataType::UInt32, false),
+            Field::new("u64", DataType::UInt64, false),
+            Field::new("f16", DataType::Float16, false),
+            Field::new("date64", DataType::Date64, false),
+            Field::new("time32s", DataType::Time32(TimeUnit::Second), false),
+            Field::new("time64ns", DataType::Time64(TimeUnit::Nanosecond), 
false),
+            Field::new(
+                "ts_s_local",
+                DataType::Timestamp(TimeUnit::Second, None),
+                false,
+            ),
+            Field::new(
+                "ts_s_utc",
+                DataType::Timestamp(TimeUnit::Second, Some("+00:00".into())),
+                false,
+            ),
+            Field::new("iv_ym", DataType::Interval(IntervalUnit::YearMonth), 
false),
+            Field::new("iv_dt", DataType::Interval(IntervalUnit::DayTime), 
false),
+            Field::new(
+                "iv_mdn",
+                DataType::Interval(IntervalUnit::MonthDayNano),
+                false,
+            ),
+        ]);
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![
+                Arc::new(Int8Array::from(i8_values.clone())) as ArrayRef,
+                Arc::new(Int16Array::from(i16_values.clone())) as ArrayRef,
+                Arc::new(UInt8Array::from(u8_values.clone())) as ArrayRef,
+                Arc::new(UInt16Array::from(u16_values.clone())) as ArrayRef,
+                Arc::new(UInt32Array::from(u32_values.clone())) as ArrayRef,
+                Arc::new(UInt64Array::from(u64_values.clone())) as ArrayRef,
+                Arc::new(Float16Array::from(f16_values.clone())) as ArrayRef,
+                Arc::new(Date64Array::from(date64_values.clone())) as ArrayRef,
+                Arc::new(Time32SecondArray::from(time32s_values.clone())) as 
ArrayRef,
+                Arc::new(Time64NanosecondArray::from(time64ns_values.clone())) 
as ArrayRef,
+                
Arc::new(TimestampSecondArray::from(ts_s_local_values.clone())) as ArrayRef,
+                Arc::new(
+                    
TimestampSecondArray::from(ts_s_utc_values.clone()).with_timezone("+00:00"),
+                ) as ArrayRef,
+                Arc::new(IntervalYearMonthArray::from(iv_ym_values.clone())) 
as ArrayRef,
+                Arc::new(IntervalDayTimeArray::from(iv_dt_values.clone())) as 
ArrayRef,
+                
Arc::new(IntervalMonthDayNanoArray::from(iv_mdn_values.clone())) as ArrayRef,
+            ],
+        )?;
+        let rt = roundtrip_ocf(&batch)?;
+        let rt_schema = rt.schema();
+        let avro_schema_json = rt_schema
+            .metadata()
+            .get(SCHEMA_METADATA_KEY)
+            .expect("avro.schema missing in round-tripped batch metadata");
+        let avro_schema: Value =
+            serde_json::from_str(avro_schema_json).expect("valid avro schema 
json");
+        let rt_arrow_schema = rt.schema();
+        if cfg!(feature = "avro_custom_types") {
+            assert!(
+                schemas_equal_ignoring_metadata(rt_arrow_schema.as_ref(), 
&schema),
+                "Schema fields mismatch.\nExpected: {:?}\nGot: {:?}",
+                schema,
+                rt_arrow_schema
+            );
+            for field_name in ["u64", "f16", "iv_ym", "iv_dt", "iv_mdn"] {
+                let field = rt_arrow_schema
+                    .field_with_name(field_name)
+                    .expect("field exists");
+                assert!(
+                    field.metadata().get(AVRO_NAME_METADATA_KEY).is_some(),
+                    "Field '{}' should have avro.name metadata",
+                    field_name
+                );
+            }
+        } else {
+            let exp_schema = Schema::new(vec![

Review Comment:
   It's not the same. One is the expected schema without `avro_custom_types` 
set and the other is with it set. I left a comment to help explain this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to