jecsand838 commented on code in PR #8349:
URL: https://github.com/apache/arrow-rs/pull/8349#discussion_r2373374993


##########
arrow-avro/src/reader/record.rs:
##########
@@ -991,8 +1101,327 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
+            Self::Union(u) => u.flush(nulls)?,
+        })
+    }
+}
+
+#[derive(Debug)]
+struct DispatchLookupTable {
+    to_reader: Box<[i8]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLookupTable {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i8::MAX as usize);
+                    to_reader.push(idx as i8);
+                    promotion.push(promo);
+                }
+                None => {
+                    to_reader.push(-1);
+                    promotion.push(Promotion::Direct);
+                }
+            }
+        }
+        Self {
+            to_reader: to_reader.into_boxed_slice(),
+            promotion: promotion.into_boxed_slice(),
+        }
+    }
+
+    // Resolve a writer branch index to (reader_idx, promotion)
+    #[inline]
+    fn resolve(&self, writer_index: usize) -> Option<(usize, Promotion)> {
+        let reader_index = *self.to_reader.get(writer_index)?;
+        (reader_index >= 0).then(|| (reader_index as usize, 
self.promotion[writer_index]))
+    }
+}
+
+#[derive(Debug)]
+struct UnionDecoder {
+    fields: UnionFields,
+    type_ids: Vec<i8>,
+    offsets: Vec<i32>,
+    branches: Vec<Decoder>,
+    counts: Vec<i32>,
+    reader_type_codes: Vec<i8>,
+    null_branch: Option<usize>,
+    default_emit_idx: usize,
+    null_emit_idx: usize,
+    plan: UnionReadPlan,
+}
+
+impl Default for UnionDecoder {
+    fn default() -> Self {
+        Self {
+            fields: UnionFields::empty(),
+            type_ids: Vec::new(),
+            offsets: Vec::new(),
+            branches: Vec::new(),
+            counts: Vec::new(),
+            reader_type_codes: Vec::new(),
+            null_branch: None,
+            default_emit_idx: 0,
+            null_emit_idx: 0,
+            plan: UnionReadPlan::Passthrough,
+        }
+    }
+}
+
+#[derive(Debug)]
+enum UnionReadPlan {
+    ReaderUnion {
+        lookup_table: DispatchLookupTable,
+    },
+    FromSingle {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+        lookup_table: DispatchLookupTable,
+    },
+    Passthrough,
+}
+
+impl UnionDecoder {
+    fn try_new(
+        fields: UnionFields,
+        branches: Vec<Decoder>,
+        resolved: Option<ResolvedUnion>,
+    ) -> Result<Self, ArrowError> {
+        let reader_type_codes = fields.iter().map(|(tid, _)| 
tid).collect::<Vec<i8>>();
+        let null_branch = branches.iter().position(|b| matches!(b, 
Decoder::Null(_)));
+        let default_emit_idx = 0;
+        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
+        let branch_len = branches.len().max(reader_type_codes.len());
+        Ok(Self {
+            fields,
+            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
+            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
+            branches,
+            counts: vec![0; branch_len],
+            reader_type_codes,
+            null_branch,
+            default_emit_idx,
+            null_emit_idx,
+            plan: Self::plan_from_resolved(resolved)?,
         })
     }
+
+    fn try_new_from_writer_union(
+        info: ResolvedUnion,
+        target: Box<Decoder>,
+    ) -> Result<Self, ArrowError> {
+        // This constructor is only for writer-union to single-type resolution
+        debug_assert!(info.writer_is_union && !info.reader_is_union);
+        let lookup_table = 
DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader);
+        Ok(Self {
+            plan: UnionReadPlan::ToSingle {
+                target,
+                lookup_table,
+            },
+            ..Self::default()
+        })
+    }
+
+    fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> 
Result<UnionReadPlan, ArrowError> {
+        let Some(info) = resolved else {
+            return Ok(UnionReadPlan::Passthrough);
+        };
+        match (info.writer_is_union, info.reader_is_union) {
+            (true, true) => {
+                let lookup_table =
+                    
DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader);
+                Ok(UnionReadPlan::ReaderUnion { lookup_table })
+            }
+            (false, true) => {
+                let Some(&(reader_idx, promotion)) =
+                    info.writer_to_reader.first().and_then(Option::as_ref)
+                else {
+                    return Err(ArrowError::SchemaError(
+                        "Writer type does not match any reader union 
branch".to_string(),
+                    ));
+                };
+                Ok(UnionReadPlan::FromSingle {
+                    reader_idx,
+                    promotion,
+                })
+            }
+            (true, false) => Err(ArrowError::InvalidArgumentError(
+                "UnionDecoder::try_new cannot build writer-union to single; 
use UnionDecoderBuilder with a target"
+                    .to_string(),
+            )),
+            // (false, false) is invalid and should never be constructed by 
the resolver.
+            _ => Err(ArrowError::SchemaError(
+                "ResolvedUnion constructed for non-union sides; resolver 
should return None"
+                    .to_string(),
+            )),
+        }
+    }
+
+    #[inline]
+    fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, ArrowError> {
+        let tag = buf.get_long()?;
+        if tag < 0 {
+            return Err(ArrowError::ParseError(format!(
+                "Negative union branch index {tag}"
+            )));
+        }
+        Ok(tag as usize)

Review Comment:
   > I am not sure it will matter, but I think long (64 bit signed) may not fit 
into a usize on 32-bit platforms
   > 
   > Another way to do this test that would catch it is something like
   
   Great catch! ty for that. I'll switch to a fallible conversion, improve the 
error messages, and clean-up. This will cover both negative indices and (on 
32‑bit) non‑negative indices that don’t fit in `usize`. The same fix will be 
applied in the `skipper` path.
   
   I can refine this further in a follow-up PR as well if that's alright with 
you.
   
   > I think union type tags are restricted to 0..=127?
   
   That `0..=127` bound applies to Arrow union `type_ids` (int8), not to Avro 
tags. In Avro, the tag is the zero‑based union branch index (encoded as an 
`int` in 1.11.1), so it’s only constrained by the number of union branches (it 
must be non‑negative and < branch count).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to