jecsand838 commented on code in PR #9328:
URL: https://github.com/apache/arrow-rs/pull/9328#discussion_r2776103484


##########
arrow-avro/src/reader/record.rs:
##########
@@ -504,20 +512,47 @@ impl Decoder {
         };
         Ok(match data_type.nullability() {
             Some(nullability) => {
-                // Default to reading a union branch tag unless the resolution 
proves otherwise.
-                let mut plan = NullablePlan::ReadTag;
-                if let Some(ResolutionInfo::Union(info)) = 
data_type.resolution.as_ref() {
-                    if !info.writer_is_union && info.reader_is_union {
-                        if let Some(Some((_reader_idx, promo))) = 
info.writer_to_reader.first() {
-                            plan = NullablePlan::FromSingle { promotion: 
*promo };
+                // Default to reading a union branch tag unless the resolution 
directs otherwise.
+                let plan = match &data_type.resolution {
+                    None => NullablePlan::ReadTag {
+                        nullability,
+                        resolution: 
ResolutionPlan::Promotion(Promotion::Direct),
+                    },
+                    Some(ResolutionInfo::Promotion(_)) => {
+                        // Promotions should have been incorporated
+                        // into the inner decoder.
+                        NullablePlan::FromSingle {
+                            resolution: 
ResolutionPlan::Promotion(Promotion::Direct),
                         }
                     }
-                }
+                    Some(ResolutionInfo::Union(info)) if !info.writer_is_union 
=> {
+                        let Some(Some((_, resolution))) = 
info.writer_to_reader.first() else {
+                            panic!(
+                                "unexpected union resolution info for 
non-union writer and union reader type",
+                            );
+                        };
+                        let resolution = ResolutionPlan::try_new(&decoder, 
resolution)?;
+                        NullablePlan::FromSingle { resolution }
+                    }
+                    Some(ResolutionInfo::Union(info)) => {
+                        let Some((_, resolution)) =
+                            
info.writer_to_reader[nullability.non_null_index()].as_ref()
+                        else {
+                            panic!("unexpected union resolution info for 
nullable writer type");

Review Comment:
   Same here. The panic on unexpected nullable-union mapping turns a 
recoverable schema mismatch into a hard crash.



##########
arrow-avro/src/reader/record.rs:
##########
@@ -504,20 +512,47 @@ impl Decoder {
         };
         Ok(match data_type.nullability() {
             Some(nullability) => {
-                // Default to reading a union branch tag unless the resolution 
proves otherwise.
-                let mut plan = NullablePlan::ReadTag;
-                if let Some(ResolutionInfo::Union(info)) = 
data_type.resolution.as_ref() {
-                    if !info.writer_is_union && info.reader_is_union {
-                        if let Some(Some((_reader_idx, promo))) = 
info.writer_to_reader.first() {
-                            plan = NullablePlan::FromSingle { promotion: 
*promo };
+                // Default to reading a union branch tag unless the resolution 
directs otherwise.
+                let plan = match &data_type.resolution {
+                    None => NullablePlan::ReadTag {
+                        nullability,
+                        resolution: 
ResolutionPlan::Promotion(Promotion::Direct),
+                    },
+                    Some(ResolutionInfo::Promotion(_)) => {
+                        // Promotions should have been incorporated
+                        // into the inner decoder.
+                        NullablePlan::FromSingle {
+                            resolution: 
ResolutionPlan::Promotion(Promotion::Direct),
                         }
                     }
-                }
+                    Some(ResolutionInfo::Union(info)) if !info.writer_is_union 
=> {
+                        let Some(Some((_, resolution))) = 
info.writer_to_reader.first() else {
+                            panic!(
+                                "unexpected union resolution info for 
non-union writer and union reader type",
+                            );

Review Comment:
   This new panic in decoder construction can abort the process for 
malformed/partial union resolution state. Since this path already returns 
`Result`, I think using `return AvroError::SchemaError` (or `ParseError`) 
instead of panicking would be better, so callers can decide how to handle the 
failure.



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1054,10 +1082,45 @@ impl Decoder {
         }
     }
 
+    fn decode_with_resolution<'d>(
+        &'d mut self,
+        buf: &mut AvroCursor<'_>,
+        resolution: &'d ResolutionPlan,
+    ) -> Result<(), AvroError> {
+        #[cfg(feature = "avro_custom_types")]
+        if let Self::RunEndEncoded(_, len, inner) = self {
+            *len += 1;
+            return inner.decode_with_resolution(buf, resolution);
+        }
+
+        match resolution {
+            ResolutionPlan::Promotion(promotion) => {
+                let promotion = *promotion;
+                self.decode_with_promotion(buf, promotion)
+            }
+            ResolutionPlan::DefaultValue(lit) => self.append_default(lit),
+            ResolutionPlan::EnumMapping(res) => {
+                let Self::Enum(indices, _, _) = self else {
+                    panic!("enum mapping resolution provided for non-enum 
decoder");
+                };
+                let raw = buf.get_int()?;
+                let resolved = res.resolve(raw)?;
+                indices.push(resolved);
+                Ok(())
+            }
+            ResolutionPlan::Record(proj) => {
+                let Self::Record(_, encodings, _, _) = self else {
+                    panic!("record projection provided for non-record 
decoder");

Review Comment:
   Same here as well. We should probably be passing `AvroError`s back to the 
caller instead of potentially crashing imo.



##########
arrow-avro/src/codec.rs:
##########
@@ -1533,62 +1550,35 @@ impl<'a> Maker<'a> {
                     nullable_union_variants(reader_variants)
                 {
                     let mut dt = self.resolve_type(writer_non_union, 
non_null_branch, namespace)?;
-                    let non_null_idx = match nullability {
-                        Nullability::NullFirst => 1,
-                        Nullability::NullSecond => 0,
-                    };
                     #[cfg(feature = "avro_custom_types")]
                     Self::propagate_nullability_into_ree(&mut dt, nullability);
                     dt.nullability = Some(nullability);
-                    let promotion = Self::coercion_from(&dt);
-                    dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
-                        writer_to_reader: Arc::from(vec![Some((non_null_idx, 
promotion))]),
-                        writer_is_union: false,
-                        reader_is_union: true,
-                    }));
+                    // Ensure resolution is set to a non-Union variant to 
suppress
+                    // reading the union tag which is the default behavior.
+                    if dt.resolution.is_none() {
+                        dt.resolution = 
Some(ResolutionInfo::Promotion(Promotion::Direct));
+                    }
                     Ok(dt)
                 } else {
-                    let mut best_match: Option<(usize, AvroDataType, 
Promotion)> = None;
-                    for (i, variant) in reader_variants.iter().enumerate() {
-                        if let Ok(resolved_dt) =
-                            self.resolve_type(writer_non_union, variant, 
namespace)
-                        {
-                            let promotion = Self::coercion_from(&resolved_dt);
-                            if promotion == Promotion::Direct {
-                                best_match = Some((i, resolved_dt, promotion));
-                                break;
-                            } else if best_match.is_none() {
-                                best_match = Some((i, resolved_dt, promotion));
-                            }
-                        }
-                    }
-                    let Some((match_idx, match_dt, promotion)) = best_match 
else {
+                    let Some((match_idx, resolution)) =
+                        self.find_best_union_match(writer_non_union, 
reader_variants, namespace)
+                    else {
                         return Err(ArrowError::SchemaError(
                             "Writer schema does not match any reader union 
branch".to_string(),
                         ));
                     };
-                    let mut children = 
Vec::with_capacity(reader_variants.len());
-                    let mut match_dt = Some(match_dt);
-                    for (i, variant) in reader_variants.iter().enumerate() {
-                        if i == match_idx {
-                            if let Some(mut dt) = match_dt.take() {
-                                if matches!(dt.resolution, 
Some(ResolutionInfo::Promotion(_))) {
-                                    dt.resolution = None;
-                                }
-                                children.push(dt);
-                            }
-                        } else {
-                            children.push(self.parse_type(variant, 
namespace)?);
-                        }
-                    }
+                    let children = reader_variants
+                        .iter()
+                        .map(|variant| self.parse_type(variant, namespace))
+                        .collect::<Result<Vec<_>, _>>()?;

Review Comment:
   I think there's a risk of a regression here by not using the `resolve_type` 
to preserve field level resolution, especially the DefaultValue. 
   
   See the test below and let me know what you think.
   
   ```rust
       #[test]
       fn 
test_resolve_writer_non_union_record_to_reader_union_preserves_defaults() {
           // Writer: record Inner{a: int}
           // Reader: union [Inner{a: int, b: int default 42}, string]
           // The matching child (Inner) should preserve DefaultValue(Int(42)) 
on field b.
           let writer = Schema::Complex(ComplexType::Record(Record {
               name: "Inner",
               namespace: None,
               doc: None,
               aliases: vec![],
               fields: vec![AvroFieldSchema {
                   name: "a",
                   doc: None,
                   r#type: mk_primitive(PrimitiveType::Int),
                   default: None,
                   aliases: vec![],
               }],
               attributes: Attributes::default(),
           }));
           let reader = mk_union(vec![
               Schema::Complex(ComplexType::Record(Record {
                   name: "Inner",
                   namespace: None,
                   doc: None,
                   aliases: vec![],
                   fields: vec![
                       AvroFieldSchema {
                           name: "a",
                           doc: None,
                           r#type: mk_primitive(PrimitiveType::Int),
                           default: None,
                           aliases: vec![],
                       },
                       AvroFieldSchema {
                           name: "b",
                           doc: None,
                           r#type: mk_primitive(PrimitiveType::Int),
                           default: 
Some(Value::Number(serde_json::Number::from(42))),
                           aliases: vec![],
                       },
                   ],
                   attributes: Attributes::default(),
               })),
               mk_primitive(PrimitiveType::String),
           ]);
           let mut maker = Maker::new(false, false);
           let dt = maker
               .make_data_type(&writer, Some(&reader), None)
               .expect("resolution should succeed");
           // Verify the union resolution structure
           let resolved = match dt.resolution.as_ref() {
               Some(ResolutionInfo::Union(u)) => u,
               other => panic!("expected union resolution info, got {other:?}"),
           };
           assert!(!resolved.writer_is_union && resolved.reader_is_union);
           // The matching child (Inner at index 0) should have field b with 
DefaultValue
           let children = match dt.codec() {
               Codec::Union(children, _, _) => children,
               other => panic!("expected union codec, got {other:?}"),
           };
           let inner_fields = match children[0].codec() {
               Codec::Struct(f) => f,
               other => panic!("expected struct codec for Inner, got 
{other:?}"),
           };
           assert_eq!(inner_fields.len(), 2);
           assert_eq!(inner_fields[1].name(), "b");
           assert_eq!(
               inner_fields[1].data_type().resolution,
               Some(ResolutionInfo::DefaultValue(AvroLiteral::Int(42))),
               "field b should have DefaultValue(Int(42)) from schema 
resolution"
           );
       }
   ```



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1315,64 +1455,94 @@ const NO_SOURCE: i8 = -1;
 
 impl DispatchLookupTable {
     fn from_writer_to_reader(
-        promotion_map: &[Option<(usize, Promotion)>],
+        reader_branches: &[Decoder],
+        resolution_map: &[Option<(usize, ResolutionInfo)>],
     ) -> Result<Self, AvroError> {
-        let mut to_reader = Vec::with_capacity(promotion_map.len());
-        let mut promotion = Vec::with_capacity(promotion_map.len());
-        for map in promotion_map {
-            match *map {
-                Some((idx, promo)) => {
+        let mut to_reader = Vec::with_capacity(resolution_map.len());
+        let mut resolution = Vec::with_capacity(resolution_map.len());
+        for map in resolution_map {
+            match map {
+                Some((idx, res)) => {
+                    let idx = *idx;
                     let idx_i8 = i8::try_from(idx).map_err(|_| {
                         AvroError::SchemaError(format!(
                             "Reader branch index {idx} exceeds i8 range (max 
{})",
                             i8::MAX
                         ))
                     })?;
+                    let plan = ResolutionPlan::try_new(&reader_branches[idx], 
res)?;
                     to_reader.push(idx_i8);
-                    promotion.push(promo);
+                    resolution.push(plan);
                 }
                 None => {
                     to_reader.push(NO_SOURCE);
-                    promotion.push(Promotion::Direct);
+                    
resolution.push(ResolutionPlan::DefaultValue(AvroLiteral::Null));
                 }
             }
         }
         Ok(Self {
             to_reader: to_reader.into_boxed_slice(),
-            promotion: promotion.into_boxed_slice(),
+            resolution: resolution.into_boxed_slice(),
         })
     }
 
-    // Resolve a writer branch index to (reader_idx, promotion)
+    // Resolve a writer branch index to (reader_idx, resolution)
     #[inline]
-    fn resolve(&self, writer_index: usize) -> Option<(usize, Promotion)> {
+    fn resolve(&self, writer_index: usize) -> Option<(usize, &ResolutionPlan)> 
{
         let reader_index = *self.to_reader.get(writer_index)?;
-        (reader_index >= 0).then(|| (reader_index as usize, 
self.promotion[writer_index]))
+        (reader_index >= 0).then(|| (reader_index as usize, 
&self.resolution[writer_index]))
     }
 }
 
 #[derive(Debug)]
 struct UnionDecoder {
     fields: UnionFields,
-    type_ids: Vec<i8>,
-    offsets: Vec<i32>,
-    branches: Vec<Decoder>,
-    counts: Vec<i32>,
-    reader_type_codes: Vec<i8>,
+    branches: UnionDecoderBranches,

Review Comment:
   Nice! Love the clean up here :)



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1283,6 +1346,83 @@ impl Decoder {
     }
 }
 
+/// Runtime plan for decoding reader-side `["null", T]` types.
+#[derive(Debug)]
+enum NullablePlan {
+    /// Writer actually wrote a union (branch tag present).
+    ReadTag {
+        nullability: Nullability,
+        resolution: ResolutionPlan,
+    },
+    /// Writer wrote a single (non-union) value resolved to the non-null branch
+    /// of the reader union; do NOT read a branch tag, but apply any 
resolution.
+    FromSingle { resolution: ResolutionPlan },
+}
+
+/// Runtime plan for resolving writer-reader type differences.
+#[derive(Debug)]
+enum ResolutionPlan {
+    /// Indicates that the writer's type should be promoted to the reader's 
type.
+    Promotion(Promotion),
+    /// Provides a default value for the field missing in the writer type.
+    DefaultValue(AvroLiteral),
+    /// Provides mapping information for resolving enums.
+    EnumMapping(EnumResolution),
+    /// Provides projection information for record fields.
+    Record(Projector),
+}
+
+impl ResolutionPlan {
+    fn try_new(decoder: &Decoder, resolution: &ResolutionInfo) -> Result<Self, 
AvroError> {
+        match (decoder, resolution) {
+            (_, ResolutionInfo::Promotion(p)) => 
Ok(ResolutionPlan::Promotion(*p)),
+            (_, ResolutionInfo::DefaultValue(lit)) => 
Ok(ResolutionPlan::DefaultValue(lit.clone())),
+            (_, ResolutionInfo::EnumMapping(m)) => {
+                Ok(ResolutionPlan::EnumMapping(EnumResolution::new(m)))
+            }
+            (Decoder::Record(_, _, field_defaults, _), 
ResolutionInfo::Record(r)) => Ok(
+                ResolutionPlan::Record(ProjectorBuilder::try_new(r, 
field_defaults).build()?),
+            ),
+            (_, ResolutionInfo::Record(_)) => {
+                unreachable!("record resolution on non-record decoder")

Review Comment:
   Using `unreachable!` here is potentially unsafe. Probably best to also pass 
an `AvroError` back here too. Perhaps it's unreachable right now, but all it 
takes is for an edge case to occur or some change down the line to create the 
potential for a crash. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to