This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 674dc17b2c Add Fixed, Uuid support to arrow-avro (#7557)
674dc17b2c is described below

commit 674dc17b2c423be16d0725a6537b0063ac7b1b58
Author: nathaniel-d-ef <[email protected]>
AuthorDate: Sat Jun 28 05:21:45 2025 -0500

    Add Fixed, Uuid support to arrow-avro (#7557)
    
    # Which issue does this PR close?
    
    Part of [4886](https://github.com/apache/arrow-rs/issues/4886)
    
    Related to [6965](https://github.com/apache/arrow-rs/pull/6965)
    
    # Rationale for this change
    
    This change expands upon the Avro reader logic by adding full support
    for the Fixed and Uuid types (Uuid relies on Fixed). It builds out the
    `Fixed` path currently stubbed out.
    
    # What changes are included in this PR?
    
    Adds `Fixed` and `Uuid` support to the arrow-avro crate with changes to
    the following:
    
    1. arrow-avro/src/codec.rs
    
    - Adds support for `Uuid` type
    - Adds test
    
    2. arrow-avro/src/reader/cursor.rs:
    
    - Adds a `get_fixed` helper method to read the specified bytes into a
    buffer.
    
    3. arrow-avro/src/reader/record.rs:
    
    - Introduces the Fixed decoding path, building out the `nyi`
    `Codec::Fixed` in the `Decoder`.
    - Introduces the Uuid decoding path, building off of Fixed
    - Adds tests.
    
    # Are there any user-facing changes?
    
    n/a
    
    ---------
    
    Co-authored-by: Connor Sanders <[email protected]>
---
 arrow-avro/src/codec.rs         |  15 ++++++
 arrow-avro/src/reader/cursor.rs |  12 +++++
 arrow-avro/src/reader/record.rs | 100 +++++++++++++++++++++++++++++++++++++++-
 3 files changed, 126 insertions(+), 1 deletion(-)

diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index 70f162f147..caac390f3d 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -192,6 +192,8 @@ pub enum Codec {
     /// Represents Avro fixed type, maps to Arrow's FixedSizeBinary data type
     /// The i32 parameter indicates the fixed binary size
     Fixed(i32),
+    /// Represents Avro Uuid type, a FixedSizeBinary with a length of 16
+    Uuid,
     /// Represents Avro array type, maps to Arrow's List data type
     List(Arc<AvroDataType>),
     /// Represents Avro record type, maps to Arrow's Struct data type
@@ -225,6 +227,7 @@ impl Codec {
             }
             Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
             Self::Fixed(size) => DataType::FixedSizeBinary(*size),
+            Self::Uuid => DataType::FixedSizeBinary(16),
             Self::List(f) => {
                 
DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
             }
@@ -457,6 +460,7 @@ fn make_data_type<'a>(
                     *c = Codec::TimestampMicros(false)
                 }
                 (Some("duration"), c @ Codec::Fixed(12)) => *c = 
Codec::Interval,
+                (Some("uuid"), c @ Codec::Utf8) => *c = Codec::Uuid,
                 (Some(logical), _) => {
                     // Insert unrecognized logical type into metadata map
                     field.metadata.insert("logicalType".into(), 
logical.into());
@@ -583,6 +587,17 @@ mod tests {
         assert!(matches!(result.codec, Codec::TimestampMicros(false)));
     }
 
+    #[test]
+    fn test_uuid_type() {
+        let mut codec = Codec::Fixed(16);
+
+        if let c @ Codec::Fixed(16) = &mut codec {
+            *c = Codec::Uuid;
+        }
+
+        assert!(matches!(codec, Codec::Uuid));
+    }
+
     #[test]
     fn test_duration_logical_type() {
         let mut codec = Codec::Fixed(12);
diff --git a/arrow-avro/src/reader/cursor.rs b/arrow-avro/src/reader/cursor.rs
index 4b6a5a4d65..1b89ff86c3 100644
--- a/arrow-avro/src/reader/cursor.rs
+++ b/arrow-avro/src/reader/cursor.rs
@@ -118,4 +118,16 @@ impl<'a> AvroCursor<'a> {
         self.buf = &self.buf[8..];
         Ok(ret)
     }
+
+    /// Read exactly `n` bytes from the buffer (e.g. for Avro `fixed`).
+    pub(crate) fn get_fixed(&mut self, n: usize) -> Result<&'a [u8], 
ArrowError> {
+        if self.buf.len() < n {
+            return Err(ArrowError::ParseError(
+                "Unexpected EOF reading fixed".to_string(),
+            ));
+        }
+        let ret = &self.buf[..n];
+        self.buf = &self.buf[n..];
+        Ok(ret)
+    }
 }
diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs
index 3466b06445..6d1a9f751a 100644
--- a/arrow-avro/src/reader/record.rs
+++ b/arrow-avro/src/reader/record.rs
@@ -122,6 +122,7 @@ enum Decoder {
         Vec<u8>,
         Box<Decoder>,
     ),
+    Fixed(i32, Vec<u8>),
     Nullable(Nullability, NullBufferBuilder, Box<Decoder>),
 }
 
@@ -157,7 +158,7 @@ impl Decoder {
             Codec::TimestampMicros(is_utc) => {
                 Self::TimestampMicros(*is_utc, 
Vec::with_capacity(DEFAULT_CAPACITY))
             }
-            Codec::Fixed(_) => return nyi("decoding fixed"),
+            Codec::Fixed(sz) => Self::Fixed(*sz, 
Vec::with_capacity(DEFAULT_CAPACITY)),
             Codec::Interval => return nyi("decoding interval"),
             Codec::List(item) => {
                 let decoder = Self::try_new(item)?;
@@ -196,6 +197,7 @@ impl Decoder {
                     Box::new(val_dec),
                 )
             }
+            Codec::Uuid => Self::Fixed(16, 
Vec::with_capacity(DEFAULT_CAPACITY)),
         };
 
         Ok(match data_type.nullability() {
@@ -232,6 +234,9 @@ impl Decoder {
                 moff.push_length(0);
             }
             Self::Nullable(_, _, _) => unreachable!("Nulls cannot be nested"),
+            Self::Fixed(sz, accum) => {
+                accum.extend(std::iter::repeat(0u8).take(*sz as usize));
+            }
         }
     }
 
@@ -282,6 +287,10 @@ impl Decoder {
                     false => e.append_null(),
                 }
             }
+            Self::Fixed(sz, accum) => {
+                let fx = buf.get_fixed(*sz as usize)?;
+                accum.extend_from_slice(fx);
+            }
         }
         Ok(())
     }
@@ -383,6 +392,12 @@ impl Decoder {
                 let map_arr = MapArray::new(map_field.clone(), moff, 
entries_struct, nulls, false);
                 Arc::new(map_arr)
             }
+            Self::Fixed(sz, accum) => {
+                let b: Buffer = flush_values(accum).into();
+                let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls)
+                    .map_err(|e| ArrowError::ParseError(e.to_string()))?;
+                Arc::new(arr)
+            }
         })
     }
 }
@@ -542,6 +557,89 @@ mod tests {
         assert_eq!(map_arr.value_length(0), 0);
     }
 
+    #[test]
+    fn test_fixed_decoding() {
+        let avro_type = avro_from_codec(Codec::Fixed(3));
+        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to 
create decoder");
+
+        let data1 = [1u8, 2, 3];
+        let mut cursor1 = AvroCursor::new(&data1);
+        decoder
+            .decode(&mut cursor1)
+            .expect("Failed to decode data1");
+        assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed 
size");
+
+        let data2 = [4u8, 5, 6];
+        let mut cursor2 = AvroCursor::new(&data2);
+        decoder
+            .decode(&mut cursor2)
+            .expect("Failed to decode data2");
+        assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed 
size");
+
+        let array = decoder.flush(None).expect("Failed to flush decoder");
+
+        assert_eq!(array.len(), 2, "Array should contain two items");
+        let fixed_size_binary_array = array
+            .as_any()
+            .downcast_ref::<FixedSizeBinaryArray>()
+            .expect("Failed to downcast to FixedSizeBinaryArray");
+
+        assert_eq!(
+            fixed_size_binary_array.value_length(),
+            3,
+            "Fixed size of binary values should be 3"
+        );
+        assert_eq!(
+            fixed_size_binary_array.value(0),
+            &[1, 2, 3],
+            "First item mismatch"
+        );
+        assert_eq!(
+            fixed_size_binary_array.value(1),
+            &[4, 5, 6],
+            "Second item mismatch"
+        );
+    }
+
+    #[test]
+    fn test_fixed_decoding_empty() {
+        let avro_type = avro_from_codec(Codec::Fixed(5));
+        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to 
create decoder");
+
+        let array = decoder
+            .flush(None)
+            .expect("Failed to flush decoder for empty input");
+
+        assert_eq!(array.len(), 0, "Array should be empty");
+        let fixed_size_binary_array = array
+            .as_any()
+            .downcast_ref::<FixedSizeBinaryArray>()
+            .expect("Failed to downcast to FixedSizeBinaryArray for empty 
array");
+
+        assert_eq!(
+            fixed_size_binary_array.value_length(),
+            5,
+            "Fixed size of binary values should be 5 as per type"
+        );
+    }
+
+    #[test]
+    fn test_uuid_decoding() {
+        let avro_type = avro_from_codec(Codec::Uuid);
+        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to 
create decoder");
+
+        let data1 = [1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
+        let mut cursor1 = AvroCursor::new(&data1);
+        decoder
+            .decode(&mut cursor1)
+            .expect("Failed to decode data1");
+        assert_eq!(
+            cursor1.position(),
+            16,
+            "Cursor should advance by fixed size"
+        );
+    }
+
     #[test]
     fn test_array_decoding() {
         let item_dt = avro_from_codec(Codec::Int32);

Reply via email to