This is an automated email from the ASF dual-hosted git repository. mbrobbel pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push: new ca07b064db Add projection with default values support to `RecordDecoder` (#8293) ca07b064db is described below commit ca07b064db5b242ae6f84c232f7b36a247cd930e Author: Connor Sanders <con...@elastiflow.com> AuthorDate: Tue Sep 16 03:00:00 2025 -0500 Add projection with default values support to `RecordDecoder` (#8293) # Which issue does this PR close? This work continues arrow-avro schema resolution support and aligns behavior with the Avro spec. - **Related to**: #4886 (“Add Avro Support”): ongoing work to round out the reader/decoder, including schema resolution and type promotion. - **Follow-ups/Context**: #8292 (Add array/map/fixed schema resolution and default value support to arrow-avro codec), #8124 (schema resolution & type promotion for the decoder), #8223 (enum mapping for schema resolution). These previous efforts established the foundations that this PR extends to default values and additional resolvable types. # Rationale for this change Avro’s specification requires readers to materialize default values when a field exists in the **reader** schema but not in the **writer** schema, and to validate defaults (i.e., union defaults must match the first branch; bytes/fixed defaults must be JSON strings; enums may specify a default symbol for unknown writer symbols). Implementing this behavior makes `arrow-avro` more standards‑compliant and improves interoperability with evolving schemas. # What changes are included in this PR? **High‑level summary** * **Refactor `RecordDecoder`** around a simpler **`Projector`**‑style abstraction that consumes `ResolvedRecord` to: (a) skip writer‑only fields, and (b) materialize reader‑only defaulted fields, reducing branching in the hot path. (See commit subject and record decoder changes.) **Touched files (2):** * `arrow-avro/src/reader/record.rs` - refactor decoder to use precomputed mappings and defaults. * `arrow-avro/src/reader/mod.rs` - add comprehensive tests for defaults and error cases (see below). # Are these changes tested? Yes, new integration tests cover both the **happy path** and **validation errors**: * `test_schema_resolution_defaults_all_supported_types`: verifies that defaults for boolean/int/long/float/double/bytes/string/date/time/timestamp/decimal/fixed/enum/duration/uuid/array/map/nested record and unions are materialized correctly for all rows. * `test_schema_resolution_default_enum_invalid_symbol_errors`: invalid enum default symbol is rejected. * `test_schema_resolution_default_fixed_size_mismatch_errors`: mismatched fixed/bytes default lengths are rejected. These tests assert the Avro‑spec behavior (i.e., union defaults must match the first branch; bytes/fixed defaults use JSON strings). # Are there any user-facing changes? N/A --- arrow-avro/src/reader/mod.rs | 241 +++++++++ arrow-avro/src/reader/record.rs | 1069 +++++++++++++++++++++++++++++++++------ 2 files changed, 1147 insertions(+), 163 deletions(-) diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs index 217366b633..bf72fc92c6 100644 --- a/arrow-avro/src/reader/mod.rs +++ b/arrow-avro/src/reader/mod.rs @@ -2085,6 +2085,245 @@ mod test { assert!(batch.column(0).as_any().is::<StringViewArray>()); } + fn make_reader_schema_with_default_fields( + path: &str, + default_fields: Vec<Value>, + ) -> AvroSchema { + let mut root = load_writer_schema_json(path); + assert_eq!(root["type"], "record", "writer schema must be a record"); + root.as_object_mut() + .expect("schema is a JSON object") + .insert("fields".to_string(), Value::Array(default_fields)); + AvroSchema::new(root.to_string()) + } + + #[test] + fn test_schema_resolution_defaults_all_supported_types() { + let path = "test/data/skippable_types.avro"; + let duration_default = "\u{0000}".repeat(12); + let reader_schema = make_reader_schema_with_default_fields( + path, + vec![ + serde_json::json!({"name":"d_bool","type":"boolean","default":true}), + serde_json::json!({"name":"d_int","type":"int","default":42}), + serde_json::json!({"name":"d_long","type":"long","default":12345}), + serde_json::json!({"name":"d_float","type":"float","default":1.5}), + serde_json::json!({"name":"d_double","type":"double","default":2.25}), + serde_json::json!({"name":"d_bytes","type":"bytes","default":"XYZ"}), + serde_json::json!({"name":"d_string","type":"string","default":"hello"}), + serde_json::json!({"name":"d_date","type":{"type":"int","logicalType":"date"},"default":0}), + serde_json::json!({"name":"d_time_ms","type":{"type":"int","logicalType":"time-millis"},"default":1000}), + serde_json::json!({"name":"d_time_us","type":{"type":"long","logicalType":"time-micros"},"default":2000}), + serde_json::json!({"name":"d_ts_ms","type":{"type":"long","logicalType":"local-timestamp-millis"},"default":0}), + serde_json::json!({"name":"d_ts_us","type":{"type":"long","logicalType":"local-timestamp-micros"},"default":0}), + serde_json::json!({"name":"d_decimal","type":{"type":"bytes","logicalType":"decimal","precision":10,"scale":2},"default":""}), + serde_json::json!({"name":"d_fixed","type":{"type":"fixed","name":"F4","size":4},"default":"ABCD"}), + serde_json::json!({"name":"d_enum","type":{"type":"enum","name":"E","symbols":["A","B","C"]},"default":"A"}), + serde_json::json!({"name":"d_duration","type":{"type":"fixed","name":"Dur","size":12,"logicalType":"duration"},"default":duration_default}), + serde_json::json!({"name":"d_uuid","type":{"type":"string","logicalType":"uuid"},"default":"00000000-0000-0000-0000-000000000000"}), + serde_json::json!({"name":"d_array","type":{"type":"array","items":"int"},"default":[1,2,3]}), + serde_json::json!({"name":"d_map","type":{"type":"map","values":"long"},"default":{"a":1,"b":2}}), + serde_json::json!({"name":"d_record","type":{ + "type":"record","name":"DefaultRec","fields":[ + {"name":"x","type":"int"}, + {"name":"y","type":["null","string"],"default":null} + ] + },"default":{"x":7}}), + serde_json::json!({"name":"d_nullable_null","type":["null","int"],"default":null}), + serde_json::json!({"name":"d_nullable_value","type":["int","null"],"default":123}), + ], + ); + let actual = read_alltypes_with_reader_schema(path, reader_schema); + let num_rows = actual.num_rows(); + assert!(num_rows > 0, "skippable_types.avro should contain rows"); + assert_eq!( + actual.num_columns(), + 22, + "expected exactly our defaulted fields" + ); + let mut arrays: Vec<Arc<dyn Array>> = Vec::with_capacity(22); + arrays.push(Arc::new(BooleanArray::from_iter(std::iter::repeat_n( + Some(true), + num_rows, + )))); + arrays.push(Arc::new(Int32Array::from_iter_values(std::iter::repeat_n( + 42, num_rows, + )))); + arrays.push(Arc::new(Int64Array::from_iter_values(std::iter::repeat_n( + 12345, num_rows, + )))); + arrays.push(Arc::new(Float32Array::from_iter_values( + std::iter::repeat_n(1.5f32, num_rows), + ))); + arrays.push(Arc::new(Float64Array::from_iter_values( + std::iter::repeat_n(2.25f64, num_rows), + ))); + arrays.push(Arc::new(BinaryArray::from_iter_values( + std::iter::repeat_n(b"XYZ".as_ref(), num_rows), + ))); + arrays.push(Arc::new(StringArray::from_iter_values( + std::iter::repeat_n("hello", num_rows), + ))); + arrays.push(Arc::new(Date32Array::from_iter_values( + std::iter::repeat_n(0, num_rows), + ))); + arrays.push(Arc::new(Time32MillisecondArray::from_iter_values( + std::iter::repeat_n(1_000, num_rows), + ))); + arrays.push(Arc::new(Time64MicrosecondArray::from_iter_values( + std::iter::repeat_n(2_000i64, num_rows), + ))); + arrays.push(Arc::new(TimestampMillisecondArray::from_iter_values( + std::iter::repeat_n(0i64, num_rows), + ))); + arrays.push(Arc::new(TimestampMicrosecondArray::from_iter_values( + std::iter::repeat_n(0i64, num_rows), + ))); + #[cfg(feature = "small_decimals")] + let decimal = Decimal64Array::from_iter_values(std::iter::repeat_n(0i64, num_rows)) + .with_precision_and_scale(10, 2) + .unwrap(); + #[cfg(not(feature = "small_decimals"))] + let decimal = Decimal128Array::from_iter_values(std::iter::repeat_n(0i128, num_rows)) + .with_precision_and_scale(10, 2) + .unwrap(); + arrays.push(Arc::new(decimal)); + let fixed_iter = std::iter::repeat_n(Some(*b"ABCD"), num_rows); + arrays.push(Arc::new( + FixedSizeBinaryArray::try_from_sparse_iter_with_size(fixed_iter, 4).unwrap(), + )); + let enum_keys = Int32Array::from_iter_values(std::iter::repeat_n(0, num_rows)); + let enum_values = StringArray::from_iter_values(["A", "B", "C"]); + let enum_arr = + DictionaryArray::<Int32Type>::try_new(enum_keys, Arc::new(enum_values)).unwrap(); + arrays.push(Arc::new(enum_arr)); + let duration_values = std::iter::repeat_n( + Some(IntervalMonthDayNanoType::make_value(0, 0, 0)), + num_rows, + ); + let duration_arr: IntervalMonthDayNanoArray = duration_values.collect(); + arrays.push(Arc::new(duration_arr)); + let uuid_bytes = [0u8; 16]; + let uuid_iter = std::iter::repeat_n(Some(uuid_bytes), num_rows); + arrays.push(Arc::new( + FixedSizeBinaryArray::try_from_sparse_iter_with_size(uuid_iter, 16).unwrap(), + )); + let item_field = Arc::new(Field::new( + Field::LIST_FIELD_DEFAULT_NAME, + DataType::Int32, + false, + )); + let mut list_builder = ListBuilder::new(Int32Builder::new()).with_field(item_field); + for _ in 0..num_rows { + list_builder.values().append_value(1); + list_builder.values().append_value(2); + list_builder.values().append_value(3); + list_builder.append(true); + } + arrays.push(Arc::new(list_builder.finish())); + let values_field = Arc::new(Field::new("value", DataType::Int64, false)); + let mut map_builder = MapBuilder::new( + Some(builder::MapFieldNames { + entry: "entries".to_string(), + key: "key".to_string(), + value: "value".to_string(), + }), + StringBuilder::new(), + Int64Builder::new(), + ) + .with_values_field(values_field); + for _ in 0..num_rows { + let (keys, vals) = map_builder.entries(); + keys.append_value("a"); + vals.append_value(1); + keys.append_value("b"); + vals.append_value(2); + map_builder.append(true).unwrap(); + } + arrays.push(Arc::new(map_builder.finish())); + let rec_fields: Fields = Fields::from(vec![ + Field::new("x", DataType::Int32, false), + Field::new("y", DataType::Utf8, true), + ]); + let mut sb = StructBuilder::new( + rec_fields.clone(), + vec![ + Box::new(Int32Builder::new()), + Box::new(StringBuilder::new()), + ], + ); + for _ in 0..num_rows { + sb.field_builder::<Int32Builder>(0).unwrap().append_value(7); + sb.field_builder::<StringBuilder>(1).unwrap().append_null(); + sb.append(true); + } + arrays.push(Arc::new(sb.finish())); + arrays.push(Arc::new(Int32Array::from_iter(std::iter::repeat_n( + None::<i32>, + num_rows, + )))); + arrays.push(Arc::new(Int32Array::from_iter_values(std::iter::repeat_n( + 123, num_rows, + )))); + let expected = RecordBatch::try_new(actual.schema(), arrays).unwrap(); + assert_eq!( + actual, expected, + "defaults should materialize correctly for all fields" + ); + } + + #[test] + fn test_schema_resolution_default_enum_invalid_symbol_errors() { + let path = "test/data/skippable_types.avro"; + let bad_schema = make_reader_schema_with_default_fields( + path, + vec![serde_json::json!({ + "name":"bad_enum", + "type":{"type":"enum","name":"E","symbols":["A","B","C"]}, + "default":"Z" + })], + ); + let file = File::open(path).unwrap(); + let res = ReaderBuilder::new() + .with_reader_schema(bad_schema) + .build(BufReader::new(file)); + let err = res.expect_err("expected enum default validation to fail"); + let msg = err.to_string(); + let lower_msg = msg.to_lowercase(); + assert!( + lower_msg.contains("enum") + && (lower_msg.contains("symbol") || lower_msg.contains("default")), + "unexpected error: {msg}" + ); + } + + #[test] + fn test_schema_resolution_default_fixed_size_mismatch_errors() { + let path = "test/data/skippable_types.avro"; + let bad_schema = make_reader_schema_with_default_fields( + path, + vec![serde_json::json!({ + "name":"bad_fixed", + "type":{"type":"fixed","name":"F","size":4}, + "default":"ABC" + })], + ); + let file = File::open(path).unwrap(); + let res = ReaderBuilder::new() + .with_reader_schema(bad_schema) + .build(BufReader::new(file)); + let err = res.expect_err("expected fixed default validation to fail"); + let msg = err.to_string(); + let lower_msg = msg.to_lowercase(); + assert!( + lower_msg.contains("fixed") + && (lower_msg.contains("size") + || lower_msg.contains("length") + || lower_msg.contains("does not match")), + "unexpected error: {msg}" + ); + } + #[test] fn test_alltypes_skip_writer_fields_keep_double_only() { let file = arrow_test_data("avro/alltypes_plain.avro"); @@ -2538,6 +2777,7 @@ mod test { let values_i128: Vec<i128> = (1..=24).map(|n| (n as i128) * pow10).collect(); let build_expected = |dt: &DataType, values: &[i128]| -> ArrayRef { match *dt { + #[cfg(feature = "small_decimals")] DataType::Decimal32(p, s) => { let it = values.iter().map(|&v| v as i32); Arc::new( @@ -2546,6 +2786,7 @@ mod test { .unwrap(), ) } + #[cfg(feature = "small_decimals")] DataType::Decimal64(p, s) => { let it = values.iter().map(|&v| v as i64); Arc::new( diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index 48eb601024..9ca8acb45b 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -15,27 +15,27 @@ // specific language governing permissions and limitations // under the License. -use crate::codec::{AvroDataType, Codec, Promotion, ResolutionInfo}; +use crate::codec::{ + AvroDataType, AvroField, AvroLiteral, Codec, Promotion, ResolutionInfo, ResolvedRecord, +}; use crate::reader::block::{Block, BlockDecoder}; use crate::reader::cursor::AvroCursor; -use crate::reader::header::Header; -use crate::schema::*; +use crate::schema::Nullability; use arrow_array::builder::{ - ArrayBuilder, Decimal128Builder, Decimal256Builder, Decimal32Builder, Decimal64Builder, - IntervalMonthDayNanoBuilder, PrimitiveBuilder, + Decimal128Builder, Decimal256Builder, IntervalMonthDayNanoBuilder, StringViewBuilder, }; +#[cfg(feature = "small_decimals")] +use arrow_array::builder::{Decimal32Builder, Decimal64Builder}; use arrow_array::types::*; use arrow_array::*; use arrow_buffer::*; use arrow_schema::{ - ArrowError, DataType, Field as ArrowField, FieldRef, Fields, IntervalUnit, - Schema as ArrowSchema, SchemaRef, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, + ArrowError, DataType, Field as ArrowField, FieldRef, Fields, Schema as ArrowSchema, SchemaRef, + DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, }; #[cfg(feature = "small_decimals")] use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION}; use std::cmp::Ordering; -use std::collections::HashMap; -use std::io::Read; use std::sync::Arc; use uuid::Uuid; @@ -60,6 +60,29 @@ macro_rules! flush_decimal { }}; } +/// Macro to append a default decimal value from two's-complement big-endian bytes +/// into the corresponding decimal builder, with compile-time constructed error text. +macro_rules! append_decimal_default { + ($lit:expr, $builder:expr, $N:literal, $Int:ty, $name:literal) => {{ + match $lit { + AvroLiteral::Bytes(b) => { + let ext = sign_cast_to::<$N>(b)?; + let val = <$Int>::from_be_bytes(ext); + $builder.append_value(val); + Ok(()) + } + _ => Err(ArrowError::InvalidArgumentError( + concat!( + "Default for ", + $name, + " must be bytes (two's-complement big-endian)" + ) + .to_string(), + )), + } + }}; +} + #[derive(Debug)] pub(crate) struct RecordDecoderBuilder<'a> { data_type: &'a AvroDataType, @@ -91,15 +114,7 @@ pub(crate) struct RecordDecoder { schema: SchemaRef, fields: Vec<Decoder>, use_utf8view: bool, - resolved: Option<ResolvedRuntime>, -} - -#[derive(Debug)] -struct ResolvedRuntime { - /// writer field index -> reader field index (or None if writer-only) - writer_to_reader: Arc<[Option<usize>]>, - /// per-writer-field skipper (Some only when writer-only) - skip_decoders: Vec<Option<Skipper>>, + projector: Option<Projector>, } impl RecordDecoder { @@ -138,14 +153,9 @@ impl RecordDecoder { arrow_fields.push(avro_field.field()); encodings.push(Decoder::try_new(avro_field.data_type())?); } - // If this record carries resolution metadata, prepare top-level runtime helpers - let resolved = match data_type.resolution.as_ref() { + let projector = match data_type.resolution.as_ref() { Some(ResolutionInfo::Record(rec)) => { - let skip_decoders = build_skip_decoders(&rec.skip_fields)?; - Some(ResolvedRuntime { - writer_to_reader: rec.writer_to_reader.clone(), - skip_decoders, - }) + Some(ProjectorBuilder::try_new(rec, reader_fields).build()?) } _ => None, }; @@ -153,7 +163,7 @@ impl RecordDecoder { schema: Arc::new(ArrowSchema::new(arrow_fields)), fields: encodings, use_utf8view, - resolved, + projector, }) } other => Err(ArrowError::ParseError(format!( @@ -170,17 +180,10 @@ impl RecordDecoder { /// Decode `count` records from `buf` pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result<usize, ArrowError> { let mut cursor = AvroCursor::new(buf); - match self.resolved.as_mut() { - Some(runtime) => { - // Top-level resolved record: read writer fields in writer order, - // project into reader fields, and skip writer-only fields + match self.projector.as_mut() { + Some(proj) => { for _ in 0..count { - decode_with_resolution( - &mut cursor, - &mut self.fields, - &runtime.writer_to_reader, - &mut runtime.skip_decoders, - )?; + proj.project_record(&mut cursor, &mut self.fields)?; } } None => { @@ -205,24 +208,10 @@ impl RecordDecoder { } } -fn decode_with_resolution( - buf: &mut AvroCursor<'_>, - encodings: &mut [Decoder], - writer_to_reader: &[Option<usize>], - skippers: &mut [Option<Skipper>], -) -> Result<(), ArrowError> { - for (w_idx, (target, skipper_opt)) in writer_to_reader.iter().zip(skippers).enumerate() { - match (*target, skipper_opt.as_mut()) { - (Some(r_idx), _) => encodings[r_idx].decode(buf)?, - (None, Some(sk)) => sk.skip(buf)?, - (None, None) => { - return Err(ArrowError::SchemaError(format!( - "No skipper available for writer-only field at index {w_idx}", - ))); - } - } - } - Ok(()) +#[derive(Debug)] +struct EnumResolution { + mapping: Arc<[i32]>, + default_index: i32, } #[derive(Debug)] @@ -252,7 +241,7 @@ enum Decoder { /// String data encoded as UTF-8 bytes, but mapped to Arrow's StringViewArray StringView(OffsetBufferBuilder<i32>, Vec<u8>), Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>), - Record(Fields, Vec<Decoder>), + Record(Fields, Vec<Decoder>, Option<Projector>), Map( FieldRef, OffsetBufferBuilder<i32>, @@ -261,27 +250,16 @@ enum Decoder { Box<Decoder>, ), Fixed(i32, Vec<u8>), - Enum(Vec<i32>, Arc<[String]>), + Enum(Vec<i32>, Arc<[String]>, Option<EnumResolution>), Duration(IntervalMonthDayNanoBuilder), Uuid(Vec<u8>), + #[cfg(feature = "small_decimals")] Decimal32(usize, Option<usize>, Option<usize>, Decimal32Builder), + #[cfg(feature = "small_decimals")] Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder), Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder), Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder), Nullable(Nullability, NullBufferBuilder, Box<Decoder>), - EnumResolved { - indices: Vec<i32>, - symbols: Arc<[String]>, - mapping: Arc<[i32]>, - default_index: i32, - }, - /// Resolved record that needs writer->reader projection and skipping writer-only fields - RecordResolved { - fields: Fields, - encodings: Vec<Decoder>, - writer_to_reader: Arc<[Option<usize>]>, - skip_decoders: Vec<Option<Skipper>>, - }, } impl Decoder { @@ -403,16 +381,14 @@ impl Decoder { ) } (Codec::Enum(symbols), _) => { - if let Some(ResolutionInfo::EnumMapping(mapping)) = data_type.resolution.as_ref() { - Self::EnumResolved { - indices: Vec::with_capacity(DEFAULT_CAPACITY), - symbols: symbols.clone(), + let res = match data_type.resolution.as_ref() { + Some(ResolutionInfo::EnumMapping(mapping)) => Some(EnumResolution { mapping: mapping.mapping.clone(), default_index: mapping.default_index, - } - } else { - Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone()) - } + }), + _ => None, + }; + Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), res) } (Codec::Struct(fields), _) => { let mut arrow_fields = Vec::with_capacity(fields.len()); @@ -422,17 +398,13 @@ impl Decoder { arrow_fields.push(avro_field.field()); encodings.push(encoding); } - if let Some(ResolutionInfo::Record(rec)) = data_type.resolution.as_ref() { - let skip_decoders = build_skip_decoders(&rec.skip_fields)?; - Self::RecordResolved { - fields: arrow_fields.into(), - encodings, - writer_to_reader: rec.writer_to_reader.clone(), - skip_decoders, - } - } else { - Self::Record(arrow_fields.into(), encodings) - } + let projector = + if let Some(ResolutionInfo::Record(rec)) = data_type.resolution.as_ref() { + Some(ProjectorBuilder::try_new(rec, fields).build()?) + } else { + None + }; + Self::Record(arrow_fields.into(), encodings, projector) } (Codec::Map(child), _) => { let val_field = child.field_with_name("value"); @@ -494,27 +466,263 @@ impl Decoder { Self::Array(_, offsets, e) => { offsets.push_length(0); } - Self::Record(_, e) => e.iter_mut().for_each(|e| e.append_null()), + Self::Record(_, e, _) => e.iter_mut().for_each(|e| e.append_null()), Self::Map(_, _koff, moff, _, _) => { moff.push_length(0); } Self::Fixed(sz, accum) => { accum.extend(std::iter::repeat_n(0u8, *sz as usize)); } + #[cfg(feature = "small_decimals")] Self::Decimal32(_, _, _, builder) => builder.append_value(0), + #[cfg(feature = "small_decimals")] Self::Decimal64(_, _, _, builder) => builder.append_value(0), Self::Decimal128(_, _, _, builder) => builder.append_value(0), Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO), - Self::Enum(indices, _) => indices.push(0), - Self::EnumResolved { indices, .. } => indices.push(0), + Self::Enum(indices, _, _) => indices.push(0), Self::Duration(builder) => builder.append_null(), Self::Nullable(_, null_buffer, inner) => { null_buffer.append(false); inner.append_null(); } - Self::RecordResolved { encodings, .. } => { - encodings.iter_mut().for_each(|e| e.append_null()); + } + } + + /// Append a single default literal into the decoder's buffers + fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> { + match self { + Self::Nullable(_, nb, inner) => { + if matches!(lit, AvroLiteral::Null) { + nb.append(false); + inner.append_null(); + Ok(()) + } else { + nb.append(true); + inner.append_default(lit) + } + } + Self::Null(count) => match lit { + AvroLiteral::Null => { + *count += 1; + Ok(()) + } + _ => Err(ArrowError::InvalidArgumentError( + "Non-null default for null type".to_string(), + )), + }, + Self::Boolean(b) => match lit { + AvroLiteral::Boolean(v) => { + b.append(*v); + Ok(()) + } + _ => Err(ArrowError::InvalidArgumentError( + "Default for boolean must be boolean".to_string(), + )), + }, + Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => match lit { + AvroLiteral::Int(i) => { + v.push(*i); + Ok(()) + } + _ => Err(ArrowError::InvalidArgumentError( + "Default for int32/date32/time-millis must be int".to_string(), + )), + }, + Self::Int64(v) + | Self::Int32ToInt64(v) + | Self::TimeMicros(v) + | Self::TimestampMillis(_, v) + | Self::TimestampMicros(_, v) => match lit { + AvroLiteral::Long(i) => { + v.push(*i); + Ok(()) + } + AvroLiteral::Int(i) => { + v.push(*i as i64); + Ok(()) + } + _ => Err(ArrowError::InvalidArgumentError( + "Default for long/time-micros/timestamp must be long or int".to_string(), + )), + }, + Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => match lit { + AvroLiteral::Float(f) => { + v.push(*f); + Ok(()) + } + _ => Err(ArrowError::InvalidArgumentError( + "Default for float must be float".to_string(), + )), + }, + Self::Float64(v) + | Self::Int32ToFloat64(v) + | Self::Int64ToFloat64(v) + | Self::Float32ToFloat64(v) => match lit { + AvroLiteral::Double(f) => { + v.push(*f); + Ok(()) + } + _ => Err(ArrowError::InvalidArgumentError( + "Default for double must be double".to_string(), + )), + }, + Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => match lit { + AvroLiteral::Bytes(b) => { + offsets.push_length(b.len()); + values.extend_from_slice(b); + Ok(()) + } + _ => Err(ArrowError::InvalidArgumentError( + "Default for bytes must be bytes".to_string(), + )), + }, + Self::BytesToString(offsets, values) + | Self::String(offsets, values) + | Self::StringView(offsets, values) => match lit { + AvroLiteral::String(s) => { + let b = s.as_bytes(); + offsets.push_length(b.len()); + values.extend_from_slice(b); + Ok(()) + } + _ => Err(ArrowError::InvalidArgumentError( + "Default for string must be string".to_string(), + )), + }, + Self::Uuid(values) => match lit { + AvroLiteral::String(s) => { + let uuid = Uuid::try_parse(s).map_err(|e| { + ArrowError::InvalidArgumentError(format!("Invalid UUID default: {s} ({e})")) + })?; + values.extend_from_slice(uuid.as_bytes()); + Ok(()) + } + _ => Err(ArrowError::InvalidArgumentError( + "Default for uuid must be string".to_string(), + )), + }, + Self::Fixed(sz, accum) => match lit { + AvroLiteral::Bytes(b) => { + if b.len() != *sz as usize { + return Err(ArrowError::InvalidArgumentError(format!( + "Fixed default length {} does not match size {sz}", + b.len(), + ))); + } + accum.extend_from_slice(b); + Ok(()) + } + _ => Err(ArrowError::InvalidArgumentError( + "Default for fixed must be bytes".to_string(), + )), + }, + #[cfg(feature = "small_decimals")] + Self::Decimal32(_, _, _, builder) => { + append_decimal_default!(lit, builder, 4, i32, "decimal32") + } + #[cfg(feature = "small_decimals")] + Self::Decimal64(_, _, _, builder) => { + append_decimal_default!(lit, builder, 8, i64, "decimal64") + } + Self::Decimal128(_, _, _, builder) => { + append_decimal_default!(lit, builder, 16, i128, "decimal128") + } + Self::Decimal256(_, _, _, builder) => { + append_decimal_default!(lit, builder, 32, i256, "decimal256") } + Self::Duration(builder) => match lit { + AvroLiteral::Bytes(b) => { + if b.len() != 12 { + return Err(ArrowError::InvalidArgumentError(format!( + "Duration default must be exactly 12 bytes, got {}", + b.len() + ))); + } + let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]); + let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]); + let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]); + let nanos = (millis as i64) * 1_000_000; + builder.append_value(IntervalMonthDayNano::new( + months as i32, + days as i32, + nanos, + )); + Ok(()) + } + _ => Err(ArrowError::InvalidArgumentError( + "Default for duration must be 12-byte little-endian months/days/millis" + .to_string(), + )), + }, + Self::Array(_, offsets, inner) => match lit { + AvroLiteral::Array(items) => { + offsets.push_length(items.len()); + for item in items { + inner.append_default(item)?; + } + Ok(()) + } + _ => Err(ArrowError::InvalidArgumentError( + "Default for array must be an array literal".to_string(), + )), + }, + Self::Map(_, koff, moff, kdata, valdec) => match lit { + AvroLiteral::Map(entries) => { + moff.push_length(entries.len()); + for (k, v) in entries { + let kb = k.as_bytes(); + koff.push_length(kb.len()); + kdata.extend_from_slice(kb); + valdec.append_default(v)?; + } + Ok(()) + } + _ => Err(ArrowError::InvalidArgumentError( + "Default for map must be a map/object literal".to_string(), + )), + }, + Self::Enum(indices, symbols, _) => match lit { + AvroLiteral::Enum(sym) => { + let pos = symbols.iter().position(|s| s == sym).ok_or_else(|| { + ArrowError::InvalidArgumentError(format!( + "Enum default symbol {sym:?} not in reader symbols" + )) + })?; + indices.push(pos as i32); + Ok(()) + } + _ => Err(ArrowError::InvalidArgumentError( + "Default for enum must be a symbol".to_string(), + )), + }, + Self::Record(field_meta, decoders, projector) => match lit { + AvroLiteral::Map(entries) => { + for (i, dec) in decoders.iter_mut().enumerate() { + let name = field_meta[i].name(); + if let Some(sub) = entries.get(name) { + dec.append_default(sub)?; + } else if let Some(proj) = projector.as_ref() { + proj.project_default(dec, i)?; + } else { + dec.append_null(); + } + } + Ok(()) + } + AvroLiteral::Null => { + for (i, dec) in decoders.iter_mut().enumerate() { + if let Some(proj) = projector.as_ref() { + proj.project_default(dec, i)?; + } else { + dec.append_null(); + } + } + Ok(()) + } + _ => Err(ArrowError::InvalidArgumentError( + "Default for record must be a map/object or null".to_string(), + )), + }, } } @@ -560,11 +768,14 @@ impl Decoder { let total_items = read_blocks(buf, |cursor| encoding.decode(cursor))?; off.push_length(total_items); } - Self::Record(_, encodings) => { + Self::Record(_, encodings, None) => { for encoding in encodings { encoding.decode(buf)?; } } + Self::Record(_, encodings, Some(proj)) => { + proj.project_record(buf, encodings)?; + } Self::Map(_, koff, moff, kdata, valdec) => { let newly_added = read_blocks(buf, |cur| { let kb = cur.get_bytes()?; @@ -578,9 +789,11 @@ impl Decoder { let fx = buf.get_fixed(*sz as usize)?; accum.extend_from_slice(fx); } + #[cfg(feature = "small_decimals")] Self::Decimal32(_, _, size, builder) => { decode_decimal!(size, buf, builder, 4, i32); } + #[cfg(feature = "small_decimals")] Self::Decimal64(_, _, size, builder) => { decode_decimal!(size, buf, builder, 8, i64); } @@ -590,21 +803,16 @@ impl Decoder { Self::Decimal256(_, _, size, builder) => { decode_decimal!(size, buf, builder, 32, i256); } - Self::Enum(indices, _) => { + Self::Enum(indices, _, None) => { indices.push(buf.get_int()?); } - Self::EnumResolved { - indices, - mapping, - default_index, - .. - } => { + Self::Enum(indices, _, Some(res)) => { let raw = buf.get_int()?; let resolved = usize::try_from(raw) .ok() - .and_then(|idx| mapping.get(idx).copied()) + .and_then(|idx| res.mapping.get(idx).copied()) .filter(|&idx| idx >= 0) - .unwrap_or(*default_index); + .unwrap_or(res.default_index); if resolved >= 0 { indices.push(resolved); } else { @@ -635,14 +843,6 @@ impl Decoder { } nb.append(is_not_null); } - Self::RecordResolved { - encodings, - writer_to_reader, - skip_decoders, - .. - } => { - decode_with_resolution(buf, encodings, writer_to_reader, skip_decoders)?; - } } Ok(()) } @@ -711,7 +911,7 @@ impl Decoder { let offsets = flush_offsets(offsets); Arc::new(ListArray::new(field.clone(), offsets, values, nulls)) } - Self::Record(fields, encodings) => { + Self::Record(fields, encodings, _) => { let arrays = encodings .iter_mut() .map(|x| x.flush(None)) @@ -764,9 +964,11 @@ impl Decoder { .map_err(|e| ArrowError::ParseError(e.to_string()))?; Arc::new(arr) } + #[cfg(feature = "small_decimals")] Self::Decimal32(precision, scale, _, builder) => { flush_decimal!(builder, precision, scale, nulls, Decimal32Array) } + #[cfg(feature = "small_decimals")] Self::Decimal64(precision, scale, _, builder) => { flush_decimal!(builder, precision, scale, nulls, Decimal64Array) } @@ -776,25 +978,13 @@ impl Decoder { Self::Decimal256(precision, scale, _, builder) => { flush_decimal!(builder, precision, scale, nulls, Decimal256Array) } - Self::Enum(indices, symbols) => flush_dict(indices, symbols, nulls)?, - Self::EnumResolved { - indices, symbols, .. - } => flush_dict(indices, symbols, nulls)?, + Self::Enum(indices, symbols, _) => flush_dict(indices, symbols, nulls)?, Self::Duration(builder) => { let (_, vals, _) = builder.finish().into_parts(); let vals = IntervalMonthDayNanoArray::try_new(vals, nulls) .map_err(|e| ArrowError::ParseError(e.to_string()))?; Arc::new(vals) } - Self::RecordResolved { - fields, encodings, .. - } => { - let arrays = encodings - .iter_mut() - .map(|x| x.flush(None)) - .collect::<Result<Vec<_>, _>>()?; - Arc::new(StructArray::new(fields.clone(), arrays, nulls)) - } }) } } @@ -976,6 +1166,120 @@ fn sign_cast_to<const N: usize>(raw: &[u8]) -> Result<[u8; N], ArrowError> { Ok(out) } +#[derive(Debug)] +struct Projector { + writer_to_reader: Arc<[Option<usize>]>, + skip_decoders: Vec<Option<Skipper>>, + field_defaults: Vec<Option<AvroLiteral>>, + default_injections: Arc<[(usize, AvroLiteral)]>, +} + +#[derive(Debug)] +struct ProjectorBuilder<'a> { + rec: &'a ResolvedRecord, + reader_fields: Arc<[AvroField]>, +} + +impl<'a> ProjectorBuilder<'a> { + #[inline] + fn try_new(rec: &'a ResolvedRecord, reader_fields: &Arc<[AvroField]>) -> Self { + Self { + rec, + reader_fields: reader_fields.clone(), + } + } + + #[inline] + fn build(self) -> Result<Projector, ArrowError> { + let reader_fields = self.reader_fields; + let mut field_defaults: Vec<Option<AvroLiteral>> = Vec::with_capacity(reader_fields.len()); + for avro_field in reader_fields.as_ref() { + if let Some(ResolutionInfo::DefaultValue(lit)) = + avro_field.data_type().resolution.as_ref() + { + field_defaults.push(Some(lit.clone())); + } else { + field_defaults.push(None); + } + } + let mut default_injections: Vec<(usize, AvroLiteral)> = + Vec::with_capacity(self.rec.default_fields.len()); + for &idx in self.rec.default_fields.as_ref() { + let lit = field_defaults + .get(idx) + .and_then(|lit| lit.clone()) + .unwrap_or(AvroLiteral::Null); + default_injections.push((idx, lit)); + } + let mut skip_decoders: Vec<Option<Skipper>> = + Vec::with_capacity(self.rec.skip_fields.len()); + for datatype in self.rec.skip_fields.as_ref() { + let skipper = match datatype { + Some(datatype) => Some(Skipper::from_avro(datatype)?), + None => None, + }; + skip_decoders.push(skipper); + } + Ok(Projector { + writer_to_reader: self.rec.writer_to_reader.clone(), + skip_decoders, + field_defaults, + default_injections: default_injections.into(), + }) + } +} + +impl Projector { + #[inline] + fn project_default(&self, decoder: &mut Decoder, index: usize) -> Result<(), ArrowError> { + // SAFETY: `index` is obtained by listing the reader's record fields (i.e., from + // `decoders.iter_mut().enumerate()`), and `field_defaults` was built in + // `ProjectorBuilder::build` to have exactly one element per reader field. + // Therefore, `index < self.field_defaults.len()` always holds here, so + // `self.field_defaults[index]` cannot panic. We only take an immutable reference + // via `.as_ref()`, and `self` is borrowed immutably. + if let Some(default_literal) = self.field_defaults[index].as_ref() { + decoder.append_default(default_literal) + } else { + decoder.append_null(); + Ok(()) + } + } + + #[inline] + fn project_record( + &mut self, + buf: &mut AvroCursor<'_>, + encodings: &mut [Decoder], + ) -> Result<(), ArrowError> { + debug_assert_eq!( + self.writer_to_reader.len(), + self.skip_decoders.len(), + "internal invariant: mapping and skipper lists must have equal length" + ); + for (i, (mapping, skipper_opt)) in self + .writer_to_reader + .iter() + .zip(self.skip_decoders.iter_mut()) + .enumerate() + { + match (mapping, skipper_opt.as_mut()) { + (Some(reader_index), _) => encodings[*reader_index].decode(buf)?, + (None, Some(skipper)) => skipper.skip(buf)?, + (None, None) => { + return Err(ArrowError::SchemaError(format!( + "No skipper available for writer-only field at index {i}", + ))); + } + } + } + for (reader_index, lit) in self.default_injections.as_ref() { + encodings[*reader_index].append_default(lit)?; + } + Ok(()) + } +} + /// Lightweight skipper for non‑projected writer fields /// (fields present in the writer schema but omitted by the reader/projection); /// per Avro 1.11.1 schema resolution these fields are ignored. @@ -1126,25 +1430,13 @@ impl Skipper { } } -#[inline] -fn build_skip_decoders( - skip_fields: &[Option<AvroDataType>], -) -> Result<Vec<Option<Skipper>>, ArrowError> { - skip_fields - .iter() - .map(|opt| opt.as_ref().map(Skipper::from_avro).transpose()) - .collect() -} - #[cfg(test)] mod tests { use super::*; use crate::codec::AvroField; - use arrow_array::{ - cast::AsArray, Array, Decimal128Array, Decimal256Array, Decimal32Array, DictionaryArray, - FixedSizeBinaryArray, IntervalMonthDayNanoArray, ListArray, MapArray, StringArray, - StructArray, - }; + use crate::schema::{PrimitiveType, Schema, TypeName}; + use arrow_array::cast::AsArray; + use indexmap::IndexMap; fn encode_avro_int(value: i32) -> Vec<u8> { let mut buf = Vec::new(); @@ -1977,12 +2269,14 @@ mod tests { vec!["B".to_string(), "C".to_string(), "A".to_string()].into(); let mapping: Arc<[i32]> = Arc::from(vec![2, 0, 1]); let default_index: i32 = -1; - let mut dec = Decoder::EnumResolved { - indices: Vec::with_capacity(DEFAULT_CAPACITY), - symbols: reader_symbols.clone(), - mapping, - default_index, - }; + let mut dec = Decoder::Enum( + Vec::with_capacity(DEFAULT_CAPACITY), + reader_symbols.clone(), + Some(EnumResolution { + mapping, + default_index, + }), + ); let mut data = Vec::new(); data.extend_from_slice(&encode_avro_int(0)); data.extend_from_slice(&encode_avro_int(1)); @@ -2013,12 +2307,14 @@ mod tests { let reader_symbols: Arc<[String]> = vec!["A".to_string(), "B".to_string()].into(); let default_index: i32 = 1; let mapping: Arc<[i32]> = Arc::from(vec![0, 1]); - let mut dec = Decoder::EnumResolved { - indices: Vec::with_capacity(DEFAULT_CAPACITY), - symbols: reader_symbols.clone(), - mapping, - default_index, - }; + let mut dec = Decoder::Enum( + Vec::with_capacity(DEFAULT_CAPACITY), + reader_symbols.clone(), + Some(EnumResolution { + mapping, + default_index, + }), + ); let mut data = Vec::new(); data.extend_from_slice(&encode_avro_int(0)); data.extend_from_slice(&encode_avro_int(1)); @@ -2048,12 +2344,14 @@ mod tests { let reader_symbols: Arc<[String]> = vec!["A".to_string()].into(); let default_index: i32 = -1; // indicates no default at type-level let mapping: Arc<[i32]> = Arc::from(vec![-1]); - let mut dec = Decoder::EnumResolved { - indices: Vec::with_capacity(DEFAULT_CAPACITY), - symbols: reader_symbols, - mapping, - default_index, - }; + let mut dec = Decoder::Enum( + Vec::with_capacity(DEFAULT_CAPACITY), + reader_symbols, + Some(EnumResolution { + mapping, + default_index, + }), + ); let data = encode_avro_int(0); let mut cur = AvroCursor::new(&data); let err = dec @@ -2069,7 +2367,7 @@ mod tests { fn make_record_resolved_decoder( reader_fields: &[(&str, DataType, bool)], writer_to_reader: Vec<Option<usize>>, - mut skip_decoders: Vec<Option<super::Skipper>>, + skip_decoders: Vec<Option<Skipper>>, ) -> Decoder { let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len()); let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len()); @@ -2086,12 +2384,16 @@ mod tests { encodings.push(enc); } let fields: Fields = field_refs.into(); - Decoder::RecordResolved { + Decoder::Record( fields, encodings, - writer_to_reader: Arc::from(writer_to_reader), - skip_decoders, - } + Some(Projector { + writer_to_reader: Arc::from(writer_to_reader), + skip_decoders, + field_defaults: vec![None; reader_fields.len()], + default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()), + }), + ) } #[test] @@ -2257,4 +2559,445 @@ mod tests { assert_eq!(id.value(0), 5); assert_eq!(id.value(1), 7); } + + fn make_record_decoder_with_projector_defaults( + reader_fields: &[(&str, DataType, bool)], + field_defaults: Vec<Option<AvroLiteral>>, + default_injections: Vec<(usize, AvroLiteral)>, + writer_to_reader_len: usize, + ) -> Decoder { + assert_eq!( + field_defaults.len(), + reader_fields.len(), + "field_defaults must have one entry per reader field" + ); + let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len()); + let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len()); + for (name, dt, nullable) in reader_fields { + field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable))); + let enc = match dt { + DataType::Int32 => Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY)), + DataType::Int64 => Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY)), + DataType::Utf8 => Decoder::String( + OffsetBufferBuilder::new(DEFAULT_CAPACITY), + Vec::with_capacity(DEFAULT_CAPACITY), + ), + other => panic!("Unsupported test field type in helper: {other:?}"), + }; + encodings.push(enc); + } + let fields: Fields = field_refs.into(); + let skip_decoders: Vec<Option<Skipper>> = + (0..writer_to_reader_len).map(|_| None::<Skipper>).collect(); + let projector = Projector { + writer_to_reader: Arc::from(vec![None; writer_to_reader_len]), + skip_decoders, + field_defaults, + default_injections: Arc::from(default_injections), + }; + Decoder::Record(fields, encodings, Some(projector)) + } + + #[test] + fn test_default_append_int32_and_int64_from_int_and_long() { + let mut d_i32 = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY)); + d_i32.append_default(&AvroLiteral::Int(42)).unwrap(); + let arr = d_i32.flush(None).unwrap(); + let a = arr.as_any().downcast_ref::<Int32Array>().unwrap(); + assert_eq!(a.len(), 1); + assert_eq!(a.value(0), 42); + let mut d_i64 = Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY)); + d_i64.append_default(&AvroLiteral::Int(5)).unwrap(); + d_i64.append_default(&AvroLiteral::Long(7)).unwrap(); + let arr64 = d_i64.flush(None).unwrap(); + let a64 = arr64.as_any().downcast_ref::<Int64Array>().unwrap(); + assert_eq!(a64.len(), 2); + assert_eq!(a64.value(0), 5); + assert_eq!(a64.value(1), 7); + } + + #[test] + fn test_default_append_floats_and_doubles() { + let mut d_f32 = Decoder::Float32(Vec::with_capacity(DEFAULT_CAPACITY)); + d_f32.append_default(&AvroLiteral::Float(1.5)).unwrap(); + let arr32 = d_f32.flush(None).unwrap(); + let a = arr32.as_any().downcast_ref::<Float32Array>().unwrap(); + assert_eq!(a.value(0), 1.5); + let mut d_f64 = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY)); + d_f64.append_default(&AvroLiteral::Double(2.25)).unwrap(); + let arr64 = d_f64.flush(None).unwrap(); + let b = arr64.as_any().downcast_ref::<Float64Array>().unwrap(); + assert_eq!(b.value(0), 2.25); + } + + #[test] + fn test_default_append_string_and_bytes() { + let mut d_str = Decoder::String( + OffsetBufferBuilder::new(DEFAULT_CAPACITY), + Vec::with_capacity(DEFAULT_CAPACITY), + ); + d_str + .append_default(&AvroLiteral::String("hi".into())) + .unwrap(); + let s_arr = d_str.flush(None).unwrap(); + let arr = s_arr.as_any().downcast_ref::<StringArray>().unwrap(); + assert_eq!(arr.value(0), "hi"); + let mut d_bytes = Decoder::Binary( + OffsetBufferBuilder::new(DEFAULT_CAPACITY), + Vec::with_capacity(DEFAULT_CAPACITY), + ); + d_bytes + .append_default(&AvroLiteral::Bytes(vec![1, 2, 3])) + .unwrap(); + let b_arr = d_bytes.flush(None).unwrap(); + let barr = b_arr.as_any().downcast_ref::<BinaryArray>().unwrap(); + assert_eq!(barr.value(0), &[1, 2, 3]); + let mut d_str_err = Decoder::String( + OffsetBufferBuilder::new(DEFAULT_CAPACITY), + Vec::with_capacity(DEFAULT_CAPACITY), + ); + let err = d_str_err + .append_default(&AvroLiteral::Bytes(vec![0x61, 0x62])) + .unwrap_err(); + assert!( + err.to_string() + .contains("Default for string must be string"), + "unexpected error: {err:?}" + ); + } + + #[test] + fn test_default_append_nullable_int32_null_and_value() { + let inner = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY)); + let mut dec = Decoder::Nullable( + Nullability::NullFirst, + NullBufferBuilder::new(DEFAULT_CAPACITY), + Box::new(inner), + ); + dec.append_default(&AvroLiteral::Null).unwrap(); + dec.append_default(&AvroLiteral::Int(11)).unwrap(); + let arr = dec.flush(None).unwrap(); + let a = arr.as_any().downcast_ref::<Int32Array>().unwrap(); + assert_eq!(a.len(), 2); + assert!(a.is_null(0)); + assert_eq!(a.value(1), 11); + } + + #[test] + fn test_default_append_array_of_ints() { + let list_dt = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32)))); + let mut d = Decoder::try_new(&list_dt).unwrap(); + let items = vec![ + AvroLiteral::Int(1), + AvroLiteral::Int(2), + AvroLiteral::Int(3), + ]; + d.append_default(&AvroLiteral::Array(items)).unwrap(); + let arr = d.flush(None).unwrap(); + let list = arr.as_any().downcast_ref::<ListArray>().unwrap(); + assert_eq!(list.len(), 1); + assert_eq!(list.value_length(0), 3); + let vals = list.values().as_any().downcast_ref::<Int32Array>().unwrap(); + assert_eq!(vals.values(), &[1, 2, 3]); + } + + #[test] + fn test_default_append_map_string_to_int() { + let map_dt = avro_from_codec(Codec::Map(Arc::new(avro_from_codec(Codec::Int32)))); + let mut d = Decoder::try_new(&map_dt).unwrap(); + let mut m: IndexMap<String, AvroLiteral> = IndexMap::new(); + m.insert("k1".to_string(), AvroLiteral::Int(10)); + m.insert("k2".to_string(), AvroLiteral::Int(20)); + d.append_default(&AvroLiteral::Map(m)).unwrap(); + let arr = d.flush(None).unwrap(); + let map = arr.as_any().downcast_ref::<MapArray>().unwrap(); + assert_eq!(map.len(), 1); + assert_eq!(map.value_length(0), 2); + let binding = map.value(0); + let entries = binding.as_any().downcast_ref::<StructArray>().unwrap(); + let k = entries + .column_by_name("key") + .unwrap() + .as_any() + .downcast_ref::<StringArray>() + .unwrap(); + let v = entries + .column_by_name("value") + .unwrap() + .as_any() + .downcast_ref::<Int32Array>() + .unwrap(); + let keys: std::collections::HashSet<&str> = (0..k.len()).map(|i| k.value(i)).collect(); + assert_eq!(keys, ["k1", "k2"].into_iter().collect()); + let vals: std::collections::HashSet<i32> = (0..v.len()).map(|i| v.value(i)).collect(); + assert_eq!(vals, [10, 20].into_iter().collect()); + } + + #[test] + fn test_default_append_enum_by_symbol() { + let symbols: Arc<[String]> = vec!["A".into(), "B".into(), "C".into()].into(); + let mut d = Decoder::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), None); + d.append_default(&AvroLiteral::Enum("B".into())).unwrap(); + let arr = d.flush(None).unwrap(); + let dict = arr + .as_any() + .downcast_ref::<DictionaryArray<Int32Type>>() + .unwrap(); + assert_eq!(dict.len(), 1); + let expected = Int32Array::from(vec![1]); + assert_eq!(dict.keys(), &expected); + let values = dict + .values() + .as_any() + .downcast_ref::<StringArray>() + .unwrap(); + assert_eq!(values.value(1), "B"); + } + + #[test] + fn test_default_append_uuid_and_type_error() { + let mut d = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)); + let uuid_str = "123e4567-e89b-12d3-a456-426614174000"; + d.append_default(&AvroLiteral::String(uuid_str.into())) + .unwrap(); + let arr_ref = d.flush(None).unwrap(); + let arr = arr_ref + .as_any() + .downcast_ref::<FixedSizeBinaryArray>() + .unwrap(); + assert_eq!(arr.value_length(), 16); + assert_eq!(arr.len(), 1); + let mut d2 = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)); + let err = d2 + .append_default(&AvroLiteral::Bytes(vec![0u8; 16])) + .unwrap_err(); + assert!( + err.to_string().contains("Default for uuid must be string"), + "unexpected error: {err:?}" + ); + } + + #[test] + fn test_default_append_fixed_and_length_mismatch() { + let mut d = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY)); + d.append_default(&AvroLiteral::Bytes(vec![1, 2, 3, 4])) + .unwrap(); + let arr_ref = d.flush(None).unwrap(); + let arr = arr_ref + .as_any() + .downcast_ref::<FixedSizeBinaryArray>() + .unwrap(); + assert_eq!(arr.value_length(), 4); + assert_eq!(arr.value(0), &[1, 2, 3, 4]); + let mut d_err = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY)); + let err = d_err + .append_default(&AvroLiteral::Bytes(vec![1, 2, 3])) + .unwrap_err(); + assert!( + err.to_string().contains("Fixed default length"), + "unexpected error: {err:?}" + ); + } + + #[test] + fn test_default_append_duration_and_length_validation() { + let dt = avro_from_codec(Codec::Interval); + let mut d = Decoder::try_new(&dt).unwrap(); + let mut bytes = Vec::with_capacity(12); + bytes.extend_from_slice(&1u32.to_le_bytes()); + bytes.extend_from_slice(&2u32.to_le_bytes()); + bytes.extend_from_slice(&3u32.to_le_bytes()); + d.append_default(&AvroLiteral::Bytes(bytes)).unwrap(); + let arr_ref = d.flush(None).unwrap(); + let arr = arr_ref + .as_any() + .downcast_ref::<IntervalMonthDayNanoArray>() + .unwrap(); + assert_eq!(arr.len(), 1); + let v = arr.value(0); + assert_eq!(v.months, 1); + assert_eq!(v.days, 2); + assert_eq!(v.nanoseconds, 3_000_000); + let mut d_err = Decoder::try_new(&avro_from_codec(Codec::Interval)).unwrap(); + let err = d_err + .append_default(&AvroLiteral::Bytes(vec![0u8; 11])) + .unwrap_err(); + assert!( + err.to_string() + .contains("Duration default must be exactly 12 bytes"), + "unexpected error: {err:?}" + ); + } + + #[test] + fn test_default_append_decimal256_from_bytes() { + let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32))); + let mut d = Decoder::try_new(&dt).unwrap(); + let pos: [u8; 32] = [ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x30, 0x39, + ]; + d.append_default(&AvroLiteral::Bytes(pos.to_vec())).unwrap(); + let neg: [u8; 32] = [ + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0x85, + ]; + d.append_default(&AvroLiteral::Bytes(neg.to_vec())).unwrap(); + let arr = d.flush(None).unwrap(); + let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap(); + assert_eq!(dec.len(), 2); + assert_eq!(dec.value_as_string(0), "123.45"); + assert_eq!(dec.value_as_string(1), "-1.23"); + } + + #[test] + fn test_record_append_default_map_missing_fields_uses_projector_field_defaults() { + let field_defaults = vec![None, Some(AvroLiteral::String("hi".into()))]; + let mut rec = make_record_decoder_with_projector_defaults( + &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)], + field_defaults, + vec![], + 0, + ); + let mut map: IndexMap<String, AvroLiteral> = IndexMap::new(); + map.insert("a".to_string(), AvroLiteral::Int(7)); + rec.append_default(&AvroLiteral::Map(map)).unwrap(); + let arr = rec.flush(None).unwrap(); + let s = arr.as_any().downcast_ref::<StructArray>().unwrap(); + let a = s + .column_by_name("a") + .unwrap() + .as_any() + .downcast_ref::<Int32Array>() + .unwrap(); + let b = s + .column_by_name("b") + .unwrap() + .as_any() + .downcast_ref::<StringArray>() + .unwrap(); + assert_eq!(a.value(0), 7); + assert_eq!(b.value(0), "hi"); + } + + #[test] + fn test_record_append_default_null_uses_projector_field_defaults() { + let field_defaults = vec![ + Some(AvroLiteral::Int(5)), + Some(AvroLiteral::String("x".into())), + ]; + let mut rec = make_record_decoder_with_projector_defaults( + &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)], + field_defaults, + vec![], + 0, + ); + rec.append_default(&AvroLiteral::Null).unwrap(); + let arr = rec.flush(None).unwrap(); + let s = arr.as_any().downcast_ref::<StructArray>().unwrap(); + let a = s + .column_by_name("a") + .unwrap() + .as_any() + .downcast_ref::<Int32Array>() + .unwrap(); + let b = s + .column_by_name("b") + .unwrap() + .as_any() + .downcast_ref::<StringArray>() + .unwrap(); + assert_eq!(a.value(0), 5); + assert_eq!(b.value(0), "x"); + } + + #[test] + fn test_record_append_default_missing_fields_without_projector_defaults_yields_type_nulls_or_empties( + ) { + let fields = vec![("a", DataType::Int32, true), ("b", DataType::Utf8, true)]; + let mut field_refs: Vec<FieldRef> = Vec::new(); + let mut encoders: Vec<Decoder> = Vec::new(); + for (name, dt, nullable) in &fields { + field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable))); + } + let enc_a = Decoder::Nullable( + Nullability::NullSecond, + NullBufferBuilder::new(DEFAULT_CAPACITY), + Box::new(Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY))), + ); + let enc_b = Decoder::Nullable( + Nullability::NullSecond, + NullBufferBuilder::new(DEFAULT_CAPACITY), + Box::new(Decoder::String( + OffsetBufferBuilder::new(DEFAULT_CAPACITY), + Vec::with_capacity(DEFAULT_CAPACITY), + )), + ); + encoders.push(enc_a); + encoders.push(enc_b); + let projector = Projector { + writer_to_reader: Arc::from(vec![]), + skip_decoders: vec![], + field_defaults: vec![None, None], // no defaults -> append_null + default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()), + }; + let mut rec = Decoder::Record(field_refs.into(), encoders, Some(projector)); + let mut map: IndexMap<String, AvroLiteral> = IndexMap::new(); + map.insert("a".to_string(), AvroLiteral::Int(9)); + rec.append_default(&AvroLiteral::Map(map)).unwrap(); + let arr = rec.flush(None).unwrap(); + let s = arr.as_any().downcast_ref::<StructArray>().unwrap(); + let a = s + .column_by_name("a") + .unwrap() + .as_any() + .downcast_ref::<Int32Array>() + .unwrap(); + let b = s + .column_by_name("b") + .unwrap() + .as_any() + .downcast_ref::<StringArray>() + .unwrap(); + assert!(a.is_valid(0)); + assert_eq!(a.value(0), 9); + assert!(b.is_null(0)); + } + + #[test] + fn test_projector_default_injection_when_writer_lacks_fields() { + let defaults = vec![None, None]; + let injections = vec![ + (0, AvroLiteral::Int(99)), + (1, AvroLiteral::String("alice".into())), + ]; + let mut rec = make_record_decoder_with_projector_defaults( + &[ + ("id", DataType::Int32, false), + ("name", DataType::Utf8, false), + ], + defaults, + injections, + 0, + ); + rec.decode(&mut AvroCursor::new(&[])).unwrap(); + let arr = rec.flush(None).unwrap(); + let s = arr.as_any().downcast_ref::<StructArray>().unwrap(); + let id = s + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::<Int32Array>() + .unwrap(); + let name = s + .column_by_name("name") + .unwrap() + .as_any() + .downcast_ref::<StringArray>() + .unwrap(); + assert_eq!(id.value(0), 99); + assert_eq!(name.value(0), "alice"); + } }