jecsand838 commented on code in PR #8349:
URL: https://github.com/apache/arrow-rs/pull/8349#discussion_r2373382956


##########
arrow-avro/src/reader/record.rs:
##########
@@ -991,8 +1101,327 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
+            Self::Union(u) => u.flush(nulls)?,
+        })
+    }
+}
+
+#[derive(Debug)]
+struct DispatchLookupTable {
+    to_reader: Box<[i8]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLookupTable {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i8::MAX as usize);
+                    to_reader.push(idx as i8);
+                    promotion.push(promo);
+                }
+                None => {
+                    to_reader.push(-1);
+                    promotion.push(Promotion::Direct);
+                }
+            }
+        }
+        Self {
+            to_reader: to_reader.into_boxed_slice(),
+            promotion: promotion.into_boxed_slice(),
+        }
+    }
+
+    // Resolve a writer branch index to (reader_idx, promotion)
+    #[inline]
+    fn resolve(&self, writer_index: usize) -> Option<(usize, Promotion)> {
+        let reader_index = *self.to_reader.get(writer_index)?;
+        (reader_index >= 0).then(|| (reader_index as usize, 
self.promotion[writer_index]))
+    }
+}
+
+#[derive(Debug)]
+struct UnionDecoder {
+    fields: UnionFields,
+    type_ids: Vec<i8>,
+    offsets: Vec<i32>,
+    branches: Vec<Decoder>,
+    counts: Vec<i32>,
+    reader_type_codes: Vec<i8>,
+    null_branch: Option<usize>,
+    default_emit_idx: usize,
+    null_emit_idx: usize,
+    plan: UnionReadPlan,
+}
+
+impl Default for UnionDecoder {
+    fn default() -> Self {
+        Self {
+            fields: UnionFields::empty(),
+            type_ids: Vec::new(),
+            offsets: Vec::new(),
+            branches: Vec::new(),
+            counts: Vec::new(),
+            reader_type_codes: Vec::new(),
+            null_branch: None,
+            default_emit_idx: 0,
+            null_emit_idx: 0,
+            plan: UnionReadPlan::Passthrough,
+        }
+    }
+}
+
+#[derive(Debug)]
+enum UnionReadPlan {
+    ReaderUnion {
+        lookup_table: DispatchLookupTable,
+    },
+    FromSingle {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+        lookup_table: DispatchLookupTable,
+    },
+    Passthrough,
+}
+
+impl UnionDecoder {
+    fn try_new(
+        fields: UnionFields,
+        branches: Vec<Decoder>,
+        resolved: Option<ResolvedUnion>,
+    ) -> Result<Self, ArrowError> {
+        let reader_type_codes = fields.iter().map(|(tid, _)| 
tid).collect::<Vec<i8>>();
+        let null_branch = branches.iter().position(|b| matches!(b, 
Decoder::Null(_)));
+        let default_emit_idx = 0;
+        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
+        let branch_len = branches.len().max(reader_type_codes.len());
+        Ok(Self {
+            fields,
+            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
+            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
+            branches,
+            counts: vec![0; branch_len],
+            reader_type_codes,
+            null_branch,
+            default_emit_idx,
+            null_emit_idx,
+            plan: Self::plan_from_resolved(resolved)?,
         })
     }
+
+    fn try_new_from_writer_union(
+        info: ResolvedUnion,
+        target: Box<Decoder>,
+    ) -> Result<Self, ArrowError> {
+        // This constructor is only for writer-union to single-type resolution
+        debug_assert!(info.writer_is_union && !info.reader_is_union);
+        let lookup_table = 
DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader);
+        Ok(Self {
+            plan: UnionReadPlan::ToSingle {
+                target,
+                lookup_table,
+            },
+            ..Self::default()
+        })
+    }
+
+    fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> 
Result<UnionReadPlan, ArrowError> {
+        let Some(info) = resolved else {
+            return Ok(UnionReadPlan::Passthrough);
+        };
+        match (info.writer_is_union, info.reader_is_union) {
+            (true, true) => {
+                let lookup_table =
+                    
DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader);
+                Ok(UnionReadPlan::ReaderUnion { lookup_table })
+            }
+            (false, true) => {
+                let Some(&(reader_idx, promotion)) =
+                    info.writer_to_reader.first().and_then(Option::as_ref)
+                else {
+                    return Err(ArrowError::SchemaError(
+                        "Writer type does not match any reader union 
branch".to_string(),
+                    ));
+                };
+                Ok(UnionReadPlan::FromSingle {
+                    reader_idx,
+                    promotion,
+                })
+            }
+            (true, false) => Err(ArrowError::InvalidArgumentError(
+                "UnionDecoder::try_new cannot build writer-union to single; 
use UnionDecoderBuilder with a target"
+                    .to_string(),
+            )),
+            // (false, false) is invalid and should never be constructed by 
the resolver.
+            _ => Err(ArrowError::SchemaError(
+                "ResolvedUnion constructed for non-union sides; resolver 
should return None"
+                    .to_string(),
+            )),
+        }
+    }
+
+    #[inline]
+    fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, ArrowError> {
+        let tag = buf.get_long()?;
+        if tag < 0 {
+            return Err(ArrowError::ParseError(format!(
+                "Negative union branch index {tag}"
+            )));
+        }
+        Ok(tag as usize)

Review Comment:
   There is an issue here that could fail a valid Avro files. 
   
   In the (incredibly unlikely and never seen it before personally) scenario we 
get a reader asking us to materialize an Arrow union with >127 branches we'll 
have an error. That's more of a Arrow format limitation on the `UnionFields` as 
I understand it. Something to follow-up on at some point imo and I can update 
the docs in a fast follow PR as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to