This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new ed25bbaf9d Implement Array Decoding in arrow-avro (#7559)
ed25bbaf9d is described below

commit ed25bbaf9de6a6f0ffcd0b74e5aec52ca2b793ea
Author: Connor Sanders <con...@elastiflow.com>
AuthorDate: Tue Jun 17 15:57:15 2025 -0500

    Implement Array Decoding in arrow-avro (#7559)
    
    # Which issue does this PR close?
    
    Part of https://github.com/apache/arrow-rs/issues/4886
    
    Related to https://github.com/apache/arrow-rs/pull/6965
    
    # Rationale for this change
    
    Avro supports arrays as a core data type, but previously arrow-avro had
    incomplete decoding logic to handle them. As a result, any Avro file
    containing array fields would fail to parse correctly within the Arrow
    ecosystem. This PR addresses this gap by:
    
    1. Completing the implementation of explicit `Array` -> `List` decoding:
    It completes the `Decoder::Array` logic that reads array blocks in Avro
    format and constructs an Arrow `ListArray`.
    
    Overall, these changes expand Arrow’s Avro reader capabilities, allowing
    users to work with array-encoded data in a standardized Arrow format.
    
    # What changes are included in this PR?
    
    **1. arrow-avro/src/reader/record.rs:**
    
    * Completed the Array decoding path which leverages blockwise reads of
    Avro array data.
    * Implemented decoder unit tests for Array types.
    
    # Are there any user-facing changes?
    
    N/A
---
 arrow-avro/src/reader/record.rs | 123 ++++++++++++++++++++++++++++++++++++----
 1 file changed, 113 insertions(+), 10 deletions(-)

diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs
index cd9d6e3c13..3466b06445 100644
--- a/arrow-avro/src/reader/record.rs
+++ b/arrow-avro/src/reader/record.rs
@@ -113,7 +113,7 @@ enum Decoder {
     String(OffsetBufferBuilder<i32>, Vec<u8>),
     /// String data encoded as UTF-8 bytes, but mapped to Arrow's 
StringViewArray
     StringView(OffsetBufferBuilder<i32>, Vec<u8>),
-    List(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
+    Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
     Record(Fields, Vec<Decoder>),
     Map(
         FieldRef,
@@ -161,7 +161,7 @@ impl Decoder {
             Codec::Interval => return nyi("decoding interval"),
             Codec::List(item) => {
                 let decoder = Self::try_new(item)?;
-                Self::List(
+                Self::Array(
                     Arc::new(item.field_with_name("item")),
                     OffsetBufferBuilder::new(DEFAULT_CAPACITY),
                     Box::new(decoder),
@@ -223,7 +223,7 @@ impl Decoder {
             Self::Binary(offsets, _) | Self::String(offsets, _) | 
Self::StringView(offsets, _) => {
                 offsets.push_length(0);
             }
-            Self::List(_, offsets, e) => {
+            Self::Array(_, offsets, e) => {
                 offsets.push_length(0);
                 e.append_null();
             }
@@ -256,10 +256,9 @@ impl Decoder {
                 offsets.push_length(data.len());
                 values.extend_from_slice(data);
             }
-            Self::List(_, _, _) => {
-                return Err(ArrowError::NotYetImplemented(
-                    "Decoding ListArray".to_string(),
-                ))
+            Self::Array(_, off, encoding) => {
+                let total_items = read_blocks(buf, |cursor| 
encoding.decode(cursor))?;
+                off.push_length(total_items);
             }
             Self::Record(_, encodings) => {
                 for encoding in encodings {
@@ -267,7 +266,7 @@ impl Decoder {
                 }
             }
             Self::Map(_, koff, moff, kdata, valdec) => {
-                let newly_added = read_map_blocks(buf, |cur| {
+                let newly_added = read_blocks(buf, |cur| {
                     let kb = cur.get_bytes()?;
                     koff.push_length(kb.len());
                     kdata.extend_from_slice(kb);
@@ -339,7 +338,7 @@ impl Decoder {
 
                 Arc::new(StringViewArray::from(values))
             }
-            Self::List(field, offsets, values) => {
+            Self::Array(field, offsets, values) => {
                 let values = values.flush(None)?;
                 let offsets = flush_offsets(offsets);
                 Arc::new(ListArray::new(field.clone(), offsets, values, nulls))
@@ -388,7 +387,7 @@ impl Decoder {
     }
 }
 
-fn read_map_blocks(
+fn read_blocks(
     buf: &mut AvroCursor,
     decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
 ) -> Result<usize, ArrowError> {
@@ -462,6 +461,17 @@ mod tests {
         IntervalMonthDayNanoArray, ListArray, MapArray, StringArray, 
StructArray,
     };
 
+    fn encode_avro_int(value: i32) -> Vec<u8> {
+        let mut buf = Vec::new();
+        let mut v = (value << 1) ^ (value >> 31);
+        while v & !0x7F != 0 {
+            buf.push(((v & 0x7F) | 0x80) as u8);
+            v >>= 7;
+        }
+        buf.push(v as u8);
+        buf
+    }
+
     fn encode_avro_long(value: i64) -> Vec<u8> {
         let mut buf = Vec::new();
         let mut v = (value << 1) ^ (value >> 63);
@@ -531,4 +541,97 @@ mod tests {
         assert_eq!(map_arr.len(), 1);
         assert_eq!(map_arr.value_length(0), 0);
     }
+
+    #[test]
+    fn test_array_decoding() {
+        let item_dt = avro_from_codec(Codec::Int32);
+        let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
+        let mut decoder = Decoder::try_new(&list_dt).unwrap();
+        let mut row1 = Vec::new();
+        row1.extend_from_slice(&encode_avro_long(2));
+        row1.extend_from_slice(&encode_avro_int(10));
+        row1.extend_from_slice(&encode_avro_int(20));
+        row1.extend_from_slice(&encode_avro_long(0));
+        let row2 = encode_avro_long(0);
+        let mut cursor = AvroCursor::new(&row1);
+        decoder.decode(&mut cursor).unwrap();
+        let mut cursor2 = AvroCursor::new(&row2);
+        decoder.decode(&mut cursor2).unwrap();
+        let array = decoder.flush(None).unwrap();
+        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
+        assert_eq!(list_arr.len(), 2);
+        let offsets = list_arr.value_offsets();
+        assert_eq!(offsets, &[0, 2, 2]);
+        let values = list_arr.values();
+        let int_arr = values.as_primitive::<Int32Type>();
+        assert_eq!(int_arr.len(), 2);
+        assert_eq!(int_arr.value(0), 10);
+        assert_eq!(int_arr.value(1), 20);
+    }
+
+    #[test]
+    fn test_array_decoding_with_negative_block_count() {
+        let item_dt = avro_from_codec(Codec::Int32);
+        let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
+        let mut decoder = Decoder::try_new(&list_dt).unwrap();
+        let mut data = encode_avro_long(-3);
+        data.extend_from_slice(&encode_avro_long(12));
+        data.extend_from_slice(&encode_avro_int(1));
+        data.extend_from_slice(&encode_avro_int(2));
+        data.extend_from_slice(&encode_avro_int(3));
+        data.extend_from_slice(&encode_avro_long(0));
+        let mut cursor = AvroCursor::new(&data);
+        decoder.decode(&mut cursor).unwrap();
+        let array = decoder.flush(None).unwrap();
+        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
+        assert_eq!(list_arr.len(), 1);
+        assert_eq!(list_arr.value_length(0), 3);
+        let values = list_arr.values().as_primitive::<Int32Type>();
+        assert_eq!(values.len(), 3);
+        assert_eq!(values.value(0), 1);
+        assert_eq!(values.value(1), 2);
+        assert_eq!(values.value(2), 3);
+    }
+
+    #[test]
+    fn test_nested_array_decoding() {
+        let inner_ty = 
avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
+        let nested_ty = 
avro_from_codec(Codec::List(Arc::new(inner_ty.clone())));
+        let mut decoder = Decoder::try_new(&nested_ty).unwrap();
+        let mut buf = Vec::new();
+        buf.extend(encode_avro_long(1));
+        buf.extend(encode_avro_long(2));
+        buf.extend(encode_avro_int(5));
+        buf.extend(encode_avro_int(6));
+        buf.extend(encode_avro_long(0));
+        buf.extend(encode_avro_long(0));
+        let mut cursor = AvroCursor::new(&buf);
+        decoder.decode(&mut cursor).unwrap();
+        let arr = decoder.flush(None).unwrap();
+        let outer = arr.as_any().downcast_ref::<ListArray>().unwrap();
+        assert_eq!(outer.len(), 1);
+        assert_eq!(outer.value_length(0), 1);
+        let inner = 
outer.values().as_any().downcast_ref::<ListArray>().unwrap();
+        assert_eq!(inner.len(), 1);
+        assert_eq!(inner.value_length(0), 2);
+        let values = inner
+            .values()
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        assert_eq!(values.values(), &[5, 6]);
+    }
+
+    #[test]
+    fn test_array_decoding_empty_array() {
+        let value_type = avro_from_codec(Codec::Utf8);
+        let map_type = avro_from_codec(Codec::List(Arc::new(value_type)));
+        let mut decoder = Decoder::try_new(&map_type).unwrap();
+        let data = encode_avro_long(0);
+        decoder.decode(&mut AvroCursor::new(&data)).unwrap();
+        let array = decoder.flush(None).unwrap();
+        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
+        assert_eq!(list_arr.len(), 1);
+        assert_eq!(list_arr.value_length(0), 0);
+    }
 }

Reply via email to