scovich commented on code in PR #8006:
URL: https://github.com/apache/arrow-rs/pull/8006#discussion_r2248037550


##########
arrow-avro/src/codec.rs:
##########
@@ -180,15 +182,15 @@ impl<'a> TryFrom<&Schema<'a>> for AvroField {
 /// Builder for an [`AvroField`]
 #[derive(Debug)]
 pub struct AvroFieldBuilder<'a> {
-    writer_schema: &'a Schema<'a>,
-    reader_schema: Option<&'a Schema<'a>>,
+    writer_schema: &'a AvroSchema<'a>,
+    reader_schema: Option<&'a AvroSchema<'a>>,

Review Comment:
   This seems like a pretty sweeping change (affects lots of other code outside 
this PR). Given that we're in the avro crate, maybe it's ok (or even better?) 
to let `Schema` be the avro schema, and just use `ArrowSchema[Ref]` for the 
exceptional cases where we're not dealing with avro schemas?
   
   Then again, we're surrounded by other `AvroXxx` typenames here, so maybe the 
change is just making things more uniform?
   
   What are your thoughts?



##########
arrow-avro/src/codec.rs:
##########
@@ -474,7 +477,7 @@ impl<'a> Resolver<'a> {
     }
 }
 
-/// Parses a [`AvroDataType`] from the provided [`Schema`] and the given 
`name` and `namespace`
+/// Parses a [`AvroDataType`] from the provided `Schema` and the given `name` 
and `namespace`

Review Comment:
   ```suggestion
   /// Parses a [`AvroDataType`] from the provided `schema` and the given 
`name` and `namespace`
   ```



##########
arrow-avro/src/schema.rs:
##########
@@ -467,37 +465,50 @@ impl<'a> SchemaStore<'a> {
     }
 }
 
-#[inline]
 fn quote(s: &str) -> Result<String, ArrowError> {
     serde_json::to_string(s)
         .map_err(|e| ArrowError::ComputeError(format!("Failed to quote string: 
{e}")))
 }
 
-fn make_fullname(name: &str, namespace_attr: Option<&str>, enclosing_ns: 
Option<&str>) -> String {
-    match namespace_attr.or(enclosing_ns) {
-        Some(ns) if !name.contains('.') => format!("{ns}.{name}"),
-        _ => name.to_string(),
+// Avro names are defined by a `name` and an optional `namespace`.
+// The full name is composed of the namespace and the name, separated by a dot.
+//
+// Avro specification defines two ways to specify a full name:
+// 1. The `name` attribute contains the full name (e.g., "a.b.c.d").
+//    In this case, the `namespace` attribute is ignored.
+// 2. The `name` attribute contains the simple name (e.g., "d") and the
+//    `namespace` attribute contains the namespace (e.g., "a.b.c").
+//
+// Each part of the name must match the regex `^[A-Za-z_][A-Za-z0-9_]*$`.
+// Complex paths with quotes or backticks like `a."hi".b` are not supported.
+//
+// This function constructs the full name and extracts the namespace,
+// handling both ways of specifying the name. It prioritizes a namespace
+// defined within the `name` attribute itself, then the explicit 
`namespace_attr`,
+// and finally the `enclosing_ns`.

Review Comment:
   Really nice comment!



##########
arrow-avro/src/schema.rs:
##########
@@ -467,37 +465,50 @@ impl<'a> SchemaStore<'a> {
     }
 }
 
-#[inline]
 fn quote(s: &str) -> Result<String, ArrowError> {
     serde_json::to_string(s)
         .map_err(|e| ArrowError::ComputeError(format!("Failed to quote string: 
{e}")))
 }
 
-fn make_fullname(name: &str, namespace_attr: Option<&str>, enclosing_ns: 
Option<&str>) -> String {
-    match namespace_attr.or(enclosing_ns) {
-        Some(ns) if !name.contains('.') => format!("{ns}.{name}"),
-        _ => name.to_string(),
+// Avro names are defined by a `name` and an optional `namespace`.
+// The full name is composed of the namespace and the name, separated by a dot.
+//
+// Avro specification defines two ways to specify a full name:
+// 1. The `name` attribute contains the full name (e.g., "a.b.c.d").
+//    In this case, the `namespace` attribute is ignored.
+// 2. The `name` attribute contains the simple name (e.g., "d") and the
+//    `namespace` attribute contains the namespace (e.g., "a.b.c").
+//
+// Each part of the name must match the regex `^[A-Za-z_][A-Za-z0-9_]*$`.
+// Complex paths with quotes or backticks like `a."hi".b` are not supported.
+//
+// This function constructs the full name and extracts the namespace,
+// handling both ways of specifying the name. It prioritizes a namespace
+// defined within the `name` attribute itself, then the explicit 
`namespace_attr`,
+// and finally the `enclosing_ns`.
+fn make_full_name(
+    name: &str,
+    namespace_attr: Option<&str>,
+    enclosing_ns: Option<&str>,
+) -> Result<(String, Option<String>), ArrowError> {

Review Comment:
   I don't see any code in this function that returns Err?



##########
arrow-avro/src/schema.rs:
##########
@@ -260,13 +276,389 @@ pub struct Fixed<'a> {
     pub attributes: Attributes<'a>,
 }
 
+/// Supported fingerprint algorithms for Avro schema identification.
+/// Currently only `Rabin` is supported, `SHA256` and `MD5` support will come 
in a future update
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+pub enum FingerprintAlgorithm {
+    /// 64‑bit CRC‑64‑AVRO Rabin fingerprint.
+    Rabin,
+}
+
+/// A schema fingerprint in one of the supported formats.
+///
+/// This is used as the key inside `SchemaStore` `HashMap`. Each `SchemaStore`
+/// instance always stores only one variant, matching its configured
+/// `FingerprintAlgorithm`, but the enum makes the API uniform.
+/// Currently only `Rabin` is supported
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints>
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+pub enum Fingerprint {
+    /// A 64-bit Rabin fingerprint.
+    Rabin(u64),
+}
+
+/// Allow easy extraction of the algorithm used to create a fingerprint.
+impl From<&Fingerprint> for FingerprintAlgorithm {
+    fn from(fp: &Fingerprint) -> Self {
+        match fp {
+            Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin,
+        }
+    }
+}
+
+/// Generates a fingerprint for the given `Schema` using the specified 
`FingerprintAlgorithm`.
+pub(crate) fn generate_fingerprint(
+    schema: &Schema,
+    hash_type: FingerprintAlgorithm,
+) -> Result<Fingerprint, ArrowError> {
+    let canonical = generate_canonical_form(schema).map_err(|e| {
+        ArrowError::ComputeError(format!("Failed to generate canonical form 
for schema: {e}"))
+    })?;
+    match hash_type {
+        FingerprintAlgorithm::Rabin => {
+            Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical)))
+        }
+    }
+}
+
+/// Generates the 64-bit Rabin fingerprint for the given `Schema`.
+///
+/// The fingerprint is computed from the canonical form of the schema.
+/// This is also known as `CRC-64-AVRO`.
+///
+/// # Returns
+/// A `Fingerprint::Rabin` variant containing the 64-bit fingerprint.
+pub fn generate_fingerprint_rabin(schema: &Schema) -> Result<Fingerprint, 
ArrowError> {
+    generate_fingerprint(schema, FingerprintAlgorithm::Rabin)
+}
+
+/// Generates the Parsed Canonical Form for the given [`Schema`].
+///
+/// The canonical form is a standardized JSON representation of the schema,
+/// primarily used for generating a schema fingerprint for equality checking.
+///
+/// This form strips attributes that do not affect the schema's identity,
+/// such as `doc` fields, `aliases`, and any properties not defined in the
+/// Avro specification.
+///
+/// 
<https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas>
+pub fn generate_canonical_form(schema: &Schema) -> Result<String, ArrowError> {
+    build_canonical(schema, None)
+}
+
+/// An in-memory cache of Avro schemas, indexed by their fingerprint.
+///
+/// `SchemaStore` provides a mechanism to store and retrieve Avro schemas 
efficiently.
+/// Each schema is associated with a unique [`Fingerprint`], which is 
generated based
+/// on the schema's canonical form and a specific hashing algorithm.
+///
+/// A `SchemaStore` instance is configured to use a single 
[`FingerprintAlgorithm`] such as Rabin,
+/// MD5 (not yet supported), or SHA256 (not yet supported) for all its 
operations.
+/// This ensures consistency when generating fingerprints and looking up 
schemas.
+/// All schemas registered will have their fingerprint computed with this 
algorithm, and
+/// lookups must use a matching fingerprint.
+///
+/// The lifetime parameter `'a` corresponds to the lifetime of the string 
slices
+/// contained within the stored [`Schema`] objects. This means the 
`SchemaStore`
+/// cannot outlive the data referenced by the schemas it contains.
+///
+/// # Examples
+///
+/// ```no_run
+/// // Create a new store with the default Rabin fingerprinting.
+/// use arrow_avro::schema::{PrimitiveType, Schema, SchemaStore, TypeName};
+///
+/// let mut store = SchemaStore::new();
+/// let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
+/// // Register the schema to get its fingerprint.
+/// let fingerprint = store.register(schema.clone()).unwrap();
+/// // Use the fingerprint to look up the schema.
+/// let retrieved_schema = store.lookup(&fingerprint).cloned();
+/// assert_eq!(retrieved_schema, Some(schema));
+/// ```
+#[derive(Debug, Clone)]
+pub struct SchemaStore<'a> {
+    /// The hashing algorithm used for generating fingerprints.
+    fingerprint_algorithm: FingerprintAlgorithm,
+    /// A map from a schema's fingerprint to the schema itself.
+    schemas: HashMap<Fingerprint, Schema<'a>>,
+}
+
+impl<'a> TryFrom<&'a [Schema<'a>]> for SchemaStore<'a> {
+    type Error = ArrowError;
+
+    /// Creates a `SchemaStore` from a slice of schemas.
+    /// Each schema in the slice is registered with the new store.
+    fn try_from(schemas: &'a [Schema<'a>]) -> Result<Self, Self::Error> {
+        let mut store = SchemaStore::new();
+        for schema in schemas {
+            store.register(schema.clone())?;
+        }
+        Ok(store)
+    }
+}
+
+impl<'a> Default for SchemaStore<'a> {
+    fn default() -> Self {
+        Self {
+            fingerprint_algorithm: FingerprintAlgorithm::Rabin,

Review Comment:
   I'm pretty sure we can use `#[derive(Default)]` for both this and 
`FingerprintAlgorithm`, by annotating the desired enum variant as the default?



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -272,17 +484,92 @@ impl ReaderBuilder {
         self
     }
 
-    /// Sets the Avro schema.
+    /// Sets the Avro reader schema.
     ///
     /// If a schema is not provided, the schema will be read from the Avro 
file header.
-    pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self {
-        self.schema = Some(schema);
+    pub fn with_reader_schema(mut self, reader_schema: AvroSchema<'static>) -> 
Self {
+        self.reader_schema = Some(reader_schema);
         self
     }
 
+    /// Sets the `SchemaStore` used for resolving writer schemas.
+    ///
+    /// This is necessary when decoding single-object encoded data that 
identifies
+    /// schemas by a fingerprint. The store allows the decoder to look up the
+    /// full writer schema from a fingerprint embedded in the data.
+    ///
+    /// Defaults to `None`.
+    pub fn with_writer_schema_store(mut self, store: SchemaStore<'static>) -> 
Self {
+        self.writer_schema_store = Some(store);
+        self
+    }
+
+    /// Sets the initial schema fingerprint for decoding single-object encoded 
data.
+    ///
+    /// This is useful when the data stream does not begin with a schema 
definition
+    /// or fingerprint, allowing the decoder to start with a known schema from 
the
+    /// `SchemaStore`.
+    ///
+    /// Defaults to `None`.
+    pub fn with_active_fingerprint(mut self, fp: Fingerprint) -> Self {
+        self.active_fingerprint = Some(fp);
+        self
+    }
+
+    /// Set the maximum number of decoders to cache.
+    ///
+    /// When dealing with Avro files that contain multiple schemas, we may 
need to switch
+    /// between different decoders. This cache avoids rebuilding them from 
scratch every time.
+    ///
+    /// Defaults to `20`.
+    pub fn with_max_decoder_cache_size(mut self, n: usize) -> Self {
+        self.decoder_cache_size = n;
+        self
+    }
+
+    // Validate the builder configuration against this truth‑table
+    //
+    // | writer_schema_store | reader_schema | active_fingerprint | Result |
+    // |---------------------|---------------|--------------------|--------|
+    // | None----------------| None----------| None---------------| Err----|
+    // | None----------------| None----------| Some---------------| Err----|
+    // | None----------------| Some----------| None---------------| Ok-----|
+    // | None----------------| Some----------| Some---------------| Err----|
+    // | Some----------------| None----------| None---------------| Err----|
+    // | Some----------------| None----------| Some---------------| Err----|

Review Comment:
   Something seems wrong here.
   The truth table indicates that `reader_schema` is always required?
   (all four `None` rows give an `Err` result)
   



##########
arrow-avro/src/schema.rs:
##########
@@ -260,13 +276,389 @@ pub struct Fixed<'a> {
     pub attributes: Attributes<'a>,
 }
 
+/// Supported fingerprint algorithms for Avro schema identification.
+/// Currently only `Rabin` is supported, `SHA256` and `MD5` support will come 
in a future update
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+pub enum FingerprintAlgorithm {
+    /// 64‑bit CRC‑64‑AVRO Rabin fingerprint.
+    Rabin,
+}
+
+/// A schema fingerprint in one of the supported formats.
+///
+/// This is used as the key inside `SchemaStore` `HashMap`. Each `SchemaStore`
+/// instance always stores only one variant, matching its configured
+/// `FingerprintAlgorithm`, but the enum makes the API uniform.
+/// Currently only `Rabin` is supported
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints>
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+pub enum Fingerprint {
+    /// A 64-bit Rabin fingerprint.
+    Rabin(u64),
+}
+
+/// Allow easy extraction of the algorithm used to create a fingerprint.
+impl From<&Fingerprint> for FingerprintAlgorithm {
+    fn from(fp: &Fingerprint) -> Self {
+        match fp {
+            Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin,
+        }
+    }
+}
+
+/// Generates a fingerprint for the given `Schema` using the specified 
`FingerprintAlgorithm`.
+pub(crate) fn generate_fingerprint(
+    schema: &Schema,
+    hash_type: FingerprintAlgorithm,
+) -> Result<Fingerprint, ArrowError> {
+    let canonical = generate_canonical_form(schema).map_err(|e| {
+        ArrowError::ComputeError(format!("Failed to generate canonical form 
for schema: {e}"))
+    })?;
+    match hash_type {
+        FingerprintAlgorithm::Rabin => {
+            Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical)))
+        }
+    }
+}
+
+/// Generates the 64-bit Rabin fingerprint for the given `Schema`.
+///
+/// The fingerprint is computed from the canonical form of the schema.
+/// This is also known as `CRC-64-AVRO`.
+///
+/// # Returns
+/// A `Fingerprint::Rabin` variant containing the 64-bit fingerprint.
+pub fn generate_fingerprint_rabin(schema: &Schema) -> Result<Fingerprint, 
ArrowError> {
+    generate_fingerprint(schema, FingerprintAlgorithm::Rabin)
+}
+
+/// Generates the Parsed Canonical Form for the given [`Schema`].
+///
+/// The canonical form is a standardized JSON representation of the schema,
+/// primarily used for generating a schema fingerprint for equality checking.
+///
+/// This form strips attributes that do not affect the schema's identity,
+/// such as `doc` fields, `aliases`, and any properties not defined in the
+/// Avro specification.
+///
+/// 
<https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas>
+pub fn generate_canonical_form(schema: &Schema) -> Result<String, ArrowError> {
+    build_canonical(schema, None)
+}
+
+/// An in-memory cache of Avro schemas, indexed by their fingerprint.
+///
+/// `SchemaStore` provides a mechanism to store and retrieve Avro schemas 
efficiently.
+/// Each schema is associated with a unique [`Fingerprint`], which is 
generated based
+/// on the schema's canonical form and a specific hashing algorithm.
+///
+/// A `SchemaStore` instance is configured to use a single 
[`FingerprintAlgorithm`] such as Rabin,
+/// MD5 (not yet supported), or SHA256 (not yet supported) for all its 
operations.
+/// This ensures consistency when generating fingerprints and looking up 
schemas.
+/// All schemas registered will have their fingerprint computed with this 
algorithm, and
+/// lookups must use a matching fingerprint.
+///
+/// The lifetime parameter `'a` corresponds to the lifetime of the string 
slices
+/// contained within the stored [`Schema`] objects. This means the 
`SchemaStore`
+/// cannot outlive the data referenced by the schemas it contains.
+///
+/// # Examples
+///
+/// ```no_run
+/// // Create a new store with the default Rabin fingerprinting.
+/// use arrow_avro::schema::{PrimitiveType, Schema, SchemaStore, TypeName};
+///
+/// let mut store = SchemaStore::new();
+/// let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
+/// // Register the schema to get its fingerprint.
+/// let fingerprint = store.register(schema.clone()).unwrap();
+/// // Use the fingerprint to look up the schema.
+/// let retrieved_schema = store.lookup(&fingerprint).cloned();
+/// assert_eq!(retrieved_schema, Some(schema));
+/// ```
+#[derive(Debug, Clone)]
+pub struct SchemaStore<'a> {
+    /// The hashing algorithm used for generating fingerprints.
+    fingerprint_algorithm: FingerprintAlgorithm,
+    /// A map from a schema's fingerprint to the schema itself.
+    schemas: HashMap<Fingerprint, Schema<'a>>,
+}
+
+impl<'a> TryFrom<&'a [Schema<'a>]> for SchemaStore<'a> {
+    type Error = ArrowError;
+
+    /// Creates a `SchemaStore` from a slice of schemas.
+    /// Each schema in the slice is registered with the new store.
+    fn try_from(schemas: &'a [Schema<'a>]) -> Result<Self, Self::Error> {
+        let mut store = SchemaStore::new();
+        for schema in schemas {
+            store.register(schema.clone())?;
+        }
+        Ok(store)
+    }
+}
+
+impl<'a> Default for SchemaStore<'a> {
+    fn default() -> Self {
+        Self {
+            fingerprint_algorithm: FingerprintAlgorithm::Rabin,

Review Comment:
   Google AI overview says:
   > Place the `#[default]` attribute on exactly one unit variant (a variant 
without any associated data) within your enum. This attribute tells the 
compiler which variant should be returned when `Default::default()` is called 
on your enum type.
   



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -272,17 +484,92 @@ impl ReaderBuilder {
         self
     }
 
-    /// Sets the Avro schema.
+    /// Sets the Avro reader schema.
     ///
     /// If a schema is not provided, the schema will be read from the Avro 
file header.
-    pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self {
-        self.schema = Some(schema);
+    pub fn with_reader_schema(mut self, reader_schema: AvroSchema<'static>) -> 
Self {
+        self.reader_schema = Some(reader_schema);
         self
     }
 
+    /// Sets the `SchemaStore` used for resolving writer schemas.
+    ///
+    /// This is necessary when decoding single-object encoded data that 
identifies
+    /// schemas by a fingerprint. The store allows the decoder to look up the
+    /// full writer schema from a fingerprint embedded in the data.
+    ///
+    /// Defaults to `None`.
+    pub fn with_writer_schema_store(mut self, store: SchemaStore<'static>) -> 
Self {
+        self.writer_schema_store = Some(store);
+        self
+    }
+
+    /// Sets the initial schema fingerprint for decoding single-object encoded 
data.
+    ///
+    /// This is useful when the data stream does not begin with a schema 
definition
+    /// or fingerprint, allowing the decoder to start with a known schema from 
the
+    /// `SchemaStore`.
+    ///
+    /// Defaults to `None`.
+    pub fn with_active_fingerprint(mut self, fp: Fingerprint) -> Self {
+        self.active_fingerprint = Some(fp);
+        self
+    }
+
+    /// Set the maximum number of decoders to cache.
+    ///
+    /// When dealing with Avro files that contain multiple schemas, we may 
need to switch
+    /// between different decoders. This cache avoids rebuilding them from 
scratch every time.
+    ///
+    /// Defaults to `20`.
+    pub fn with_max_decoder_cache_size(mut self, n: usize) -> Self {
+        self.decoder_cache_size = n;
+        self
+    }
+
+    // Validate the builder configuration against this truth‑table
+    //
+    // | writer_schema_store | reader_schema | active_fingerprint | Result |
+    // |---------------------|---------------|--------------------|--------|
+    // | None----------------| None----------| None---------------| Err----|
+    // | None----------------| None----------| Some---------------| Err----|
+    // | None----------------| Some----------| None---------------| Ok-----|
+    // | None----------------| Some----------| Some---------------| Err----|
+    // | Some----------------| None----------| None---------------| Err----|
+    // | Some----------------| None----------| Some---------------| Err----|
+    // | Some----------------| Some----------| None---------------| Ok-----|
+    // | Some----------------| Some----------| Some---------------| Ok-----|
+    fn validate(&self) -> Result<(), ArrowError> {
+        match (
+            self.writer_schema_store.is_some(),
+            self.reader_schema.is_some(),
+            self.active_fingerprint.is_some(),
+        ) {
+            // Row 3: No store, reader schema present, no fingerprint
+            (false, true, false)
+            // Row 7: Store is present, reader schema is resent, no fingerprint
+            | (true, true, false)
+            // Row 8: Store present, reader schema present, fingerprint present
+            | (true, true, true) => Ok(()),
+            // Fingerprint without a store (rows 2 & 4)
+            (false, _, true) => Err(ArrowError::InvalidArgumentError(
+                "Active fingerprint requires a writer schema store".into(),
+            )),
+            // Store present but no reader schema (rows 5 & 6)
+            (true, false, _) => Err(ArrowError::InvalidArgumentError(
+                "Reader schema must be set when writer schema store is 
provided".into(),
+            )),
+            // No schema store or reader schema provided (row 1)
+            (false, false, _) => Err(ArrowError::InvalidArgumentError(

Review Comment:
   Actually... I think the truth table is incorrect, see other comment.



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -182,21 +174,130 @@ impl Decoder {
             FingerprintAlgorithm::Rabin,
             SchemaStore::fingerprint_algorithm,
         );
+        // The loop stops when the batch is full, a schema change is staged,
+        // or handle_prefix indicates we need more bytes (Some(0)).
         while total_consumed < data.len() && self.remaining_capacity > 0 {
-            if let Some(prefix_bytes) = 
self.handle_prefix(&data[total_consumed..], hash_type)? {
-                // A batch is complete when its `remaining_capacity` is 0. It 
may be completed early if
-                // a schema change is detected or there are insufficient bytes 
to read the next prefix.
-                // A schema change requires a new batch.
-                total_consumed += prefix_bytes;
-                break;
+            match self.handle_prefix(&data[total_consumed..], hash_type)? {
+                None => {
+                    // No prefix: decode one row.
+                    let n = 
self.active_decoder.decode(&data[total_consumed..], 1)?;
+                    total_consumed += n;
+                    self.remaining_capacity -= 1;
+                }
+                Some(0) => {
+                    // Detected start of a prefix but need more bytes.
+                    break;
+                }
+                Some(n) => {
+                    // Consumed a complete prefix (n > 0). Stage flush and 
stop.
+                    total_consumed += n;
+                    break;
+                }
             }
-            let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
-            total_consumed += n;
-            self.remaining_capacity -= 1;
         }
         Ok(total_consumed)
     }
 
+    // Attempt to handle a single‑object‑encoding prefix at the current 
position.
+    //
+    // * Ok(None) – buffer does not start with the prefix.
+    // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller 
should await more bytes.
+    // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and 
fingerprint).
+    fn handle_prefix(
+        &mut self,
+        buf: &[u8],
+        hash_type: FingerprintAlgorithm,
+    ) -> Result<Option<usize>, ArrowError> {
+        // If there is no schema store, prefixes are unrecognized.
+        if self.writer_schema_store.is_none() {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Need at least the magic bytes to decide (2 bytes).
+        let Some(magic_bytes) = buf.get(..SINGLE_OBJECT_MAGIC.len()) else {
+            return Ok(Some(0)); // Get more bytes
+        };
+        // Bail out early if the magic does not match.
+        if magic_bytes != SINGLE_OBJECT_MAGIC {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Try to parse the fingerprint that follows the magic.
+        let fingerprint_size = match hash_type {
+            FingerprintAlgorithm::Rabin => self
+                .handle_fingerprint::<8>(&buf[SINGLE_OBJECT_MAGIC.len()..], 
|bytes| {
+                    Fingerprint::Rabin(u64::from_le_bytes(bytes))
+                })?,
+        };
+        // Convert the inner result into a “bytes consumed” count.
+        let consumed = match fingerprint_size {
+            Some(n) => n + SINGLE_OBJECT_MAGIC.len(), // magic + fingerprint
+            None => 0,                                // incomplete fingerprint
+        };
+        Ok(Some(consumed))
+    }
+
+    // Attempts to read and install a new fingerprint of `N` bytes.
+    //
+    // * Ok(None) – insufficient bytes (`buf.len() < `N`).
+    // * Ok(Some(N)) – fingerprint consumed (always `N`).
+    fn handle_fingerprint<const N: usize>(
+        &mut self,
+        buf: &[u8],
+        fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint,
+    ) -> Result<Option<usize>, ArrowError> {
+        // Need enough bytes to get fingerprint (next N bytes)
+        let Some(fingerprint_bytes) = buf.get(..N) else {
+            return Ok(None); // Get more bytes
+        };
+        // SAFETY: length checked above.
+        let new_fingerprint = 
fingerprint_from(fingerprint_bytes.try_into().unwrap());
+        // If the fingerprint indicates a schema change, prepare to switch 
decoders.
+        if self.active_fingerprint != Some(new_fingerprint) {
+            let new_decoder = match self.cache.shift_remove(&new_fingerprint) {
+                Some(decoder) => decoder,
+                None => self.create_decoder_for(new_fingerprint)?,
+            };
+            self.pending_schema = Some((new_fingerprint, new_decoder));
+            // If there are already decoded rows, we must flush them first.
+            // Reducing `remaining_capacity` to 0 ensures `flush` is called 
next.
+            if self.remaining_capacity < self.batch_size {
+                self.remaining_capacity = 0;
+            }
+        }
+        Ok(Some(N))
+    }
+
+    fn create_decoder_for(
+        &mut self,
+        new_fingerprint: Fingerprint,
+    ) -> Result<RecordDecoder, ArrowError> {
+        let writer_schema_store = self
+            .writer_schema_store
+            .as_ref()
+            .ok_or_else(|| ArrowError::ParseError("Schema store 
unavailable".into()))?;
+        let writer_schema = writer_schema_store
+            .lookup(&new_fingerprint)
+            .ok_or_else(|| {
+                ArrowError::ParseError(format!("Unknown fingerprint: 
{new_fingerprint:?}"))
+            })?;
+        match self.reader_schema {
+            Some(ref reader_schema) => {
+                let resolved = AvroField::resolve_from_writer_and_reader(
+                    writer_schema,
+                    reader_schema,
+                    self.utf8_view,
+                    self.strict_mode,
+                )?;
+                Ok(RecordDecoder::try_new_with_options(
+                    resolved.data_type(),
+                    self.utf8_view,
+                )?)
+            }
+            None => Err(ArrowError::ParseError(
+                "Reader schema unavailable for resolution".into(),
+            )),
+        }

Review Comment:
   ```suggestion
           let Some(ref reader_schema) = self.reader_schema else {
               return Err(ArrowError::ParseError(
                   "Reader schema unavailable for resolution".into(),
               ));
           };
           let resolved = AvroField::resolve_from_writer_and_reader(
               writer_schema,
               reader_schema,
               self.utf8_view,
               self.strict_mode,
           )?;
           RecordDecoder::try_new_with_options(
               resolved.data_type(),
               self.utf8_view,
           )
   ```
   Redundant `Ok(foo?)` there?



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -182,21 +174,130 @@ impl Decoder {
             FingerprintAlgorithm::Rabin,
             SchemaStore::fingerprint_algorithm,
         );
+        // The loop stops when the batch is full, a schema change is staged,
+        // or handle_prefix indicates we need more bytes (Some(0)).
         while total_consumed < data.len() && self.remaining_capacity > 0 {
-            if let Some(prefix_bytes) = 
self.handle_prefix(&data[total_consumed..], hash_type)? {
-                // A batch is complete when its `remaining_capacity` is 0. It 
may be completed early if
-                // a schema change is detected or there are insufficient bytes 
to read the next prefix.
-                // A schema change requires a new batch.
-                total_consumed += prefix_bytes;
-                break;
+            match self.handle_prefix(&data[total_consumed..], hash_type)? {
+                None => {
+                    // No prefix: decode one row.
+                    let n = 
self.active_decoder.decode(&data[total_consumed..], 1)?;
+                    total_consumed += n;
+                    self.remaining_capacity -= 1;
+                }
+                Some(0) => {
+                    // Detected start of a prefix but need more bytes.
+                    break;
+                }
+                Some(n) => {
+                    // Consumed a complete prefix (n > 0). Stage flush and 
stop.
+                    total_consumed += n;
+                    break;
+                }
             }
-            let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
-            total_consumed += n;
-            self.remaining_capacity -= 1;
         }
         Ok(total_consumed)
     }
 
+    // Attempt to handle a single‑object‑encoding prefix at the current 
position.
+    //
+    // * Ok(None) – buffer does not start with the prefix.
+    // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller 
should await more bytes.
+    // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and 
fingerprint).
+    fn handle_prefix(
+        &mut self,
+        buf: &[u8],
+        hash_type: FingerprintAlgorithm,
+    ) -> Result<Option<usize>, ArrowError> {
+        // If there is no schema store, prefixes are unrecognized.
+        if self.writer_schema_store.is_none() {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Need at least the magic bytes to decide (2 bytes).
+        let Some(magic_bytes) = buf.get(..SINGLE_OBJECT_MAGIC.len()) else {
+            return Ok(Some(0)); // Get more bytes
+        };
+        // Bail out early if the magic does not match.
+        if magic_bytes != SINGLE_OBJECT_MAGIC {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Try to parse the fingerprint that follows the magic.
+        let fingerprint_size = match hash_type {
+            FingerprintAlgorithm::Rabin => self
+                .handle_fingerprint::<8>(&buf[SINGLE_OBJECT_MAGIC.len()..], 
|bytes| {

Review Comment:
   I don't think the type annotation is necessary? At least my toy 
[playground](https://play.rust-lang.org/?version=stable&mode=debug&edition=2024&gist=656aad599b24f29e4cef1702425324f4)
 example didn't need it?
   ```suggestion
                   .handle_fingerprint(&buf[SINGLE_OBJECT_MAGIC.len()..], 
|bytes| {
   ```



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -154,39 +158,178 @@ impl Decoder {
     ///
     /// Returns the number of bytes consumed.
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        if self.active_fingerprint.is_none()
+            && self.writer_schema_store.is_some()
+            && data.len() >= SINGLE_OBJECT_MAGIC.len()
+            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+        {
+            return Err(ArrowError::ParseError(
+                "Expected single‑object encoding fingerprint prefix for first 
message \
+                 (writer_schema_store is set but active_fingerprint is None)"
+                    .into(),
+            ));
+        }
         let mut total_consumed = 0usize;
-        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
-            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
-            // A successful call to record_decoder.decode means one row was 
decoded.
-            // If `consumed` is 0 on a non-empty buffer, it implies a valid 
zero-byte record.
-            // We increment `decoded_rows` to mark progress and avoid an 
infinite loop.
-            // We add `consumed` (which can be 0) to `total_consumed`.
-            total_consumed += consumed;
-            self.decoded_rows += 1;
+        let hash_type = self.writer_schema_store.as_ref().map_or(
+            FingerprintAlgorithm::Rabin,
+            SchemaStore::fingerprint_algorithm,
+        );
+        // The loop stops when the batch is full, a schema change is staged,
+        // or handle_prefix indicates we need more bytes (Some(0)).
+        while total_consumed < data.len() && self.remaining_capacity > 0 {
+            match self.handle_prefix(&data[total_consumed..], hash_type)? {
+                None => {
+                    // No prefix: decode one row.
+                    let n = 
self.active_decoder.decode(&data[total_consumed..], 1)?;
+                    total_consumed += n;
+                    self.remaining_capacity -= 1;
+                }
+                Some(0) => {
+                    // Detected start of a prefix but need more bytes.
+                    break;
+                }
+                Some(n) => {
+                    // Consumed a complete prefix (n > 0). Stage flush and 
stop.
+                    total_consumed += n;
+                    break;
+                }
+            }
         }
         Ok(total_consumed)
     }
 
+    // Attempt to handle a single‑object‑encoding prefix at the current 
position.
+    //
+    // * Ok(None) – buffer does not start with the prefix.
+    // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller 
should await more bytes.
+    // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and 
fingerprint).
+    fn handle_prefix(
+        &mut self,
+        buf: &[u8],
+        hash_type: FingerprintAlgorithm,
+    ) -> Result<Option<usize>, ArrowError> {
+        // If there is no schema store, prefixes are unrecognized.
+        if self.writer_schema_store.is_none() {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Need at least the magic bytes to decide (2 bytes).
+        let Some(magic_bytes) = buf.get(..SINGLE_OBJECT_MAGIC.len()) else {
+            return Ok(Some(0)); // Get more bytes
+        };
+        // Bail out early if the magic does not match.
+        if magic_bytes != SINGLE_OBJECT_MAGIC {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Try to parse the fingerprint that follows the magic.

Review Comment:
   Do we have unit test coverage for all these corner cases?



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -182,21 +174,130 @@ impl Decoder {
             FingerprintAlgorithm::Rabin,
             SchemaStore::fingerprint_algorithm,
         );
+        // The loop stops when the batch is full, a schema change is staged,
+        // or handle_prefix indicates we need more bytes (Some(0)).
         while total_consumed < data.len() && self.remaining_capacity > 0 {
-            if let Some(prefix_bytes) = 
self.handle_prefix(&data[total_consumed..], hash_type)? {
-                // A batch is complete when its `remaining_capacity` is 0. It 
may be completed early if
-                // a schema change is detected or there are insufficient bytes 
to read the next prefix.
-                // A schema change requires a new batch.
-                total_consumed += prefix_bytes;
-                break;
+            match self.handle_prefix(&data[total_consumed..], hash_type)? {
+                None => {
+                    // No prefix: decode one row.
+                    let n = 
self.active_decoder.decode(&data[total_consumed..], 1)?;
+                    total_consumed += n;
+                    self.remaining_capacity -= 1;
+                }
+                Some(0) => {
+                    // Detected start of a prefix but need more bytes.
+                    break;
+                }

Review Comment:
   I'm not sure we gain much by splitting out the `Some(0)` case separately 
like this, when the behavior is the same as for `Some(n)` ? I also wonder if an 
`if let` is better here?
   ```rust
   if let Some(n) = self.handle_prefix(...)? {
       // We either consumed a prefix (n > 0) and need a schema switch, or we 
need 
       // more bytes to make a decision. Either way, this decoding attempt is 
finished.
       total_consumed += n;
       break;
   }
   
   // No prefix: decode one row and keep going.
   let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
   self.remaining_capacity -= 1;
   total_consumed += n;
   ```



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -154,39 +158,178 @@ impl Decoder {
     ///
     /// Returns the number of bytes consumed.
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        if self.active_fingerprint.is_none()
+            && self.writer_schema_store.is_some()
+            && data.len() >= SINGLE_OBJECT_MAGIC.len()
+            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+        {
+            return Err(ArrowError::ParseError(
+                "Expected single‑object encoding fingerprint prefix for first 
message \
+                 (writer_schema_store is set but active_fingerprint is None)"
+                    .into(),
+            ));
+        }
         let mut total_consumed = 0usize;
-        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
-            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
-            // A successful call to record_decoder.decode means one row was 
decoded.
-            // If `consumed` is 0 on a non-empty buffer, it implies a valid 
zero-byte record.
-            // We increment `decoded_rows` to mark progress and avoid an 
infinite loop.
-            // We add `consumed` (which can be 0) to `total_consumed`.
-            total_consumed += consumed;
-            self.decoded_rows += 1;
+        let hash_type = self.writer_schema_store.as_ref().map_or(
+            FingerprintAlgorithm::Rabin,
+            SchemaStore::fingerprint_algorithm,
+        );
+        // The loop stops when the batch is full, a schema change is staged,
+        // or handle_prefix indicates we need more bytes (Some(0)).
+        while total_consumed < data.len() && self.remaining_capacity > 0 {
+            match self.handle_prefix(&data[total_consumed..], hash_type)? {
+                None => {
+                    // No prefix: decode one row.
+                    let n = 
self.active_decoder.decode(&data[total_consumed..], 1)?;
+                    total_consumed += n;
+                    self.remaining_capacity -= 1;
+                }
+                Some(0) => {
+                    // Detected start of a prefix but need more bytes.
+                    break;
+                }
+                Some(n) => {
+                    // Consumed a complete prefix (n > 0). Stage flush and 
stop.
+                    total_consumed += n;
+                    break;
+                }
+            }
         }
         Ok(total_consumed)
     }
 
+    // Attempt to handle a single‑object‑encoding prefix at the current 
position.
+    //
+    // * Ok(None) – buffer does not start with the prefix.
+    // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller 
should await more bytes.
+    // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and 
fingerprint).
+    fn handle_prefix(
+        &mut self,
+        buf: &[u8],
+        hash_type: FingerprintAlgorithm,
+    ) -> Result<Option<usize>, ArrowError> {
+        // If there is no schema store, prefixes are unrecognized.
+        if self.writer_schema_store.is_none() {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Need at least the magic bytes to decide (2 bytes).
+        let Some(magic_bytes) = buf.get(..SINGLE_OBJECT_MAGIC.len()) else {
+            return Ok(Some(0)); // Get more bytes
+        };
+        // Bail out early if the magic does not match.
+        if magic_bytes != SINGLE_OBJECT_MAGIC {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Try to parse the fingerprint that follows the magic.
+        let fingerprint_size = match hash_type {
+            FingerprintAlgorithm::Rabin => self
+                .handle_fingerprint::<8>(&buf[SINGLE_OBJECT_MAGIC.len()..], 
|bytes| {
+                    Fingerprint::Rabin(u64::from_le_bytes(bytes))
+                })?,
+        };
+        // Convert the inner result into a “bytes consumed” count.
+        let consumed = match fingerprint_size {
+            Some(n) => n + SINGLE_OBJECT_MAGIC.len(), // magic + fingerprint
+            None => 0,                                // incomplete fingerprint
+        };
+        Ok(Some(consumed))
+    }
+
+    // Attempts to read and install a new fingerprint of `N` bytes.
+    //
+    // * Ok(None) – insufficient bytes (`buf.len() < `N`).
+    // * Ok(Some(N)) – fingerprint consumed (always `N`).
+    fn handle_fingerprint<const N: usize>(
+        &mut self,
+        buf: &[u8],
+        fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint,
+    ) -> Result<Option<usize>, ArrowError> {
+        // Need enough bytes to get fingerprint (next N bytes)
+        let Some(fingerprint_bytes) = buf.get(..N) else {
+            return Ok(None); // Get more bytes
+        };
+        // SAFETY: length checked above.
+        let new_fingerprint = 
fingerprint_from(fingerprint_bytes.try_into().unwrap());
+        // If the fingerprint indicates a schema change, prepare to switch 
decoders.
+        if self.active_fingerprint != Some(new_fingerprint) {
+            let new_decoder = match self.cache.shift_remove(&new_fingerprint) {
+                Some(decoder) => decoder,
+                None => self.create_decoder_for(new_fingerprint)?,
+            };
+            self.pending_schema = Some((new_fingerprint, new_decoder));
+            // If there are already decoded rows, we must flush them first.
+            // Reducing `remaining_capacity` to 0 ensures `flush` is called 
next.
+            if self.remaining_capacity < self.batch_size {
+                self.remaining_capacity = 0;
+            }
+        }
+        Ok(Some(N))
+    }
+
+    fn create_decoder_for(
+        &mut self,
+        new_fingerprint: Fingerprint,
+    ) -> Result<RecordDecoder, ArrowError> {
+        let writer_schema_store = self
+            .writer_schema_store
+            .as_ref()
+            .ok_or_else(|| ArrowError::ParseError("Schema store 
unavailable".into()))?;
+        let writer_schema = writer_schema_store
+            .lookup(&new_fingerprint)
+            .ok_or_else(|| {
+                ArrowError::ParseError(format!("Unknown fingerprint: 
{new_fingerprint:?}"))
+            })?;
+        match self.reader_schema {
+            Some(ref reader_schema) => {
+                let resolved = AvroField::resolve_from_writer_and_reader(
+                    writer_schema,
+                    reader_schema,
+                    self.utf8_view,
+                    self.strict_mode,
+                )?;
+                Ok(RecordDecoder::try_new_with_options(
+                    resolved.data_type(),
+                    self.utf8_view,
+                )?)
+            }
+            None => Err(ArrowError::ParseError(
+                "Reader schema unavailable for resolution".into(),
+            )),
+        }
+    }
+
     /// Produce a `RecordBatch` if at least one row is fully decoded, returning
     /// `Ok(None)` if no new rows are available.
     pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
-        if self.decoded_rows == 0 {
-            Ok(None)
-        } else {
-            let batch = self.record_decoder.flush()?;
-            self.decoded_rows = 0;
-            Ok(Some(batch))
+        if self.remaining_capacity == self.batch_size {
+            return Ok(None);
         }
+        let batch = self.active_decoder.flush()?;
+        self.remaining_capacity = self.batch_size;
+        // Apply any staged schema switch.
+        if let Some((new_fingerprint, new_decoder)) = 
self.pending_schema.take() {
+            if let Some(old_fingerprint) = 
self.active_fingerprint.replace(new_fingerprint) {
+                let old_decoder = std::mem::replace(&mut self.active_decoder, 
new_decoder);
+                self.cache.shift_remove(&old_fingerprint);
+                self.cache.insert(old_fingerprint, old_decoder);
+                if self.cache.len() > self.max_cache_size {
+                    self.cache.shift_remove_index(0);

Review Comment:
   This is the operation that enforces LRU, right? Newer entries at back, 
oldest at front gets kicked out to make space?



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -216,34 +365,97 @@ impl ReaderBuilder {
     /// - `batch_size` = 1024
     /// - `strict_mode` = false
     /// - `utf8_view` = false
-    /// - `schema` = None
+    /// - `reader_schema` = None
+    /// - `writer_schema_store` = None
+    /// - `active_fingerprint` = None
     pub fn new() -> Self {
         Self::default()
     }
 
-    fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> 
Result<RecordDecoder, ArrowError> {
-        let root_field = AvroFieldBuilder::new(schema)
-            .with_utf8view(self.utf8_view)
-            .with_strict_mode(self.strict_mode)
-            .build()?;
-        RecordDecoder::try_new_with_options(root_field.data_type(), 
self.utf8_view)
+    fn make_record_decoder<'a>(
+        &self,
+        writer_schema: &AvroSchema<'a>,
+        reader_schema: Option<&AvroSchema<'a>>,
+    ) -> Result<RecordDecoder, ArrowError> {
+        let field_builder = match reader_schema {
+            Some(rs) if !compare_schemas(writer_schema, rs)? => {
+                AvroFieldBuilder::new(writer_schema).with_reader_schema(rs)
+            }
+            Some(rs) => AvroFieldBuilder::new(rs),
+            None => AvroFieldBuilder::new(writer_schema),
+        }
+        .with_utf8view(self.utf8_view)
+        .with_strict_mode(self.strict_mode);
+        let root = field_builder.build()?;

Review Comment:
   aside: very odd (= hard to read) indentation... surprised fmt likes it but 
also have no idea what it should be instead? 
   Maybe we can avoid the whole issue by rearranging a bit?
   ```suggestion
           };
           let root = field_builder
               .with_utf8view(self.utf8_view)
               .with_strict_mode(self.strict_mode)
               .build()?;
   ```



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -216,34 +365,97 @@ impl ReaderBuilder {
     /// - `batch_size` = 1024
     /// - `strict_mode` = false
     /// - `utf8_view` = false
-    /// - `schema` = None
+    /// - `reader_schema` = None
+    /// - `writer_schema_store` = None
+    /// - `active_fingerprint` = None
     pub fn new() -> Self {
         Self::default()
     }
 
-    fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> 
Result<RecordDecoder, ArrowError> {
-        let root_field = AvroFieldBuilder::new(schema)
-            .with_utf8view(self.utf8_view)
-            .with_strict_mode(self.strict_mode)
-            .build()?;
-        RecordDecoder::try_new_with_options(root_field.data_type(), 
self.utf8_view)
+    fn make_record_decoder<'a>(
+        &self,
+        writer_schema: &AvroSchema<'a>,
+        reader_schema: Option<&AvroSchema<'a>>,
+    ) -> Result<RecordDecoder, ArrowError> {
+        let field_builder = match reader_schema {
+            Some(rs) if !compare_schemas(writer_schema, rs)? => {
+                AvroFieldBuilder::new(writer_schema).with_reader_schema(rs)
+            }
+            Some(rs) => AvroFieldBuilder::new(rs),
+            None => AvroFieldBuilder::new(writer_schema),
+        }
+        .with_utf8view(self.utf8_view)
+        .with_strict_mode(self.strict_mode);
+        let root = field_builder.build()?;
+        RecordDecoder::try_new_with_options(root.data_type(), self.utf8_view)
     }
 
-    fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, 
Decoder), ArrowError> {
-        let header = read_header(reader)?;
-        let record_decoder = if let Some(schema) = &self.schema {
-            self.make_record_decoder(schema)?
-        } else {
-            let avro_schema: Option<AvroSchema<'_>> = header
-                .schema()
-                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
-            let avro_schema = avro_schema.ok_or_else(|| {
-                ArrowError::ParseError("No Avro schema present in file 
header".to_string())
-            })?;
-            self.make_record_decoder(&avro_schema)?
-        };
-        let decoder = Decoder::new(record_decoder, self.batch_size);
-        Ok((header, decoder))
+    fn make_decoder_with_parts(
+        &self,
+        active_decoder: RecordDecoder,
+        active_fingerprint: Option<Fingerprint>,
+        reader_schema: Option<AvroSchema<'static>>,
+        writer_schema_store: Option<SchemaStore<'static>>,
+    ) -> Decoder {
+        Decoder {
+            batch_size: self.batch_size,
+            remaining_capacity: self.batch_size,
+            active_fingerprint,
+            active_decoder,
+            cache: IndexMap::new(),
+            max_cache_size: self.decoder_cache_size,
+            reader_schema,
+            utf8_view: self.utf8_view,
+            writer_schema_store,
+            strict_mode: self.strict_mode,
+            pending_schema: None,
+        }
+    }
+
+    fn make_decoder(&self, header: Option<&Header>) -> Result<Decoder, 
ArrowError> {
+        match header {
+            Some(hdr) => {
+                let writer_schema = hdr

Review Comment:
   To reduce nesting depth of the code here (for readability), suggest:
   ```rust
           if let Some(hdr) = header {
               let writer_schema = hdr
                 ...
               return Ok(...);
           }
   
           let reader_schema = ...
             ...
           Ok(self.make_decoder_with_parts(...))
   ```
   ... which reduces the `Some` case indentation level by one here and the 
`None` case indentation by two levels.



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -272,17 +484,92 @@ impl ReaderBuilder {
         self
     }
 
-    /// Sets the Avro schema.
+    /// Sets the Avro reader schema.
     ///
     /// If a schema is not provided, the schema will be read from the Avro 
file header.
-    pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self {
-        self.schema = Some(schema);
+    pub fn with_reader_schema(mut self, reader_schema: AvroSchema<'static>) -> 
Self {
+        self.reader_schema = Some(reader_schema);
         self
     }
 
+    /// Sets the `SchemaStore` used for resolving writer schemas.
+    ///
+    /// This is necessary when decoding single-object encoded data that 
identifies
+    /// schemas by a fingerprint. The store allows the decoder to look up the
+    /// full writer schema from a fingerprint embedded in the data.
+    ///
+    /// Defaults to `None`.
+    pub fn with_writer_schema_store(mut self, store: SchemaStore<'static>) -> 
Self {
+        self.writer_schema_store = Some(store);
+        self
+    }
+
+    /// Sets the initial schema fingerprint for decoding single-object encoded 
data.
+    ///
+    /// This is useful when the data stream does not begin with a schema 
definition
+    /// or fingerprint, allowing the decoder to start with a known schema from 
the
+    /// `SchemaStore`.
+    ///
+    /// Defaults to `None`.
+    pub fn with_active_fingerprint(mut self, fp: Fingerprint) -> Self {
+        self.active_fingerprint = Some(fp);
+        self
+    }
+
+    /// Set the maximum number of decoders to cache.
+    ///
+    /// When dealing with Avro files that contain multiple schemas, we may 
need to switch
+    /// between different decoders. This cache avoids rebuilding them from 
scratch every time.
+    ///
+    /// Defaults to `20`.
+    pub fn with_max_decoder_cache_size(mut self, n: usize) -> Self {
+        self.decoder_cache_size = n;
+        self
+    }
+
+    // Validate the builder configuration against this truth‑table
+    //
+    // | writer_schema_store | reader_schema | active_fingerprint | Result |
+    // |---------------------|---------------|--------------------|--------|
+    // | None----------------| None----------| None---------------| Err----|
+    // | None----------------| None----------| Some---------------| Err----|
+    // | None----------------| Some----------| None---------------| Ok-----|
+    // | None----------------| Some----------| Some---------------| Err----|
+    // | Some----------------| None----------| None---------------| Err----|
+    // | Some----------------| None----------| Some---------------| Err----|
+    // | Some----------------| Some----------| None---------------| Ok-----|
+    // | Some----------------| Some----------| Some---------------| Ok-----|
+    fn validate(&self) -> Result<(), ArrowError> {
+        match (
+            self.writer_schema_store.is_some(),
+            self.reader_schema.is_some(),
+            self.active_fingerprint.is_some(),
+        ) {
+            // Row 3: No store, reader schema present, no fingerprint
+            (false, true, false)
+            // Row 7: Store is present, reader schema is resent, no fingerprint
+            | (true, true, false)
+            // Row 8: Store present, reader schema present, fingerprint present
+            | (true, true, true) => Ok(()),

Review Comment:
   I liked how the error cases below had some `_` to reduce redundancy. But we 
have two choices here:
   ```suggestion
               // Store optional, reader schema present, no fingerprint (rows 3 
& 7)
               (_, true, false)
               // Row 8: Store present, reader schema present, fingerprint 
present
               | (true, true, true) => Ok(()),
   ```
   or
   ```suggestion
               // Row 3: No store, reader schema present, no fingerprint
               (false, true, false)
               // Store present, reader schema present, fingerprint optional 
(rows 7 & 8)
               | (true, true, _) => Ok(()),
   ```
   I'm guessing the second approach is the more logical grouping?
   
   (also, typo `resent` in row 7 comment)



##########
arrow-avro/src/schema.rs:
##########
@@ -15,12 +15,27 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use arrow_schema::ArrowError;
 use serde::{Deserialize, Serialize};
+use serde_json::{json, Value};
+use std::collections::hash_map::Entry;
 use std::collections::HashMap;
+use strum_macros::AsRefStr;
 
 /// The metadata key used for storing the JSON encoded [`Schema`]
 pub const SCHEMA_METADATA_KEY: &str = "avro.schema";
 
+/// The Avro single‑object encoding “magic” bytes (`0xC3 0x01`)
+pub const SINGLE_OBJECT_MAGIC: [u8; 2] = [0xC3, 0x01];

Review Comment:
   Out of curiosity, what in the avro spec prevents false positives with magic 
bytes? 
   Is there some reason it's impossible for those two bytes to appear as part 
of a normal record?



##########
arrow-avro/src/schema.rs:
##########
@@ -260,13 +276,389 @@ pub struct Fixed<'a> {
     pub attributes: Attributes<'a>,
 }
 
+/// Supported fingerprint algorithms for Avro schema identification.
+/// Currently only `Rabin` is supported, `SHA256` and `MD5` support will come 
in a future update
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+pub enum FingerprintAlgorithm {
+    /// 64‑bit CRC‑64‑AVRO Rabin fingerprint.
+    Rabin,
+}
+
+/// A schema fingerprint in one of the supported formats.
+///
+/// This is used as the key inside `SchemaStore` `HashMap`. Each `SchemaStore`
+/// instance always stores only one variant, matching its configured
+/// `FingerprintAlgorithm`, but the enum makes the API uniform.
+/// Currently only `Rabin` is supported
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints>
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+pub enum Fingerprint {
+    /// A 64-bit Rabin fingerprint.
+    Rabin(u64),
+}
+
+/// Allow easy extraction of the algorithm used to create a fingerprint.
+impl From<&Fingerprint> for FingerprintAlgorithm {
+    fn from(fp: &Fingerprint) -> Self {
+        match fp {
+            Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin,
+        }
+    }
+}
+
+/// Generates a fingerprint for the given `Schema` using the specified 
`FingerprintAlgorithm`.
+pub(crate) fn generate_fingerprint(
+    schema: &Schema,
+    hash_type: FingerprintAlgorithm,
+) -> Result<Fingerprint, ArrowError> {
+    let canonical = generate_canonical_form(schema).map_err(|e| {
+        ArrowError::ComputeError(format!("Failed to generate canonical form 
for schema: {e}"))
+    })?;
+    match hash_type {
+        FingerprintAlgorithm::Rabin => {
+            Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical)))
+        }
+    }
+}
+
+/// Generates the 64-bit Rabin fingerprint for the given `Schema`.
+///
+/// The fingerprint is computed from the canonical form of the schema.
+/// This is also known as `CRC-64-AVRO`.
+///
+/// # Returns
+/// A `Fingerprint::Rabin` variant containing the 64-bit fingerprint.
+pub fn generate_fingerprint_rabin(schema: &Schema) -> Result<Fingerprint, 
ArrowError> {
+    generate_fingerprint(schema, FingerprintAlgorithm::Rabin)
+}
+
+/// Generates the Parsed Canonical Form for the given [`Schema`].
+///
+/// The canonical form is a standardized JSON representation of the schema,
+/// primarily used for generating a schema fingerprint for equality checking.
+///
+/// This form strips attributes that do not affect the schema's identity,
+/// such as `doc` fields, `aliases`, and any properties not defined in the
+/// Avro specification.
+///
+/// 
<https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas>
+pub fn generate_canonical_form(schema: &Schema) -> Result<String, ArrowError> {
+    build_canonical(schema, None)
+}
+
+/// An in-memory cache of Avro schemas, indexed by their fingerprint.
+///
+/// `SchemaStore` provides a mechanism to store and retrieve Avro schemas 
efficiently.
+/// Each schema is associated with a unique [`Fingerprint`], which is 
generated based
+/// on the schema's canonical form and a specific hashing algorithm.
+///
+/// A `SchemaStore` instance is configured to use a single 
[`FingerprintAlgorithm`] such as Rabin,
+/// MD5 (not yet supported), or SHA256 (not yet supported) for all its 
operations.
+/// This ensures consistency when generating fingerprints and looking up 
schemas.
+/// All schemas registered will have their fingerprint computed with this 
algorithm, and
+/// lookups must use a matching fingerprint.
+///
+/// The lifetime parameter `'a` corresponds to the lifetime of the string 
slices
+/// contained within the stored [`Schema`] objects. This means the 
`SchemaStore`
+/// cannot outlive the data referenced by the schemas it contains.
+///
+/// # Examples
+///
+/// ```no_run
+/// // Create a new store with the default Rabin fingerprinting.
+/// use arrow_avro::schema::{PrimitiveType, Schema, SchemaStore, TypeName};
+///
+/// let mut store = SchemaStore::new();
+/// let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
+/// // Register the schema to get its fingerprint.
+/// let fingerprint = store.register(schema.clone()).unwrap();
+/// // Use the fingerprint to look up the schema.
+/// let retrieved_schema = store.lookup(&fingerprint).cloned();
+/// assert_eq!(retrieved_schema, Some(schema));
+/// ```
+#[derive(Debug, Clone)]
+pub struct SchemaStore<'a> {
+    /// The hashing algorithm used for generating fingerprints.
+    fingerprint_algorithm: FingerprintAlgorithm,
+    /// A map from a schema's fingerprint to the schema itself.
+    schemas: HashMap<Fingerprint, Schema<'a>>,
+}
+
+impl<'a> TryFrom<&'a [Schema<'a>]> for SchemaStore<'a> {
+    type Error = ArrowError;
+
+    /// Creates a `SchemaStore` from a slice of schemas.
+    /// Each schema in the slice is registered with the new store.
+    fn try_from(schemas: &'a [Schema<'a>]) -> Result<Self, Self::Error> {
+        let mut store = SchemaStore::new();
+        for schema in schemas {
+            store.register(schema.clone())?;
+        }
+        Ok(store)
+    }
+}
+
+impl<'a> Default for SchemaStore<'a> {
+    fn default() -> Self {
+        Self {
+            fingerprint_algorithm: FingerprintAlgorithm::Rabin,
+            schemas: HashMap::new(),
+        }
+    }
+}
+
+impl<'a> SchemaStore<'a> {
+    /// Creates an empty `SchemaStore` using the default fingerprinting 
algorithm (64-bit Rabin).
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Registers a schema with the store and returns its fingerprint.
+    ///
+    /// A fingerprint is calculated for the given schema using the store's 
configured
+    /// hash type. If a schema with the same fingerprint does not already 
exist in the
+    /// store, the new schema is inserted. If the fingerprint already exists, 
the
+    /// existing schema is not overwritten.
+    ///
+    /// # Arguments
+    ///
+    /// * `schema` - The schema to register.
+    ///
+    /// # Returns
+    ///
+    /// A `Result` containing the `Fingerprint` of the schema if successful,
+    /// or an `ArrowError` on failure.
+    pub fn register(&mut self, schema: Schema<'a>) -> Result<Fingerprint, 
ArrowError> {
+        let fp = generate_fingerprint(&schema, self.fingerprint_algorithm)?;
+        match self.schemas.entry(fp) {
+            Entry::Occupied(entry) => {
+                if entry.get() != &schema {
+                    return Err(ArrowError::ComputeError(format!(
+                        "Schema fingerprint collision detected for fingerprint 
{fp:?}"
+                    )));
+                }
+            }
+            Entry::Vacant(entry) => {
+                entry.insert(schema);
+            }
+        }
+        Ok(fp)
+    }
+
+    /// Looks up a schema by its `Fingerprint`.
+    ///
+    /// # Arguments
+    ///
+    /// * `fp` - A reference to the `Fingerprint` of the schema to look up.
+    ///
+    /// # Returns
+    ///
+    /// An `Option` containing a clone of the `Schema` if found, otherwise 
`None`.
+    pub fn lookup(&self, fp: &Fingerprint) -> Option<&Schema<'a>> {
+        self.schemas.get(fp)
+    }
+
+    /// Returns the `FingerprintAlgorithm` used by the `SchemaStore` for 
fingerprinting.
+    pub(crate) fn fingerprint_algorithm(&self) -> FingerprintAlgorithm {
+        self.fingerprint_algorithm
+    }
+}
+
+fn quote(s: &str) -> Result<String, ArrowError> {
+    serde_json::to_string(s)
+        .map_err(|e| ArrowError::ComputeError(format!("Failed to quote string: 
{e}")))
+}
+
+// Avro names are defined by a `name` and an optional `namespace`.
+// The full name is composed of the namespace and the name, separated by a dot.
+//
+// Avro specification defines two ways to specify a full name:
+// 1. The `name` attribute contains the full name (e.g., "a.b.c.d").
+//    In this case, the `namespace` attribute is ignored.
+// 2. The `name` attribute contains the simple name (e.g., "d") and the
+//    `namespace` attribute contains the namespace (e.g., "a.b.c").
+//
+// Each part of the name must match the regex `^[A-Za-z_][A-Za-z0-9_]*$`.
+// Complex paths with quotes or backticks like `a."hi".b` are not supported.
+//
+// This function constructs the full name and extracts the namespace,
+// handling both ways of specifying the name. It prioritizes a namespace
+// defined within the `name` attribute itself, then the explicit 
`namespace_attr`,
+// and finally the `enclosing_ns`.
+fn make_full_name(
+    name: &str,
+    namespace_attr: Option<&str>,
+    enclosing_ns: Option<&str>,
+) -> Result<(String, Option<String>), ArrowError> {
+    // `name` already contains a dot then treat as full-name, ignore namespace.
+    if let Some((ns, _)) = name.rsplit_once('.') {
+        return Ok((name.to_string(), Some(ns.to_string())));
+    }
+    Ok(match namespace_attr.or(enclosing_ns) {
+        Some(ns) => (format!("{ns}.{name}"), Some(ns.to_string())),
+        None => (name.to_string(), None),
+    })
+}
+
+fn build_canonical(schema: &Schema, enclosing_ns: Option<&str>) -> 
Result<String, ArrowError> {
+    Ok(match schema {
+        Schema::TypeName(tn) | Schema::Type(Type { r#type: tn, .. }) => match 
tn {
+            TypeName::Primitive(pt) => quote(pt.as_ref())?,
+            TypeName::Ref(name) => {
+                let (full_name, _) = make_full_name(name, None, enclosing_ns)?;
+                quote(&full_name)?
+            }
+        },
+        Schema::Union(branches) => format!(
+            "[{}]",
+            branches
+                .iter()
+                .map(|b| build_canonical(b, enclosing_ns))
+                .collect::<Result<Vec<_>, _>>()?
+                .join(",")
+        ),
+        Schema::Complex(ct) => match ct {
+            ComplexType::Record(r) => {
+                let (full_name, child_ns) = make_full_name(r.name, 
r.namespace, enclosing_ns)?;
+                let fields = r
+                    .fields
+                    .iter()
+                    .map(|f| {
+                        let field_type =
+                            build_canonical(&f.r#type, 
child_ns.as_deref().or(enclosing_ns))?;
+                        Ok(format!(
+                            "{{\"name\":{},\"type\":{}}}",
+                            quote(f.name)?,
+                            field_type
+                        ))
+                    })
+                    .collect::<Result<Vec<_>, ArrowError>>()?
+                    .join(",");
+                format!(
+                    "{{\"name\":{},\"type\":\"record\",\"fields\":[{}]}}",

Review Comment:
   I think a raw string literal would be easier to read (also, can embed 
`fields` directly)?
   ```suggestion
                       r#"{{"name":{},"type":"record","fields":[{fields}]}}"#,
   ```
   (several of these)



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -216,34 +330,98 @@ impl ReaderBuilder {
     /// - `batch_size` = 1024
     /// - `strict_mode` = false
     /// - `utf8_view` = false
-    /// - `schema` = None
+    /// - `reader_schema` = None
+    /// - `writer_schema_store` = None
+    /// - `active_fp` = None
+    /// - `static_store_mode` = false
     pub fn new() -> Self {
         Self::default()
     }
 
-    fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> 
Result<RecordDecoder, ArrowError> {
-        let root_field = AvroFieldBuilder::new(schema)
-            .with_utf8view(self.utf8_view)
-            .with_strict_mode(self.strict_mode)
-            .build()?;
-        RecordDecoder::try_new_with_options(root_field.data_type(), 
self.utf8_view)
+    fn make_record_decoder<'a>(
+        &self,
+        writer_schema: &AvroSchema<'a>,
+        reader_schema: Option<&AvroSchema<'a>>,
+    ) -> Result<RecordDecoder, ArrowError> {
+        let field_builder = match reader_schema {
+            Some(rs) if !compare_schemas(writer_schema, rs)? => {
+                AvroFieldBuilder::new(writer_schema).with_reader_schema(rs)
+            }
+            Some(rs) => AvroFieldBuilder::new(rs),
+            None => AvroFieldBuilder::new(writer_schema),
+        }
+        .with_utf8view(self.utf8_view)
+        .with_strict_mode(self.strict_mode);
+        let root = field_builder.build()?;
+        RecordDecoder::try_new_with_options(root.data_type(), self.utf8_view)
     }
 
-    fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, 
Decoder), ArrowError> {
-        let header = read_header(reader)?;
-        let record_decoder = if let Some(schema) = &self.schema {
-            self.make_record_decoder(schema)?
-        } else {
-            let avro_schema: Option<AvroSchema<'_>> = header
-                .schema()
-                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
-            let avro_schema = avro_schema.ok_or_else(|| {
-                ArrowError::ParseError("No Avro schema present in file 
header".to_string())
-            })?;
-            self.make_record_decoder(&avro_schema)?
-        };
-        let decoder = Decoder::new(record_decoder, self.batch_size);
-        Ok((header, decoder))
+    fn make_decoder_with_parts(
+        &self,
+        active_decoder: RecordDecoder,
+        active_fingerprint: Option<Fingerprint>,
+        reader_schema: Option<AvroSchema<'static>>,
+        writer_schema_store: Option<SchemaStore<'static>>,
+    ) -> Decoder {
+        Decoder {
+            batch_size: self.batch_size,
+            remaining_capacity: self.batch_size,
+            active_fingerprint,
+            active_decoder,
+            cache: IndexMap::new(),
+            max_cache_size: self.decoder_cache_size,
+            reader_schema,
+            utf8_view: self.utf8_view,
+            writer_schema_store,
+            strict_mode: self.strict_mode,
+            pending_schema: None,
+        }
+    }
+
+    fn make_decoder(&self, header: Option<&Header>) -> Result<Decoder, 
ArrowError> {
+        match header {
+            Some(hdr) => {
+                let writer_schema = hdr
+                    .schema()
+                    .map_err(|e| ArrowError::ExternalError(Box::new(e)))?
+                    .ok_or_else(|| {
+                        ArrowError::ParseError("No Avro schema present in file 
header".into())
+                    })?;
+                let record_decoder =
+                    self.make_record_decoder(&writer_schema, 
self.reader_schema.as_ref())?;
+                Ok(self.make_decoder_with_parts(record_decoder, None, None, 
None))
+            }
+            None => {
+                let reader_schema = self.reader_schema.clone().ok_or_else(|| {
+                    ArrowError::ParseError("Reader schema required for raw 
Avro".into())
+                })?;
+                let (init_fingerprint, initial_decoder) =
+                    if let (Some(schema_store), Some(fingerprint)) =
+                        (&self.writer_schema_store, self.active_fingerprint)
+                    {
+                        // An initial fingerprint is provided, use it to look 
up the first schema.
+                        let writer_schema = 
schema_store.lookup(&fingerprint).ok_or_else(|| {
+                            ArrowError::ParseError(
+                                "Active fingerprint not found in schema 
store".into(),
+                            )
+                        })?;
+                        let decoder =
+                            self.make_record_decoder(writer_schema, 
Some(&reader_schema))?;
+                        (Some(fingerprint), decoder)
+                    } else {
+                        // No initial fingerprint; the first record must 
contain one.
+                        // A decoder is created from the reader schema only.
+                        let decoder = self.make_record_decoder(&reader_schema, 
None)?;
+                        (None, decoder)
+                    };
+                Ok(self.make_decoder_with_parts(
+                    initial_decoder,
+                    init_fingerprint,

Review Comment:
   bump? we still have both initial_ and init_ here?



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -272,17 +484,92 @@ impl ReaderBuilder {
         self
     }
 
-    /// Sets the Avro schema.
+    /// Sets the Avro reader schema.
     ///
     /// If a schema is not provided, the schema will be read from the Avro 
file header.
-    pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self {
-        self.schema = Some(schema);
+    pub fn with_reader_schema(mut self, reader_schema: AvroSchema<'static>) -> 
Self {
+        self.reader_schema = Some(reader_schema);
         self
     }
 
+    /// Sets the `SchemaStore` used for resolving writer schemas.
+    ///
+    /// This is necessary when decoding single-object encoded data that 
identifies
+    /// schemas by a fingerprint. The store allows the decoder to look up the
+    /// full writer schema from a fingerprint embedded in the data.
+    ///
+    /// Defaults to `None`.
+    pub fn with_writer_schema_store(mut self, store: SchemaStore<'static>) -> 
Self {
+        self.writer_schema_store = Some(store);
+        self
+    }
+
+    /// Sets the initial schema fingerprint for decoding single-object encoded 
data.
+    ///
+    /// This is useful when the data stream does not begin with a schema 
definition
+    /// or fingerprint, allowing the decoder to start with a known schema from 
the
+    /// `SchemaStore`.
+    ///
+    /// Defaults to `None`.
+    pub fn with_active_fingerprint(mut self, fp: Fingerprint) -> Self {
+        self.active_fingerprint = Some(fp);
+        self
+    }
+
+    /// Set the maximum number of decoders to cache.
+    ///
+    /// When dealing with Avro files that contain multiple schemas, we may 
need to switch
+    /// between different decoders. This cache avoids rebuilding them from 
scratch every time.
+    ///
+    /// Defaults to `20`.
+    pub fn with_max_decoder_cache_size(mut self, n: usize) -> Self {
+        self.decoder_cache_size = n;
+        self
+    }
+
+    // Validate the builder configuration against this truth‑table
+    //
+    // | writer_schema_store | reader_schema | active_fingerprint | Result |
+    // |---------------------|---------------|--------------------|--------|
+    // | None----------------| None----------| None---------------| Err----|
+    // | None----------------| None----------| Some---------------| Err----|
+    // | None----------------| Some----------| None---------------| Ok-----|
+    // | None----------------| Some----------| Some---------------| Err----|
+    // | Some----------------| None----------| None---------------| Err----|
+    // | Some----------------| None----------| Some---------------| Err----|
+    // | Some----------------| Some----------| None---------------| Ok-----|
+    // | Some----------------| Some----------| Some---------------| Ok-----|
+    fn validate(&self) -> Result<(), ArrowError> {
+        match (
+            self.writer_schema_store.is_some(),
+            self.reader_schema.is_some(),
+            self.active_fingerprint.is_some(),
+        ) {
+            // Row 3: No store, reader schema present, no fingerprint
+            (false, true, false)
+            // Row 7: Store is present, reader schema is resent, no fingerprint
+            | (true, true, false)
+            // Row 8: Store present, reader schema present, fingerprint present
+            | (true, true, true) => Ok(()),
+            // Fingerprint without a store (rows 2 & 4)
+            (false, _, true) => Err(ArrowError::InvalidArgumentError(
+                "Active fingerprint requires a writer schema store".into(),
+            )),
+            // Store present but no reader schema (rows 5 & 6)
+            (true, false, _) => Err(ArrowError::InvalidArgumentError(
+                "Reader schema must be set when writer schema store is 
provided".into(),
+            )),
+            // No schema store or reader schema provided (row 1)
+            (false, false, _) => Err(ArrowError::InvalidArgumentError(

Review Comment:
   These two cases (other than the specific error message) can be combined?
   ```rust
               (_, false, _) => Err(ArrowError::InvalidArgumentError(...)),
   ```
   ... but what error message to give? The logic suggests that reader schema is 
always required, which shouldn't be true.
   
   I think the real issue is that the "row 1" match arm should be `(false, 
false, false)`? While `(false, false, true)` (row 2) is _also_ an error, it's 
already covered by a different match arm. Maybe we need to rework the logical 
groupings a bit?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to