This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push: new 70f9250387 Adds additional type support to arrow-avro writer (#8298) 70f9250387 is described below commit 70f9250387222def22fc5ba47c64a215d6ffbeb7 Author: nathaniel-d-ef <nathan...@elastiflow.com> AuthorDate: Sat Sep 13 13:32:16 2025 +0200 Adds additional type support to arrow-avro writer (#8298) # Which issue does this PR close? - Part of https://github.com/apache/arrow-rs/issues/4886 - Related to https://github.com/apache/arrow-rs/pull/8274 # Rationale for this change This PR extends on work introduced in #8274, adding additional complex type support to the Avro writer. This brings us closer to a complete round-trip capability and Avro spec support in the arrow-avro crate. # What changes are included in this PR? New encoders: Fixed UUID IntervalMonthDayNano IntervalYearMonth IntervalDayTime Decimal32 Decimal64 Decimal128 Decimal256 Corresponding changes in support of these encoders in FieldEncoder and FieldPlan # Are these changes tested? Yes, additional complex type unit tests have been added. Benchmark tests have also been written but are being omitted here to keep the diff manageable. All tests, new and existing, pass. # Are there any user-facing changes? n/a, arrow-avro crate is not yet public --- arrow-avro/src/schema.rs | 52 ++- arrow-avro/src/writer/encoder.rs | 981 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 989 insertions(+), 44 deletions(-) diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs index 6e343736c1..e73b1050c7 100644 --- a/arrow-avro/src/schema.rs +++ b/arrow-avro/src/schema.rs @@ -984,6 +984,36 @@ fn datatype_to_avro( null_order: Nullability, ) -> Result<(Value, JsonMap<String, Value>), ArrowError> { let mut extras = JsonMap::new(); + let mut handle_decimal = |precision: &u8, scale: &i8| -> Result<Value, ArrowError> { + if *scale < 0 { + return Err(ArrowError::SchemaError(format!( + "Invalid Avro decimal for field '{field_name}': scale ({scale}) must be >= 0" + ))); + } + if (*scale as usize) > (*precision as usize) { + return Err(ArrowError::SchemaError(format!( + "Invalid Avro decimal for field '{field_name}': scale ({scale}) \ + must be <= precision ({precision})" + ))); + } + + let mut meta = JsonMap::from_iter([ + ("logicalType".into(), json!("decimal")), + ("precision".into(), json!(*precision)), + ("scale".into(), json!(*scale)), + ]); + if let Some(size) = metadata + .get("size") + .and_then(|val| val.parse::<usize>().ok()) + { + meta.insert("type".into(), json!("fixed")); + meta.insert("size".into(), json!(size)); + meta.insert("name".into(), json!(name_gen.make_unique(field_name))); + } else { + meta.insert("type".into(), json!("bytes")); + } + Ok(Value::Object(meta)) + }; let val = match dt { DataType::Null => Value::String("null".into()), DataType::Boolean => Value::String("boolean".into()), @@ -1013,24 +1043,12 @@ fn datatype_to_avro( }) } } + #[cfg(feature = "small_decimals")] + DataType::Decimal32(precision, scale) | DataType::Decimal64(precision, scale) => { + handle_decimal(precision, scale)? + } DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => { - // Prefer fixed if original size info present - let mut meta = JsonMap::from_iter([ - ("logicalType".into(), json!("decimal")), - ("precision".into(), json!(*precision)), - ("scale".into(), json!(*scale)), - ]); - if let Some(size) = metadata - .get("size") - .and_then(|val| val.parse::<usize>().ok()) - { - meta.insert("type".into(), json!("fixed")); - meta.insert("size".into(), json!(size)); - meta.insert("name".into(), json!(name_gen.make_unique(field_name))); - } else { - meta.insert("type".into(), json!("bytes")); - } - Value::Object(meta) + handle_decimal(precision, scale)? } DataType::Date32 => json!({ "type": "int", "logicalType": "date" }), DataType::Date64 => json!({ "type": "long", "logicalType": "local-timestamp-millis" }), diff --git a/arrow-avro/src/writer/encoder.rs b/arrow-avro/src/writer/encoder.rs index ccf80fd8d1..d80a3e739a 100644 --- a/arrow-avro/src/writer/encoder.rs +++ b/arrow-avro/src/writer/encoder.rs @@ -21,15 +21,21 @@ use crate::codec::{AvroDataType, AvroField, Codec}; use crate::schema::Nullability; use arrow_array::cast::AsArray; use arrow_array::types::{ - ArrowPrimitiveType, Float32Type, Float64Type, Int32Type, Int64Type, TimestampMicrosecondType, + ArrowPrimitiveType, Float32Type, Float64Type, Int32Type, Int64Type, IntervalDayTimeType, + IntervalMonthDayNanoType, IntervalYearMonthType, TimestampMicrosecondType, }; use arrow_array::{ - Array, GenericBinaryArray, GenericListArray, GenericStringArray, LargeListArray, ListArray, - OffsetSizeTrait, PrimitiveArray, RecordBatch, StructArray, + Array, Decimal128Array, Decimal256Array, DictionaryArray, FixedSizeBinaryArray, + GenericBinaryArray, GenericListArray, GenericStringArray, LargeListArray, ListArray, MapArray, + OffsetSizeTrait, PrimitiveArray, RecordBatch, StringArray, StructArray, }; +#[cfg(feature = "small_decimals")] +use arrow_array::{Decimal32Array, Decimal64Array}; use arrow_buffer::NullBuffer; -use arrow_schema::{ArrowError, DataType, Field, Schema as ArrowSchema, TimeUnit}; +use arrow_schema::{ArrowError, DataType, Field, IntervalUnit, Schema as ArrowSchema, TimeUnit}; use std::io::Write; +use std::sync::Arc; +use uuid::Uuid; /// Encode a single Avro-`long` using ZigZag + variable length, buffered. /// @@ -69,6 +75,110 @@ fn write_bool<W: Write + ?Sized>(out: &mut W, v: bool) -> Result<(), ArrowError> .map_err(|e| ArrowError::IoError(format!("write bool: {e}"), e)) } +/// Minimal two's-complement big-endian representation helper for Avro decimal (bytes). +/// +/// For positive numbers, trim leading 0x00 until an essential byte is reached. +/// For negative numbers, trim leading 0xFF until an essential byte is reached. +/// The resulting slice still encodes the same signed value. +/// +/// See Avro spec: decimal over `bytes` uses two's-complement big-endian +/// representation of the unscaled integer value. 1.11.1 specification. +#[inline] +fn minimal_twos_complement(be: &[u8]) -> &[u8] { + if be.is_empty() { + return be; + } + let sign_byte = if (be[0] & 0x80) != 0 { 0xFF } else { 0x00 }; + let mut k = 0usize; + while k < be.len() && be[k] == sign_byte { + k += 1; + } + if k == 0 { + return be; + } + if k == be.len() { + return &be[be.len() - 1..]; + } + let drop = if ((be[k] ^ sign_byte) & 0x80) == 0 { + k + } else { + k - 1 + }; + &be[drop..] +} + +/// Sign-extend (or validate/truncate) big-endian integer bytes to exactly `n` bytes. +/// +/// +/// - If shorter than `n`, the slice is sign-extended by left-padding with the +/// sign byte (`0x00` for positive, `0xFF` for negative). +/// - If longer than `n`, the slice is truncated from the left. An overflow error +/// is returned if any of the truncated bytes are not redundant sign bytes, +/// or if the resulting value's sign bit would differ from the original. +/// - If the slice is already `n` bytes long, it is copied. +/// +/// Used for encoding Avro decimal values into `fixed(N)` fields. +#[inline] +fn write_sign_extended<W: Write + ?Sized>( + out: &mut W, + src_be: &[u8], + n: usize, +) -> Result<(), ArrowError> { + let len = src_be.len(); + if len == n { + return out + .write_all(src_be) + .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e)); + } + let sign_byte = if len > 0 && (src_be[0] & 0x80) != 0 { + 0xFF + } else { + 0x00 + }; + if len > n { + let extra = len - n; + if n == 0 && src_be.iter().all(|&b| b == sign_byte) { + return Ok(()); + } + // All truncated bytes must equal the sign byte, and the MSB of the first + // retained byte must match the sign (otherwise overflow). + if src_be[..extra].iter().any(|&b| b != sign_byte) + || ((src_be[extra] ^ sign_byte) & 0x80) != 0 + { + return Err(ArrowError::InvalidArgumentError(format!( + "Decimal value with {len} bytes cannot be represented in {n} bytes without overflow", + ))); + } + return out + .write_all(&src_be[extra..]) + .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e)); + } + // len < n: prepend sign bytes (sign extension) then the payload + let pad_len = n - len; + // Fixed-size stack pads to avoid heap allocation on the hot path + const ZPAD: [u8; 64] = [0x00; 64]; + const FPAD: [u8; 64] = [0xFF; 64]; + let pad = if sign_byte == 0x00 { + &ZPAD[..] + } else { + &FPAD[..] + }; + // Emit padding in 64‑byte chunks (minimizes write calls without allocating), + // then write the original bytes. + let mut rem = pad_len; + while rem >= pad.len() { + out.write_all(pad) + .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e))?; + rem -= pad.len(); + } + if rem > 0 { + out.write_all(&pad[..rem]) + .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e))?; + } + out.write_all(src_be) + .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e)) +} + /// Write the union branch index for an optional field. /// /// Branch index is 0-based per Avro unions: @@ -112,6 +222,64 @@ impl<'a> FieldEncoder<'a> { nullability: Option<Nullability>, ) -> Result<Self, ArrowError> { let encoder = match plan { + FieldPlan::Scalar => match array.data_type() { + DataType::Boolean => Encoder::Boolean(BooleanEncoder(array.as_boolean())), + DataType::Utf8 => { + Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>())) + } + DataType::LargeUtf8 => { + Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>())) + } + DataType::Int32 => Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())), + DataType::Int64 => Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())), + DataType::Float32 => { + Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>())) + } + DataType::Float64 => { + Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>())) + } + DataType::Binary => Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())), + DataType::LargeBinary => { + Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>())) + } + DataType::FixedSizeBinary(len) => { + let arr = array + .as_any() + .downcast_ref::<FixedSizeBinaryArray>() + .ok_or_else(|| { + ArrowError::SchemaError("Expected FixedSizeBinaryArray".into()) + })?; + Encoder::Fixed(FixedEncoder(arr)) + } + DataType::Timestamp(TimeUnit::Microsecond, _) => Encoder::Timestamp(LongEncoder( + array.as_primitive::<TimestampMicrosecondType>(), + )), + DataType::Interval(unit) => match unit { + IntervalUnit::MonthDayNano => { + Encoder::IntervalMonthDayNano(DurationEncoder( + array.as_primitive::<IntervalMonthDayNanoType>(), + )) + } + IntervalUnit::YearMonth => { + Encoder::IntervalYearMonth(DurationEncoder( + array.as_primitive::<IntervalYearMonthType>(), + )) + } + IntervalUnit::DayTime => Encoder::IntervalDayTime(DurationEncoder( + array.as_primitive::<IntervalDayTimeType>(), + )), + } + DataType::Duration(_) => { + return Err(ArrowError::NotYetImplemented( + "Avro writer: Arrow Duration(TimeUnit) has no standard Avro mapping; cast to Interval(MonthDayNano) to use Avro 'duration'".into(), + )); + } + other => { + return Err(ArrowError::NotYetImplemented(format!( + "Avro scalar type not yet supported: {other:?}" + ))); + } + }, FieldPlan::Struct { encoders } => { let arr = array .as_any() @@ -151,35 +319,50 @@ impl<'a> FieldEncoder<'a> { ))) } }, - FieldPlan::Scalar => match array.data_type() { - DataType::Boolean => Encoder::Boolean(BooleanEncoder(array.as_boolean())), - DataType::Utf8 => { - Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>())) - } - DataType::LargeUtf8 => { - Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>())) + FieldPlan::Decimal {size} => match array.data_type() { + #[cfg(feature = "small_decimals")] + DataType::Decimal32(_,_) => { + let arr = array + .as_any() + .downcast_ref::<Decimal32Array>() + .ok_or_else(|| ArrowError::SchemaError("Expected Decimal32Array".into()))?; + Encoder::Decimal32(DecimalEncoder::<4, Decimal32Array>::new(arr, *size)) } - DataType::Int32 => Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())), - DataType::Int64 => Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())), - DataType::Float32 => { - Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>())) + #[cfg(feature = "small_decimals")] + DataType::Decimal64(_,_) => { + let arr = array + .as_any() + .downcast_ref::<Decimal64Array>() + .ok_or_else(|| ArrowError::SchemaError("Expected Decimal64Array".into()))?; + Encoder::Decimal64(DecimalEncoder::<8, Decimal64Array>::new(arr, *size)) } - DataType::Float64 => { - Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>())) + DataType::Decimal128(_,_) => { + let arr = array + .as_any() + .downcast_ref::<Decimal128Array>() + .ok_or_else(|| ArrowError::SchemaError("Expected Decimal128Array".into()))?; + Encoder::Decimal128(DecimalEncoder::<16, Decimal128Array>::new(arr, *size)) } - DataType::Binary => Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())), - DataType::LargeBinary => { - Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>())) + DataType::Decimal256(_,_) => { + let arr = array + .as_any() + .downcast_ref::<Decimal256Array>() + .ok_or_else(|| ArrowError::SchemaError("Expected Decimal256Array".into()))?; + Encoder::Decimal256(DecimalEncoder::<32, Decimal256Array>::new(arr, *size)) } - DataType::Timestamp(TimeUnit::Microsecond, _) => Encoder::Timestamp(LongEncoder( - array.as_primitive::<TimestampMicrosecondType>(), - )), other => { - return Err(ArrowError::NotYetImplemented(format!( - "Avro scalar type not yet supported: {other:?}" - ))); + return Err(ArrowError::SchemaError(format!( + "Avro decimal site requires Arrow Decimal 32, 64, 128, or 256, found: {other:?}" + ))) } }, + FieldPlan::Uuid => { + let arr = array + .as_any() + .downcast_ref::<FixedSizeBinaryArray>() + .ok_or_else(|| ArrowError::SchemaError("Expected FixedSizeBinaryArray".into()))?; + Encoder::Uuid(UuidEncoder(arr)) + } other => { return Err(ArrowError::NotYetImplemented(format!( "Avro writer: {other:?} not yet supported", @@ -256,6 +439,10 @@ enum FieldPlan { items_nullability: Option<Nullability>, item_plan: Box<FieldPlan>, }, + /// Avro decimal logical type (bytes or fixed). `size=None` => bytes(decimal), `Some(n)` => fixed(n) + Decimal { size: Option<usize> }, + /// Avro UUID logical type (fixed) + Uuid, } #[derive(Debug, Clone)] @@ -366,8 +553,44 @@ fn find_struct_child_index(fields: &arrow_schema::Fields, name: &str) -> Option< fields.iter().position(|f| f.name() == name) } +fn find_map_value_field_index(fields: &arrow_schema::Fields) -> Option<usize> { + // Prefer common Arrow field names; fall back to second child if exactly two + find_struct_child_index(fields, "value") + .or_else(|| find_struct_child_index(fields, "values")) + .or_else(|| if fields.len() == 2 { Some(1) } else { None }) +} + impl FieldPlan { fn build(avro_dt: &AvroDataType, arrow_field: &Field) -> Result<Self, ArrowError> { + if let DataType::FixedSizeBinary(len) = arrow_field.data_type() { + // Extension-based detection (only when the feature is enabled) + let ext_is_uuid = { + #[cfg(feature = "canonical_extension_types")] + { + matches!( + arrow_field.extension_type_name(), + Some("arrow.uuid") | Some("uuid") + ) + } + #[cfg(not(feature = "canonical_extension_types"))] + { + false + } + }; + let md_is_uuid = arrow_field + .metadata() + .get("logicalType") + .map(|s| s.as_str()) + == Some("uuid"); + if ext_is_uuid || md_is_uuid { + if *len != 16 { + return Err(ArrowError::InvalidArgumentError( + "logicalType=uuid requires FixedSizeBinary(16)".into(), + )); + } + return Ok(FieldPlan::Uuid); + } + } match avro_dt.codec() { Codec::Struct(avro_fields) => { let fields = match arrow_field.data_type() { @@ -408,6 +631,40 @@ impl FieldPlan { "Avro array maps to Arrow List/LargeList, found: {other:?}" ))), }, + // decimal site (bytes or fixed(N)) with precision/scale validation + Codec::Decimal(precision, scale_opt, fixed_size_opt) => { + let (ap, as_) = match arrow_field.data_type() { + #[cfg(feature = "small_decimals")] + DataType::Decimal32(p, s) => (*p as usize, *s as i32), + #[cfg(feature = "small_decimals")] + DataType::Decimal64(p, s) => (*p as usize, *s as i32), + DataType::Decimal128(p, s) => (*p as usize, *s as i32), + DataType::Decimal256(p, s) => (*p as usize, *s as i32), + other => { + return Err(ArrowError::SchemaError(format!( + "Avro decimal requires Arrow decimal, got {other:?} for field '{}'", + arrow_field.name() + ))) + } + }; + let sc = scale_opt.unwrap_or(0) as i32; // Avro scale defaults to 0 if absent + if ap != *precision || as_ != sc { + return Err(ArrowError::SchemaError(format!( + "Decimal precision/scale mismatch for field '{}': Avro({precision},{sc}) vs Arrow({ap},{as_})", + arrow_field.name() + ))); + } + Ok(FieldPlan::Decimal { + size: *fixed_size_opt, + }) + } + Codec::Interval => match arrow_field.data_type() { + DataType::Interval(IntervalUnit::MonthDayNano | IntervalUnit::YearMonth | IntervalUnit::DayTime + ) => Ok(FieldPlan::Scalar), + other => Err(ArrowError::SchemaError(format!( + "Avro duration logical type requires Arrow Interval(MonthDayNano), found: {other:?}" + ))), + } _ => Ok(FieldPlan::Scalar), } } @@ -427,6 +684,22 @@ enum Encoder<'a> { List(Box<ListEncoder32<'a>>), LargeList(Box<ListEncoder64<'a>>), Struct(Box<StructEncoder<'a>>), + /// Avro `fixed` encoder (raw bytes, no length) + Fixed(FixedEncoder<'a>), + /// Avro `uuid` logical type encoder (string with RFC‑4122 hyphenated text) + Uuid(UuidEncoder<'a>), + /// Avro `duration` logical type (Arrow Interval(MonthDayNano)) encoder + IntervalMonthDayNano(DurationEncoder<'a, IntervalMonthDayNanoType>), + /// Avro `duration` logical type (Arrow Interval(YearMonth)) encoder + IntervalYearMonth(DurationEncoder<'a, IntervalYearMonthType>), + /// Avro `duration` logical type (Arrow Interval(DayTime)) encoder + IntervalDayTime(DurationEncoder<'a, IntervalDayTimeType>), + #[cfg(feature = "small_decimals")] + Decimal32(Decimal32Encoder<'a>), + #[cfg(feature = "small_decimals")] + Decimal64(Decimal64Encoder<'a>), + Decimal128(Decimal128Encoder<'a>), + Decimal256(Decimal256Encoder<'a>), } impl<'a> Encoder<'a> { @@ -446,6 +719,17 @@ impl<'a> Encoder<'a> { Encoder::List(e) => e.encode(out, idx), Encoder::LargeList(e) => e.encode(out, idx), Encoder::Struct(e) => e.encode(out, idx), + Encoder::Fixed(e) => (e).encode(out, idx), + Encoder::Uuid(e) => (e).encode(out, idx), + Encoder::IntervalMonthDayNano(e) => (e).encode(out, idx), + Encoder::IntervalYearMonth(e) => (e).encode(out, idx), + Encoder::IntervalDayTime(e) => (e).encode(out, idx), + #[cfg(feature = "small_decimals")] + Encoder::Decimal32(e) => (e).encode(out, idx), + #[cfg(feature = "small_decimals")] + Encoder::Decimal64(e) => (e).encode(out, idx), + Encoder::Decimal128(e) => (e).encode(out, idx), + Encoder::Decimal256(e) => (e).encode(out, idx), } } } @@ -511,7 +795,6 @@ impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> { type Utf8Encoder<'a> = Utf8GenericEncoder<'a, i32>; type Utf8LargeEncoder<'a> = Utf8GenericEncoder<'a, i64>; - struct StructEncoder<'a> { encoders: Vec<FieldEncoder<'a>>, } @@ -653,6 +936,193 @@ fn prepare_value_site_encoder<'a>( FieldEncoder::make_encoder(values_array, value_field, plan, nullability) } +/// Avro `fixed` encoder for Arrow `FixedSizeBinaryArray`. +/// Spec: a fixed is encoded as exactly `size` bytes, with no length prefix. +struct FixedEncoder<'a>(&'a FixedSizeBinaryArray); +impl FixedEncoder<'_> { + fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + let v = self.0.value(idx); // &[u8] of fixed width + out.write_all(v) + .map_err(|e| ArrowError::IoError(format!("write fixed bytes: {e}"), e)) + } +} + +/// Avro UUID logical type encoder: Arrow FixedSizeBinary(16) → Avro string (UUID). +/// Spec: uuid is a logical type over string (RFC‑4122). We output hyphenated form. +struct UuidEncoder<'a>(&'a FixedSizeBinaryArray); +impl UuidEncoder<'_> { + fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + let mut buf = [0u8; 1 + uuid::fmt::Hyphenated::LENGTH]; + buf[0] = 0x48; + let v = self.0.value(idx); + let u = Uuid::from_slice(v) + .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid UUID bytes: {e}")))?; + let _ = u.hyphenated().encode_lower(&mut buf[1..]); + out.write_all(&buf) + .map_err(|e| ArrowError::IoError(format!("write uuid: {e}"), e)) + } +} + +#[derive(Copy, Clone)] +struct DurationParts { + months: u32, + days: u32, + millis: u32, +} +/// Trait mapping an Arrow interval native value to Avro duration `(months, days, millis)`. +trait IntervalToDurationParts: ArrowPrimitiveType { + fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError>; +} +impl IntervalToDurationParts for IntervalMonthDayNanoType { + fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError> { + let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(native); + if months < 0 || days < 0 || nanos < 0 { + return Err(ArrowError::InvalidArgumentError( + "Avro 'duration' cannot encode negative months/days/nanoseconds".into(), + )); + } + if nanos % 1_000_000 != 0 { + return Err(ArrowError::InvalidArgumentError( + "Avro 'duration' requires whole milliseconds; nanoseconds must be divisible by 1_000_000" + .into(), + )); + } + let millis = nanos / 1_000_000; + if millis > u32::MAX as i64 { + return Err(ArrowError::InvalidArgumentError( + "Avro 'duration' milliseconds exceed u32::MAX".into(), + )); + } + Ok(DurationParts { + months: months as u32, + days: days as u32, + millis: millis as u32, + }) + } +} +impl IntervalToDurationParts for IntervalYearMonthType { + fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError> { + if native < 0 { + return Err(ArrowError::InvalidArgumentError( + "Avro 'duration' cannot encode negative months".into(), + )); + } + Ok(DurationParts { + months: native as u32, + days: 0, + millis: 0, + }) + } +} +impl IntervalToDurationParts for IntervalDayTimeType { + fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError> { + let (days, millis) = IntervalDayTimeType::to_parts(native); + if days < 0 || millis < 0 { + return Err(ArrowError::InvalidArgumentError( + "Avro 'duration' cannot encode negative days or milliseconds".into(), + )); + } + Ok(DurationParts { + months: 0, + days: days as u32, + millis: millis as u32, + }) + } +} +/// Single generic encoder used for all three interval units. +/// Writes Avro `fixed(12)` as three little-endian u32 values in one call. +struct DurationEncoder<'a, P: ArrowPrimitiveType + IntervalToDurationParts>(&'a PrimitiveArray<P>); +impl<'a, P: ArrowPrimitiveType + IntervalToDurationParts> DurationEncoder<'a, P> { + #[inline(always)] + fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + let parts = P::duration_parts(self.0.value(idx))?; + let months = parts.months.to_le_bytes(); + let days = parts.days.to_le_bytes(); + let ms = parts.millis.to_le_bytes(); + // SAFETY + // - Endianness & layout: Avro's `duration` logical type is encoded as fixed(12) + // with three *little-endian* unsigned 32-bit integers in order: (months, days, millis). + // We explicitly materialize exactly those 12 bytes. + // - In-bounds indexing: `to_le_bytes()` on `u32` returns `[u8; 4]` by contract, + // therefore, the constant indices 0..=3 used below are *always* in-bounds. + // Rust will panic on out-of-bounds indexing, but there is no such path here; + // the compiler can also elide the bound checks for constant, provably in-range + // indices. [std docs; Rust Performance Book on bounds-check elimination] + // - Memory safety: The `[u8; 12]` array is built on the stack by value, with no + // aliasing and no uninitialized memory. There is no `unsafe`. + // - I/O: `write_all(&buf)` is fallible and its `Result` is propagated and mapped + // into `ArrowError`, so I/O errors are reported, not panicked. + // Consequently, constructing `buf` with the constant indices below is safe and + // panic-free under these validated preconditions. + let buf = [ + months[0], months[1], months[2], months[3], days[0], days[1], days[2], days[3], ms[0], + ms[1], ms[2], ms[3], + ]; + out.write_all(&buf) + .map_err(|e| ArrowError::IoError(format!("write duration: {e}"), e)) + } +} + +/// Minimal trait to obtain a big-endian fixed-size byte array for a decimal's +/// unscaled integer value at `idx`. +trait DecimalBeBytes<const N: usize> { + fn value_be_bytes(&self, idx: usize) -> [u8; N]; +} +#[cfg(feature = "small_decimals")] +impl DecimalBeBytes<4> for Decimal32Array { + fn value_be_bytes(&self, idx: usize) -> [u8; 4] { + self.value(idx).to_be_bytes() + } +} +#[cfg(feature = "small_decimals")] +impl DecimalBeBytes<8> for Decimal64Array { + fn value_be_bytes(&self, idx: usize) -> [u8; 8] { + self.value(idx).to_be_bytes() + } +} +impl DecimalBeBytes<16> for Decimal128Array { + fn value_be_bytes(&self, idx: usize) -> [u8; 16] { + self.value(idx).to_be_bytes() + } +} +impl DecimalBeBytes<32> for Decimal256Array { + fn value_be_bytes(&self, idx: usize) -> [u8; 32] { + // Arrow i256 → [u8; 32] big-endian + self.value(idx).to_be_bytes() + } +} + +/// Generic Avro decimal encoder over Arrow decimal arrays. +/// - When `fixed_size` is `None` → Avro `bytes(decimal)`; writes the minimal +/// two's-complement representation with a length prefix. +/// - When `Some(n)` → Avro `fixed(n, decimal)`; sign-extends (or validates) +/// to exactly `n` bytes and writes them directly. +struct DecimalEncoder<'a, const N: usize, A: DecimalBeBytes<N>> { + arr: &'a A, + fixed_size: Option<usize>, +} + +impl<'a, const N: usize, A: DecimalBeBytes<N>> DecimalEncoder<'a, N, A> { + fn new(arr: &'a A, fixed_size: Option<usize>) -> Self { + Self { arr, fixed_size } + } + + fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + let be = self.arr.value_be_bytes(idx); + match self.fixed_size { + Some(n) => write_sign_extended(out, &be, n), + None => write_len_prefixed(out, minimal_twos_complement(&be)), + } + } +} + +#[cfg(feature = "small_decimals")] +type Decimal32Encoder<'a> = DecimalEncoder<'a, 4, Decimal32Array>; +#[cfg(feature = "small_decimals")] +type Decimal64Encoder<'a> = DecimalEncoder<'a, 8, Decimal64Array>; +type Decimal128Encoder<'a> = DecimalEncoder<'a, 16, Decimal128Array>; +type Decimal256Encoder<'a> = DecimalEncoder<'a, 32, Decimal256Array>; + #[cfg(test)] mod tests { use super::*; @@ -688,6 +1158,15 @@ mod tests { out } + fn duration_fixed12(months: u32, days: u32, millis: u32) -> [u8; 12] { + let m = months.to_le_bytes(); + let d = days.to_le_bytes(); + let ms = millis.to_le_bytes(); + [ + m[0], m[1], m[2], m[3], d[0], d[1], d[2], d[3], ms[0], ms[1], ms[2], ms[3], + ] + } + fn encode_all( array: &dyn Array, plan: &FieldPlan, @@ -763,4 +1242,452 @@ mod tests { let got = encode_all(&arr, &FieldPlan::Scalar, None); assert_bytes_eq(&got, &expected); } + + #[test] + fn list_encoder_int32() { + // Build ListArray [[1,2], [], [3]] + let values = Int32Array::from(vec![1, 2, 3]); + let offsets = vec![0, 2, 2, 3]; + let list = ListArray::new( + Field::new("item", DataType::Int32, true).into(), + arrow_buffer::OffsetBuffer::new(offsets.into()), + Arc::new(values) as ArrayRef, + None, + ); + // Avro array encoding per row + let mut expected = Vec::new(); + // row 0: block len 2, items 1,2 then 0 + expected.extend(avro_long_bytes(2)); + expected.extend(avro_long_bytes(1)); + expected.extend(avro_long_bytes(2)); + expected.extend(avro_long_bytes(0)); + // row 1: empty + expected.extend(avro_long_bytes(0)); + // row 2: one item 3 + expected.extend(avro_long_bytes(1)); + expected.extend(avro_long_bytes(3)); + expected.extend(avro_long_bytes(0)); + + let plan = FieldPlan::List { + items_nullability: None, + item_plan: Box::new(FieldPlan::Scalar), + }; + let got = encode_all(&list, &plan, None); + assert_bytes_eq(&got, &expected); + } + + #[test] + fn struct_encoder_two_fields() { + // Struct { a: Int32, b: Utf8 } + let a = Int32Array::from(vec![1, 2]); + let b = StringArray::from(vec!["x", "y"]); + let fields = Fields::from(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8, true), + ]); + let struct_arr = StructArray::new( + fields.clone(), + vec![Arc::new(a) as ArrayRef, Arc::new(b) as ArrayRef], + None, + ); + let plan = FieldPlan::Struct { + encoders: vec![ + FieldBinding { + arrow_index: 0, + nullability: None, + plan: FieldPlan::Scalar, + }, + FieldBinding { + arrow_index: 1, + nullability: None, + plan: FieldPlan::Scalar, + }, + ], + }; + let got = encode_all(&struct_arr, &plan, None); + // Expected: rows concatenated: a then b + let mut expected = Vec::new(); + expected.extend(avro_long_bytes(1)); // a=1 + expected.extend(avro_len_prefixed_bytes(b"x")); // b="x" + expected.extend(avro_long_bytes(2)); // a=2 + expected.extend(avro_len_prefixed_bytes(b"y")); // b="y" + assert_bytes_eq(&got, &expected); + } + + #[test] + fn decimal_bytes_and_fixed() { + // Use Decimal128 with small positives and negatives + let dec = Decimal128Array::from(vec![1i128, -1i128, 0i128]) + .with_precision_and_scale(20, 0) + .unwrap(); + // bytes(decimal): minimal two's complement length-prefixed + let plan_bytes = FieldPlan::Decimal { size: None }; + let got_bytes = encode_all(&dec, &plan_bytes, None); + // 1 -> 0x01; -1 -> 0xFF; 0 -> 0x00 + let mut expected_bytes = Vec::new(); + expected_bytes.extend(avro_len_prefixed_bytes(&[0x01])); + expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF])); + expected_bytes.extend(avro_len_prefixed_bytes(&[0x00])); + assert_bytes_eq(&got_bytes, &expected_bytes); + + let plan_fixed = FieldPlan::Decimal { size: Some(16) }; + let got_fixed = encode_all(&dec, &plan_fixed, None); + let mut expected_fixed = Vec::new(); + expected_fixed.extend_from_slice(&1i128.to_be_bytes()); + expected_fixed.extend_from_slice(&(-1i128).to_be_bytes()); + expected_fixed.extend_from_slice(&0i128.to_be_bytes()); + assert_bytes_eq(&got_fixed, &expected_fixed); + } + + #[test] + fn decimal_bytes_256() { + use arrow_buffer::i256; + // Use Decimal256 with small positives and negatives + let dec = Decimal256Array::from(vec![ + i256::from_i128(1), + i256::from_i128(-1), + i256::from_i128(0), + ]) + .with_precision_and_scale(76, 0) + .unwrap(); + // bytes(decimal): minimal two's complement length-prefixed + let plan_bytes = FieldPlan::Decimal { size: None }; + let got_bytes = encode_all(&dec, &plan_bytes, None); + // 1 -> 0x01; -1 -> 0xFF; 0 -> 0x00 + let mut expected_bytes = Vec::new(); + expected_bytes.extend(avro_len_prefixed_bytes(&[0x01])); + expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF])); + expected_bytes.extend(avro_len_prefixed_bytes(&[0x00])); + assert_bytes_eq(&got_bytes, &expected_bytes); + + // fixed(32): 32-byte big-endian two's complement + let plan_fixed = FieldPlan::Decimal { size: Some(32) }; + let got_fixed = encode_all(&dec, &plan_fixed, None); + let mut expected_fixed = Vec::new(); + expected_fixed.extend_from_slice(&i256::from_i128(1).to_be_bytes()); + expected_fixed.extend_from_slice(&i256::from_i128(-1).to_be_bytes()); + expected_fixed.extend_from_slice(&i256::from_i128(0).to_be_bytes()); + assert_bytes_eq(&got_fixed, &expected_fixed); + } + + #[cfg(feature = "small_decimals")] + #[test] + fn decimal_bytes_and_fixed_32() { + // Use Decimal32 with small positives and negatives + let dec = Decimal32Array::from(vec![1i32, -1i32, 0i32]) + .with_precision_and_scale(9, 0) + .unwrap(); + // bytes(decimal) + let plan_bytes = FieldPlan::Decimal { size: None }; + let got_bytes = encode_all(&dec, &plan_bytes, None); + let mut expected_bytes = Vec::new(); + expected_bytes.extend(avro_len_prefixed_bytes(&[0x01])); + expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF])); + expected_bytes.extend(avro_len_prefixed_bytes(&[0x00])); + assert_bytes_eq(&got_bytes, &expected_bytes); + // fixed(4) + let plan_fixed = FieldPlan::Decimal { size: Some(4) }; + let got_fixed = encode_all(&dec, &plan_fixed, None); + let mut expected_fixed = Vec::new(); + expected_fixed.extend_from_slice(&1i32.to_be_bytes()); + expected_fixed.extend_from_slice(&(-1i32).to_be_bytes()); + expected_fixed.extend_from_slice(&0i32.to_be_bytes()); + assert_bytes_eq(&got_fixed, &expected_fixed); + } + + #[cfg(feature = "small_decimals")] + #[test] + fn decimal_bytes_and_fixed_64() { + // Use Decimal64 with small positives and negatives + let dec = Decimal64Array::from(vec![1i64, -1i64, 0i64]) + .with_precision_and_scale(18, 0) + .unwrap(); + // bytes(decimal) + let plan_bytes = FieldPlan::Decimal { size: None }; + let got_bytes = encode_all(&dec, &plan_bytes, None); + let mut expected_bytes = Vec::new(); + expected_bytes.extend(avro_len_prefixed_bytes(&[0x01])); + expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF])); + expected_bytes.extend(avro_len_prefixed_bytes(&[0x00])); + assert_bytes_eq(&got_bytes, &expected_bytes); + // fixed(8) + let plan_fixed = FieldPlan::Decimal { size: Some(8) }; + let got_fixed = encode_all(&dec, &plan_fixed, None); + let mut expected_fixed = Vec::new(); + expected_fixed.extend_from_slice(&1i64.to_be_bytes()); + expected_fixed.extend_from_slice(&(-1i64).to_be_bytes()); + expected_fixed.extend_from_slice(&0i64.to_be_bytes()); + assert_bytes_eq(&got_fixed, &expected_fixed); + } + + #[test] + fn float32_and_float64_encoders() { + let f32a = Float32Array::from(vec![0.0f32, -1.5f32, f32::from_bits(0x7fc00000)]); // includes a quiet NaN bit pattern + let f64a = Float64Array::from(vec![0.0f64, -2.25f64]); + // f32 expected + let mut expected32 = Vec::new(); + for v in [0.0f32, -1.5f32, f32::from_bits(0x7fc00000)] { + expected32.extend_from_slice(&v.to_bits().to_le_bytes()); + } + let got32 = encode_all(&f32a, &FieldPlan::Scalar, None); + assert_bytes_eq(&got32, &expected32); + // f64 expected + let mut expected64 = Vec::new(); + for v in [0.0f64, -2.25f64] { + expected64.extend_from_slice(&v.to_bits().to_le_bytes()); + } + let got64 = encode_all(&f64a, &FieldPlan::Scalar, None); + assert_bytes_eq(&got64, &expected64); + } + + #[test] + fn long_encoder_int64() { + let arr = Int64Array::from(vec![0i64, 1i64, -1i64, 2i64, -2i64, i64::MIN + 1]); + let mut expected = Vec::new(); + for v in [0, 1, -1, 2, -2, i64::MIN + 1] { + expected.extend(avro_long_bytes(v)); + } + let got = encode_all(&arr, &FieldPlan::Scalar, None); + assert_bytes_eq(&got, &expected); + } + + #[test] + fn fixed_encoder_plain() { + // Two values of width 4 + let data = [[0xDE, 0xAD, 0xBE, 0xEF], [0x00, 0x01, 0x02, 0x03]]; + let values: Vec<Vec<u8>> = data.iter().map(|x| x.to_vec()).collect(); + let arr = FixedSizeBinaryArray::try_from_iter(values.into_iter()).unwrap(); + let got = encode_all(&arr, &FieldPlan::Scalar, None); + let mut expected = Vec::new(); + expected.extend_from_slice(&data[0]); + expected.extend_from_slice(&data[1]); + assert_bytes_eq(&got, &expected); + } + + #[test] + fn uuid_encoder_test() { + // Happy path + let u = Uuid::parse_str("00112233-4455-6677-8899-aabbccddeeff").unwrap(); + let bytes = *u.as_bytes(); + let arr_ok = FixedSizeBinaryArray::try_from_iter(vec![bytes.to_vec()].into_iter()).unwrap(); + // Expected: length 36 (0x48) followed by hyphenated lowercase text + let mut expected = Vec::new(); + expected.push(0x48); + expected.extend_from_slice(u.hyphenated().to_string().as_bytes()); + let got = encode_all(&arr_ok, &FieldPlan::Uuid, None); + assert_bytes_eq(&got, &expected); + } + + #[test] + fn uuid_encoder_error() { + // Invalid UUID bytes: wrong length + let arr = + FixedSizeBinaryArray::try_new(10, arrow_buffer::Buffer::from(vec![0u8; 10]), None) + .unwrap(); + let plan = FieldPlan::Uuid; + + let field = Field::new("f", arr.data_type().clone(), true); + let mut enc = FieldEncoder::make_encoder(&arr, &field, &plan, None).unwrap(); + let mut out = Vec::new(); + let err = enc.encode(&mut out, 0).unwrap_err(); + match err { + ArrowError::InvalidArgumentError(msg) => { + assert!(msg.contains("Invalid UUID bytes")) + } + other => panic!("expected InvalidArgumentError, got {other:?}"), + } + } + + #[test] + fn list64_encoder_int32() { + // LargeList [[1,2,3], []] + let values = Int32Array::from(vec![1, 2, 3]); + let offsets: Vec<i64> = vec![0, 3, 3]; + let list = LargeListArray::new( + Field::new("item", DataType::Int32, true).into(), + arrow_buffer::OffsetBuffer::new(offsets.into()), + Arc::new(values) as ArrayRef, + None, + ); + let plan = FieldPlan::List { + items_nullability: None, + item_plan: Box::new(FieldPlan::Scalar), + }; + let got = encode_all(&list, &plan, None); + // Expected one block of 3 and then 0, then empty 0 + let mut expected = Vec::new(); + expected.extend(avro_long_bytes(3)); + expected.extend(avro_long_bytes(1)); + expected.extend(avro_long_bytes(2)); + expected.extend(avro_long_bytes(3)); + expected.extend(avro_long_bytes(0)); + expected.extend(avro_long_bytes(0)); + assert_bytes_eq(&got, &expected); + } + + #[test] + fn int_encoder_test() { + let ints = Int32Array::from(vec![0, -1, 2]); + let mut expected_i = Vec::new(); + for v in [0i32, -1, 2] { + expected_i.extend(avro_long_bytes(v as i64)); + } + let got_i = encode_all(&ints, &FieldPlan::Scalar, None); + assert_bytes_eq(&got_i, &expected_i); + } + + #[test] + fn boolean_encoder_test() { + let bools = BooleanArray::from(vec![true, false]); + let mut expected_b = Vec::new(); + expected_b.extend_from_slice(&[1]); + expected_b.extend_from_slice(&[0]); + let got_b = encode_all(&bools, &FieldPlan::Scalar, None); + assert_bytes_eq(&got_b, &expected_b); + } + + #[test] + fn duration_encoder_year_month_happy_path() { + let arr: PrimitiveArray<IntervalYearMonthType> = vec![0i32, 1i32, 25i32].into(); + let mut expected = Vec::new(); + for m in [0u32, 1u32, 25u32] { + expected.extend_from_slice(&duration_fixed12(m, 0, 0)); + } + let got = encode_all(&arr, &FieldPlan::Scalar, None); + assert_bytes_eq(&got, &expected); + } + + #[test] + fn duration_encoder_year_month_rejects_negative() { + let arr: PrimitiveArray<IntervalYearMonthType> = vec![-1i32].into(); + let field = Field::new("f", DataType::Interval(IntervalUnit::YearMonth), true); + let mut enc = FieldEncoder::make_encoder(&arr, &field, &FieldPlan::Scalar, None).unwrap(); + let mut out = Vec::new(); + let err = enc.encode(&mut out, 0).unwrap_err(); + match err { + ArrowError::InvalidArgumentError(msg) => { + assert!(msg.contains("cannot encode negative months")) + } + other => panic!("expected InvalidArgumentError, got {other:?}"), + } + } + + #[test] + fn duration_encoder_day_time_happy_path() { + let v0 = IntervalDayTimeType::make_value(2, 500); // days=2, millis=500 + let v1 = IntervalDayTimeType::make_value(0, 0); + let arr: PrimitiveArray<IntervalDayTimeType> = vec![v0, v1].into(); + let mut expected = Vec::new(); + expected.extend_from_slice(&duration_fixed12(0, 2, 500)); + expected.extend_from_slice(&duration_fixed12(0, 0, 0)); + let got = encode_all(&arr, &FieldPlan::Scalar, None); + assert_bytes_eq(&got, &expected); + } + + #[test] + fn duration_encoder_day_time_rejects_negative() { + let bad = IntervalDayTimeType::make_value(-1, 0); + let arr: PrimitiveArray<IntervalDayTimeType> = vec![bad].into(); + let field = Field::new("f", DataType::Interval(IntervalUnit::DayTime), true); + let mut enc = FieldEncoder::make_encoder(&arr, &field, &FieldPlan::Scalar, None).unwrap(); + let mut out = Vec::new(); + let err = enc.encode(&mut out, 0).unwrap_err(); + match err { + ArrowError::InvalidArgumentError(msg) => { + assert!(msg.contains("cannot encode negative days")) + } + other => panic!("expected InvalidArgumentError, got {other:?}"), + } + } + + #[test] + fn duration_encoder_month_day_nano_happy_path() { + let v0 = IntervalMonthDayNanoType::make_value(1, 2, 3_000_000); // -> millis = 3 + let v1 = IntervalMonthDayNanoType::make_value(0, 0, 0); + let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v0, v1].into(); + let mut expected = Vec::new(); + expected.extend_from_slice(&duration_fixed12(1, 2, 3)); + expected.extend_from_slice(&duration_fixed12(0, 0, 0)); + let got = encode_all(&arr, &FieldPlan::Scalar, None); + assert_bytes_eq(&got, &expected); + } + + #[test] + fn duration_encoder_month_day_nano_rejects_non_ms_multiple() { + let bad = IntervalMonthDayNanoType::make_value(0, 0, 1); + let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![bad].into(); + let field = Field::new("f", DataType::Interval(IntervalUnit::MonthDayNano), true); + let mut enc = FieldEncoder::make_encoder(&arr, &field, &FieldPlan::Scalar, None).unwrap(); + let mut out = Vec::new(); + let err = enc.encode(&mut out, 0).unwrap_err(); + match err { + ArrowError::InvalidArgumentError(msg) => { + assert!(msg.contains("requires whole milliseconds") || msg.contains("divisible")) + } + other => panic!("expected InvalidArgumentError, got {other:?}"), + } + } + + #[test] + fn minimal_twos_complement_test() { + let pos = [0x00, 0x00, 0x01]; + assert_eq!(minimal_twos_complement(&pos), &pos[2..]); + let neg = [0xFF, 0xFF, 0x80]; // negative minimal is 0x80 + assert_eq!(minimal_twos_complement(&neg), &neg[2..]); + let zero = [0x00, 0x00, 0x00]; + assert_eq!(minimal_twos_complement(&zero), &zero[2..]); + } + + #[test] + fn write_sign_extend_test() { + let mut out = Vec::new(); + write_sign_extended(&mut out, &[0x01], 4).unwrap(); + assert_eq!(out, vec![0x00, 0x00, 0x00, 0x01]); + out.clear(); + write_sign_extended(&mut out, &[0xFF], 4).unwrap(); + assert_eq!(out, vec![0xFF, 0xFF, 0xFF, 0xFF]); + out.clear(); + // truncation success (sign bytes only removed) + write_sign_extended(&mut out, &[0xFF, 0xFF, 0x80], 2).unwrap(); + assert_eq!(out, vec![0xFF, 0x80]); + out.clear(); + // truncation overflow + let err = write_sign_extended(&mut out, &[0x01, 0x00], 1).unwrap_err(); + match err { + ArrowError::InvalidArgumentError(_) => {} + _ => panic!("expected InvalidArgumentError"), + } + } + + #[test] + fn duration_month_day_nano_overflow_millis() { + // nanos leading to millis > u32::MAX + let nanos = ((u64::from(u32::MAX) + 1) * 1_000_000) as i64; + let v = IntervalMonthDayNanoType::make_value(0, 0, nanos); + let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v].into(); + let field = Field::new("f", DataType::Interval(IntervalUnit::MonthDayNano), true); + let mut enc = FieldEncoder::make_encoder(&arr, &field, &FieldPlan::Scalar, None).unwrap(); + let mut out = Vec::new(); + let err = enc.encode(&mut out, 0).unwrap_err(); + match err { + ArrowError::InvalidArgumentError(msg) => assert!(msg.contains("exceed u32::MAX")), + _ => panic!("expected InvalidArgumentError"), + } + } + + #[test] + fn fieldplan_decimal_precision_scale_mismatch_errors() { + // Avro expects (10,2), Arrow has (12,2) + use crate::codec::Codec; + use std::collections::HashMap; + let arrow_field = Field::new("d", DataType::Decimal128(12, 2), true); + let avro_dt = AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None); + let err = FieldPlan::build(&avro_dt, &arrow_field).unwrap_err(); + match err { + ArrowError::SchemaError(msg) => { + assert!(msg.contains("Decimal precision/scale mismatch")) + } + _ => panic!("expected SchemaError"), + } + } }