jecsand838 commented on code in PR #8349:
URL: https://github.com/apache/arrow-rs/pull/8349#discussion_r2369521017


##########
arrow-avro/src/reader/record.rs:
##########
@@ -1518,19 +1104,340 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
-            Self::Union(fields, type_ids, offsets, encodings, _, None) => {
-                flush_union!(fields, type_ids, offsets, encodings)
-            }
-            Self::Union(fields, type_ids, offsets, encodings, _, 
Some(union_resolution)) => {
-                match &mut union_resolution.kind {
-                    UnionResolvedKind::Both { .. } | 
UnionResolvedKind::FromSingle { .. } => {
-                        flush_union!(fields, type_ids, offsets, encodings)
-                    }
-                    UnionResolvedKind::ToSingle { target } => 
target.flush(nulls)?,
+            Self::Union(u) => u.flush(nulls)?,
+        })
+    }
+}
+
+#[derive(Debug)]
+struct DispatchLut {
+    to_reader: Box<[i16]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLut {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i16::MAX as usize);
+                    to_reader.push(idx as i16);
+                    promotion.push(promo);
+                }
+                None => {
+                    to_reader.push(-1);
+                    promotion.push(Promotion::Direct);
                 }
             }
+        }
+        Self {
+            to_reader: to_reader.into_boxed_slice(),
+            promotion: promotion.into_boxed_slice(),
+        }
+    }
+
+    // Resolve a writer branch index to (reader_idx, promotion)
+    #[inline]
+    fn resolve(&self, writer_idx: usize) -> Option<(usize, Promotion)> {
+        if writer_idx >= self.to_reader.len() {
+            return None;
+        }
+        let reader_index = self.to_reader[writer_idx];
+        if reader_index < 0 {
+            None
+        } else {
+            Some((reader_index as usize, self.promotion[writer_idx]))
+        }
+    }
+}
+
+#[derive(Debug)]
+struct UnionDecoder {
+    fields: UnionFields,
+    type_ids: Vec<i8>,
+    offsets: Vec<i32>,
+    branches: Vec<Decoder>,
+    counts: Vec<i32>,
+    type_id_by_reader_idx: Arc<[i8]>,
+    null_branch: Option<usize>,
+    default_emit_idx: usize,
+    null_emit_idx: usize,
+    plan: UnionReadPlan,
+}
+
+impl Default for UnionDecoder {
+    fn default() -> Self {
+        Self {
+            fields: UnionFields::empty(),
+            type_ids: Vec::new(),
+            offsets: Vec::new(),
+            branches: Vec::new(),
+            counts: Vec::new(),
+            type_id_by_reader_idx: Arc::from([]),
+            null_branch: None,
+            default_emit_idx: 0,
+            null_emit_idx: 0,
+            plan: UnionReadPlan::Passthrough,
+        }
+    }
+}
+
+#[derive(Debug)]
+enum UnionReadPlan {
+    ReaderUnion {
+        lookup_table: DispatchLut,
+    },
+    FromSingle {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+        lookup_table: DispatchLut,
+    },
+    Passthrough,
+}
+
+impl UnionDecoder {
+    fn try_new(
+        fields: UnionFields,
+        branches: Vec<Decoder>,
+        resolved: Option<ResolvedUnion>,
+    ) -> Result<Self, ArrowError> {
+        let reader_type_codes: Arc<[i8]> =
+            Arc::from(fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>());
+        let null_branch = branches.iter().position(|b| matches!(b, 
Decoder::Null(_)));
+        let default_emit_idx = 0;
+        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
+        let plan = Self::plan_from_resolved(resolved)?;
+        let branch_len = branches.len().max(reader_type_codes.len());
+        Ok(Self {
+            fields,
+            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
+            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
+            branches,
+            counts: vec![0; branch_len],
+            type_id_by_reader_idx: reader_type_codes,
+            null_branch,
+            default_emit_idx,
+            null_emit_idx,
+            plan,
         })
     }
+
+    fn try_new_from_writer_union(
+        info: ResolvedUnion,
+        target: Box<Decoder>,
+    ) -> Result<Self, ArrowError> {
+        // This constructor is only for writer-union to single-type resolution
+        debug_assert!(info.writer_is_union && !info.reader_is_union);
+        let lookup_table = 
DispatchLut::from_writer_to_reader(&info.writer_to_reader);
+        Ok(Self {
+            plan: UnionReadPlan::ToSingle {
+                target,
+                lookup_table,
+            },
+            ..Self::default()
+        })
+    }
+
+    fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> 
Result<UnionReadPlan, ArrowError> {
+        match resolved {
+            None => Ok(UnionReadPlan::Passthrough),
+            Some(info) => match (info.writer_is_union, info.reader_is_union) {
+                (true, true) => {
+                    let lookup_table = 
DispatchLut::from_writer_to_reader(&info.writer_to_reader);
+                    Ok(UnionReadPlan::ReaderUnion { lookup_table })
+                }
+                (false, true) => {
+                    let (reader_idx, promotion) =
+                        info.writer_to_reader.first().and_then(|x| 
*x).ok_or_else(|| {
+                            ArrowError::SchemaError(
+                                "Writer type does not match any reader union 
branch".to_string(),
+                            )
+                        })?;

Review Comment:
   It's not fully equivalent. Something like this would work though:
   
   ```rust
                       let Some(&(reader_idx, promotion)) =
                           
info.writer_to_reader.first().and_then(Option::as_ref) else {
                           return Err(ArrowError::SchemaError(
                               "Writer type does not match any reader union 
branch".to_string(),
                           ));
                       };
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to