scovich commented on code in PR #8006:
URL: https://github.com/apache/arrow-rs/pull/8006#discussion_r2244284471


##########
arrow-avro/src/reader/mod.rs:
##########
@@ -154,39 +167,130 @@ impl Decoder {
     ///
     /// Returns the number of bytes consumed.
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        if self.active_fingerprint.is_none()
+            && self.writer_schema_store.is_some()
+            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+        {
+            return Err(ArrowError::ParseError(
+                "Expected single‑object encoding fingerprint prefix for first 
message \
+                     (writer_schema_store is set but active_fingerprint is 
None)"
+                    .into(),
+            ));
+        }
         let mut total_consumed = 0usize;
-        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
-            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
-            // A successful call to record_decoder.decode means one row was 
decoded.
-            // If `consumed` is 0 on a non-empty buffer, it implies a valid 
zero-byte record.
-            // We increment `decoded_rows` to mark progress and avoid an 
infinite loop.
-            // We add `consumed` (which can be 0) to `total_consumed`.
-            total_consumed += consumed;
-            self.decoded_rows += 1;
+        let hash_type = self.writer_schema_store.as_ref().map_or(
+            FingerprintAlgorithm::Rabin,
+            SchemaStore::fingerprint_algorithm,
+        );
+        while total_consumed < data.len() && self.remaining_capacity > 0 {
+            if let Some(prefix_bytes) = 
self.handle_prefix(&data[total_consumed..], hash_type)? {
+                // A batch is complete when its `remaining_capacity` is 0. It 
may be completed early if
+                // a schema change is detected or there are insufficient bytes 
to read the next prefix.
+                // A schema change requires a new batch.
+                total_consumed += prefix_bytes;
+                break;
+            }
+            let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
+            total_consumed += n;
+            self.remaining_capacity -= 1;
         }
         Ok(total_consumed)
     }
 
     /// Produce a `RecordBatch` if at least one row is fully decoded, returning
     /// `Ok(None)` if no new rows are available.
     pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
-        if self.decoded_rows == 0 {
-            Ok(None)
-        } else {
-            let batch = self.record_decoder.flush()?;
-            self.decoded_rows = 0;
-            Ok(Some(batch))
+        if self.remaining_capacity == self.batch_size {
+            return Ok(None);
+        }
+        let batch = self.active_decoder.flush()?;
+        self.remaining_capacity = self.batch_size;
+        // Apply a pending schema switch if one is staged
+        if let Some((new_fingerprint, new_decoder)) = 
self.pending_schema.take() {
+            // Cache the old decoder before replacing it
+            if let Some(old_fingerprint) = 
self.active_fingerprint.replace(new_fingerprint) {
+                let old_decoder = std::mem::replace(&mut self.active_decoder, 
new_decoder);
+                self.cache.shift_remove(&old_fingerprint);
+                self.cache.insert(old_fingerprint, old_decoder);
+                if self.cache.len() > self.max_cache_size {
+                    self.cache.shift_remove_index(0);
+                }
+            } else {
+                self.active_decoder = new_decoder;
+            }
+        }
+        Ok(Some(batch))
+    }
+
+    #[inline]
+    fn handle_prefix(
+        &mut self,
+        buf: &[u8],
+        hash_type: FingerprintAlgorithm,
+    ) -> Result<Option<usize>, ArrowError> {
+        if self.writer_schema_store.is_none() || 
!buf.starts_with(&SINGLE_OBJECT_MAGIC) {
+            return Ok(None);
+        }
+        let fp_bytes = &buf[2..]; // safe thanks to the `starts_with` check 
above
+        let new_fp = match hash_type {
+            FingerprintAlgorithm::Rabin => {
+                let Ok(bytes) = <[u8; 8]>::try_from(fp_bytes) else {

Review Comment:
   I don't think this can work? the slice -> array conversion requires an 
exact-length slice. It's annoying enough that the parquet-variant crate defines 
a [helper 
function](https://github.com/apache/arrow-rs/blob/main/parquet-variant/src/utils.rs#L61-L68)
 to hide it. Using it here would look like this:
   ```suggestion
                   let Ok(bytes) = array_from_slice(buf, 2) else {
   ```
   ... but I don't know if that "extract an array from a slice at a given 
offset" pattern arises often enough in this crate to be worth creating a 
similar helper here?
   
   Notes:
   * Even tho array-from-slice conversion has a fallible signature, the helper 
guarantees it's infallible in practice by forcing both the sub-slice and 
conversion operations to use the same generic constant `N`; the initial 
sub-slicing operation can still fail if the buffer is too small, tho.
   * The compiler can infer the array size for `bytes` from the 
`u64::from_le_bytes` call that consumes it



##########
arrow-avro/src/schema.rs:
##########
@@ -260,13 +274,369 @@ pub struct Fixed<'a> {
     pub attributes: Attributes<'a>,
 }
 
+/// Supported fingerprint algorithms for Avro schema identification.
+/// Currently only `Rabin` is supported, `SHA256` and `MD5` support will come 
in a future update
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+pub enum FingerprintAlgorithm {
+    /// 64‑bit CRC‑64‑AVRO Rabin fingerprint.
+    Rabin,
+}
+
+/// A schema fingerprint in one of the supported formats.
+///
+/// This is used as the key inside `SchemaStore` `HashMap`. Each `SchemaStore`
+/// instance always stores only one variant, matching its configured
+/// `FingerprintAlgorithm`, but the enum makes the API uniform.
+/// Currently only `Rabin` is supported
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints>
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+pub enum Fingerprint {
+    /// A 64-bit Rabin fingerprint.
+    Rabin(u64),
+}
+
+/// Allow easy extraction of the algorithm used to create a fingerprint.
+impl From<&Fingerprint> for FingerprintAlgorithm {
+    #[inline]
+    fn from(fp: &Fingerprint) -> Self {
+        match fp {
+            Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin,
+        }
+    }
+}
+
+/// Generates a fingerprint for the given `Schema` using the specified 
`FingerprintAlgorithm`.
+#[inline]
+pub(crate) fn generate_fingerprint(
+    schema: &Schema,
+    hash_type: FingerprintAlgorithm,
+) -> Result<Fingerprint, ArrowError> {
+    let canonical = generate_canonical_form(schema).map_err(|e| {
+        ArrowError::ComputeError(format!("Failed to generate canonical form 
for schema: {e}"))
+    })?;
+    match hash_type {
+        FingerprintAlgorithm::Rabin => {
+            Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical)))
+        }
+    }
+}
+
+/// Generates the 64-bit Rabin fingerprint for the given `Schema`.
+///
+/// The fingerprint is computed from the canonical form of the schema.
+/// This is also known as `CRC-64-AVRO`.
+///
+/// # Returns
+/// A `Fingerprint::Rabin` variant containing the 64-bit fingerprint.
+#[inline]
+pub fn generate_fingerprint_rabin(schema: &Schema) -> Result<Fingerprint, 
ArrowError> {
+    generate_fingerprint(schema, FingerprintAlgorithm::Rabin)
+}
+
+/// Generates the Parsed Canonical Form for the given [`Schema`].
+///
+/// The canonical form is a standardized JSON representation of the schema,
+/// primarily used for generating a schema fingerprint for equality checking.
+///
+/// This form strips attributes that do not affect the schema's identity,
+/// such as `doc` fields, `aliases`, and any properties not defined in the
+/// Avro specification.
+///
+/// 
<https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas>
+#[inline]
+pub fn generate_canonical_form(schema: &Schema) -> Result<String, ArrowError> {
+    build_canonical(schema, None)
+}
+
+/// An in-memory cache of Avro schemas, indexed by their fingerprint.
+///
+/// `SchemaStore` provides a mechanism to store and retrieve Avro schemas 
efficiently.
+/// Each schema is associated with a unique [`Fingerprint`], which is 
generated based
+/// on the schema's canonical form and a specific hashing algorithm.
+///
+/// A `SchemaStore` instance is configured to use a single 
[`FingerprintAlgorithm`] such as Rabin,
+/// MD5 (not yet supported), or SHA256 (not yet supported) for all its 
operations.
+/// This ensures consistency when generating fingerprints and looking up 
schemas.
+/// All schemas registered will have their fingerprint computed with this 
algorithm, and
+/// lookups must use a matching fingerprint.
+///
+/// The lifetime parameter `'a` corresponds to the lifetime of the string 
slices
+/// contained within the stored [`Schema`] objects. This means the 
`SchemaStore`
+/// cannot outlive the data referenced by the schemas it contains.
+///
+/// # Examples
+///
+/// ```no_run
+/// // Create a new store with the default Rabin fingerprinting.
+/// use arrow_avro::schema::{PrimitiveType, Schema, SchemaStore, TypeName};
+///
+/// let mut store = SchemaStore::new();
+/// let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
+/// // Register the schema to get its fingerprint.
+/// let fingerprint = store.register(schema.clone()).unwrap();
+/// // Use the fingerprint to look up the schema.
+/// let retrieved_schema = store.lookup(&fingerprint).cloned();
+/// assert_eq!(retrieved_schema, Some(schema));
+/// ```
+#[derive(Debug, Clone)]
+pub struct SchemaStore<'a> {
+    /// The hashing algorithm used for generating fingerprints.
+    fingerprint_algorithm: FingerprintAlgorithm,
+    /// A map from a schema's fingerprint to the schema itself.
+    schemas: HashMap<Fingerprint, Schema<'a>>,
+}
+
+impl<'a> TryFrom<&'a [Schema<'a>]> for SchemaStore<'a> {
+    type Error = ArrowError;
+
+    /// Creates a `SchemaStore` from a slice of schemas.
+    /// Each schema in the slice is registered with the new store.
+    fn try_from(schemas: &'a [Schema<'a>]) -> Result<Self, Self::Error> {
+        let mut store = SchemaStore::new();
+        for schema in schemas {
+            store.register(schema.clone())?;
+        }
+        Ok(store)
+    }
+}
+
+impl<'a> Default for SchemaStore<'a> {
+    fn default() -> Self {
+        Self {
+            fingerprint_algorithm: FingerprintAlgorithm::Rabin,
+            schemas: HashMap::new(),
+        }
+    }
+}
+
+impl<'a> SchemaStore<'a> {
+    /// Creates an empty `SchemaStore` using the default fingerprinting 
algorithm (64-bit Rabin).
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Registers a schema with the store and returns its fingerprint.
+    ///
+    /// A fingerprint is calculated for the given schema using the store's 
configured
+    /// hash type. If a schema with the same fingerprint does not already 
exist in the
+    /// store, the new schema is inserted. If the fingerprint already exists, 
the
+    /// existing schema is not overwritten.
+    ///
+    /// # Arguments
+    ///
+    /// * `schema` - The schema to register.
+    ///
+    /// # Returns
+    ///
+    /// A `Result` containing the `Fingerprint` of the schema if successful,
+    /// or an `ArrowError` on failure.
+    pub fn register(&mut self, schema: Schema<'a>) -> Result<Fingerprint, 
ArrowError> {
+        let fp = generate_fingerprint(&schema, self.fingerprint_algorithm)?;
+        match self.schemas.entry(fp) {
+            Entry::Occupied(entry) => {
+                if entry.get() != &schema {
+                    return Err(ArrowError::ComputeError(format!(
+                        "Schema fingerprint collision detected for fingerprint 
{fp:?}"
+                    )));
+                }
+            }
+            Entry::Vacant(entry) => {
+                entry.insert(schema);
+            }
+        }
+        Ok(fp)
+    }
+
+    /// Looks up a schema by its `Fingerprint`.
+    ///
+    /// # Arguments
+    ///
+    /// * `fp` - A reference to the `Fingerprint` of the schema to look up.
+    ///
+    /// # Returns
+    ///
+    /// An `Option` containing a clone of the `Schema` if found, otherwise 
`None`.
+    pub fn lookup(&self, fp: &Fingerprint) -> Option<&Schema<'a>> {
+        self.schemas.get(fp)
+    }
+
+    /// Returns the `FingerprintAlgorithm` used by the `SchemaStore` for 
fingerprinting.
+    pub(crate) fn fingerprint_algorithm(&self) -> FingerprintAlgorithm {
+        self.fingerprint_algorithm
+    }
+}
+
+#[inline]
+fn quote(s: &str) -> Result<String, ArrowError> {
+    serde_json::to_string(s)
+        .map_err(|e| ArrowError::ComputeError(format!("Failed to quote string: 
{e}")))
+}
+
+fn make_fullname(name: &str, namespace_attr: Option<&str>, enclosing_ns: 
Option<&str>) -> String {
+    match namespace_attr.or(enclosing_ns) {
+        Some(ns) if !name.contains('.') => format!("{ns}.{name}"),
+        _ => name.to_string(),

Review Comment:
   Got it, thanks for the explanation of yet another intricacy of the spec.
   
   Maybe worth a code comment summarizing this?



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -154,39 +167,130 @@ impl Decoder {
     ///
     /// Returns the number of bytes consumed.
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        if self.active_fingerprint.is_none()
+            && self.writer_schema_store.is_some()
+            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+        {
+            return Err(ArrowError::ParseError(
+                "Expected single‑object encoding fingerprint prefix for first 
message \
+                     (writer_schema_store is set but active_fingerprint is 
None)"
+                    .into(),
+            ));
+        }
         let mut total_consumed = 0usize;
-        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
-            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
-            // A successful call to record_decoder.decode means one row was 
decoded.
-            // If `consumed` is 0 on a non-empty buffer, it implies a valid 
zero-byte record.
-            // We increment `decoded_rows` to mark progress and avoid an 
infinite loop.
-            // We add `consumed` (which can be 0) to `total_consumed`.
-            total_consumed += consumed;
-            self.decoded_rows += 1;
+        let hash_type = self.writer_schema_store.as_ref().map_or(
+            FingerprintAlgorithm::Rabin,
+            SchemaStore::fingerprint_algorithm,
+        );
+        while total_consumed < data.len() && self.remaining_capacity > 0 {
+            if let Some(prefix_bytes) = 
self.handle_prefix(&data[total_consumed..], hash_type)? {
+                // A batch is complete when its `remaining_capacity` is 0. It 
may be completed early if
+                // a schema change is detected or there are insufficient bytes 
to read the next prefix.
+                // A schema change requires a new batch.
+                total_consumed += prefix_bytes;
+                break;
+            }
+            let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
+            total_consumed += n;
+            self.remaining_capacity -= 1;
         }
         Ok(total_consumed)
     }
 
     /// Produce a `RecordBatch` if at least one row is fully decoded, returning
     /// `Ok(None)` if no new rows are available.
     pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
-        if self.decoded_rows == 0 {
-            Ok(None)
-        } else {
-            let batch = self.record_decoder.flush()?;
-            self.decoded_rows = 0;
-            Ok(Some(batch))
+        if self.remaining_capacity == self.batch_size {
+            return Ok(None);
+        }
+        let batch = self.active_decoder.flush()?;
+        self.remaining_capacity = self.batch_size;
+        // Apply a pending schema switch if one is staged
+        if let Some((new_fingerprint, new_decoder)) = 
self.pending_schema.take() {
+            // Cache the old decoder before replacing it
+            if let Some(old_fingerprint) = 
self.active_fingerprint.replace(new_fingerprint) {
+                let old_decoder = std::mem::replace(&mut self.active_decoder, 
new_decoder);
+                self.cache.shift_remove(&old_fingerprint);
+                self.cache.insert(old_fingerprint, old_decoder);
+                if self.cache.len() > self.max_cache_size {
+                    self.cache.shift_remove_index(0);
+                }
+            } else {
+                self.active_decoder = new_decoder;
+            }
+        }
+        Ok(Some(batch))
+    }
+
+    #[inline]
+    fn handle_prefix(
+        &mut self,
+        buf: &[u8],
+        hash_type: FingerprintAlgorithm,
+    ) -> Result<Option<usize>, ArrowError> {
+        if self.writer_schema_store.is_none() || 
!buf.starts_with(&SINGLE_OBJECT_MAGIC) {
+            return Ok(None);

Review Comment:
   I think there's a subtle bug here -- suppose there really is a schema 
change, but the buffer has only 1 byte left. Then by returning None we would 
tell our caller that no schema change is coming and the caller would attempt to 
consume the byte as part of a (possibly single-byte) record instead.
   
   I think we need to split up the checks? 
   * no schema store => `Ok(None)`
   * buffer has fewer than `prefix_len` bytes left => `Ok(Some(0))`
   * the first two bytes are not magic => `Ok(None)`
   * the first two bytes are magic, but there are not enough bytes left for the 
actual fingerprint => `Ok(Some(0))`
   * the first two bytes are magic and there are enough bytes for the actual 
(Rabin) fingerprint => `Ok(Some(10))`, possibly with a schema change if the new 
fingerprint differs from the active one.



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -154,39 +167,130 @@ impl Decoder {
     ///
     /// Returns the number of bytes consumed.
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        if self.active_fingerprint.is_none()
+            && self.writer_schema_store.is_some()
+            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+        {
+            return Err(ArrowError::ParseError(
+                "Expected single‑object encoding fingerprint prefix for first 
message \
+                     (writer_schema_store is set but active_fingerprint is 
None)"
+                    .into(),
+            ));
+        }
         let mut total_consumed = 0usize;
-        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
-            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
-            // A successful call to record_decoder.decode means one row was 
decoded.
-            // If `consumed` is 0 on a non-empty buffer, it implies a valid 
zero-byte record.
-            // We increment `decoded_rows` to mark progress and avoid an 
infinite loop.
-            // We add `consumed` (which can be 0) to `total_consumed`.
-            total_consumed += consumed;
-            self.decoded_rows += 1;
+        let hash_type = self.writer_schema_store.as_ref().map_or(
+            FingerprintAlgorithm::Rabin,
+            SchemaStore::fingerprint_algorithm,
+        );
+        while total_consumed < data.len() && self.remaining_capacity > 0 {
+            if let Some(prefix_bytes) = 
self.handle_prefix(&data[total_consumed..], hash_type)? {
+                // A batch is complete when its `remaining_capacity` is 0. It 
may be completed early if
+                // a schema change is detected or there are insufficient bytes 
to read the next prefix.
+                // A schema change requires a new batch.
+                total_consumed += prefix_bytes;
+                break;
+            }
+            let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
+            total_consumed += n;
+            self.remaining_capacity -= 1;
         }
         Ok(total_consumed)
     }
 
     /// Produce a `RecordBatch` if at least one row is fully decoded, returning
     /// `Ok(None)` if no new rows are available.
     pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
-        if self.decoded_rows == 0 {
-            Ok(None)
-        } else {
-            let batch = self.record_decoder.flush()?;
-            self.decoded_rows = 0;
-            Ok(Some(batch))
+        if self.remaining_capacity == self.batch_size {
+            return Ok(None);
+        }
+        let batch = self.active_decoder.flush()?;
+        self.remaining_capacity = self.batch_size;
+        // Apply a pending schema switch if one is staged
+        if let Some((new_fingerprint, new_decoder)) = 
self.pending_schema.take() {
+            // Cache the old decoder before replacing it
+            if let Some(old_fingerprint) = 
self.active_fingerprint.replace(new_fingerprint) {
+                let old_decoder = std::mem::replace(&mut self.active_decoder, 
new_decoder);
+                self.cache.shift_remove(&old_fingerprint);
+                self.cache.insert(old_fingerprint, old_decoder);
+                if self.cache.len() > self.max_cache_size {
+                    self.cache.shift_remove_index(0);
+                }
+            } else {
+                self.active_decoder = new_decoder;
+            }
+        }
+        Ok(Some(batch))
+    }
+
+    #[inline]
+    fn handle_prefix(
+        &mut self,
+        buf: &[u8],
+        hash_type: FingerprintAlgorithm,
+    ) -> Result<Option<usize>, ArrowError> {
+        if self.writer_schema_store.is_none() || 
!buf.starts_with(&SINGLE_OBJECT_MAGIC) {
+            return Ok(None);

Review Comment:
   From what I can tell, this whole operation should actually be infallible, 
because the only "fallible" action it performs is slice-to-array conversion 
that can be made infallible.
   
   Maybe something like this?
   ```rust
   // Attempts to read and install a new fingerprint, checking the magic bytes 
prefix first.
   // Returns the number of bytes consumed; None indicates there was no 
fingerprint and 
   // Some(0) indicates there were insufficient bytes available.
   fn handle_prefix(
       &mut self,
       mut buf: &[u8],
       hash_type: FingerprintAlgorithm,
   ) -> Option<usize> {
       if self.writer_schema_store.is_none() {
           return None; // no schema changes allowed
       }
   
       let Some(magic_bytes) = buf.split_off(SINGLE_OBJECT_MAGIC.len()) else {
           return Some(0); // not enough bytes available to even check magic
       };
       (magic_bytes == &SINGLE_OBJECT_MAGIC).then(|| {
           let fp_size = match hash_type {
               FingerprintAlgorithm::Rabin => {
                   self.handle_fingerprint(&buf, |bytes| 
Fingerprint::Rabin(u64::from_le_bytes(bytes)))
               }
           };
           fp_size.map_or(0, |n| n + magic_bytes.len())
       })
   }
   
   // Attempts to read and install a new fingerprint.
   // Returns the number of bytes consumed -- `Some(N)` -- or `None` if 
insufficient bytes were available.
   fn handle_fingerprint<const N: usize>(
       &mut self,
       buf: &[u8],
       fingerprint_from: FnOnce([u8; N]) -> Fingerprint,
   ) -> Option<usize> {
       // NOTE: Converting a slice of length N to an array of length N always 
succeeds
       let fp_bytes = buf.get(..N)?;
       let new_fp = fingerprint_from(fp_bytes.try_into().unwrap());
   
       if self.active_fingerprint != Some(new_fp) {
           // Fingerprint change => schema change => prepare to switch decoders.
           ...
       }
       
       Some(N)
   }
   ```
   As a nice side effect, the `prefix_len` is replaced by a generic function 
that lets the compiler capture the fingerprint's size, and which _also_ makes a 
nice hook point for installing the new fingerprint.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to