This is an automated email from the ASF dual-hosted git repository.

mbrobbel pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new ca07b064db Add projection with default values support to 
`RecordDecoder` (#8293)
ca07b064db is described below

commit ca07b064db5b242ae6f84c232f7b36a247cd930e
Author: Connor Sanders <con...@elastiflow.com>
AuthorDate: Tue Sep 16 03:00:00 2025 -0500

    Add projection with default values support to `RecordDecoder` (#8293)
    
    # Which issue does this PR close?
    
    This work continues arrow-avro schema resolution support and aligns
    behavior with the Avro spec.
    
    - **Related to**: #4886 (“Add Avro Support”): ongoing work to round out
    the reader/decoder, including schema resolution and type promotion.
    - **Follow-ups/Context**: #8292 (Add array/map/fixed schema resolution
    and default value support to arrow-avro codec), #8124 (schema resolution
    & type promotion for the decoder), #8223 (enum mapping for schema
    resolution). These previous efforts established the foundations that
    this PR extends to default values and additional resolvable types.
    
    # Rationale for this change
    
    Avro’s specification requires readers to materialize default values when
    a field exists in the **reader** schema but not in the **writer**
    schema, and to validate defaults (i.e., union defaults must match the
    first branch; bytes/fixed defaults must be JSON strings; enums may
    specify a default symbol for unknown writer symbols). Implementing this
    behavior makes `arrow-avro` more standards‑compliant and improves
    interoperability with evolving schemas.
    
    # What changes are included in this PR?
    
    **High‑level summary**
    
    * **Refactor `RecordDecoder`** around a simpler **`Projector`**‑style
    abstraction that consumes `ResolvedRecord` to: (a) skip writer‑only
    fields, and (b) materialize reader‑only defaulted fields, reducing
    branching in the hot path. (See commit subject and record decoder
    changes.)
    **Touched files (2):**
    
    * `arrow-avro/src/reader/record.rs` - refactor decoder to use
    precomputed mappings and defaults.
    * `arrow-avro/src/reader/mod.rs` - add comprehensive tests for defaults
    and error cases (see below).
    
    # Are these changes tested?
    
    Yes, new integration tests cover both the **happy path** and
    **validation errors**:
    * `test_schema_resolution_defaults_all_supported_types`: verifies that
    defaults for
    
boolean/int/long/float/double/bytes/string/date/time/timestamp/decimal/fixed/enum/duration/uuid/array/map/nested
    record and unions are materialized correctly for all rows.
    * `test_schema_resolution_default_enum_invalid_symbol_errors`: invalid
    enum default symbol is rejected.
    * `test_schema_resolution_default_fixed_size_mismatch_errors`:
    mismatched fixed/bytes default lengths are rejected.
    
    These tests assert the Avro‑spec behavior (i.e., union defaults must
    match the first branch; bytes/fixed defaults use JSON strings).
    
    # Are there any user-facing changes?
    
    N/A
---
 arrow-avro/src/reader/mod.rs    |  241 +++++++++
 arrow-avro/src/reader/record.rs | 1069 +++++++++++++++++++++++++++++++++------
 2 files changed, 1147 insertions(+), 163 deletions(-)

diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index 217366b633..bf72fc92c6 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -2085,6 +2085,245 @@ mod test {
         assert!(batch.column(0).as_any().is::<StringViewArray>());
     }
 
+    fn make_reader_schema_with_default_fields(
+        path: &str,
+        default_fields: Vec<Value>,
+    ) -> AvroSchema {
+        let mut root = load_writer_schema_json(path);
+        assert_eq!(root["type"], "record", "writer schema must be a record");
+        root.as_object_mut()
+            .expect("schema is a JSON object")
+            .insert("fields".to_string(), Value::Array(default_fields));
+        AvroSchema::new(root.to_string())
+    }
+
+    #[test]
+    fn test_schema_resolution_defaults_all_supported_types() {
+        let path = "test/data/skippable_types.avro";
+        let duration_default = "\u{0000}".repeat(12);
+        let reader_schema = make_reader_schema_with_default_fields(
+            path,
+            vec![
+                
serde_json::json!({"name":"d_bool","type":"boolean","default":true}),
+                serde_json::json!({"name":"d_int","type":"int","default":42}),
+                
serde_json::json!({"name":"d_long","type":"long","default":12345}),
+                
serde_json::json!({"name":"d_float","type":"float","default":1.5}),
+                
serde_json::json!({"name":"d_double","type":"double","default":2.25}),
+                
serde_json::json!({"name":"d_bytes","type":"bytes","default":"XYZ"}),
+                
serde_json::json!({"name":"d_string","type":"string","default":"hello"}),
+                
serde_json::json!({"name":"d_date","type":{"type":"int","logicalType":"date"},"default":0}),
+                
serde_json::json!({"name":"d_time_ms","type":{"type":"int","logicalType":"time-millis"},"default":1000}),
+                
serde_json::json!({"name":"d_time_us","type":{"type":"long","logicalType":"time-micros"},"default":2000}),
+                
serde_json::json!({"name":"d_ts_ms","type":{"type":"long","logicalType":"local-timestamp-millis"},"default":0}),
+                
serde_json::json!({"name":"d_ts_us","type":{"type":"long","logicalType":"local-timestamp-micros"},"default":0}),
+                
serde_json::json!({"name":"d_decimal","type":{"type":"bytes","logicalType":"decimal","precision":10,"scale":2},"default":""}),
+                
serde_json::json!({"name":"d_fixed","type":{"type":"fixed","name":"F4","size":4},"default":"ABCD"}),
+                
serde_json::json!({"name":"d_enum","type":{"type":"enum","name":"E","symbols":["A","B","C"]},"default":"A"}),
+                
serde_json::json!({"name":"d_duration","type":{"type":"fixed","name":"Dur","size":12,"logicalType":"duration"},"default":duration_default}),
+                
serde_json::json!({"name":"d_uuid","type":{"type":"string","logicalType":"uuid"},"default":"00000000-0000-0000-0000-000000000000"}),
+                
serde_json::json!({"name":"d_array","type":{"type":"array","items":"int"},"default":[1,2,3]}),
+                
serde_json::json!({"name":"d_map","type":{"type":"map","values":"long"},"default":{"a":1,"b":2}}),
+                serde_json::json!({"name":"d_record","type":{
+              "type":"record","name":"DefaultRec","fields":[
+                  {"name":"x","type":"int"},
+                  {"name":"y","type":["null","string"],"default":null}
+              ]
+        },"default":{"x":7}}),
+                
serde_json::json!({"name":"d_nullable_null","type":["null","int"],"default":null}),
+                
serde_json::json!({"name":"d_nullable_value","type":["int","null"],"default":123}),
+            ],
+        );
+        let actual = read_alltypes_with_reader_schema(path, reader_schema);
+        let num_rows = actual.num_rows();
+        assert!(num_rows > 0, "skippable_types.avro should contain rows");
+        assert_eq!(
+            actual.num_columns(),
+            22,
+            "expected exactly our defaulted fields"
+        );
+        let mut arrays: Vec<Arc<dyn Array>> = Vec::with_capacity(22);
+        arrays.push(Arc::new(BooleanArray::from_iter(std::iter::repeat_n(
+            Some(true),
+            num_rows,
+        ))));
+        arrays.push(Arc::new(Int32Array::from_iter_values(std::iter::repeat_n(
+            42, num_rows,
+        ))));
+        arrays.push(Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
+            12345, num_rows,
+        ))));
+        arrays.push(Arc::new(Float32Array::from_iter_values(
+            std::iter::repeat_n(1.5f32, num_rows),
+        )));
+        arrays.push(Arc::new(Float64Array::from_iter_values(
+            std::iter::repeat_n(2.25f64, num_rows),
+        )));
+        arrays.push(Arc::new(BinaryArray::from_iter_values(
+            std::iter::repeat_n(b"XYZ".as_ref(), num_rows),
+        )));
+        arrays.push(Arc::new(StringArray::from_iter_values(
+            std::iter::repeat_n("hello", num_rows),
+        )));
+        arrays.push(Arc::new(Date32Array::from_iter_values(
+            std::iter::repeat_n(0, num_rows),
+        )));
+        arrays.push(Arc::new(Time32MillisecondArray::from_iter_values(
+            std::iter::repeat_n(1_000, num_rows),
+        )));
+        arrays.push(Arc::new(Time64MicrosecondArray::from_iter_values(
+            std::iter::repeat_n(2_000i64, num_rows),
+        )));
+        arrays.push(Arc::new(TimestampMillisecondArray::from_iter_values(
+            std::iter::repeat_n(0i64, num_rows),
+        )));
+        arrays.push(Arc::new(TimestampMicrosecondArray::from_iter_values(
+            std::iter::repeat_n(0i64, num_rows),
+        )));
+        #[cfg(feature = "small_decimals")]
+        let decimal = 
Decimal64Array::from_iter_values(std::iter::repeat_n(0i64, num_rows))
+            .with_precision_and_scale(10, 2)
+            .unwrap();
+        #[cfg(not(feature = "small_decimals"))]
+        let decimal = 
Decimal128Array::from_iter_values(std::iter::repeat_n(0i128, num_rows))
+            .with_precision_and_scale(10, 2)
+            .unwrap();
+        arrays.push(Arc::new(decimal));
+        let fixed_iter = std::iter::repeat_n(Some(*b"ABCD"), num_rows);
+        arrays.push(Arc::new(
+            FixedSizeBinaryArray::try_from_sparse_iter_with_size(fixed_iter, 
4).unwrap(),
+        ));
+        let enum_keys = Int32Array::from_iter_values(std::iter::repeat_n(0, 
num_rows));
+        let enum_values = StringArray::from_iter_values(["A", "B", "C"]);
+        let enum_arr =
+            DictionaryArray::<Int32Type>::try_new(enum_keys, 
Arc::new(enum_values)).unwrap();
+        arrays.push(Arc::new(enum_arr));
+        let duration_values = std::iter::repeat_n(
+            Some(IntervalMonthDayNanoType::make_value(0, 0, 0)),
+            num_rows,
+        );
+        let duration_arr: IntervalMonthDayNanoArray = 
duration_values.collect();
+        arrays.push(Arc::new(duration_arr));
+        let uuid_bytes = [0u8; 16];
+        let uuid_iter = std::iter::repeat_n(Some(uuid_bytes), num_rows);
+        arrays.push(Arc::new(
+            FixedSizeBinaryArray::try_from_sparse_iter_with_size(uuid_iter, 
16).unwrap(),
+        ));
+        let item_field = Arc::new(Field::new(
+            Field::LIST_FIELD_DEFAULT_NAME,
+            DataType::Int32,
+            false,
+        ));
+        let mut list_builder = 
ListBuilder::new(Int32Builder::new()).with_field(item_field);
+        for _ in 0..num_rows {
+            list_builder.values().append_value(1);
+            list_builder.values().append_value(2);
+            list_builder.values().append_value(3);
+            list_builder.append(true);
+        }
+        arrays.push(Arc::new(list_builder.finish()));
+        let values_field = Arc::new(Field::new("value", DataType::Int64, 
false));
+        let mut map_builder = MapBuilder::new(
+            Some(builder::MapFieldNames {
+                entry: "entries".to_string(),
+                key: "key".to_string(),
+                value: "value".to_string(),
+            }),
+            StringBuilder::new(),
+            Int64Builder::new(),
+        )
+        .with_values_field(values_field);
+        for _ in 0..num_rows {
+            let (keys, vals) = map_builder.entries();
+            keys.append_value("a");
+            vals.append_value(1);
+            keys.append_value("b");
+            vals.append_value(2);
+            map_builder.append(true).unwrap();
+        }
+        arrays.push(Arc::new(map_builder.finish()));
+        let rec_fields: Fields = Fields::from(vec![
+            Field::new("x", DataType::Int32, false),
+            Field::new("y", DataType::Utf8, true),
+        ]);
+        let mut sb = StructBuilder::new(
+            rec_fields.clone(),
+            vec![
+                Box::new(Int32Builder::new()),
+                Box::new(StringBuilder::new()),
+            ],
+        );
+        for _ in 0..num_rows {
+            sb.field_builder::<Int32Builder>(0).unwrap().append_value(7);
+            sb.field_builder::<StringBuilder>(1).unwrap().append_null();
+            sb.append(true);
+        }
+        arrays.push(Arc::new(sb.finish()));
+        arrays.push(Arc::new(Int32Array::from_iter(std::iter::repeat_n(
+            None::<i32>,
+            num_rows,
+        ))));
+        arrays.push(Arc::new(Int32Array::from_iter_values(std::iter::repeat_n(
+            123, num_rows,
+        ))));
+        let expected = RecordBatch::try_new(actual.schema(), arrays).unwrap();
+        assert_eq!(
+            actual, expected,
+            "defaults should materialize correctly for all fields"
+        );
+    }
+
+    #[test]
+    fn test_schema_resolution_default_enum_invalid_symbol_errors() {
+        let path = "test/data/skippable_types.avro";
+        let bad_schema = make_reader_schema_with_default_fields(
+            path,
+            vec![serde_json::json!({
+                "name":"bad_enum",
+                "type":{"type":"enum","name":"E","symbols":["A","B","C"]},
+                "default":"Z"
+            })],
+        );
+        let file = File::open(path).unwrap();
+        let res = ReaderBuilder::new()
+            .with_reader_schema(bad_schema)
+            .build(BufReader::new(file));
+        let err = res.expect_err("expected enum default validation to fail");
+        let msg = err.to_string();
+        let lower_msg = msg.to_lowercase();
+        assert!(
+            lower_msg.contains("enum")
+                && (lower_msg.contains("symbol") || 
lower_msg.contains("default")),
+            "unexpected error: {msg}"
+        );
+    }
+
+    #[test]
+    fn test_schema_resolution_default_fixed_size_mismatch_errors() {
+        let path = "test/data/skippable_types.avro";
+        let bad_schema = make_reader_schema_with_default_fields(
+            path,
+            vec![serde_json::json!({
+                "name":"bad_fixed",
+                "type":{"type":"fixed","name":"F","size":4},
+                "default":"ABC"
+            })],
+        );
+        let file = File::open(path).unwrap();
+        let res = ReaderBuilder::new()
+            .with_reader_schema(bad_schema)
+            .build(BufReader::new(file));
+        let err = res.expect_err("expected fixed default validation to fail");
+        let msg = err.to_string();
+        let lower_msg = msg.to_lowercase();
+        assert!(
+            lower_msg.contains("fixed")
+                && (lower_msg.contains("size")
+                    || lower_msg.contains("length")
+                    || lower_msg.contains("does not match")),
+            "unexpected error: {msg}"
+        );
+    }
+
     #[test]
     fn test_alltypes_skip_writer_fields_keep_double_only() {
         let file = arrow_test_data("avro/alltypes_plain.avro");
@@ -2538,6 +2777,7 @@ mod test {
             let values_i128: Vec<i128> = (1..=24).map(|n| (n as i128) * 
pow10).collect();
             let build_expected = |dt: &DataType, values: &[i128]| -> ArrayRef {
                 match *dt {
+                    #[cfg(feature = "small_decimals")]
                     DataType::Decimal32(p, s) => {
                         let it = values.iter().map(|&v| v as i32);
                         Arc::new(
@@ -2546,6 +2786,7 @@ mod test {
                                 .unwrap(),
                         )
                     }
+                    #[cfg(feature = "small_decimals")]
                     DataType::Decimal64(p, s) => {
                         let it = values.iter().map(|&v| v as i64);
                         Arc::new(
diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs
index 48eb601024..9ca8acb45b 100644
--- a/arrow-avro/src/reader/record.rs
+++ b/arrow-avro/src/reader/record.rs
@@ -15,27 +15,27 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::codec::{AvroDataType, Codec, Promotion, ResolutionInfo};
+use crate::codec::{
+    AvroDataType, AvroField, AvroLiteral, Codec, Promotion, ResolutionInfo, 
ResolvedRecord,
+};
 use crate::reader::block::{Block, BlockDecoder};
 use crate::reader::cursor::AvroCursor;
-use crate::reader::header::Header;
-use crate::schema::*;
+use crate::schema::Nullability;
 use arrow_array::builder::{
-    ArrayBuilder, Decimal128Builder, Decimal256Builder, Decimal32Builder, 
Decimal64Builder,
-    IntervalMonthDayNanoBuilder, PrimitiveBuilder,
+    Decimal128Builder, Decimal256Builder, IntervalMonthDayNanoBuilder, 
StringViewBuilder,
 };
+#[cfg(feature = "small_decimals")]
+use arrow_array::builder::{Decimal32Builder, Decimal64Builder};
 use arrow_array::types::*;
 use arrow_array::*;
 use arrow_buffer::*;
 use arrow_schema::{
-    ArrowError, DataType, Field as ArrowField, FieldRef, Fields, IntervalUnit,
-    Schema as ArrowSchema, SchemaRef, DECIMAL128_MAX_PRECISION, 
DECIMAL256_MAX_PRECISION,
+    ArrowError, DataType, Field as ArrowField, FieldRef, Fields, Schema as 
ArrowSchema, SchemaRef,
+    DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION,
 };
 #[cfg(feature = "small_decimals")]
 use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
 use std::cmp::Ordering;
-use std::collections::HashMap;
-use std::io::Read;
 use std::sync::Arc;
 use uuid::Uuid;
 
@@ -60,6 +60,29 @@ macro_rules! flush_decimal {
     }};
 }
 
+/// Macro to append a default decimal value from two's-complement big-endian 
bytes
+/// into the corresponding decimal builder, with compile-time constructed 
error text.
+macro_rules! append_decimal_default {
+    ($lit:expr, $builder:expr, $N:literal, $Int:ty, $name:literal) => {{
+        match $lit {
+            AvroLiteral::Bytes(b) => {
+                let ext = sign_cast_to::<$N>(b)?;
+                let val = <$Int>::from_be_bytes(ext);
+                $builder.append_value(val);
+                Ok(())
+            }
+            _ => Err(ArrowError::InvalidArgumentError(
+                concat!(
+                    "Default for ",
+                    $name,
+                    " must be bytes (two's-complement big-endian)"
+                )
+                .to_string(),
+            )),
+        }
+    }};
+}
+
 #[derive(Debug)]
 pub(crate) struct RecordDecoderBuilder<'a> {
     data_type: &'a AvroDataType,
@@ -91,15 +114,7 @@ pub(crate) struct RecordDecoder {
     schema: SchemaRef,
     fields: Vec<Decoder>,
     use_utf8view: bool,
-    resolved: Option<ResolvedRuntime>,
-}
-
-#[derive(Debug)]
-struct ResolvedRuntime {
-    /// writer field index -> reader field index (or None if writer-only)
-    writer_to_reader: Arc<[Option<usize>]>,
-    /// per-writer-field skipper (Some only when writer-only)
-    skip_decoders: Vec<Option<Skipper>>,
+    projector: Option<Projector>,
 }
 
 impl RecordDecoder {
@@ -138,14 +153,9 @@ impl RecordDecoder {
                     arrow_fields.push(avro_field.field());
                     encodings.push(Decoder::try_new(avro_field.data_type())?);
                 }
-                // If this record carries resolution metadata, prepare 
top-level runtime helpers
-                let resolved = match data_type.resolution.as_ref() {
+                let projector = match data_type.resolution.as_ref() {
                     Some(ResolutionInfo::Record(rec)) => {
-                        let skip_decoders = 
build_skip_decoders(&rec.skip_fields)?;
-                        Some(ResolvedRuntime {
-                            writer_to_reader: rec.writer_to_reader.clone(),
-                            skip_decoders,
-                        })
+                        Some(ProjectorBuilder::try_new(rec, 
reader_fields).build()?)
                     }
                     _ => None,
                 };
@@ -153,7 +163,7 @@ impl RecordDecoder {
                     schema: Arc::new(ArrowSchema::new(arrow_fields)),
                     fields: encodings,
                     use_utf8view,
-                    resolved,
+                    projector,
                 })
             }
             other => Err(ArrowError::ParseError(format!(
@@ -170,17 +180,10 @@ impl RecordDecoder {
     /// Decode `count` records from `buf`
     pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result<usize, 
ArrowError> {
         let mut cursor = AvroCursor::new(buf);
-        match self.resolved.as_mut() {
-            Some(runtime) => {
-                // Top-level resolved record: read writer fields in writer 
order,
-                // project into reader fields, and skip writer-only fields
+        match self.projector.as_mut() {
+            Some(proj) => {
                 for _ in 0..count {
-                    decode_with_resolution(
-                        &mut cursor,
-                        &mut self.fields,
-                        &runtime.writer_to_reader,
-                        &mut runtime.skip_decoders,
-                    )?;
+                    proj.project_record(&mut cursor, &mut self.fields)?;
                 }
             }
             None => {
@@ -205,24 +208,10 @@ impl RecordDecoder {
     }
 }
 
-fn decode_with_resolution(
-    buf: &mut AvroCursor<'_>,
-    encodings: &mut [Decoder],
-    writer_to_reader: &[Option<usize>],
-    skippers: &mut [Option<Skipper>],
-) -> Result<(), ArrowError> {
-    for (w_idx, (target, skipper_opt)) in 
writer_to_reader.iter().zip(skippers).enumerate() {
-        match (*target, skipper_opt.as_mut()) {
-            (Some(r_idx), _) => encodings[r_idx].decode(buf)?,
-            (None, Some(sk)) => sk.skip(buf)?,
-            (None, None) => {
-                return Err(ArrowError::SchemaError(format!(
-                    "No skipper available for writer-only field at index 
{w_idx}",
-                )));
-            }
-        }
-    }
-    Ok(())
+#[derive(Debug)]
+struct EnumResolution {
+    mapping: Arc<[i32]>,
+    default_index: i32,
 }
 
 #[derive(Debug)]
@@ -252,7 +241,7 @@ enum Decoder {
     /// String data encoded as UTF-8 bytes, but mapped to Arrow's 
StringViewArray
     StringView(OffsetBufferBuilder<i32>, Vec<u8>),
     Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
-    Record(Fields, Vec<Decoder>),
+    Record(Fields, Vec<Decoder>, Option<Projector>),
     Map(
         FieldRef,
         OffsetBufferBuilder<i32>,
@@ -261,27 +250,16 @@ enum Decoder {
         Box<Decoder>,
     ),
     Fixed(i32, Vec<u8>),
-    Enum(Vec<i32>, Arc<[String]>),
+    Enum(Vec<i32>, Arc<[String]>, Option<EnumResolution>),
     Duration(IntervalMonthDayNanoBuilder),
     Uuid(Vec<u8>),
+    #[cfg(feature = "small_decimals")]
     Decimal32(usize, Option<usize>, Option<usize>, Decimal32Builder),
+    #[cfg(feature = "small_decimals")]
     Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
     Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
     Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
     Nullable(Nullability, NullBufferBuilder, Box<Decoder>),
-    EnumResolved {
-        indices: Vec<i32>,
-        symbols: Arc<[String]>,
-        mapping: Arc<[i32]>,
-        default_index: i32,
-    },
-    /// Resolved record that needs writer->reader projection and skipping 
writer-only fields
-    RecordResolved {
-        fields: Fields,
-        encodings: Vec<Decoder>,
-        writer_to_reader: Arc<[Option<usize>]>,
-        skip_decoders: Vec<Option<Skipper>>,
-    },
 }
 
 impl Decoder {
@@ -403,16 +381,14 @@ impl Decoder {
                 )
             }
             (Codec::Enum(symbols), _) => {
-                if let Some(ResolutionInfo::EnumMapping(mapping)) = 
data_type.resolution.as_ref() {
-                    Self::EnumResolved {
-                        indices: Vec::with_capacity(DEFAULT_CAPACITY),
-                        symbols: symbols.clone(),
+                let res = match data_type.resolution.as_ref() {
+                    Some(ResolutionInfo::EnumMapping(mapping)) => 
Some(EnumResolution {
                         mapping: mapping.mapping.clone(),
                         default_index: mapping.default_index,
-                    }
-                } else {
-                    Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY), 
symbols.clone())
-                }
+                    }),
+                    _ => None,
+                };
+                Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY), 
symbols.clone(), res)
             }
             (Codec::Struct(fields), _) => {
                 let mut arrow_fields = Vec::with_capacity(fields.len());
@@ -422,17 +398,13 @@ impl Decoder {
                     arrow_fields.push(avro_field.field());
                     encodings.push(encoding);
                 }
-                if let Some(ResolutionInfo::Record(rec)) = 
data_type.resolution.as_ref() {
-                    let skip_decoders = build_skip_decoders(&rec.skip_fields)?;
-                    Self::RecordResolved {
-                        fields: arrow_fields.into(),
-                        encodings,
-                        writer_to_reader: rec.writer_to_reader.clone(),
-                        skip_decoders,
-                    }
-                } else {
-                    Self::Record(arrow_fields.into(), encodings)
-                }
+                let projector =
+                    if let Some(ResolutionInfo::Record(rec)) = 
data_type.resolution.as_ref() {
+                        Some(ProjectorBuilder::try_new(rec, fields).build()?)
+                    } else {
+                        None
+                    };
+                Self::Record(arrow_fields.into(), encodings, projector)
             }
             (Codec::Map(child), _) => {
                 let val_field = child.field_with_name("value");
@@ -494,27 +466,263 @@ impl Decoder {
             Self::Array(_, offsets, e) => {
                 offsets.push_length(0);
             }
-            Self::Record(_, e) => e.iter_mut().for_each(|e| e.append_null()),
+            Self::Record(_, e, _) => e.iter_mut().for_each(|e| 
e.append_null()),
             Self::Map(_, _koff, moff, _, _) => {
                 moff.push_length(0);
             }
             Self::Fixed(sz, accum) => {
                 accum.extend(std::iter::repeat_n(0u8, *sz as usize));
             }
+            #[cfg(feature = "small_decimals")]
             Self::Decimal32(_, _, _, builder) => builder.append_value(0),
+            #[cfg(feature = "small_decimals")]
             Self::Decimal64(_, _, _, builder) => builder.append_value(0),
             Self::Decimal128(_, _, _, builder) => builder.append_value(0),
             Self::Decimal256(_, _, _, builder) => 
builder.append_value(i256::ZERO),
-            Self::Enum(indices, _) => indices.push(0),
-            Self::EnumResolved { indices, .. } => indices.push(0),
+            Self::Enum(indices, _, _) => indices.push(0),
             Self::Duration(builder) => builder.append_null(),
             Self::Nullable(_, null_buffer, inner) => {
                 null_buffer.append(false);
                 inner.append_null();
             }
-            Self::RecordResolved { encodings, .. } => {
-                encodings.iter_mut().for_each(|e| e.append_null());
+        }
+    }
+
+    /// Append a single default literal into the decoder's buffers
+    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
+        match self {
+            Self::Nullable(_, nb, inner) => {
+                if matches!(lit, AvroLiteral::Null) {
+                    nb.append(false);
+                    inner.append_null();
+                    Ok(())
+                } else {
+                    nb.append(true);
+                    inner.append_default(lit)
+                }
+            }
+            Self::Null(count) => match lit {
+                AvroLiteral::Null => {
+                    *count += 1;
+                    Ok(())
+                }
+                _ => Err(ArrowError::InvalidArgumentError(
+                    "Non-null default for null type".to_string(),
+                )),
+            },
+            Self::Boolean(b) => match lit {
+                AvroLiteral::Boolean(v) => {
+                    b.append(*v);
+                    Ok(())
+                }
+                _ => Err(ArrowError::InvalidArgumentError(
+                    "Default for boolean must be boolean".to_string(),
+                )),
+            },
+            Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => match 
lit {
+                AvroLiteral::Int(i) => {
+                    v.push(*i);
+                    Ok(())
+                }
+                _ => Err(ArrowError::InvalidArgumentError(
+                    "Default for int32/date32/time-millis must be 
int".to_string(),
+                )),
+            },
+            Self::Int64(v)
+            | Self::Int32ToInt64(v)
+            | Self::TimeMicros(v)
+            | Self::TimestampMillis(_, v)
+            | Self::TimestampMicros(_, v) => match lit {
+                AvroLiteral::Long(i) => {
+                    v.push(*i);
+                    Ok(())
+                }
+                AvroLiteral::Int(i) => {
+                    v.push(*i as i64);
+                    Ok(())
+                }
+                _ => Err(ArrowError::InvalidArgumentError(
+                    "Default for long/time-micros/timestamp must be long or 
int".to_string(),
+                )),
+            },
+            Self::Float32(v) | Self::Int32ToFloat32(v) | 
Self::Int64ToFloat32(v) => match lit {
+                AvroLiteral::Float(f) => {
+                    v.push(*f);
+                    Ok(())
+                }
+                _ => Err(ArrowError::InvalidArgumentError(
+                    "Default for float must be float".to_string(),
+                )),
+            },
+            Self::Float64(v)
+            | Self::Int32ToFloat64(v)
+            | Self::Int64ToFloat64(v)
+            | Self::Float32ToFloat64(v) => match lit {
+                AvroLiteral::Double(f) => {
+                    v.push(*f);
+                    Ok(())
+                }
+                _ => Err(ArrowError::InvalidArgumentError(
+                    "Default for double must be double".to_string(),
+                )),
+            },
+            Self::Binary(offsets, values) | Self::StringToBytes(offsets, 
values) => match lit {
+                AvroLiteral::Bytes(b) => {
+                    offsets.push_length(b.len());
+                    values.extend_from_slice(b);
+                    Ok(())
+                }
+                _ => Err(ArrowError::InvalidArgumentError(
+                    "Default for bytes must be bytes".to_string(),
+                )),
+            },
+            Self::BytesToString(offsets, values)
+            | Self::String(offsets, values)
+            | Self::StringView(offsets, values) => match lit {
+                AvroLiteral::String(s) => {
+                    let b = s.as_bytes();
+                    offsets.push_length(b.len());
+                    values.extend_from_slice(b);
+                    Ok(())
+                }
+                _ => Err(ArrowError::InvalidArgumentError(
+                    "Default for string must be string".to_string(),
+                )),
+            },
+            Self::Uuid(values) => match lit {
+                AvroLiteral::String(s) => {
+                    let uuid = Uuid::try_parse(s).map_err(|e| {
+                        ArrowError::InvalidArgumentError(format!("Invalid UUID 
default: {s} ({e})"))
+                    })?;
+                    values.extend_from_slice(uuid.as_bytes());
+                    Ok(())
+                }
+                _ => Err(ArrowError::InvalidArgumentError(
+                    "Default for uuid must be string".to_string(),
+                )),
+            },
+            Self::Fixed(sz, accum) => match lit {
+                AvroLiteral::Bytes(b) => {
+                    if b.len() != *sz as usize {
+                        return Err(ArrowError::InvalidArgumentError(format!(
+                            "Fixed default length {} does not match size {sz}",
+                            b.len(),
+                        )));
+                    }
+                    accum.extend_from_slice(b);
+                    Ok(())
+                }
+                _ => Err(ArrowError::InvalidArgumentError(
+                    "Default for fixed must be bytes".to_string(),
+                )),
+            },
+            #[cfg(feature = "small_decimals")]
+            Self::Decimal32(_, _, _, builder) => {
+                append_decimal_default!(lit, builder, 4, i32, "decimal32")
+            }
+            #[cfg(feature = "small_decimals")]
+            Self::Decimal64(_, _, _, builder) => {
+                append_decimal_default!(lit, builder, 8, i64, "decimal64")
+            }
+            Self::Decimal128(_, _, _, builder) => {
+                append_decimal_default!(lit, builder, 16, i128, "decimal128")
+            }
+            Self::Decimal256(_, _, _, builder) => {
+                append_decimal_default!(lit, builder, 32, i256, "decimal256")
             }
+            Self::Duration(builder) => match lit {
+                AvroLiteral::Bytes(b) => {
+                    if b.len() != 12 {
+                        return Err(ArrowError::InvalidArgumentError(format!(
+                            "Duration default must be exactly 12 bytes, got 
{}",
+                            b.len()
+                        )));
+                    }
+                    let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
+                    let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
+                    let millis = u32::from_le_bytes([b[8], b[9], b[10], 
b[11]]);
+                    let nanos = (millis as i64) * 1_000_000;
+                    builder.append_value(IntervalMonthDayNano::new(
+                        months as i32,
+                        days as i32,
+                        nanos,
+                    ));
+                    Ok(())
+                }
+                _ => Err(ArrowError::InvalidArgumentError(
+                    "Default for duration must be 12-byte little-endian 
months/days/millis"
+                        .to_string(),
+                )),
+            },
+            Self::Array(_, offsets, inner) => match lit {
+                AvroLiteral::Array(items) => {
+                    offsets.push_length(items.len());
+                    for item in items {
+                        inner.append_default(item)?;
+                    }
+                    Ok(())
+                }
+                _ => Err(ArrowError::InvalidArgumentError(
+                    "Default for array must be an array literal".to_string(),
+                )),
+            },
+            Self::Map(_, koff, moff, kdata, valdec) => match lit {
+                AvroLiteral::Map(entries) => {
+                    moff.push_length(entries.len());
+                    for (k, v) in entries {
+                        let kb = k.as_bytes();
+                        koff.push_length(kb.len());
+                        kdata.extend_from_slice(kb);
+                        valdec.append_default(v)?;
+                    }
+                    Ok(())
+                }
+                _ => Err(ArrowError::InvalidArgumentError(
+                    "Default for map must be a map/object literal".to_string(),
+                )),
+            },
+            Self::Enum(indices, symbols, _) => match lit {
+                AvroLiteral::Enum(sym) => {
+                    let pos = symbols.iter().position(|s| s == 
sym).ok_or_else(|| {
+                        ArrowError::InvalidArgumentError(format!(
+                            "Enum default symbol {sym:?} not in reader symbols"
+                        ))
+                    })?;
+                    indices.push(pos as i32);
+                    Ok(())
+                }
+                _ => Err(ArrowError::InvalidArgumentError(
+                    "Default for enum must be a symbol".to_string(),
+                )),
+            },
+            Self::Record(field_meta, decoders, projector) => match lit {
+                AvroLiteral::Map(entries) => {
+                    for (i, dec) in decoders.iter_mut().enumerate() {
+                        let name = field_meta[i].name();
+                        if let Some(sub) = entries.get(name) {
+                            dec.append_default(sub)?;
+                        } else if let Some(proj) = projector.as_ref() {
+                            proj.project_default(dec, i)?;
+                        } else {
+                            dec.append_null();
+                        }
+                    }
+                    Ok(())
+                }
+                AvroLiteral::Null => {
+                    for (i, dec) in decoders.iter_mut().enumerate() {
+                        if let Some(proj) = projector.as_ref() {
+                            proj.project_default(dec, i)?;
+                        } else {
+                            dec.append_null();
+                        }
+                    }
+                    Ok(())
+                }
+                _ => Err(ArrowError::InvalidArgumentError(
+                    "Default for record must be a map/object or 
null".to_string(),
+                )),
+            },
         }
     }
 
@@ -560,11 +768,14 @@ impl Decoder {
                 let total_items = read_blocks(buf, |cursor| 
encoding.decode(cursor))?;
                 off.push_length(total_items);
             }
-            Self::Record(_, encodings) => {
+            Self::Record(_, encodings, None) => {
                 for encoding in encodings {
                     encoding.decode(buf)?;
                 }
             }
+            Self::Record(_, encodings, Some(proj)) => {
+                proj.project_record(buf, encodings)?;
+            }
             Self::Map(_, koff, moff, kdata, valdec) => {
                 let newly_added = read_blocks(buf, |cur| {
                     let kb = cur.get_bytes()?;
@@ -578,9 +789,11 @@ impl Decoder {
                 let fx = buf.get_fixed(*sz as usize)?;
                 accum.extend_from_slice(fx);
             }
+            #[cfg(feature = "small_decimals")]
             Self::Decimal32(_, _, size, builder) => {
                 decode_decimal!(size, buf, builder, 4, i32);
             }
+            #[cfg(feature = "small_decimals")]
             Self::Decimal64(_, _, size, builder) => {
                 decode_decimal!(size, buf, builder, 8, i64);
             }
@@ -590,21 +803,16 @@ impl Decoder {
             Self::Decimal256(_, _, size, builder) => {
                 decode_decimal!(size, buf, builder, 32, i256);
             }
-            Self::Enum(indices, _) => {
+            Self::Enum(indices, _, None) => {
                 indices.push(buf.get_int()?);
             }
-            Self::EnumResolved {
-                indices,
-                mapping,
-                default_index,
-                ..
-            } => {
+            Self::Enum(indices, _, Some(res)) => {
                 let raw = buf.get_int()?;
                 let resolved = usize::try_from(raw)
                     .ok()
-                    .and_then(|idx| mapping.get(idx).copied())
+                    .and_then(|idx| res.mapping.get(idx).copied())
                     .filter(|&idx| idx >= 0)
-                    .unwrap_or(*default_index);
+                    .unwrap_or(res.default_index);
                 if resolved >= 0 {
                     indices.push(resolved);
                 } else {
@@ -635,14 +843,6 @@ impl Decoder {
                 }
                 nb.append(is_not_null);
             }
-            Self::RecordResolved {
-                encodings,
-                writer_to_reader,
-                skip_decoders,
-                ..
-            } => {
-                decode_with_resolution(buf, encodings, writer_to_reader, 
skip_decoders)?;
-            }
         }
         Ok(())
     }
@@ -711,7 +911,7 @@ impl Decoder {
                 let offsets = flush_offsets(offsets);
                 Arc::new(ListArray::new(field.clone(), offsets, values, nulls))
             }
-            Self::Record(fields, encodings) => {
+            Self::Record(fields, encodings, _) => {
                 let arrays = encodings
                     .iter_mut()
                     .map(|x| x.flush(None))
@@ -764,9 +964,11 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(arr)
             }
+            #[cfg(feature = "small_decimals")]
             Self::Decimal32(precision, scale, _, builder) => {
                 flush_decimal!(builder, precision, scale, nulls, 
Decimal32Array)
             }
+            #[cfg(feature = "small_decimals")]
             Self::Decimal64(precision, scale, _, builder) => {
                 flush_decimal!(builder, precision, scale, nulls, 
Decimal64Array)
             }
@@ -776,25 +978,13 @@ impl Decoder {
             Self::Decimal256(precision, scale, _, builder) => {
                 flush_decimal!(builder, precision, scale, nulls, 
Decimal256Array)
             }
-            Self::Enum(indices, symbols) => flush_dict(indices, symbols, 
nulls)?,
-            Self::EnumResolved {
-                indices, symbols, ..
-            } => flush_dict(indices, symbols, nulls)?,
+            Self::Enum(indices, symbols, _) => flush_dict(indices, symbols, 
nulls)?,
             Self::Duration(builder) => {
                 let (_, vals, _) = builder.finish().into_parts();
                 let vals = IntervalMonthDayNanoArray::try_new(vals, nulls)
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
-            Self::RecordResolved {
-                fields, encodings, ..
-            } => {
-                let arrays = encodings
-                    .iter_mut()
-                    .map(|x| x.flush(None))
-                    .collect::<Result<Vec<_>, _>>()?;
-                Arc::new(StructArray::new(fields.clone(), arrays, nulls))
-            }
         })
     }
 }
@@ -976,6 +1166,120 @@ fn sign_cast_to<const N: usize>(raw: &[u8]) -> 
Result<[u8; N], ArrowError> {
     Ok(out)
 }
 
+#[derive(Debug)]
+struct Projector {
+    writer_to_reader: Arc<[Option<usize>]>,
+    skip_decoders: Vec<Option<Skipper>>,
+    field_defaults: Vec<Option<AvroLiteral>>,
+    default_injections: Arc<[(usize, AvroLiteral)]>,
+}
+
+#[derive(Debug)]
+struct ProjectorBuilder<'a> {
+    rec: &'a ResolvedRecord,
+    reader_fields: Arc<[AvroField]>,
+}
+
+impl<'a> ProjectorBuilder<'a> {
+    #[inline]
+    fn try_new(rec: &'a ResolvedRecord, reader_fields: &Arc<[AvroField]>) -> 
Self {
+        Self {
+            rec,
+            reader_fields: reader_fields.clone(),
+        }
+    }
+
+    #[inline]
+    fn build(self) -> Result<Projector, ArrowError> {
+        let reader_fields = self.reader_fields;
+        let mut field_defaults: Vec<Option<AvroLiteral>> = 
Vec::with_capacity(reader_fields.len());
+        for avro_field in reader_fields.as_ref() {
+            if let Some(ResolutionInfo::DefaultValue(lit)) =
+                avro_field.data_type().resolution.as_ref()
+            {
+                field_defaults.push(Some(lit.clone()));
+            } else {
+                field_defaults.push(None);
+            }
+        }
+        let mut default_injections: Vec<(usize, AvroLiteral)> =
+            Vec::with_capacity(self.rec.default_fields.len());
+        for &idx in self.rec.default_fields.as_ref() {
+            let lit = field_defaults
+                .get(idx)
+                .and_then(|lit| lit.clone())
+                .unwrap_or(AvroLiteral::Null);
+            default_injections.push((idx, lit));
+        }
+        let mut skip_decoders: Vec<Option<Skipper>> =
+            Vec::with_capacity(self.rec.skip_fields.len());
+        for datatype in self.rec.skip_fields.as_ref() {
+            let skipper = match datatype {
+                Some(datatype) => Some(Skipper::from_avro(datatype)?),
+                None => None,
+            };
+            skip_decoders.push(skipper);
+        }
+        Ok(Projector {
+            writer_to_reader: self.rec.writer_to_reader.clone(),
+            skip_decoders,
+            field_defaults,
+            default_injections: default_injections.into(),
+        })
+    }
+}
+
+impl Projector {
+    #[inline]
+    fn project_default(&self, decoder: &mut Decoder, index: usize) -> 
Result<(), ArrowError> {
+        // SAFETY: `index` is obtained by listing the reader's record fields 
(i.e., from
+        // `decoders.iter_mut().enumerate()`), and `field_defaults` was built 
in
+        // `ProjectorBuilder::build` to have exactly one element per reader 
field.
+        // Therefore, `index < self.field_defaults.len()` always holds here, so
+        // `self.field_defaults[index]` cannot panic. We only take an 
immutable reference
+        // via `.as_ref()`, and `self` is borrowed immutably.
+        if let Some(default_literal) = self.field_defaults[index].as_ref() {
+            decoder.append_default(default_literal)
+        } else {
+            decoder.append_null();
+            Ok(())
+        }
+    }
+
+    #[inline]
+    fn project_record(
+        &mut self,
+        buf: &mut AvroCursor<'_>,
+        encodings: &mut [Decoder],
+    ) -> Result<(), ArrowError> {
+        debug_assert_eq!(
+            self.writer_to_reader.len(),
+            self.skip_decoders.len(),
+            "internal invariant: mapping and skipper lists must have equal 
length"
+        );
+        for (i, (mapping, skipper_opt)) in self
+            .writer_to_reader
+            .iter()
+            .zip(self.skip_decoders.iter_mut())
+            .enumerate()
+        {
+            match (mapping, skipper_opt.as_mut()) {
+                (Some(reader_index), _) => 
encodings[*reader_index].decode(buf)?,
+                (None, Some(skipper)) => skipper.skip(buf)?,
+                (None, None) => {
+                    return Err(ArrowError::SchemaError(format!(
+                        "No skipper available for writer-only field at index 
{i}",
+                    )));
+                }
+            }
+        }
+        for (reader_index, lit) in self.default_injections.as_ref() {
+            encodings[*reader_index].append_default(lit)?;
+        }
+        Ok(())
+    }
+}
+
 /// Lightweight skipper for non‑projected writer fields
 /// (fields present in the writer schema but omitted by the reader/projection);
 /// per Avro 1.11.1 schema resolution these fields are ignored.
@@ -1126,25 +1430,13 @@ impl Skipper {
     }
 }
 
-#[inline]
-fn build_skip_decoders(
-    skip_fields: &[Option<AvroDataType>],
-) -> Result<Vec<Option<Skipper>>, ArrowError> {
-    skip_fields
-        .iter()
-        .map(|opt| opt.as_ref().map(Skipper::from_avro).transpose())
-        .collect()
-}
-
 #[cfg(test)]
 mod tests {
     use super::*;
     use crate::codec::AvroField;
-    use arrow_array::{
-        cast::AsArray, Array, Decimal128Array, Decimal256Array, 
Decimal32Array, DictionaryArray,
-        FixedSizeBinaryArray, IntervalMonthDayNanoArray, ListArray, MapArray, 
StringArray,
-        StructArray,
-    };
+    use crate::schema::{PrimitiveType, Schema, TypeName};
+    use arrow_array::cast::AsArray;
+    use indexmap::IndexMap;
 
     fn encode_avro_int(value: i32) -> Vec<u8> {
         let mut buf = Vec::new();
@@ -1977,12 +2269,14 @@ mod tests {
             vec!["B".to_string(), "C".to_string(), "A".to_string()].into();
         let mapping: Arc<[i32]> = Arc::from(vec![2, 0, 1]);
         let default_index: i32 = -1;
-        let mut dec = Decoder::EnumResolved {
-            indices: Vec::with_capacity(DEFAULT_CAPACITY),
-            symbols: reader_symbols.clone(),
-            mapping,
-            default_index,
-        };
+        let mut dec = Decoder::Enum(
+            Vec::with_capacity(DEFAULT_CAPACITY),
+            reader_symbols.clone(),
+            Some(EnumResolution {
+                mapping,
+                default_index,
+            }),
+        );
         let mut data = Vec::new();
         data.extend_from_slice(&encode_avro_int(0));
         data.extend_from_slice(&encode_avro_int(1));
@@ -2013,12 +2307,14 @@ mod tests {
         let reader_symbols: Arc<[String]> = vec!["A".to_string(), 
"B".to_string()].into();
         let default_index: i32 = 1;
         let mapping: Arc<[i32]> = Arc::from(vec![0, 1]);
-        let mut dec = Decoder::EnumResolved {
-            indices: Vec::with_capacity(DEFAULT_CAPACITY),
-            symbols: reader_symbols.clone(),
-            mapping,
-            default_index,
-        };
+        let mut dec = Decoder::Enum(
+            Vec::with_capacity(DEFAULT_CAPACITY),
+            reader_symbols.clone(),
+            Some(EnumResolution {
+                mapping,
+                default_index,
+            }),
+        );
         let mut data = Vec::new();
         data.extend_from_slice(&encode_avro_int(0));
         data.extend_from_slice(&encode_avro_int(1));
@@ -2048,12 +2344,14 @@ mod tests {
         let reader_symbols: Arc<[String]> = vec!["A".to_string()].into();
         let default_index: i32 = -1; // indicates no default at type-level
         let mapping: Arc<[i32]> = Arc::from(vec![-1]);
-        let mut dec = Decoder::EnumResolved {
-            indices: Vec::with_capacity(DEFAULT_CAPACITY),
-            symbols: reader_symbols,
-            mapping,
-            default_index,
-        };
+        let mut dec = Decoder::Enum(
+            Vec::with_capacity(DEFAULT_CAPACITY),
+            reader_symbols,
+            Some(EnumResolution {
+                mapping,
+                default_index,
+            }),
+        );
         let data = encode_avro_int(0);
         let mut cur = AvroCursor::new(&data);
         let err = dec
@@ -2069,7 +2367,7 @@ mod tests {
     fn make_record_resolved_decoder(
         reader_fields: &[(&str, DataType, bool)],
         writer_to_reader: Vec<Option<usize>>,
-        mut skip_decoders: Vec<Option<super::Skipper>>,
+        skip_decoders: Vec<Option<Skipper>>,
     ) -> Decoder {
         let mut field_refs: Vec<FieldRef> = 
Vec::with_capacity(reader_fields.len());
         let mut encodings: Vec<Decoder> = 
Vec::with_capacity(reader_fields.len());
@@ -2086,12 +2384,16 @@ mod tests {
             encodings.push(enc);
         }
         let fields: Fields = field_refs.into();
-        Decoder::RecordResolved {
+        Decoder::Record(
             fields,
             encodings,
-            writer_to_reader: Arc::from(writer_to_reader),
-            skip_decoders,
-        }
+            Some(Projector {
+                writer_to_reader: Arc::from(writer_to_reader),
+                skip_decoders,
+                field_defaults: vec![None; reader_fields.len()],
+                default_injections: Arc::from(Vec::<(usize, 
AvroLiteral)>::new()),
+            }),
+        )
     }
 
     #[test]
@@ -2257,4 +2559,445 @@ mod tests {
         assert_eq!(id.value(0), 5);
         assert_eq!(id.value(1), 7);
     }
+
+    fn make_record_decoder_with_projector_defaults(
+        reader_fields: &[(&str, DataType, bool)],
+        field_defaults: Vec<Option<AvroLiteral>>,
+        default_injections: Vec<(usize, AvroLiteral)>,
+        writer_to_reader_len: usize,
+    ) -> Decoder {
+        assert_eq!(
+            field_defaults.len(),
+            reader_fields.len(),
+            "field_defaults must have one entry per reader field"
+        );
+        let mut field_refs: Vec<FieldRef> = 
Vec::with_capacity(reader_fields.len());
+        let mut encodings: Vec<Decoder> = 
Vec::with_capacity(reader_fields.len());
+        for (name, dt, nullable) in reader_fields {
+            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), 
*nullable)));
+            let enc = match dt {
+                DataType::Int32 => 
Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
+                DataType::Int64 => 
Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
+                DataType::Utf8 => Decoder::String(
+                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
+                    Vec::with_capacity(DEFAULT_CAPACITY),
+                ),
+                other => panic!("Unsupported test field type in helper: 
{other:?}"),
+            };
+            encodings.push(enc);
+        }
+        let fields: Fields = field_refs.into();
+        let skip_decoders: Vec<Option<Skipper>> =
+            (0..writer_to_reader_len).map(|_| None::<Skipper>).collect();
+        let projector = Projector {
+            writer_to_reader: Arc::from(vec![None; writer_to_reader_len]),
+            skip_decoders,
+            field_defaults,
+            default_injections: Arc::from(default_injections),
+        };
+        Decoder::Record(fields, encodings, Some(projector))
+    }
+
+    #[test]
+    fn test_default_append_int32_and_int64_from_int_and_long() {
+        let mut d_i32 = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
+        d_i32.append_default(&AvroLiteral::Int(42)).unwrap();
+        let arr = d_i32.flush(None).unwrap();
+        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
+        assert_eq!(a.len(), 1);
+        assert_eq!(a.value(0), 42);
+        let mut d_i64 = Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY));
+        d_i64.append_default(&AvroLiteral::Int(5)).unwrap();
+        d_i64.append_default(&AvroLiteral::Long(7)).unwrap();
+        let arr64 = d_i64.flush(None).unwrap();
+        let a64 = arr64.as_any().downcast_ref::<Int64Array>().unwrap();
+        assert_eq!(a64.len(), 2);
+        assert_eq!(a64.value(0), 5);
+        assert_eq!(a64.value(1), 7);
+    }
+
+    #[test]
+    fn test_default_append_floats_and_doubles() {
+        let mut d_f32 = Decoder::Float32(Vec::with_capacity(DEFAULT_CAPACITY));
+        d_f32.append_default(&AvroLiteral::Float(1.5)).unwrap();
+        let arr32 = d_f32.flush(None).unwrap();
+        let a = arr32.as_any().downcast_ref::<Float32Array>().unwrap();
+        assert_eq!(a.value(0), 1.5);
+        let mut d_f64 = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
+        d_f64.append_default(&AvroLiteral::Double(2.25)).unwrap();
+        let arr64 = d_f64.flush(None).unwrap();
+        let b = arr64.as_any().downcast_ref::<Float64Array>().unwrap();
+        assert_eq!(b.value(0), 2.25);
+    }
+
+    #[test]
+    fn test_default_append_string_and_bytes() {
+        let mut d_str = Decoder::String(
+            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
+            Vec::with_capacity(DEFAULT_CAPACITY),
+        );
+        d_str
+            .append_default(&AvroLiteral::String("hi".into()))
+            .unwrap();
+        let s_arr = d_str.flush(None).unwrap();
+        let arr = s_arr.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(arr.value(0), "hi");
+        let mut d_bytes = Decoder::Binary(
+            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
+            Vec::with_capacity(DEFAULT_CAPACITY),
+        );
+        d_bytes
+            .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
+            .unwrap();
+        let b_arr = d_bytes.flush(None).unwrap();
+        let barr = b_arr.as_any().downcast_ref::<BinaryArray>().unwrap();
+        assert_eq!(barr.value(0), &[1, 2, 3]);
+        let mut d_str_err = Decoder::String(
+            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
+            Vec::with_capacity(DEFAULT_CAPACITY),
+        );
+        let err = d_str_err
+            .append_default(&AvroLiteral::Bytes(vec![0x61, 0x62]))
+            .unwrap_err();
+        assert!(
+            err.to_string()
+                .contains("Default for string must be string"),
+            "unexpected error: {err:?}"
+        );
+    }
+
+    #[test]
+    fn test_default_append_nullable_int32_null_and_value() {
+        let inner = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
+        let mut dec = Decoder::Nullable(
+            Nullability::NullFirst,
+            NullBufferBuilder::new(DEFAULT_CAPACITY),
+            Box::new(inner),
+        );
+        dec.append_default(&AvroLiteral::Null).unwrap();
+        dec.append_default(&AvroLiteral::Int(11)).unwrap();
+        let arr = dec.flush(None).unwrap();
+        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
+        assert_eq!(a.len(), 2);
+        assert!(a.is_null(0));
+        assert_eq!(a.value(1), 11);
+    }
+
+    #[test]
+    fn test_default_append_array_of_ints() {
+        let list_dt = 
avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
+        let mut d = Decoder::try_new(&list_dt).unwrap();
+        let items = vec![
+            AvroLiteral::Int(1),
+            AvroLiteral::Int(2),
+            AvroLiteral::Int(3),
+        ];
+        d.append_default(&AvroLiteral::Array(items)).unwrap();
+        let arr = d.flush(None).unwrap();
+        let list = arr.as_any().downcast_ref::<ListArray>().unwrap();
+        assert_eq!(list.len(), 1);
+        assert_eq!(list.value_length(0), 3);
+        let vals = 
list.values().as_any().downcast_ref::<Int32Array>().unwrap();
+        assert_eq!(vals.values(), &[1, 2, 3]);
+    }
+
+    #[test]
+    fn test_default_append_map_string_to_int() {
+        let map_dt = 
avro_from_codec(Codec::Map(Arc::new(avro_from_codec(Codec::Int32))));
+        let mut d = Decoder::try_new(&map_dt).unwrap();
+        let mut m: IndexMap<String, AvroLiteral> = IndexMap::new();
+        m.insert("k1".to_string(), AvroLiteral::Int(10));
+        m.insert("k2".to_string(), AvroLiteral::Int(20));
+        d.append_default(&AvroLiteral::Map(m)).unwrap();
+        let arr = d.flush(None).unwrap();
+        let map = arr.as_any().downcast_ref::<MapArray>().unwrap();
+        assert_eq!(map.len(), 1);
+        assert_eq!(map.value_length(0), 2);
+        let binding = map.value(0);
+        let entries = binding.as_any().downcast_ref::<StructArray>().unwrap();
+        let k = entries
+            .column_by_name("key")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        let v = entries
+            .column_by_name("value")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        let keys: std::collections::HashSet<&str> = (0..k.len()).map(|i| 
k.value(i)).collect();
+        assert_eq!(keys, ["k1", "k2"].into_iter().collect());
+        let vals: std::collections::HashSet<i32> = (0..v.len()).map(|i| 
v.value(i)).collect();
+        assert_eq!(vals, [10, 20].into_iter().collect());
+    }
+
+    #[test]
+    fn test_default_append_enum_by_symbol() {
+        let symbols: Arc<[String]> = vec!["A".into(), "B".into(), 
"C".into()].into();
+        let mut d = Decoder::Enum(Vec::with_capacity(DEFAULT_CAPACITY), 
symbols.clone(), None);
+        d.append_default(&AvroLiteral::Enum("B".into())).unwrap();
+        let arr = d.flush(None).unwrap();
+        let dict = arr
+            .as_any()
+            .downcast_ref::<DictionaryArray<Int32Type>>()
+            .unwrap();
+        assert_eq!(dict.len(), 1);
+        let expected = Int32Array::from(vec![1]);
+        assert_eq!(dict.keys(), &expected);
+        let values = dict
+            .values()
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert_eq!(values.value(1), "B");
+    }
+
+    #[test]
+    fn test_default_append_uuid_and_type_error() {
+        let mut d = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
+        let uuid_str = "123e4567-e89b-12d3-a456-426614174000";
+        d.append_default(&AvroLiteral::String(uuid_str.into()))
+            .unwrap();
+        let arr_ref = d.flush(None).unwrap();
+        let arr = arr_ref
+            .as_any()
+            .downcast_ref::<FixedSizeBinaryArray>()
+            .unwrap();
+        assert_eq!(arr.value_length(), 16);
+        assert_eq!(arr.len(), 1);
+        let mut d2 = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
+        let err = d2
+            .append_default(&AvroLiteral::Bytes(vec![0u8; 16]))
+            .unwrap_err();
+        assert!(
+            err.to_string().contains("Default for uuid must be string"),
+            "unexpected error: {err:?}"
+        );
+    }
+
+    #[test]
+    fn test_default_append_fixed_and_length_mismatch() {
+        let mut d = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
+        d.append_default(&AvroLiteral::Bytes(vec![1, 2, 3, 4]))
+            .unwrap();
+        let arr_ref = d.flush(None).unwrap();
+        let arr = arr_ref
+            .as_any()
+            .downcast_ref::<FixedSizeBinaryArray>()
+            .unwrap();
+        assert_eq!(arr.value_length(), 4);
+        assert_eq!(arr.value(0), &[1, 2, 3, 4]);
+        let mut d_err = Decoder::Fixed(4, 
Vec::with_capacity(DEFAULT_CAPACITY));
+        let err = d_err
+            .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
+            .unwrap_err();
+        assert!(
+            err.to_string().contains("Fixed default length"),
+            "unexpected error: {err:?}"
+        );
+    }
+
+    #[test]
+    fn test_default_append_duration_and_length_validation() {
+        let dt = avro_from_codec(Codec::Interval);
+        let mut d = Decoder::try_new(&dt).unwrap();
+        let mut bytes = Vec::with_capacity(12);
+        bytes.extend_from_slice(&1u32.to_le_bytes());
+        bytes.extend_from_slice(&2u32.to_le_bytes());
+        bytes.extend_from_slice(&3u32.to_le_bytes());
+        d.append_default(&AvroLiteral::Bytes(bytes)).unwrap();
+        let arr_ref = d.flush(None).unwrap();
+        let arr = arr_ref
+            .as_any()
+            .downcast_ref::<IntervalMonthDayNanoArray>()
+            .unwrap();
+        assert_eq!(arr.len(), 1);
+        let v = arr.value(0);
+        assert_eq!(v.months, 1);
+        assert_eq!(v.days, 2);
+        assert_eq!(v.nanoseconds, 3_000_000);
+        let mut d_err = 
Decoder::try_new(&avro_from_codec(Codec::Interval)).unwrap();
+        let err = d_err
+            .append_default(&AvroLiteral::Bytes(vec![0u8; 11]))
+            .unwrap_err();
+        assert!(
+            err.to_string()
+                .contains("Duration default must be exactly 12 bytes"),
+            "unexpected error: {err:?}"
+        );
+    }
+
+    #[test]
+    fn test_default_append_decimal256_from_bytes() {
+        let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
+        let mut d = Decoder::try_new(&dt).unwrap();
+        let pos: [u8; 32] = [
+            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00, 0x00,
+            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00, 0x00,
+            0x00, 0x00, 0x30, 0x39,
+        ];
+        d.append_default(&AvroLiteral::Bytes(pos.to_vec())).unwrap();
+        let neg: [u8; 32] = [
+            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 
0xFF, 0xFF, 0xFF,
+            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 
0xFF, 0xFF, 0xFF,
+            0xFF, 0xFF, 0xFF, 0x85,
+        ];
+        d.append_default(&AvroLiteral::Bytes(neg.to_vec())).unwrap();
+        let arr = d.flush(None).unwrap();
+        let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
+        assert_eq!(dec.len(), 2);
+        assert_eq!(dec.value_as_string(0), "123.45");
+        assert_eq!(dec.value_as_string(1), "-1.23");
+    }
+
+    #[test]
+    fn 
test_record_append_default_map_missing_fields_uses_projector_field_defaults() {
+        let field_defaults = vec![None, 
Some(AvroLiteral::String("hi".into()))];
+        let mut rec = make_record_decoder_with_projector_defaults(
+            &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
+            field_defaults,
+            vec![],
+            0,
+        );
+        let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
+        map.insert("a".to_string(), AvroLiteral::Int(7));
+        rec.append_default(&AvroLiteral::Map(map)).unwrap();
+        let arr = rec.flush(None).unwrap();
+        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
+        let a = s
+            .column_by_name("a")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        let b = s
+            .column_by_name("b")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert_eq!(a.value(0), 7);
+        assert_eq!(b.value(0), "hi");
+    }
+
+    #[test]
+    fn test_record_append_default_null_uses_projector_field_defaults() {
+        let field_defaults = vec![
+            Some(AvroLiteral::Int(5)),
+            Some(AvroLiteral::String("x".into())),
+        ];
+        let mut rec = make_record_decoder_with_projector_defaults(
+            &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
+            field_defaults,
+            vec![],
+            0,
+        );
+        rec.append_default(&AvroLiteral::Null).unwrap();
+        let arr = rec.flush(None).unwrap();
+        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
+        let a = s
+            .column_by_name("a")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        let b = s
+            .column_by_name("b")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert_eq!(a.value(0), 5);
+        assert_eq!(b.value(0), "x");
+    }
+
+    #[test]
+    fn 
test_record_append_default_missing_fields_without_projector_defaults_yields_type_nulls_or_empties(
+    ) {
+        let fields = vec![("a", DataType::Int32, true), ("b", DataType::Utf8, 
true)];
+        let mut field_refs: Vec<FieldRef> = Vec::new();
+        let mut encoders: Vec<Decoder> = Vec::new();
+        for (name, dt, nullable) in &fields {
+            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), 
*nullable)));
+        }
+        let enc_a = Decoder::Nullable(
+            Nullability::NullSecond,
+            NullBufferBuilder::new(DEFAULT_CAPACITY),
+            Box::new(Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY))),
+        );
+        let enc_b = Decoder::Nullable(
+            Nullability::NullSecond,
+            NullBufferBuilder::new(DEFAULT_CAPACITY),
+            Box::new(Decoder::String(
+                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
+                Vec::with_capacity(DEFAULT_CAPACITY),
+            )),
+        );
+        encoders.push(enc_a);
+        encoders.push(enc_b);
+        let projector = Projector {
+            writer_to_reader: Arc::from(vec![]),
+            skip_decoders: vec![],
+            field_defaults: vec![None, None], // no defaults -> append_null
+            default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
+        };
+        let mut rec = Decoder::Record(field_refs.into(), encoders, 
Some(projector));
+        let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
+        map.insert("a".to_string(), AvroLiteral::Int(9));
+        rec.append_default(&AvroLiteral::Map(map)).unwrap();
+        let arr = rec.flush(None).unwrap();
+        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
+        let a = s
+            .column_by_name("a")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        let b = s
+            .column_by_name("b")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert!(a.is_valid(0));
+        assert_eq!(a.value(0), 9);
+        assert!(b.is_null(0));
+    }
+
+    #[test]
+    fn test_projector_default_injection_when_writer_lacks_fields() {
+        let defaults = vec![None, None];
+        let injections = vec![
+            (0, AvroLiteral::Int(99)),
+            (1, AvroLiteral::String("alice".into())),
+        ];
+        let mut rec = make_record_decoder_with_projector_defaults(
+            &[
+                ("id", DataType::Int32, false),
+                ("name", DataType::Utf8, false),
+            ],
+            defaults,
+            injections,
+            0,
+        );
+        rec.decode(&mut AvroCursor::new(&[])).unwrap();
+        let arr = rec.flush(None).unwrap();
+        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
+        let id = s
+            .column_by_name("id")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        let name = s
+            .column_by_name("name")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert_eq!(id.value(0), 99);
+        assert_eq!(name.value(0), "alice");
+    }
 }

Reply via email to