This is an automated email from the ASF dual-hosted git repository.

jeffreyvo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new cfd8c25bfe fix union array row converter to handle non-sequential type 
ids (#9283)
cfd8c25bfe is described below

commit cfd8c25bfea5093d794e9b2e5217d97ffc0f3531
Author: Matthew Kim <[email protected]>
AuthorDate: Thu Jan 29 13:59:19 2026 -1000

    fix union array row converter to handle non-sequential type ids (#9283)
    
    # Which issue does this PR close?
    
    - Closes https://github.com/apache/arrow-rs/issues/9263
    
    # Rationale for this change
    
    This PR fixes the row converter's handling of union arrays with
    non-sequential type ids.
    
    Previously, type ids were incorrectly used as array indices into the
    converters vector, causing panics or data corruption when type ids
    didn't match field positions. The fix was to add a mapping from type ids
    to field indices
---
 arrow-row/src/lib.rs | 101 ++++++++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 88 insertions(+), 13 deletions(-)

diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs
index d535d90cef..6b4d3de8d8 100644
--- a/arrow-row/src/lib.rs
+++ b/arrow-row/src/lib.rs
@@ -501,9 +501,9 @@ enum Codec {
     List(RowConverter),
     /// A row converter for the values array of a run-end encoded array
     RunEndEncoded(RowConverter),
-    /// Row converters for each union field (indexed by type_id)
-    /// and the encoding of null rows for each field
-    Union(Vec<RowConverter>, Vec<OwnedRow>),
+    /// Row converters for each union field (indexed by field position)
+    /// the type_ids for each field position, and the encoding of null rows 
for each field
+    Union(Vec<RowConverter>, Vec<i8>, Vec<OwnedRow>),
 }
 
 impl Codec {
@@ -579,9 +579,10 @@ impl Codec {
                 };
 
                 let mut converters = Vec::with_capacity(fields.len());
+                let mut type_ids = Vec::with_capacity(fields.len());
                 let mut null_rows = Vec::with_capacity(fields.len());
 
-                for (_type_id, field) in fields.iter() {
+                for (type_id, field) in fields.iter() {
                     let sort_field =
                         SortField::new_with_options(field.data_type().clone(), 
options);
                     let converter = RowConverter::new(vec![sort_field])?;
@@ -594,10 +595,11 @@ impl Codec {
                     };
 
                     converters.push(converter);
+                    type_ids.push(type_id);
                     null_rows.push(owned);
                 }
 
-                Ok(Self::Union(converters, null_rows))
+                Ok(Self::Union(converters, type_ids, null_rows))
             }
             _ => Err(ArrowError::NotYetImplemented(format!(
                 "not yet implemented: {:?}",
@@ -667,7 +669,7 @@ impl Codec {
                 let rows = 
converter.convert_columns(std::slice::from_ref(&values))?;
                 Ok(Encoder::RunEndEncoded(rows))
             }
-            Codec::Union(converters, _) => {
+            Codec::Union(converters, field_to_type_ids, _) => {
                 let union_array = array
                     .as_any()
                     .downcast_ref::<UnionArray>()
@@ -677,14 +679,16 @@ impl Codec {
                 let offsets = union_array.offsets().cloned();
 
                 let mut child_rows = Vec::with_capacity(converters.len());
-                for (type_id, converter) in converters.iter().enumerate() {
-                    let child_array = union_array.child(type_id as i8);
+                for (field_idx, converter) in converters.iter().enumerate() {
+                    let type_id = field_to_type_ids[field_idx];
+                    let child_array = union_array.child(type_id);
                     let rows = 
converter.convert_columns(std::slice::from_ref(child_array))?;
                     child_rows.push(rows);
                 }
 
                 Ok(Encoder::Union {
                     child_rows,
+                    field_to_type_ids: field_to_type_ids.clone(),
                     type_ids,
                     offsets,
                 })
@@ -699,7 +703,7 @@ impl Codec {
             Codec::Struct(converter, nulls) => converter.size() + 
nulls.data.len(),
             Codec::List(converter) => converter.size(),
             Codec::RunEndEncoded(converter) => converter.size(),
-            Codec::Union(converters, null_rows) => {
+            Codec::Union(converters, _, null_rows) => {
                 converters.iter().map(|c| c.size()).sum::<usize>()
                     + null_rows.iter().map(|n| n.data.len()).sum::<usize>()
             }
@@ -726,6 +730,7 @@ enum Encoder<'a> {
     /// The row encoding of each union field's child array, type_ids buffer, 
offsets buffer (for Dense), and mode
     Union {
         child_rows: Vec<Rows>,
+        field_to_type_ids: Vec<i8>,
         type_ids: ScalarBuffer<i8>,
         offsets: Option<ScalarBuffer<i32>>,
     },
@@ -1642,6 +1647,7 @@ fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) 
-> LengthTracker {
             },
             Encoder::Union {
                 child_rows,
+                field_to_type_ids,
                 type_ids,
                 offsets,
             } => {
@@ -1650,10 +1656,16 @@ fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) 
-> LengthTracker {
                     .downcast_ref::<UnionArray>()
                     .expect("expected UnionArray");
 
+                let mut type_id_to_field_idx = [0usize; 128];
+                for (field_idx, &type_id) in 
field_to_type_ids.iter().enumerate() {
+                    type_id_to_field_idx[type_id as usize] = field_idx;
+                }
+
                 let lengths = (0..union_array.len()).map(|i| {
                     let type_id = type_ids[i];
+                    let field_idx = type_id_to_field_idx[type_id as usize];
                     let child_row_i = offsets.as_ref().map(|o| o[i] as 
usize).unwrap_or(i);
-                    let child_row_len = child_rows[type_id as 
usize].row_len(child_row_i);
+                    let child_row_len = 
child_rows[field_idx].row_len(child_row_i);
 
                     // length: 1 byte type_id + child row bytes
                     1 + child_row_len
@@ -1855,18 +1867,25 @@ fn encode_column(
         },
         Encoder::Union {
             child_rows,
+            field_to_type_ids,
             type_ids,
             offsets: offsets_buf,
         } => {
+            let mut type_id_to_field_idx = [0usize; 128];
+            for (field_idx, &type_id) in field_to_type_ids.iter().enumerate() {
+                type_id_to_field_idx[type_id as usize] = field_idx;
+            }
+
             offsets
                 .iter_mut()
                 .skip(1)
                 .enumerate()
                 .for_each(|(i, offset)| {
                     let type_id = type_ids[i];
+                    let field_idx = type_id_to_field_idx[type_id as usize];
 
                     let child_row_idx = offsets_buf.as_ref().map(|o| o[i] as 
usize).unwrap_or(i);
-                    let child_row = child_rows[type_id as 
usize].row(child_row_idx);
+                    let child_row = child_rows[field_idx].row(child_row_idx);
                     let child_bytes = child_row.as_ref();
 
                     let type_id_byte = if opts.descending {
@@ -2008,13 +2027,18 @@ unsafe fn decode_column(
             },
             _ => unreachable!(),
         },
-        Codec::Union(converters, null_rows) => {
+        Codec::Union(converters, field_to_type_ids, null_rows) => {
             let len = rows.len();
 
             let DataType::Union(union_fields, mode) = &field.data_type else {
                 unreachable!()
             };
 
+            let mut type_id_to_field_idx = [0usize; 128];
+            for (field_idx, &type_id) in field_to_type_ids.iter().enumerate() {
+                type_id_to_field_idx[type_id as usize] = field_idx;
+            }
+
             let mut type_ids = Vec::with_capacity(len);
             let mut rows_by_field: Vec<Vec<(usize, &[u8])>> = vec![Vec::new(); 
converters.len()];
 
@@ -2027,7 +2051,7 @@ unsafe fn decode_column(
                 let type_id = type_id_byte as i8;
                 type_ids.push(type_id);
 
-                let field_idx = type_id as usize;
+                let field_idx = type_id_to_field_idx[type_id as usize];
 
                 let child_row = &row[1..];
                 rows_by_field[field_idx].push((idx, child_row));
@@ -4362,6 +4386,57 @@ mod tests {
         }
     }
 
+    #[test]
+    fn test_row_converter_roundtrip_with_non_default_union_type_ids() {
+        // test with non-sequential type IDs (70, 85) instead of (0, 1)
+        let fields = UnionFields::try_new(
+            vec![70, 85],
+            vec![
+                Field::new("int", DataType::Int32, true),
+                Field::new("string", DataType::Utf8, true),
+            ],
+        )
+        .unwrap();
+
+        let int_array = Int32Array::from(vec![Some(67), None]);
+        let string_array = StringArray::from(vec![None::<&str>, 
Some("hello")]);
+        let type_ids = vec![70i8, 85].into();
+
+        let union_array = UnionArray::try_new(
+            fields.clone(),
+            type_ids,
+            None,
+            vec![
+                Arc::new(int_array) as ArrayRef,
+                Arc::new(string_array) as ArrayRef,
+            ],
+        )
+        .unwrap();
+
+        let field = Field::new("col", DataType::Union(fields, 
UnionMode::Sparse), true);
+        let sort_field = SortField::new(field.data_type().clone());
+        let converter = RowConverter::new(vec![sort_field]).unwrap();
+
+        let rows = converter
+            .convert_columns(&[Arc::new(union_array.clone()) as ArrayRef])
+            .unwrap();
+
+        // roundtrip
+        let out = converter.convert_rows(&rows).unwrap();
+
+        let [col1] = out.as_slice() else {
+            panic!("expected 1 column")
+        };
+
+        let col = col1.as_any().downcast_ref::<UnionArray>().unwrap();
+        assert_eq!(col.len(), union_array.len());
+        assert_eq!(col.type_ids(), union_array.type_ids());
+
+        for i in 0..col.len() {
+            assert_eq!(col.value(i).as_ref(), union_array.value(i).as_ref());
+        }
+    }
+
     #[test]
     fn rows_size_should_count_for_capacity() {
         let row_converter = 
RowConverter::new(vec![SortField::new(DataType::UInt8)]).unwrap();

Reply via email to