This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push: new ed25bbaf9d Implement Array Decoding in arrow-avro (#7559) ed25bbaf9d is described below commit ed25bbaf9de6a6f0ffcd0b74e5aec52ca2b793ea Author: Connor Sanders <con...@elastiflow.com> AuthorDate: Tue Jun 17 15:57:15 2025 -0500 Implement Array Decoding in arrow-avro (#7559) # Which issue does this PR close? Part of https://github.com/apache/arrow-rs/issues/4886 Related to https://github.com/apache/arrow-rs/pull/6965 # Rationale for this change Avro supports arrays as a core data type, but previously arrow-avro had incomplete decoding logic to handle them. As a result, any Avro file containing array fields would fail to parse correctly within the Arrow ecosystem. This PR addresses this gap by: 1. Completing the implementation of explicit `Array` -> `List` decoding: It completes the `Decoder::Array` logic that reads array blocks in Avro format and constructs an Arrow `ListArray`. Overall, these changes expand Arrow’s Avro reader capabilities, allowing users to work with array-encoded data in a standardized Arrow format. # What changes are included in this PR? **1. arrow-avro/src/reader/record.rs:** * Completed the Array decoding path which leverages blockwise reads of Avro array data. * Implemented decoder unit tests for Array types. # Are there any user-facing changes? N/A --- arrow-avro/src/reader/record.rs | 123 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 113 insertions(+), 10 deletions(-) diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index cd9d6e3c13..3466b06445 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -113,7 +113,7 @@ enum Decoder { String(OffsetBufferBuilder<i32>, Vec<u8>), /// String data encoded as UTF-8 bytes, but mapped to Arrow's StringViewArray StringView(OffsetBufferBuilder<i32>, Vec<u8>), - List(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>), + Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>), Record(Fields, Vec<Decoder>), Map( FieldRef, @@ -161,7 +161,7 @@ impl Decoder { Codec::Interval => return nyi("decoding interval"), Codec::List(item) => { let decoder = Self::try_new(item)?; - Self::List( + Self::Array( Arc::new(item.field_with_name("item")), OffsetBufferBuilder::new(DEFAULT_CAPACITY), Box::new(decoder), @@ -223,7 +223,7 @@ impl Decoder { Self::Binary(offsets, _) | Self::String(offsets, _) | Self::StringView(offsets, _) => { offsets.push_length(0); } - Self::List(_, offsets, e) => { + Self::Array(_, offsets, e) => { offsets.push_length(0); e.append_null(); } @@ -256,10 +256,9 @@ impl Decoder { offsets.push_length(data.len()); values.extend_from_slice(data); } - Self::List(_, _, _) => { - return Err(ArrowError::NotYetImplemented( - "Decoding ListArray".to_string(), - )) + Self::Array(_, off, encoding) => { + let total_items = read_blocks(buf, |cursor| encoding.decode(cursor))?; + off.push_length(total_items); } Self::Record(_, encodings) => { for encoding in encodings { @@ -267,7 +266,7 @@ impl Decoder { } } Self::Map(_, koff, moff, kdata, valdec) => { - let newly_added = read_map_blocks(buf, |cur| { + let newly_added = read_blocks(buf, |cur| { let kb = cur.get_bytes()?; koff.push_length(kb.len()); kdata.extend_from_slice(kb); @@ -339,7 +338,7 @@ impl Decoder { Arc::new(StringViewArray::from(values)) } - Self::List(field, offsets, values) => { + Self::Array(field, offsets, values) => { let values = values.flush(None)?; let offsets = flush_offsets(offsets); Arc::new(ListArray::new(field.clone(), offsets, values, nulls)) @@ -388,7 +387,7 @@ impl Decoder { } } -fn read_map_blocks( +fn read_blocks( buf: &mut AvroCursor, decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>, ) -> Result<usize, ArrowError> { @@ -462,6 +461,17 @@ mod tests { IntervalMonthDayNanoArray, ListArray, MapArray, StringArray, StructArray, }; + fn encode_avro_int(value: i32) -> Vec<u8> { + let mut buf = Vec::new(); + let mut v = (value << 1) ^ (value >> 31); + while v & !0x7F != 0 { + buf.push(((v & 0x7F) | 0x80) as u8); + v >>= 7; + } + buf.push(v as u8); + buf + } + fn encode_avro_long(value: i64) -> Vec<u8> { let mut buf = Vec::new(); let mut v = (value << 1) ^ (value >> 63); @@ -531,4 +541,97 @@ mod tests { assert_eq!(map_arr.len(), 1); assert_eq!(map_arr.value_length(0), 0); } + + #[test] + fn test_array_decoding() { + let item_dt = avro_from_codec(Codec::Int32); + let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt))); + let mut decoder = Decoder::try_new(&list_dt).unwrap(); + let mut row1 = Vec::new(); + row1.extend_from_slice(&encode_avro_long(2)); + row1.extend_from_slice(&encode_avro_int(10)); + row1.extend_from_slice(&encode_avro_int(20)); + row1.extend_from_slice(&encode_avro_long(0)); + let row2 = encode_avro_long(0); + let mut cursor = AvroCursor::new(&row1); + decoder.decode(&mut cursor).unwrap(); + let mut cursor2 = AvroCursor::new(&row2); + decoder.decode(&mut cursor2).unwrap(); + let array = decoder.flush(None).unwrap(); + let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap(); + assert_eq!(list_arr.len(), 2); + let offsets = list_arr.value_offsets(); + assert_eq!(offsets, &[0, 2, 2]); + let values = list_arr.values(); + let int_arr = values.as_primitive::<Int32Type>(); + assert_eq!(int_arr.len(), 2); + assert_eq!(int_arr.value(0), 10); + assert_eq!(int_arr.value(1), 20); + } + + #[test] + fn test_array_decoding_with_negative_block_count() { + let item_dt = avro_from_codec(Codec::Int32); + let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt))); + let mut decoder = Decoder::try_new(&list_dt).unwrap(); + let mut data = encode_avro_long(-3); + data.extend_from_slice(&encode_avro_long(12)); + data.extend_from_slice(&encode_avro_int(1)); + data.extend_from_slice(&encode_avro_int(2)); + data.extend_from_slice(&encode_avro_int(3)); + data.extend_from_slice(&encode_avro_long(0)); + let mut cursor = AvroCursor::new(&data); + decoder.decode(&mut cursor).unwrap(); + let array = decoder.flush(None).unwrap(); + let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap(); + assert_eq!(list_arr.len(), 1); + assert_eq!(list_arr.value_length(0), 3); + let values = list_arr.values().as_primitive::<Int32Type>(); + assert_eq!(values.len(), 3); + assert_eq!(values.value(0), 1); + assert_eq!(values.value(1), 2); + assert_eq!(values.value(2), 3); + } + + #[test] + fn test_nested_array_decoding() { + let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32)))); + let nested_ty = avro_from_codec(Codec::List(Arc::new(inner_ty.clone()))); + let mut decoder = Decoder::try_new(&nested_ty).unwrap(); + let mut buf = Vec::new(); + buf.extend(encode_avro_long(1)); + buf.extend(encode_avro_long(2)); + buf.extend(encode_avro_int(5)); + buf.extend(encode_avro_int(6)); + buf.extend(encode_avro_long(0)); + buf.extend(encode_avro_long(0)); + let mut cursor = AvroCursor::new(&buf); + decoder.decode(&mut cursor).unwrap(); + let arr = decoder.flush(None).unwrap(); + let outer = arr.as_any().downcast_ref::<ListArray>().unwrap(); + assert_eq!(outer.len(), 1); + assert_eq!(outer.value_length(0), 1); + let inner = outer.values().as_any().downcast_ref::<ListArray>().unwrap(); + assert_eq!(inner.len(), 1); + assert_eq!(inner.value_length(0), 2); + let values = inner + .values() + .as_any() + .downcast_ref::<Int32Array>() + .unwrap(); + assert_eq!(values.values(), &[5, 6]); + } + + #[test] + fn test_array_decoding_empty_array() { + let value_type = avro_from_codec(Codec::Utf8); + let map_type = avro_from_codec(Codec::List(Arc::new(value_type))); + let mut decoder = Decoder::try_new(&map_type).unwrap(); + let data = encode_avro_long(0); + decoder.decode(&mut AvroCursor::new(&data)).unwrap(); + let array = decoder.flush(None).unwrap(); + let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap(); + assert_eq!(list_arr.len(), 1); + assert_eq!(list_arr.value_length(0), 0); + } }