This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 4e18975  [feat] Support different header types (#190)
4e18975 is described below

commit 4e18975c3a6a1c224861f7667fd366c477ef7b5b
Author: Mason Gup <[email protected]>
AuthorDate: Mon Apr 28 17:02:14 2025 -0400

    [feat] Support different header types (#190)
    
    Issue #164
    
    * Header trait and structs
    
    * Tests and some cleanups
    
    * Update names
    
    * Add rustdocs
    
    * Issue #190 - Minor improvements
    
    Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
    
    * Fix for null schema
    
    * Extract a constant for the GLUE_HEADER_LENGTH (== 18)
    
    Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
    
    ---------
    
    Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
    Co-authored-by: Martin Tzvetanov Grigorov <[email protected]>
---
 avro/src/error.rs   |   5 +-
 avro/src/headers.rs | 178 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 avro/src/lib.rs     |   1 +
 avro/src/reader.rs  |  51 +++++++++------
 avro/src/writer.rs  |  64 ++++++++++++++-----
 5 files changed, 263 insertions(+), 36 deletions(-)

diff --git a/avro/src/error.rs b/avro/src/error.rs
index 237995f..760f97d 100644
--- a/avro/src/error.rs
+++ b/avro/src/error.rs
@@ -410,7 +410,7 @@ pub enum Error {
     HeaderMagic,
 
     #[error("Message Header mismatch. Expected: {0:?}. Actual: {1:?}")]
-    SingleObjectHeaderMismatch([u8; 10], [u8; 10]),
+    SingleObjectHeaderMismatch(Vec<u8>, Vec<u8>),
 
     #[error("Failed to get JSON from avro.schema key in map")]
     GetAvroSchemaFromMap,
@@ -513,6 +513,9 @@ pub enum Error {
 
     #[error("Invalid Avro data! Cannot read codec type from value that is not 
Value::Bytes.")]
     BadCodecMetadata,
+
+    #[error("Cannot convert a slice to Uuid: {0}")]
+    UuidFromSlice(#[source] uuid::Error),
 }
 
 #[derive(thiserror::Error, PartialEq)]
diff --git a/avro/src/headers.rs b/avro/src/headers.rs
new file mode 100644
index 0000000..971b182
--- /dev/null
+++ b/avro/src/headers.rs
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Handling of Avro magic headers
+use uuid::Uuid;
+
+use crate::{rabin::Rabin, schema::SchemaFingerprint, AvroResult, Schema};
+
+/// This trait represents that an object is able to construct an Avro message 
header. It is
+/// implemented for some known header types already. If you need a header type 
that is not already
+/// included here, then you can create your own struct and implement this 
trait.
+pub trait HeaderBuilder {
+    fn build_header(&self) -> Vec<u8>;
+}
+
+/// HeaderBuilder based on the Rabin schema fingerprint
+///
+/// This is the default and will be used automatically by the `new` impls in
+/// [crate::reader::GenericSingleObjectReader] and 
[crate::writer::GenericSingleObjectWriter].
+pub struct RabinFingerprintHeader {
+    fingerprint: SchemaFingerprint,
+}
+
+impl RabinFingerprintHeader {
+    /// Use this helper to build an instance from an existing Avro `Schema`.
+    pub fn from_schema(schema: &Schema) -> Self {
+        let fingerprint = schema.fingerprint::<Rabin>();
+        RabinFingerprintHeader { fingerprint }
+    }
+}
+
+impl HeaderBuilder for RabinFingerprintHeader {
+    fn build_header(&self) -> Vec<u8> {
+        let bytes = &self.fingerprint.bytes;
+        vec![
+            0xC3, 0x01, bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], 
bytes[5], bytes[6],
+            bytes[7],
+        ]
+    }
+}
+
+/// HeaderBuilder based on
+/// [Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) 
schema UUID
+///
+/// See the function docs for usage details
+pub struct GlueSchemaUuidHeader {
+    schema_uuid: Uuid,
+}
+
+impl GlueSchemaUuidHeader {
+    /// Create an instance of the struct from a Glue Schema UUID
+    ///
+    /// Code for writing messages will most likely want to use this. You will 
need to determine
+    /// via other means the correct Glue schema UUID and use it with this 
method to be able to
+    /// create Avro-encoded messages with the correct headers.
+    pub fn from_uuid(schema_uuid: Uuid) -> Self {
+        GlueSchemaUuidHeader { schema_uuid }
+    }
+
+    /// The minimum length of a Glue header.
+    /// 2 bytes for the special prefix (3, 0) plus
+    /// 16 bytes for the Uuid
+    const GLUE_HEADER_LENGTH: usize = 18;
+
+    /// Create an instance of the struct based on parsing the UUID out of the 
header of a raw
+    /// message
+    ///
+    /// Code for reading messages will most likely want to use this. Once you 
receive the raw bytes
+    /// of a message, use this function to build the struct from it. That 
struct can then be used
+    /// with the below `schema_uuid` function to retrieve the UUID in order to 
retrieve the correct
+    /// schema for the message. You can then use the raw message, the schema, 
and the struct
+    /// instance to read the message.
+    pub fn parse_from_raw_avro(message_payload: &[u8]) -> AvroResult<Self> {
+        if message_payload.len() < Self::GLUE_HEADER_LENGTH {
+            return Err(crate::error::Error::HeaderMagic);
+        }
+        let schema_uuid =
+            
Uuid::from_slice(&message_payload[2..18]).map_err(crate::Error::UuidFromSlice)?;
+        Ok(GlueSchemaUuidHeader { schema_uuid })
+    }
+
+    /// Retrieve the UUID from the object
+    ///
+    /// This is most useful in conjunction with the `parse_from_raw_avro` 
function to retrieve the
+    /// actual UUID from the raw data of a received message.
+    pub fn schema_uuid(&self) -> Uuid {
+        self.schema_uuid
+    }
+}
+
+impl HeaderBuilder for GlueSchemaUuidHeader {
+    fn build_header(&self) -> Vec<u8> {
+        let mut output_vec: Vec<u8> = vec![3, 0];
+        output_vec.extend_from_slice(self.schema_uuid.as_bytes());
+        output_vec
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use apache_avro_test_helper::TestResult;
+
+    #[test]
+    fn test_rabin_fingerprint_header() -> TestResult {
+        let schema_str = r#"
+            {
+            "type": "record",
+            "name": "test",
+            "fields": [
+                {
+                "name": "a",
+                "type": "long",
+                "default": 42
+                },
+                {
+                "name": "b",
+                "type": "string"
+                }
+            ]
+            }
+            "#;
+        let schema = Schema::parse_str(schema_str)?;
+        let header_builder = RabinFingerprintHeader::from_schema(&schema);
+        let computed_header = header_builder.build_header();
+        let expected_header: Vec<u8> = vec![195, 1, 232, 198, 194, 12, 97, 95, 
44, 71];
+        assert_eq!(computed_header, expected_header);
+        Ok(())
+    }
+
+    #[test]
+    fn test_glue_schema_header() -> TestResult {
+        let schema_uuid = 
Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
+        let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
+        let computed_header = header_builder.build_header();
+        let expected_header: Vec<u8> = vec![
+            3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72, 
90, 95,
+        ];
+        assert_eq!(computed_header, expected_header);
+        Ok(())
+    }
+
+    #[test]
+    fn test_glue_header_parse() -> TestResult {
+        let incoming_avro_message: Vec<u8> = vec![
+            3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72, 
90, 95, 65, 65, 65,
+        ];
+        let header_builder = 
GlueSchemaUuidHeader::parse_from_raw_avro(&incoming_avro_message)?;
+        let expected_schema_uuid = 
Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
+        assert_eq!(header_builder.schema_uuid(), expected_schema_uuid);
+        Ok(())
+    }
+
+    #[test]
+    fn test_glue_header_parse_err_on_message_too_short() -> TestResult {
+        let incoming_message: Vec<u8> = vec![3, 0, 178, 241, 207, 0, 4, 52, 1];
+        let header_builder_res = 
GlueSchemaUuidHeader::parse_from_raw_avro(&incoming_message);
+        assert!(matches!(
+            header_builder_res,
+            Err(crate::error::Error::HeaderMagic)
+        ));
+        Ok(())
+    }
+}
diff --git a/avro/src/lib.rs b/avro/src/lib.rs
index 247ab18..930541a 100644
--- a/avro/src/lib.rs
+++ b/avro/src/lib.rs
@@ -871,6 +871,7 @@ mod ser_schema;
 mod util;
 mod writer;
 
+pub mod headers;
 pub mod rabin;
 pub mod schema;
 pub mod schema_compatibility;
diff --git a/avro/src/reader.rs b/avro/src/reader.rs
index 0830035..a70fffd 100644
--- a/avro/src/reader.rs
+++ b/avro/src/reader.rs
@@ -19,7 +19,7 @@
 use crate::{
     decode::{decode, decode_internal},
     from_value,
-    rabin::Rabin,
+    headers::{HeaderBuilder, RabinFingerprintHeader},
     schema::{
         resolve_names, resolve_names_with_schemata, AvroSchema, Names, 
ResolvedOwnedSchema,
         ResolvedSchema, Schema,
@@ -503,24 +503,20 @@ pub fn from_avro_datum_reader_schemata<R: Read>(
 
 pub struct GenericSingleObjectReader {
     write_schema: ResolvedOwnedSchema,
-    expected_header: [u8; 10],
+    expected_header: Vec<u8>,
 }
 
 impl GenericSingleObjectReader {
     pub fn new(schema: Schema) -> AvroResult<GenericSingleObjectReader> {
-        let fingerprint = schema.fingerprint::<Rabin>();
-        let expected_header = [
-            0xC3,
-            0x01,
-            fingerprint.bytes[0],
-            fingerprint.bytes[1],
-            fingerprint.bytes[2],
-            fingerprint.bytes[3],
-            fingerprint.bytes[4],
-            fingerprint.bytes[5],
-            fingerprint.bytes[6],
-            fingerprint.bytes[7],
-        ];
+        let header_builder = RabinFingerprintHeader::from_schema(&schema);
+        Self::new_with_header_builder(schema, header_builder)
+    }
+
+    pub fn new_with_header_builder<HB: HeaderBuilder>(
+        schema: Schema,
+        header_builder: HB,
+    ) -> AvroResult<GenericSingleObjectReader> {
+        let expected_header = header_builder.build_header();
         Ok(GenericSingleObjectReader {
             write_schema: ResolvedOwnedSchema::try_from(schema)?,
             expected_header,
@@ -528,7 +524,7 @@ impl GenericSingleObjectReader {
     }
 
     pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
-        let mut header: [u8; 10] = [0; 10];
+        let mut header = vec![0; self.expected_header.len()];
         match reader.read_exact(&mut header) {
             Ok(_) => {
                 if self.expected_header == header {
@@ -540,7 +536,7 @@ impl GenericSingleObjectReader {
                     )
                 } else {
                     Err(Error::SingleObjectHeaderMismatch(
-                        self.expected_header,
+                        self.expected_header.clone(),
                         header,
                     ))
                 }
@@ -602,11 +598,12 @@ pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::{encode::encode, types::Record};
+    use crate::{encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin, 
types::Record};
     use apache_avro_test_helper::TestResult;
     use pretty_assertions::assert_eq;
     use serde::Deserialize;
     use std::io::Cursor;
+    use uuid::Uuid;
 
     const SCHEMA: &str = r#"
     {
@@ -1034,6 +1031,24 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn avro_rs_164_generic_reader_alternate_header() -> TestResult {
+        let schema_uuid = 
Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
+        let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
+        let generic_reader = 
GenericSingleObjectReader::new_with_header_builder(
+            TestSingleObjectReader::get_schema(),
+            header_builder,
+        )
+        .expect("failed to build reader");
+        let data_to_read: Vec<u8> = vec![
+            3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72, 
90, 95,
+        ];
+        let mut to_read = &data_to_read[..];
+        let read_result = generic_reader.read_value(&mut to_read);
+        matches!(read_result, Err(crate::Error::ReadBytes(_)));
+        Ok(())
+    }
+
     #[cfg(not(feature = "snappy"))]
     #[test]
     fn test_avro_3549_read_not_enabled_codec() {
diff --git a/avro/src/writer.rs b/avro/src/writer.rs
index ed41f0e..7406e48 100644
--- a/avro/src/writer.rs
+++ b/avro/src/writer.rs
@@ -18,14 +18,14 @@
 //! Logic handling writing in Avro format at user level.
 use crate::{
     encode::{encode, encode_internal, encode_to_vec},
-    rabin::Rabin,
+    headers::{HeaderBuilder, RabinFingerprintHeader},
     schema::{AvroSchema, Name, ResolvedOwnedSchema, ResolvedSchema, Schema},
     ser_schema::SchemaAwareWriteSerializer,
     types::Value,
     AvroResult, Codec, Error,
 };
 use serde::Serialize;
-use std::{collections::HashMap, io::Write, marker::PhantomData};
+use std::{collections::HashMap, io::Write, marker::PhantomData, 
ops::RangeInclusive};
 
 const DEFAULT_BLOCK_SIZE: usize = 16000;
 const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
@@ -488,20 +488,17 @@ impl GenericSingleObjectWriter {
         schema: &Schema,
         initial_buffer_cap: usize,
     ) -> AvroResult<GenericSingleObjectWriter> {
-        let fingerprint = schema.fingerprint::<Rabin>();
+        let header_builder = RabinFingerprintHeader::from_schema(schema);
+        Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, 
header_builder)
+    }
+
+    pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
+        schema: &Schema,
+        initial_buffer_cap: usize,
+        header_builder: HB,
+    ) -> AvroResult<GenericSingleObjectWriter> {
         let mut buffer = Vec::with_capacity(initial_buffer_cap);
-        let header = [
-            0xC3,
-            0x01,
-            fingerprint.bytes[0],
-            fingerprint.bytes[1],
-            fingerprint.bytes[2],
-            fingerprint.bytes[3],
-            fingerprint.bytes[4],
-            fingerprint.bytes[5],
-            fingerprint.bytes[6],
-            fingerprint.bytes[7],
-        ];
+        let header = header_builder.build_header();
         buffer.extend_from_slice(&header);
 
         Ok(GenericSingleObjectWriter {
@@ -510,15 +507,18 @@ impl GenericSingleObjectWriter {
         })
     }
 
+    const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
+
     /// Write the referenced Value to the provided Write object. Returns a 
result with the number of bytes written including the header
     pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> 
AvroResult<usize> {
-        if self.buffer.len() != 10 {
+        let original_length = self.buffer.len();
+        if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
             Err(Error::IllegalSingleObjectWriterState)
         } else {
             write_value_ref_owned_resolved(&self.resolved, v, &mut 
self.buffer)?;
             writer.write_all(&self.buffer).map_err(Error::WriteBytes)?;
             let len = self.buffer.len();
-            self.buffer.truncate(10);
+            self.buffer.truncate(original_length);
             Ok(len)
         }
     }
@@ -701,6 +701,8 @@ mod tests {
     use crate::{
         decimal::Decimal,
         duration::{Days, Duration, Millis, Months},
+        headers::GlueSchemaUuidHeader,
+        rabin::Rabin,
         schema::{DecimalSchema, FixedSchema, Name},
         types::Record,
         util::zig_i64,
@@ -708,6 +710,7 @@ mod tests {
     };
     use pretty_assertions::assert_eq;
     use serde::{Deserialize, Serialize};
+    use uuid::Uuid;
 
     use crate::codec::DeflateSettings;
     use apache_avro_test_helper::TestResult;
@@ -1384,6 +1387,33 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn test_single_object_writer_with_header_builder() -> TestResult {
+        let mut buf: Vec<u8> = Vec::new();
+        let obj = TestSingleObjectWriter {
+            a: 300,
+            b: 34.555,
+            c: vec!["cat".into(), "dog".into()],
+        };
+        let schema_uuid = 
Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
+        let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
+        let mut writer = 
GenericSingleObjectWriter::new_with_capacity_and_header_builder(
+            &TestSingleObjectWriter::get_schema(),
+            1024,
+            header_builder,
+        )
+        .expect("Should resolve schema");
+        let value = obj.into();
+        writer
+            .write_value_ref(&value, &mut buf)
+            .expect("Error serializing properly");
+
+        assert_eq!(buf[0], 0x03);
+        assert_eq!(buf[1], 0x00);
+        assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
+        Ok(())
+    }
+
     #[test]
     fn test_writer_parity() -> TestResult {
         let obj1 = TestSingleObjectWriter {

Reply via email to