This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 674dc17b2c Add Fixed, Uuid support to arrow-avro (#7557)
674dc17b2c is described below
commit 674dc17b2c423be16d0725a6537b0063ac7b1b58
Author: nathaniel-d-ef <[email protected]>
AuthorDate: Sat Jun 28 05:21:45 2025 -0500
Add Fixed, Uuid support to arrow-avro (#7557)
# Which issue does this PR close?
Part of [4886](https://github.com/apache/arrow-rs/issues/4886)
Related to [6965](https://github.com/apache/arrow-rs/pull/6965)
# Rationale for this change
This change expands upon the Avro reader logic by adding full support
for the Fixed and Uuid types (Uuid relies on Fixed). It builds out the
`Fixed` path currently stubbed out.
# What changes are included in this PR?
Adds `Fixed` and `Uuid` support to the arrow-avro crate with changes to
the following:
1. arrow-avro/src/codec.rs
- Adds support for `Uuid` type
- Adds test
2. arrow-avro/src/reader/cursor.rs:
- Adds a `get_fixed` helper method to read the specified bytes into a
buffer.
3. arrow-avro/src/reader/record.rs:
- Introduces the Fixed decoding path, building out the `nyi`
`Codec::Fixed` in the `Decoder`.
- Introduces the Uuid decoding path, building off of Fixed
- Adds tests.
# Are there any user-facing changes?
n/a
---------
Co-authored-by: Connor Sanders <[email protected]>
---
arrow-avro/src/codec.rs | 15 ++++++
arrow-avro/src/reader/cursor.rs | 12 +++++
arrow-avro/src/reader/record.rs | 100 +++++++++++++++++++++++++++++++++++++++-
3 files changed, 126 insertions(+), 1 deletion(-)
diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index 70f162f147..caac390f3d 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -192,6 +192,8 @@ pub enum Codec {
/// Represents Avro fixed type, maps to Arrow's FixedSizeBinary data type
/// The i32 parameter indicates the fixed binary size
Fixed(i32),
+ /// Represents Avro Uuid type, a FixedSizeBinary with a length of 16
+ Uuid,
/// Represents Avro array type, maps to Arrow's List data type
List(Arc<AvroDataType>),
/// Represents Avro record type, maps to Arrow's Struct data type
@@ -225,6 +227,7 @@ impl Codec {
}
Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
Self::Fixed(size) => DataType::FixedSizeBinary(*size),
+ Self::Uuid => DataType::FixedSizeBinary(16),
Self::List(f) => {
DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
}
@@ -457,6 +460,7 @@ fn make_data_type<'a>(
*c = Codec::TimestampMicros(false)
}
(Some("duration"), c @ Codec::Fixed(12)) => *c =
Codec::Interval,
+ (Some("uuid"), c @ Codec::Utf8) => *c = Codec::Uuid,
(Some(logical), _) => {
// Insert unrecognized logical type into metadata map
field.metadata.insert("logicalType".into(),
logical.into());
@@ -583,6 +587,17 @@ mod tests {
assert!(matches!(result.codec, Codec::TimestampMicros(false)));
}
+ #[test]
+ fn test_uuid_type() {
+ let mut codec = Codec::Fixed(16);
+
+ if let c @ Codec::Fixed(16) = &mut codec {
+ *c = Codec::Uuid;
+ }
+
+ assert!(matches!(codec, Codec::Uuid));
+ }
+
#[test]
fn test_duration_logical_type() {
let mut codec = Codec::Fixed(12);
diff --git a/arrow-avro/src/reader/cursor.rs b/arrow-avro/src/reader/cursor.rs
index 4b6a5a4d65..1b89ff86c3 100644
--- a/arrow-avro/src/reader/cursor.rs
+++ b/arrow-avro/src/reader/cursor.rs
@@ -118,4 +118,16 @@ impl<'a> AvroCursor<'a> {
self.buf = &self.buf[8..];
Ok(ret)
}
+
+ /// Read exactly `n` bytes from the buffer (e.g. for Avro `fixed`).
+ pub(crate) fn get_fixed(&mut self, n: usize) -> Result<&'a [u8],
ArrowError> {
+ if self.buf.len() < n {
+ return Err(ArrowError::ParseError(
+ "Unexpected EOF reading fixed".to_string(),
+ ));
+ }
+ let ret = &self.buf[..n];
+ self.buf = &self.buf[n..];
+ Ok(ret)
+ }
}
diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs
index 3466b06445..6d1a9f751a 100644
--- a/arrow-avro/src/reader/record.rs
+++ b/arrow-avro/src/reader/record.rs
@@ -122,6 +122,7 @@ enum Decoder {
Vec<u8>,
Box<Decoder>,
),
+ Fixed(i32, Vec<u8>),
Nullable(Nullability, NullBufferBuilder, Box<Decoder>),
}
@@ -157,7 +158,7 @@ impl Decoder {
Codec::TimestampMicros(is_utc) => {
Self::TimestampMicros(*is_utc,
Vec::with_capacity(DEFAULT_CAPACITY))
}
- Codec::Fixed(_) => return nyi("decoding fixed"),
+ Codec::Fixed(sz) => Self::Fixed(*sz,
Vec::with_capacity(DEFAULT_CAPACITY)),
Codec::Interval => return nyi("decoding interval"),
Codec::List(item) => {
let decoder = Self::try_new(item)?;
@@ -196,6 +197,7 @@ impl Decoder {
Box::new(val_dec),
)
}
+ Codec::Uuid => Self::Fixed(16,
Vec::with_capacity(DEFAULT_CAPACITY)),
};
Ok(match data_type.nullability() {
@@ -232,6 +234,9 @@ impl Decoder {
moff.push_length(0);
}
Self::Nullable(_, _, _) => unreachable!("Nulls cannot be nested"),
+ Self::Fixed(sz, accum) => {
+ accum.extend(std::iter::repeat(0u8).take(*sz as usize));
+ }
}
}
@@ -282,6 +287,10 @@ impl Decoder {
false => e.append_null(),
}
}
+ Self::Fixed(sz, accum) => {
+ let fx = buf.get_fixed(*sz as usize)?;
+ accum.extend_from_slice(fx);
+ }
}
Ok(())
}
@@ -383,6 +392,12 @@ impl Decoder {
let map_arr = MapArray::new(map_field.clone(), moff,
entries_struct, nulls, false);
Arc::new(map_arr)
}
+ Self::Fixed(sz, accum) => {
+ let b: Buffer = flush_values(accum).into();
+ let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls)
+ .map_err(|e| ArrowError::ParseError(e.to_string()))?;
+ Arc::new(arr)
+ }
})
}
}
@@ -542,6 +557,89 @@ mod tests {
assert_eq!(map_arr.value_length(0), 0);
}
+ #[test]
+ fn test_fixed_decoding() {
+ let avro_type = avro_from_codec(Codec::Fixed(3));
+ let mut decoder = Decoder::try_new(&avro_type).expect("Failed to
create decoder");
+
+ let data1 = [1u8, 2, 3];
+ let mut cursor1 = AvroCursor::new(&data1);
+ decoder
+ .decode(&mut cursor1)
+ .expect("Failed to decode data1");
+ assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed
size");
+
+ let data2 = [4u8, 5, 6];
+ let mut cursor2 = AvroCursor::new(&data2);
+ decoder
+ .decode(&mut cursor2)
+ .expect("Failed to decode data2");
+ assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed
size");
+
+ let array = decoder.flush(None).expect("Failed to flush decoder");
+
+ assert_eq!(array.len(), 2, "Array should contain two items");
+ let fixed_size_binary_array = array
+ .as_any()
+ .downcast_ref::<FixedSizeBinaryArray>()
+ .expect("Failed to downcast to FixedSizeBinaryArray");
+
+ assert_eq!(
+ fixed_size_binary_array.value_length(),
+ 3,
+ "Fixed size of binary values should be 3"
+ );
+ assert_eq!(
+ fixed_size_binary_array.value(0),
+ &[1, 2, 3],
+ "First item mismatch"
+ );
+ assert_eq!(
+ fixed_size_binary_array.value(1),
+ &[4, 5, 6],
+ "Second item mismatch"
+ );
+ }
+
+ #[test]
+ fn test_fixed_decoding_empty() {
+ let avro_type = avro_from_codec(Codec::Fixed(5));
+ let mut decoder = Decoder::try_new(&avro_type).expect("Failed to
create decoder");
+
+ let array = decoder
+ .flush(None)
+ .expect("Failed to flush decoder for empty input");
+
+ assert_eq!(array.len(), 0, "Array should be empty");
+ let fixed_size_binary_array = array
+ .as_any()
+ .downcast_ref::<FixedSizeBinaryArray>()
+ .expect("Failed to downcast to FixedSizeBinaryArray for empty
array");
+
+ assert_eq!(
+ fixed_size_binary_array.value_length(),
+ 5,
+ "Fixed size of binary values should be 5 as per type"
+ );
+ }
+
+ #[test]
+ fn test_uuid_decoding() {
+ let avro_type = avro_from_codec(Codec::Uuid);
+ let mut decoder = Decoder::try_new(&avro_type).expect("Failed to
create decoder");
+
+ let data1 = [1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
+ let mut cursor1 = AvroCursor::new(&data1);
+ decoder
+ .decode(&mut cursor1)
+ .expect("Failed to decode data1");
+ assert_eq!(
+ cursor1.position(),
+ 16,
+ "Cursor should advance by fixed size"
+ );
+ }
+
#[test]
fn test_array_decoding() {
let item_dt = avro_from_codec(Codec::Int32);