scovich commented on code in PR #8006:
URL: https://github.com/apache/arrow-rs/pull/8006#discussion_r2249229633


##########
arrow-avro/src/reader/mod.rs:
##########
@@ -154,39 +165,175 @@ impl Decoder {
     ///
     /// Returns the number of bytes consumed.
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        if self.writer_schema_store.is_some()
+            && data.len() >= SINGLE_OBJECT_MAGIC.len()
+            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+        {
+            return Err(ArrowError::ParseError(
+                "Expected single‑object encoding fingerprint prefix for first 
message \
+                 (writer_schema_store is set but active_fingerprint is None)"
+                    .into(),
+            ));
+        }
         let mut total_consumed = 0usize;
-        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
-            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
-            // A successful call to record_decoder.decode means one row was 
decoded.
-            // If `consumed` is 0 on a non-empty buffer, it implies a valid 
zero-byte record.
-            // We increment `decoded_rows` to mark progress and avoid an 
infinite loop.
-            // We add `consumed` (which can be 0) to `total_consumed`.
-            total_consumed += consumed;
-            self.decoded_rows += 1;
+        let hash_type = self.writer_schema_store.as_ref().map_or(
+            FingerprintAlgorithm::Rabin,
+            SchemaStore::fingerprint_algorithm,
+        );
+        // The loop stops when the batch is full, a schema change is staged,
+        // or handle_prefix indicates we need more bytes (Some(0)).
+        while total_consumed < data.len() && self.remaining_capacity > 0 {
+            if let Some(n) = self.handle_prefix(&data[total_consumed..], 
hash_type)? {
+                // We either consumed a prefix (n > 0) and need a schema 
switch, or we need
+                // more bytes to make a decision. Either way, this decoding 
attempt is finished.
+                total_consumed += n;
+            }
+            // No prefix: decode one row and keep going.
+            let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
+            self.remaining_capacity -= 1;
+            total_consumed += n;
         }
         Ok(total_consumed)
     }
 
+    // Attempt to handle a single‑object‑encoding prefix at the current 
position.
+    //
+    // * Ok(None) – buffer does not start with the prefix.
+    // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller 
should await more bytes.
+    // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and 
fingerprint).
+    fn handle_prefix(
+        &mut self,
+        buf: &[u8],
+        hash_type: FingerprintAlgorithm,
+    ) -> Result<Option<usize>, ArrowError> {
+        // If there is no schema store, prefixes are unrecognized.
+        if self.writer_schema_store.is_none() {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Need at least the magic bytes to decide (2 bytes).
+        let Some(magic_bytes) = buf.get(..SINGLE_OBJECT_MAGIC.len()) else {
+            return Ok(Some(0)); // Get more bytes
+        };
+        // Bail out early if the magic does not match.
+        if magic_bytes != SINGLE_OBJECT_MAGIC {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Try to parse the fingerprint that follows the magic.
+        let fingerprint_size = match hash_type {
+            FingerprintAlgorithm::Rabin => self
+                .handle_fingerprint(&buf[SINGLE_OBJECT_MAGIC.len()..], |bytes| 
{
+                    Fingerprint::Rabin(u64::from_le_bytes(bytes))
+                })?,
+        };
+        // Convert the inner result into a “bytes consumed” count.
+        let consumed = match fingerprint_size {
+            Some(n) => n + SINGLE_OBJECT_MAGIC.len(), // magic + fingerprint
+            None => 0,                                // incomplete fingerprint
+        };
+        Ok(Some(consumed))
+    }
+
+    // Attempts to read and install a new fingerprint of `N` bytes.
+    //
+    // * Ok(None) – insufficient bytes (`buf.len() < `N`).
+    // * Ok(Some(N)) – fingerprint consumed (always `N`).
+    fn handle_fingerprint<const N: usize>(
+        &mut self,
+        buf: &[u8],
+        fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint,
+    ) -> Result<Option<usize>, ArrowError> {
+        // Need enough bytes to get fingerprint (next N bytes)
+        let Some(fingerprint_bytes) = buf.get(..N) else {
+            return Ok(None); // Get more bytes
+        };
+        // SAFETY: length checked above.
+        let new_fingerprint = 
fingerprint_from(fingerprint_bytes.try_into().unwrap());
+        // If the fingerprint indicates a schema change, prepare to switch 
decoders.
+        if self.active_fingerprint != Some(new_fingerprint) {
+            #[cfg(feature = "lru")]
+            let new_decoder = match self.cache.pop(&new_fingerprint) {
+                Some(decoder) => decoder,
+                None => self.create_decoder_for(new_fingerprint)?,
+            };
+            #[cfg(not(feature = "lru"))]
+            let new_decoder = match self.cache.shift_remove(&new_fingerprint) {
+                Some(decoder) => decoder,
+                None => self.create_decoder_for(new_fingerprint)?,
+            };
+            self.pending_schema = Some((new_fingerprint, new_decoder));
+            // If there are already decoded rows, we must flush them first.
+            // Reducing `remaining_capacity` to 0 ensures `flush` is called 
next.
+            if self.remaining_capacity < self.batch_size {
+                self.remaining_capacity = 0;
+            }
+        }
+        Ok(Some(N))
+    }
+
+    fn create_decoder_for(
+        &mut self,
+        new_fingerprint: Fingerprint,
+    ) -> Result<RecordDecoder, ArrowError> {
+        let writer_schema_store = self
+            .writer_schema_store
+            .as_ref()
+            .ok_or_else(|| ArrowError::ParseError("Schema store 
unavailable".into()))?;
+        let writer_schema = writer_schema_store
+            .lookup(&new_fingerprint)
+            .ok_or_else(|| {
+                ArrowError::ParseError(format!("Unknown fingerprint: 
{new_fingerprint:?}"))
+            })?;
+        let Some(ref reader_schema) = self.reader_schema else {
+            return Err(ArrowError::ParseError(
+                "Reader schema unavailable for resolution".into(),
+            ));
+        };
+        let resolved = AvroField::resolve_from_writer_and_reader(
+            writer_schema,
+            reader_schema,
+            self.utf8_view,
+            self.strict_mode,
+        )?;
+        RecordDecoder::try_new_with_options(resolved.data_type(), 
self.utf8_view)
+    }
+
     /// Produce a `RecordBatch` if at least one row is fully decoded, returning
     /// `Ok(None)` if no new rows are available.
     pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
-        if self.decoded_rows == 0 {
-            Ok(None)
-        } else {
-            let batch = self.record_decoder.flush()?;
-            self.decoded_rows = 0;
-            Ok(Some(batch))
+        if self.remaining_capacity == self.batch_size {
+            return Ok(None);
         }
+        let batch = self.active_decoder.flush()?;
+        self.remaining_capacity = self.batch_size;
+        // Apply any staged schema switch.
+        if let Some((new_fingerprint, new_decoder)) = 
self.pending_schema.take() {
+            if let Some(old_fingerprint) = 
self.active_fingerprint.replace(new_fingerprint) {
+                let old_decoder = std::mem::replace(&mut self.active_decoder, 
new_decoder);
+                #[cfg(feature = "lru")]
+                self.cache.put(old_fingerprint, old_decoder);
+                #[cfg(not(feature = "lru"))]
+                self.cache.shift_remove(&old_fingerprint);
+                #[cfg(not(feature = "lru"))]
+                self.cache.insert(old_fingerprint, old_decoder);
+                #[cfg(not(feature = "lru"))]
+                if self.cache.len() > self.max_cache_size {
+                    self.cache.shift_remove_index(0);
+                }

Review Comment:
   ```suggestion
                   #[cfg(not(feature = "lru"))]
                   {
                       self.cache.shift_remove(&old_fingerprint);
                       self.cache.insert(old_fingerprint, old_decoder);
                       if self.cache.len() > self.max_cache_size {
                           self.cache.shift_remove_index(0);
                       }
                   }
   ```



##########
arrow-avro/Cargo.toml:
##########
@@ -57,7 +57,8 @@ xz = { version = "0.1", default-features = false, optional = 
true }
 crc = { version = "3.0", optional = true }
 strum_macros = "0.27"
 uuid = "1.17"
-indexmap = "2.10"
+lru      = { version = "0.12", optional = true }
+indexmap = "2.10.0"

Review Comment:
   Out of curiosity, why is it important to have 2.10.0 as opposed to just 2.10 
generally?



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -154,39 +165,175 @@ impl Decoder {
     ///
     /// Returns the number of bytes consumed.
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        if self.writer_schema_store.is_some()
+            && data.len() >= SINGLE_OBJECT_MAGIC.len()
+            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+        {
+            return Err(ArrowError::ParseError(
+                "Expected single‑object encoding fingerprint prefix for first 
message \
+                 (writer_schema_store is set but active_fingerprint is None)"
+                    .into(),
+            ));
+        }
         let mut total_consumed = 0usize;
-        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
-            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
-            // A successful call to record_decoder.decode means one row was 
decoded.
-            // If `consumed` is 0 on a non-empty buffer, it implies a valid 
zero-byte record.
-            // We increment `decoded_rows` to mark progress and avoid an 
infinite loop.
-            // We add `consumed` (which can be 0) to `total_consumed`.
-            total_consumed += consumed;
-            self.decoded_rows += 1;
+        let hash_type = self.writer_schema_store.as_ref().map_or(
+            FingerprintAlgorithm::Rabin,
+            SchemaStore::fingerprint_algorithm,
+        );
+        // The loop stops when the batch is full, a schema change is staged,
+        // or handle_prefix indicates we need more bytes (Some(0)).
+        while total_consumed < data.len() && self.remaining_capacity > 0 {
+            if let Some(n) = self.handle_prefix(&data[total_consumed..], 
hash_type)? {
+                // We either consumed a prefix (n > 0) and need a schema 
switch, or we need
+                // more bytes to make a decision. Either way, this decoding 
attempt is finished.
+                total_consumed += n;
+            }
+            // No prefix: decode one row and keep going.
+            let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
+            self.remaining_capacity -= 1;
+            total_consumed += n;
         }
         Ok(total_consumed)
     }
 
+    // Attempt to handle a single‑object‑encoding prefix at the current 
position.
+    //
+    // * Ok(None) – buffer does not start with the prefix.
+    // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller 
should await more bytes.
+    // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and 
fingerprint).
+    fn handle_prefix(
+        &mut self,
+        buf: &[u8],
+        hash_type: FingerprintAlgorithm,
+    ) -> Result<Option<usize>, ArrowError> {
+        // If there is no schema store, prefixes are unrecognized.
+        if self.writer_schema_store.is_none() {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Need at least the magic bytes to decide (2 bytes).
+        let Some(magic_bytes) = buf.get(..SINGLE_OBJECT_MAGIC.len()) else {
+            return Ok(Some(0)); // Get more bytes
+        };
+        // Bail out early if the magic does not match.
+        if magic_bytes != SINGLE_OBJECT_MAGIC {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Try to parse the fingerprint that follows the magic.
+        let fingerprint_size = match hash_type {
+            FingerprintAlgorithm::Rabin => self
+                .handle_fingerprint(&buf[SINGLE_OBJECT_MAGIC.len()..], |bytes| 
{
+                    Fingerprint::Rabin(u64::from_le_bytes(bytes))
+                })?,
+        };
+        // Convert the inner result into a “bytes consumed” count.
+        let consumed = match fingerprint_size {
+            Some(n) => n + SINGLE_OBJECT_MAGIC.len(), // magic + fingerprint
+            None => 0,                                // incomplete fingerprint
+        };
+        Ok(Some(consumed))
+    }
+
+    // Attempts to read and install a new fingerprint of `N` bytes.
+    //
+    // * Ok(None) – insufficient bytes (`buf.len() < `N`).
+    // * Ok(Some(N)) – fingerprint consumed (always `N`).
+    fn handle_fingerprint<const N: usize>(
+        &mut self,
+        buf: &[u8],
+        fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint,
+    ) -> Result<Option<usize>, ArrowError> {
+        // Need enough bytes to get fingerprint (next N bytes)
+        let Some(fingerprint_bytes) = buf.get(..N) else {
+            return Ok(None); // Get more bytes
+        };
+        // SAFETY: length checked above.
+        let new_fingerprint = 
fingerprint_from(fingerprint_bytes.try_into().unwrap());
+        // If the fingerprint indicates a schema change, prepare to switch 
decoders.
+        if self.active_fingerprint != Some(new_fingerprint) {
+            #[cfg(feature = "lru")]
+            let new_decoder = match self.cache.pop(&new_fingerprint) {
+                Some(decoder) => decoder,
+                None => self.create_decoder_for(new_fingerprint)?,
+            };
+            #[cfg(not(feature = "lru"))]
+            let new_decoder = match self.cache.shift_remove(&new_fingerprint) {
+                Some(decoder) => decoder,
+                None => self.create_decoder_for(new_fingerprint)?,
+            };
+            self.pending_schema = Some((new_fingerprint, new_decoder));
+            // If there are already decoded rows, we must flush them first.
+            // Reducing `remaining_capacity` to 0 ensures `flush` is called 
next.
+            if self.remaining_capacity < self.batch_size {
+                self.remaining_capacity = 0;
+            }
+        }
+        Ok(Some(N))
+    }
+
+    fn create_decoder_for(
+        &mut self,
+        new_fingerprint: Fingerprint,
+    ) -> Result<RecordDecoder, ArrowError> {
+        let writer_schema_store = self
+            .writer_schema_store
+            .as_ref()
+            .ok_or_else(|| ArrowError::ParseError("Schema store 
unavailable".into()))?;

Review Comment:
   ```suggestion
           let Some(ref writer_schema_store) = self.writer_schema_store else {
               return Err(ArrowError::ParseError("Schema store 
unavailable".into())));
           };
   ```
   aside: We can safely `unwrap` because the caller already checked. But since 
we anyway have to return a `Result` we may as well leave this code in its 
robust current form?
   



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -294,20 +540,14 @@ impl ReaderBuilder {
         })
     }
 
-    /// Create a [`Decoder`] from this builder and a `BufRead` by
-    /// reading and parsing the Avro file's header. This will
-    /// not create a full [`Reader`].
-    pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder, 
ArrowError> {
-        match self.schema {
-            Some(ref schema) => {
-                let record_decoder = self.make_record_decoder(schema)?;
-                Ok(Decoder::new(record_decoder, self.batch_size))
-            }
-            None => {
-                let (_, decoder) = self.build_impl(&mut reader)?;
-                Ok(decoder)
-            }
+    /// Create a [`Decoder`] from this builder.
+    pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
+        if self.writer_schema_store.is_none() {
+            return Err(ArrowError::InvalidArgumentError(
+                "Cannot build a decoder without a writer schema 
store".to_string(),

Review Comment:
   Double negatives are hard to read
   ```suggestion
                   "Building a decoder requires a writer schema 
store".to_string(),
   ```



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -124,23 +132,26 @@ fn read_header<R: BufRead>(mut reader: R) -> 
Result<Header, ArrowError> {
 /// A low-level interface for decoding Avro-encoded bytes into Arrow 
`RecordBatch`.
 #[derive(Debug)]
 pub struct Decoder {
-    record_decoder: RecordDecoder,
+    active_decoder: RecordDecoder,
+    active_fingerprint: Option<Fingerprint>,
     batch_size: usize,
-    decoded_rows: usize,
+    remaining_capacity: usize,
+    #[cfg(feature = "lru")]
+    cache: LruCache<Fingerprint, RecordDecoder>,
+    #[cfg(not(feature = "lru"))]
+    cache: IndexMap<Fingerprint, RecordDecoder>,
+    max_cache_size: usize,
+    reader_schema: Option<AvroSchema<'static>>,
+    writer_schema_store: Option<SchemaStore<'static>>,

Review Comment:
   I just realized -- these are forcing static lifetimes. Does that mean memory 
they reference must leak for all practical purposes? 
   
   (I don't actually understand why avro schemas have lifetimes in the first 
place? Especially if they anyway require static lifetime in practice?)



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -216,34 +369,91 @@ impl ReaderBuilder {
     /// - `batch_size` = 1024
     /// - `strict_mode` = false
     /// - `utf8_view` = false
-    /// - `schema` = None
+    /// - `reader_schema` = None
+    /// - `writer_schema_store` = None
+    /// - `active_fingerprint` = None
     pub fn new() -> Self {
         Self::default()
     }
 
-    fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> 
Result<RecordDecoder, ArrowError> {
-        let root_field = AvroFieldBuilder::new(schema)
-            .with_utf8view(self.utf8_view)
-            .with_strict_mode(self.strict_mode)
-            .build()?;
-        RecordDecoder::try_new_with_options(root_field.data_type(), 
self.utf8_view)
+    fn make_record_decoder<'a>(
+        &self,
+        writer_schema: &AvroSchema<'a>,
+        reader_schema: Option<&AvroSchema<'a>>,
+    ) -> Result<RecordDecoder, ArrowError> {
+        let root = match reader_schema {
+            Some(reader_schema) if !compare_schemas(writer_schema, 
reader_schema)? => {
+                
AvroFieldBuilder::new(writer_schema).with_reader_schema(reader_schema)
+            }
+            _ => AvroFieldBuilder::new(writer_schema),
+        }
+        .with_utf8view(self.utf8_view)
+        .with_strict_mode(self.strict_mode)
+        .build()?;
+        RecordDecoder::try_new_with_options(root.data_type(), self.utf8_view)
+    }
+
+    fn make_decoder_with_parts(
+        &self,
+        active_decoder: RecordDecoder,
+        active_fingerprint: Option<Fingerprint>,
+        reader_schema: Option<AvroSchema<'static>>,
+        writer_schema_store: Option<SchemaStore<'static>>,
+    ) -> Decoder {
+        #[cfg(feature = "lru")]
+        let capacity = 
NonZeroUsize::new(self.decoder_cache_size).unwrap_or(NonZeroUsize::MIN); // 
NonZeroUsize::MIN is 1

Review Comment:
   Surprised fmt allowed such a long line? Is the CI fmt job not aware of this 
crate yet or something?



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -154,39 +165,175 @@ impl Decoder {
     ///
     /// Returns the number of bytes consumed.
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        if self.writer_schema_store.is_some()
+            && data.len() >= SINGLE_OBJECT_MAGIC.len()
+            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+        {
+            return Err(ArrowError::ParseError(
+                "Expected single‑object encoding fingerprint prefix for first 
message \
+                 (writer_schema_store is set but active_fingerprint is None)"
+                    .into(),
+            ));
+        }
         let mut total_consumed = 0usize;
-        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
-            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
-            // A successful call to record_decoder.decode means one row was 
decoded.
-            // If `consumed` is 0 on a non-empty buffer, it implies a valid 
zero-byte record.
-            // We increment `decoded_rows` to mark progress and avoid an 
infinite loop.
-            // We add `consumed` (which can be 0) to `total_consumed`.
-            total_consumed += consumed;
-            self.decoded_rows += 1;
+        let hash_type = self.writer_schema_store.as_ref().map_or(
+            FingerprintAlgorithm::Rabin,
+            SchemaStore::fingerprint_algorithm,
+        );
+        // The loop stops when the batch is full, a schema change is staged,
+        // or handle_prefix indicates we need more bytes (Some(0)).
+        while total_consumed < data.len() && self.remaining_capacity > 0 {
+            if let Some(n) = self.handle_prefix(&data[total_consumed..], 
hash_type)? {
+                // We either consumed a prefix (n > 0) and need a schema 
switch, or we need
+                // more bytes to make a decision. Either way, this decoding 
attempt is finished.
+                total_consumed += n;
+            }
+            // No prefix: decode one row and keep going.
+            let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
+            self.remaining_capacity -= 1;
+            total_consumed += n;
         }
         Ok(total_consumed)
     }
 
+    // Attempt to handle a single‑object‑encoding prefix at the current 
position.
+    //
+    // * Ok(None) – buffer does not start with the prefix.
+    // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller 
should await more bytes.
+    // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and 
fingerprint).
+    fn handle_prefix(
+        &mut self,
+        buf: &[u8],
+        hash_type: FingerprintAlgorithm,
+    ) -> Result<Option<usize>, ArrowError> {
+        // If there is no schema store, prefixes are unrecognized.
+        if self.writer_schema_store.is_none() {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Need at least the magic bytes to decide (2 bytes).
+        let Some(magic_bytes) = buf.get(..SINGLE_OBJECT_MAGIC.len()) else {
+            return Ok(Some(0)); // Get more bytes
+        };
+        // Bail out early if the magic does not match.
+        if magic_bytes != SINGLE_OBJECT_MAGIC {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Try to parse the fingerprint that follows the magic.
+        let fingerprint_size = match hash_type {
+            FingerprintAlgorithm::Rabin => self
+                .handle_fingerprint(&buf[SINGLE_OBJECT_MAGIC.len()..], |bytes| 
{
+                    Fingerprint::Rabin(u64::from_le_bytes(bytes))
+                })?,
+        };
+        // Convert the inner result into a “bytes consumed” count.
+        let consumed = match fingerprint_size {
+            Some(n) => n + SINGLE_OBJECT_MAGIC.len(), // magic + fingerprint
+            None => 0,                                // incomplete fingerprint
+        };
+        Ok(Some(consumed))
+    }
+
+    // Attempts to read and install a new fingerprint of `N` bytes.
+    //
+    // * Ok(None) – insufficient bytes (`buf.len() < `N`).
+    // * Ok(Some(N)) – fingerprint consumed (always `N`).
+    fn handle_fingerprint<const N: usize>(
+        &mut self,
+        buf: &[u8],
+        fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint,
+    ) -> Result<Option<usize>, ArrowError> {
+        // Need enough bytes to get fingerprint (next N bytes)
+        let Some(fingerprint_bytes) = buf.get(..N) else {
+            return Ok(None); // Get more bytes

Review Comment:
   ```suggestion
               return Ok(None); // Insufficient bytes
   ```



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -154,39 +165,175 @@ impl Decoder {
     ///
     /// Returns the number of bytes consumed.
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        if self.writer_schema_store.is_some()
+            && data.len() >= SINGLE_OBJECT_MAGIC.len()
+            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+        {
+            return Err(ArrowError::ParseError(
+                "Expected single‑object encoding fingerprint prefix for first 
message \
+                 (writer_schema_store is set but active_fingerprint is None)"
+                    .into(),
+            ));
+        }
         let mut total_consumed = 0usize;
-        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
-            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
-            // A successful call to record_decoder.decode means one row was 
decoded.
-            // If `consumed` is 0 on a non-empty buffer, it implies a valid 
zero-byte record.
-            // We increment `decoded_rows` to mark progress and avoid an 
infinite loop.
-            // We add `consumed` (which can be 0) to `total_consumed`.
-            total_consumed += consumed;
-            self.decoded_rows += 1;
+        let hash_type = self.writer_schema_store.as_ref().map_or(
+            FingerprintAlgorithm::Rabin,
+            SchemaStore::fingerprint_algorithm,
+        );
+        // The loop stops when the batch is full, a schema change is staged,
+        // or handle_prefix indicates we need more bytes (Some(0)).
+        while total_consumed < data.len() && self.remaining_capacity > 0 {
+            if let Some(n) = self.handle_prefix(&data[total_consumed..], 
hash_type)? {
+                // We either consumed a prefix (n > 0) and need a schema 
switch, or we need
+                // more bytes to make a decision. Either way, this decoding 
attempt is finished.
+                total_consumed += n;
+            }
+            // No prefix: decode one row and keep going.
+            let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
+            self.remaining_capacity -= 1;
+            total_consumed += n;
         }
         Ok(total_consumed)
     }
 
+    // Attempt to handle a single‑object‑encoding prefix at the current 
position.
+    //
+    // * Ok(None) – buffer does not start with the prefix.
+    // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller 
should await more bytes.
+    // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and 
fingerprint).
+    fn handle_prefix(
+        &mut self,
+        buf: &[u8],
+        hash_type: FingerprintAlgorithm,
+    ) -> Result<Option<usize>, ArrowError> {
+        // If there is no schema store, prefixes are unrecognized.
+        if self.writer_schema_store.is_none() {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Need at least the magic bytes to decide (2 bytes).
+        let Some(magic_bytes) = buf.get(..SINGLE_OBJECT_MAGIC.len()) else {
+            return Ok(Some(0)); // Get more bytes
+        };
+        // Bail out early if the magic does not match.
+        if magic_bytes != SINGLE_OBJECT_MAGIC {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Try to parse the fingerprint that follows the magic.
+        let fingerprint_size = match hash_type {
+            FingerprintAlgorithm::Rabin => self
+                .handle_fingerprint(&buf[SINGLE_OBJECT_MAGIC.len()..], |bytes| 
{

Review Comment:
   Eventually, we'll want to factor out this buf slicing because it's the same 
for every fingerprint algo. 
   Not a big deal for this PR tho.



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -216,34 +369,91 @@ impl ReaderBuilder {
     /// - `batch_size` = 1024
     /// - `strict_mode` = false
     /// - `utf8_view` = false
-    /// - `schema` = None
+    /// - `reader_schema` = None
+    /// - `writer_schema_store` = None
+    /// - `active_fingerprint` = None
     pub fn new() -> Self {
         Self::default()
     }
 
-    fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> 
Result<RecordDecoder, ArrowError> {
-        let root_field = AvroFieldBuilder::new(schema)
-            .with_utf8view(self.utf8_view)
-            .with_strict_mode(self.strict_mode)
-            .build()?;
-        RecordDecoder::try_new_with_options(root_field.data_type(), 
self.utf8_view)
+    fn make_record_decoder<'a>(
+        &self,
+        writer_schema: &AvroSchema<'a>,
+        reader_schema: Option<&AvroSchema<'a>>,
+    ) -> Result<RecordDecoder, ArrowError> {
+        let root = match reader_schema {
+            Some(reader_schema) if !compare_schemas(writer_schema, 
reader_schema)? => {
+                
AvroFieldBuilder::new(writer_schema).with_reader_schema(reader_schema)
+            }
+            _ => AvroFieldBuilder::new(writer_schema),
+        }
+        .with_utf8view(self.utf8_view)
+        .with_strict_mode(self.strict_mode)
+        .build()?;
+        RecordDecoder::try_new_with_options(root.data_type(), self.utf8_view)
+    }
+
+    fn make_decoder_with_parts(
+        &self,
+        active_decoder: RecordDecoder,
+        active_fingerprint: Option<Fingerprint>,
+        reader_schema: Option<AvroSchema<'static>>,
+        writer_schema_store: Option<SchemaStore<'static>>,
+    ) -> Decoder {
+        #[cfg(feature = "lru")]
+        let capacity = 
NonZeroUsize::new(self.decoder_cache_size).unwrap_or(NonZeroUsize::MIN); // 
NonZeroUsize::MIN is 1
+        Decoder {
+            batch_size: self.batch_size,
+            remaining_capacity: self.batch_size,
+            active_fingerprint,
+            active_decoder,
+            #[cfg(feature = "lru")]
+            cache: LruCache::new(capacity),
+            #[cfg(not(feature = "lru"))]
+            cache: IndexMap::new(),
+            max_cache_size: self.decoder_cache_size,
+            reader_schema,
+            utf8_view: self.utf8_view,
+            writer_schema_store,
+            strict_mode: self.strict_mode,
+            pending_schema: None,
+        }
     }
 
-    fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, 
Decoder), ArrowError> {
-        let header = read_header(reader)?;
-        let record_decoder = if let Some(schema) = &self.schema {
-            self.make_record_decoder(schema)?
-        } else {
-            let avro_schema: Option<AvroSchema<'_>> = header
+    fn make_decoder(&self, header: Option<&Header>) -> Result<Decoder, 
ArrowError> {
+        if let Some(hdr) = header {
+            let writer_schema = hdr
                 .schema()
-                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
-            let avro_schema = avro_schema.ok_or_else(|| {
-                ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?
+                .ok_or_else(|| {
+                    ArrowError::ParseError("No Avro schema present in file 
header".into())
+                })?;
+            let record_decoder =
+                self.make_record_decoder(&writer_schema, 
self.reader_schema.as_ref())?;
+            return Ok(self.make_decoder_with_parts(record_decoder, None, None, 
None));
+        }
+        let writer_schema_store = 
self.writer_schema_store.as_ref().ok_or_else(|| {
+            ArrowError::ParseError("Writer schema store required for raw 
Avro".into())
+        })?;
+        let fingerprint = self
+            .active_fingerprint
+            .or_else(|| writer_schema_store.fingerprints().into_iter().next())
+            .ok_or_else(|| {
+                ArrowError::ParseError(
+                    "Writer schema store must contain at least one 
schema".into(),
+                )
             })?;
-            self.make_record_decoder(&avro_schema)?
-        };
-        let decoder = Decoder::new(record_decoder, self.batch_size);
-        Ok((header, decoder))
+        let writer_schema = 
writer_schema_store.lookup(&fingerprint).ok_or_else(|| {
+            ArrowError::ParseError("Active fingerprint not found in schema 
store".into())
+        })?;
+        let record_decoder =
+            self.make_record_decoder(writer_schema, 
self.reader_schema.as_ref())?;
+        Ok(self.make_decoder_with_parts(
+            record_decoder,
+            Some(fingerprint),
+            self.reader_schema.clone(),
+            self.writer_schema_store.clone(),

Review Comment:
   This is a weird code smell. Some of the state comes from `self` internally, 
while other state has to be copied manually by caller even tho it's _also_ 
coming from `self`?
   
   We know that the `writer_schema_store` should only be set for this raw 
decoder path (see other comment), so it shouldn't be a member of `self` at all 
and instead passed directly as an argument to `build_decoder`. That takes care 
of the one. But what about the reader schema? What's the story behind the smell 
there?



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -154,39 +165,175 @@ impl Decoder {
     ///
     /// Returns the number of bytes consumed.
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        if self.writer_schema_store.is_some()
+            && data.len() >= SINGLE_OBJECT_MAGIC.len()
+            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+        {
+            return Err(ArrowError::ParseError(
+                "Expected single‑object encoding fingerprint prefix for first 
message \
+                 (writer_schema_store is set but active_fingerprint is None)"
+                    .into(),
+            ));
+        }
         let mut total_consumed = 0usize;
-        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
-            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
-            // A successful call to record_decoder.decode means one row was 
decoded.
-            // If `consumed` is 0 on a non-empty buffer, it implies a valid 
zero-byte record.
-            // We increment `decoded_rows` to mark progress and avoid an 
infinite loop.
-            // We add `consumed` (which can be 0) to `total_consumed`.
-            total_consumed += consumed;
-            self.decoded_rows += 1;
+        let hash_type = self.writer_schema_store.as_ref().map_or(
+            FingerprintAlgorithm::Rabin,
+            SchemaStore::fingerprint_algorithm,
+        );
+        // The loop stops when the batch is full, a schema change is staged,
+        // or handle_prefix indicates we need more bytes (Some(0)).
+        while total_consumed < data.len() && self.remaining_capacity > 0 {
+            if let Some(n) = self.handle_prefix(&data[total_consumed..], 
hash_type)? {
+                // We either consumed a prefix (n > 0) and need a schema 
switch, or we need
+                // more bytes to make a decision. Either way, this decoding 
attempt is finished.
+                total_consumed += n;
+            }
+            // No prefix: decode one row and keep going.
+            let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
+            self.remaining_capacity -= 1;
+            total_consumed += n;
         }
         Ok(total_consumed)
     }
 
+    // Attempt to handle a single‑object‑encoding prefix at the current 
position.
+    //
+    // * Ok(None) – buffer does not start with the prefix.
+    // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller 
should await more bytes.
+    // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and 
fingerprint).
+    fn handle_prefix(
+        &mut self,
+        buf: &[u8],
+        hash_type: FingerprintAlgorithm,
+    ) -> Result<Option<usize>, ArrowError> {
+        // If there is no schema store, prefixes are unrecognized.
+        if self.writer_schema_store.is_none() {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Need at least the magic bytes to decide (2 bytes).
+        let Some(magic_bytes) = buf.get(..SINGLE_OBJECT_MAGIC.len()) else {
+            return Ok(Some(0)); // Get more bytes
+        };
+        // Bail out early if the magic does not match.
+        if magic_bytes != SINGLE_OBJECT_MAGIC {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Try to parse the fingerprint that follows the magic.
+        let fingerprint_size = match hash_type {
+            FingerprintAlgorithm::Rabin => self
+                .handle_fingerprint(&buf[SINGLE_OBJECT_MAGIC.len()..], |bytes| 
{
+                    Fingerprint::Rabin(u64::from_le_bytes(bytes))
+                })?,
+        };
+        // Convert the inner result into a “bytes consumed” count.
+        let consumed = match fingerprint_size {
+            Some(n) => n + SINGLE_OBJECT_MAGIC.len(), // magic + fingerprint
+            None => 0,                                // incomplete fingerprint
+        };

Review Comment:
   ```suggestion
           // NOTE: Incomplete fingerprint consumes no bytes.
           let consumed = fingerprint_size.map_or(0, |n| n + 
SINGLE_OBJECT_MAGIC.len());
   ```



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -154,39 +165,175 @@ impl Decoder {
     ///
     /// Returns the number of bytes consumed.
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        if self.writer_schema_store.is_some()
+            && data.len() >= SINGLE_OBJECT_MAGIC.len()
+            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+        {
+            return Err(ArrowError::ParseError(
+                "Expected single‑object encoding fingerprint prefix for first 
message \
+                 (writer_schema_store is set but active_fingerprint is None)"
+                    .into(),
+            ));
+        }
         let mut total_consumed = 0usize;
-        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
-            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
-            // A successful call to record_decoder.decode means one row was 
decoded.
-            // If `consumed` is 0 on a non-empty buffer, it implies a valid 
zero-byte record.
-            // We increment `decoded_rows` to mark progress and avoid an 
infinite loop.
-            // We add `consumed` (which can be 0) to `total_consumed`.
-            total_consumed += consumed;
-            self.decoded_rows += 1;
+        let hash_type = self.writer_schema_store.as_ref().map_or(
+            FingerprintAlgorithm::Rabin,
+            SchemaStore::fingerprint_algorithm,
+        );
+        // The loop stops when the batch is full, a schema change is staged,
+        // or handle_prefix indicates we need more bytes (Some(0)).
+        while total_consumed < data.len() && self.remaining_capacity > 0 {
+            if let Some(n) = self.handle_prefix(&data[total_consumed..], 
hash_type)? {
+                // We either consumed a prefix (n > 0) and need a schema 
switch, or we need
+                // more bytes to make a decision. Either way, this decoding 
attempt is finished.
+                total_consumed += n;
+            }
+            // No prefix: decode one row and keep going.
+            let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
+            self.remaining_capacity -= 1;
+            total_consumed += n;
         }
         Ok(total_consumed)
     }
 
+    // Attempt to handle a single‑object‑encoding prefix at the current 
position.
+    //
+    // * Ok(None) – buffer does not start with the prefix.
+    // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller 
should await more bytes.
+    // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and 
fingerprint).
+    fn handle_prefix(
+        &mut self,
+        buf: &[u8],
+        hash_type: FingerprintAlgorithm,
+    ) -> Result<Option<usize>, ArrowError> {
+        // If there is no schema store, prefixes are unrecognized.
+        if self.writer_schema_store.is_none() {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Need at least the magic bytes to decide (2 bytes).
+        let Some(magic_bytes) = buf.get(..SINGLE_OBJECT_MAGIC.len()) else {
+            return Ok(Some(0)); // Get more bytes
+        };
+        // Bail out early if the magic does not match.
+        if magic_bytes != SINGLE_OBJECT_MAGIC {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Try to parse the fingerprint that follows the magic.
+        let fingerprint_size = match hash_type {
+            FingerprintAlgorithm::Rabin => self
+                .handle_fingerprint(&buf[SINGLE_OBJECT_MAGIC.len()..], |bytes| 
{
+                    Fingerprint::Rabin(u64::from_le_bytes(bytes))
+                })?,
+        };
+        // Convert the inner result into a “bytes consumed” count.
+        let consumed = match fingerprint_size {
+            Some(n) => n + SINGLE_OBJECT_MAGIC.len(), // magic + fingerprint
+            None => 0,                                // incomplete fingerprint
+        };
+        Ok(Some(consumed))
+    }
+
+    // Attempts to read and install a new fingerprint of `N` bytes.
+    //
+    // * Ok(None) – insufficient bytes (`buf.len() < `N`).
+    // * Ok(Some(N)) – fingerprint consumed (always `N`).
+    fn handle_fingerprint<const N: usize>(
+        &mut self,
+        buf: &[u8],
+        fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint,
+    ) -> Result<Option<usize>, ArrowError> {
+        // Need enough bytes to get fingerprint (next N bytes)
+        let Some(fingerprint_bytes) = buf.get(..N) else {
+            return Ok(None); // Get more bytes
+        };
+        // SAFETY: length checked above.
+        let new_fingerprint = 
fingerprint_from(fingerprint_bytes.try_into().unwrap());
+        // If the fingerprint indicates a schema change, prepare to switch 
decoders.
+        if self.active_fingerprint != Some(new_fingerprint) {
+            #[cfg(feature = "lru")]
+            let new_decoder = match self.cache.pop(&new_fingerprint) {
+                Some(decoder) => decoder,
+                None => self.create_decoder_for(new_fingerprint)?,
+            };
+            #[cfg(not(feature = "lru"))]
+            let new_decoder = match self.cache.shift_remove(&new_fingerprint) {
+                Some(decoder) => decoder,
+                None => self.create_decoder_for(new_fingerprint)?,
+            };
+            self.pending_schema = Some((new_fingerprint, new_decoder));
+            // If there are already decoded rows, we must flush them first.
+            // Reducing `remaining_capacity` to 0 ensures `flush` is called 
next.
+            if self.remaining_capacity < self.batch_size {
+                self.remaining_capacity = 0;
+            }
+        }
+        Ok(Some(N))
+    }
+
+    fn create_decoder_for(
+        &mut self,
+        new_fingerprint: Fingerprint,
+    ) -> Result<RecordDecoder, ArrowError> {
+        let writer_schema_store = self
+            .writer_schema_store
+            .as_ref()
+            .ok_or_else(|| ArrowError::ParseError("Schema store 
unavailable".into()))?;
+        let writer_schema = writer_schema_store
+            .lookup(&new_fingerprint)
+            .ok_or_else(|| {
+                ArrowError::ParseError(format!("Unknown fingerprint: 
{new_fingerprint:?}"))
+            })?;

Review Comment:
   Another possibility, since we know the writer store was already checked:
   ```suggestion
           let writer_schema = self
               .writer_schema_store
               .as_ref()
               .flat_map(|store| store.lookup(&new_fingerprint))
               .ok_or_else(|| {
                   ArrowError::ParseError(format!("Unknown fingerprint: 
{new_fingerprint:?}"))
               })?;
   ```
   (technically, the error message is correct even if the real cause was a 
missing store -- no store, no known fingerprints)



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -154,39 +165,175 @@ impl Decoder {
     ///
     /// Returns the number of bytes consumed.
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        if self.writer_schema_store.is_some()
+            && data.len() >= SINGLE_OBJECT_MAGIC.len()
+            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+        {
+            return Err(ArrowError::ParseError(
+                "Expected single‑object encoding fingerprint prefix for first 
message \
+                 (writer_schema_store is set but active_fingerprint is None)"
+                    .into(),
+            ));
+        }
         let mut total_consumed = 0usize;
-        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
-            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
-            // A successful call to record_decoder.decode means one row was 
decoded.
-            // If `consumed` is 0 on a non-empty buffer, it implies a valid 
zero-byte record.
-            // We increment `decoded_rows` to mark progress and avoid an 
infinite loop.
-            // We add `consumed` (which can be 0) to `total_consumed`.
-            total_consumed += consumed;
-            self.decoded_rows += 1;
+        let hash_type = self.writer_schema_store.as_ref().map_or(
+            FingerprintAlgorithm::Rabin,
+            SchemaStore::fingerprint_algorithm,
+        );
+        // The loop stops when the batch is full, a schema change is staged,
+        // or handle_prefix indicates we need more bytes (Some(0)).
+        while total_consumed < data.len() && self.remaining_capacity > 0 {
+            if let Some(n) = self.handle_prefix(&data[total_consumed..], 
hash_type)? {
+                // We either consumed a prefix (n > 0) and need a schema 
switch, or we need
+                // more bytes to make a decision. Either way, this decoding 
attempt is finished.
+                total_consumed += n;
+            }
+            // No prefix: decode one row and keep going.
+            let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
+            self.remaining_capacity -= 1;
+            total_consumed += n;
         }
         Ok(total_consumed)
     }
 
+    // Attempt to handle a single‑object‑encoding prefix at the current 
position.
+    //
+    // * Ok(None) – buffer does not start with the prefix.
+    // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller 
should await more bytes.
+    // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and 
fingerprint).
+    fn handle_prefix(
+        &mut self,
+        buf: &[u8],
+        hash_type: FingerprintAlgorithm,
+    ) -> Result<Option<usize>, ArrowError> {
+        // If there is no schema store, prefixes are unrecognized.
+        if self.writer_schema_store.is_none() {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Need at least the magic bytes to decide (2 bytes).
+        let Some(magic_bytes) = buf.get(..SINGLE_OBJECT_MAGIC.len()) else {
+            return Ok(Some(0)); // Get more bytes
+        };
+        // Bail out early if the magic does not match.
+        if magic_bytes != SINGLE_OBJECT_MAGIC {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Try to parse the fingerprint that follows the magic.
+        let fingerprint_size = match hash_type {
+            FingerprintAlgorithm::Rabin => self
+                .handle_fingerprint(&buf[SINGLE_OBJECT_MAGIC.len()..], |bytes| 
{
+                    Fingerprint::Rabin(u64::from_le_bytes(bytes))
+                })?,
+        };
+        // Convert the inner result into a “bytes consumed” count.
+        let consumed = match fingerprint_size {
+            Some(n) => n + SINGLE_OBJECT_MAGIC.len(), // magic + fingerprint
+            None => 0,                                // incomplete fingerprint
+        };
+        Ok(Some(consumed))
+    }
+
+    // Attempts to read and install a new fingerprint of `N` bytes.
+    //
+    // * Ok(None) – insufficient bytes (`buf.len() < `N`).
+    // * Ok(Some(N)) – fingerprint consumed (always `N`).
+    fn handle_fingerprint<const N: usize>(
+        &mut self,
+        buf: &[u8],
+        fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint,
+    ) -> Result<Option<usize>, ArrowError> {
+        // Need enough bytes to get fingerprint (next N bytes)
+        let Some(fingerprint_bytes) = buf.get(..N) else {
+            return Ok(None); // Get more bytes
+        };
+        // SAFETY: length checked above.
+        let new_fingerprint = 
fingerprint_from(fingerprint_bytes.try_into().unwrap());
+        // If the fingerprint indicates a schema change, prepare to switch 
decoders.
+        if self.active_fingerprint != Some(new_fingerprint) {
+            #[cfg(feature = "lru")]
+            let new_decoder = match self.cache.pop(&new_fingerprint) {
+                Some(decoder) => decoder,
+                None => self.create_decoder_for(new_fingerprint)?,
+            };
+            #[cfg(not(feature = "lru"))]
+            let new_decoder = match self.cache.shift_remove(&new_fingerprint) {
+                Some(decoder) => decoder,
+                None => self.create_decoder_for(new_fingerprint)?,
+            };

Review Comment:
   Instead of duplicating the whole thing, suggest factoring out the 
cfg-dependent bit:
   ```suggestion
               #[cfg(feature = "lru")]
               let new_decoder = self.cache.pop(&new_fingerprint);
               #[cfg(not(feature = "lru"))]
               let new_decoder = self.cache.shift_remove(&new_fingerprint);
               
               let new_decoder = match new_decoder {
                   Some(decoder) => decoder,
                   None => self.create_decoder_for(new_fingerprint)?,
               };
   ```



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -154,39 +165,175 @@ impl Decoder {
     ///
     /// Returns the number of bytes consumed.
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        if self.writer_schema_store.is_some()
+            && data.len() >= SINGLE_OBJECT_MAGIC.len()
+            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+        {
+            return Err(ArrowError::ParseError(
+                "Expected single‑object encoding fingerprint prefix for first 
message \
+                 (writer_schema_store is set but active_fingerprint is None)"
+                    .into(),
+            ));
+        }
         let mut total_consumed = 0usize;
-        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
-            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
-            // A successful call to record_decoder.decode means one row was 
decoded.
-            // If `consumed` is 0 on a non-empty buffer, it implies a valid 
zero-byte record.
-            // We increment `decoded_rows` to mark progress and avoid an 
infinite loop.
-            // We add `consumed` (which can be 0) to `total_consumed`.
-            total_consumed += consumed;
-            self.decoded_rows += 1;
+        let hash_type = self.writer_schema_store.as_ref().map_or(
+            FingerprintAlgorithm::Rabin,
+            SchemaStore::fingerprint_algorithm,
+        );
+        // The loop stops when the batch is full, a schema change is staged,
+        // or handle_prefix indicates we need more bytes (Some(0)).
+        while total_consumed < data.len() && self.remaining_capacity > 0 {
+            if let Some(n) = self.handle_prefix(&data[total_consumed..], 
hash_type)? {
+                // We either consumed a prefix (n > 0) and need a schema 
switch, or we need
+                // more bytes to make a decision. Either way, this decoding 
attempt is finished.

Review Comment:
   I just realized there might be a reason to split out this logic after all:
   
   Consuming a prefix doesn't intrinsically force the batch to end. 
   The batch only ends if the new fingerprint is different than the active one.
   And if that happens, the remaining capacity is already forced to zero.
   
   Which would produce something like:
   ```rust
   match self.handle_prefix(...)? {
       // prefix detected, but we don't have all the fingerprint bytes yet
       Some(0) => break,
       // A fingerprint was consumed; if it triggered a schema change, the
       // remaining capacity was set to zero and the loop will break naturally
       Some(n) => total_consumed += n,
       // No prefix: decode the next row
       None => {
             ...
       }
   }
   ```
   
   But maybe no-op fingerprints are a corner case not worth optimizing for? 



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -294,20 +540,14 @@ impl ReaderBuilder {
         })
     }
 
-    /// Create a [`Decoder`] from this builder and a `BufRead` by
-    /// reading and parsing the Avro file's header. This will
-    /// not create a full [`Reader`].
-    pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder, 
ArrowError> {
-        match self.schema {
-            Some(ref schema) => {
-                let record_decoder = self.make_record_decoder(schema)?;
-                Ok(Decoder::new(record_decoder, self.batch_size))
-            }
-            None => {
-                let (_, decoder) = self.build_impl(&mut reader)?;
-                Ok(decoder)
-            }
+    /// Create a [`Decoder`] from this builder.
+    pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
+        if self.writer_schema_store.is_none() {
+            return Err(ArrowError::InvalidArgumentError(
+                "Cannot build a decoder without a writer schema 
store".to_string(),

Review Comment:
   However, if it's required, we shouldn't leave the footgun of an opt-in 
method call. 
   Just make it a parameter of the `build_decoder` method instead?
   
   Or does a normal `build` also (optionally) use a schema store?



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -154,39 +165,175 @@ impl Decoder {
     ///
     /// Returns the number of bytes consumed.
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        if self.writer_schema_store.is_some()
+            && data.len() >= SINGLE_OBJECT_MAGIC.len()
+            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+        {
+            return Err(ArrowError::ParseError(
+                "Expected single‑object encoding fingerprint prefix for first 
message \
+                 (writer_schema_store is set but active_fingerprint is None)"
+                    .into(),
+            ));
+        }
         let mut total_consumed = 0usize;
-        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
-            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
-            // A successful call to record_decoder.decode means one row was 
decoded.
-            // If `consumed` is 0 on a non-empty buffer, it implies a valid 
zero-byte record.
-            // We increment `decoded_rows` to mark progress and avoid an 
infinite loop.
-            // We add `consumed` (which can be 0) to `total_consumed`.
-            total_consumed += consumed;
-            self.decoded_rows += 1;
+        let hash_type = self.writer_schema_store.as_ref().map_or(
+            FingerprintAlgorithm::Rabin,
+            SchemaStore::fingerprint_algorithm,
+        );
+        // The loop stops when the batch is full, a schema change is staged,
+        // or handle_prefix indicates we need more bytes (Some(0)).
+        while total_consumed < data.len() && self.remaining_capacity > 0 {
+            if let Some(n) = self.handle_prefix(&data[total_consumed..], 
hash_type)? {
+                // We either consumed a prefix (n > 0) and need a schema 
switch, or we need
+                // more bytes to make a decision. Either way, this decoding 
attempt is finished.
+                total_consumed += n;
+            }
+            // No prefix: decode one row and keep going.
+            let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
+            self.remaining_capacity -= 1;
+            total_consumed += n;
         }
         Ok(total_consumed)
     }
 
+    // Attempt to handle a single‑object‑encoding prefix at the current 
position.
+    //
+    // * Ok(None) – buffer does not start with the prefix.
+    // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller 
should await more bytes.
+    // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and 
fingerprint).
+    fn handle_prefix(
+        &mut self,
+        buf: &[u8],
+        hash_type: FingerprintAlgorithm,
+    ) -> Result<Option<usize>, ArrowError> {
+        // If there is no schema store, prefixes are unrecognized.
+        if self.writer_schema_store.is_none() {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Need at least the magic bytes to decide (2 bytes).
+        let Some(magic_bytes) = buf.get(..SINGLE_OBJECT_MAGIC.len()) else {
+            return Ok(Some(0)); // Get more bytes
+        };
+        // Bail out early if the magic does not match.
+        if magic_bytes != SINGLE_OBJECT_MAGIC {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Try to parse the fingerprint that follows the magic.
+        let fingerprint_size = match hash_type {
+            FingerprintAlgorithm::Rabin => self
+                .handle_fingerprint(&buf[SINGLE_OBJECT_MAGIC.len()..], |bytes| 
{
+                    Fingerprint::Rabin(u64::from_le_bytes(bytes))
+                })?,
+        };
+        // Convert the inner result into a “bytes consumed” count.
+        let consumed = match fingerprint_size {
+            Some(n) => n + SINGLE_OBJECT_MAGIC.len(), // magic + fingerprint
+            None => 0,                                // incomplete fingerprint
+        };
+        Ok(Some(consumed))
+    }
+
+    // Attempts to read and install a new fingerprint of `N` bytes.
+    //
+    // * Ok(None) – insufficient bytes (`buf.len() < `N`).
+    // * Ok(Some(N)) – fingerprint consumed (always `N`).
+    fn handle_fingerprint<const N: usize>(
+        &mut self,
+        buf: &[u8],
+        fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint,
+    ) -> Result<Option<usize>, ArrowError> {
+        // Need enough bytes to get fingerprint (next N bytes)
+        let Some(fingerprint_bytes) = buf.get(..N) else {
+            return Ok(None); // Get more bytes
+        };
+        // SAFETY: length checked above.
+        let new_fingerprint = 
fingerprint_from(fingerprint_bytes.try_into().unwrap());
+        // If the fingerprint indicates a schema change, prepare to switch 
decoders.
+        if self.active_fingerprint != Some(new_fingerprint) {
+            #[cfg(feature = "lru")]
+            let new_decoder = match self.cache.pop(&new_fingerprint) {
+                Some(decoder) => decoder,
+                None => self.create_decoder_for(new_fingerprint)?,
+            };
+            #[cfg(not(feature = "lru"))]
+            let new_decoder = match self.cache.shift_remove(&new_fingerprint) {
+                Some(decoder) => decoder,
+                None => self.create_decoder_for(new_fingerprint)?,
+            };
+            self.pending_schema = Some((new_fingerprint, new_decoder));
+            // If there are already decoded rows, we must flush them first.
+            // Reducing `remaining_capacity` to 0 ensures `flush` is called 
next.
+            if self.remaining_capacity < self.batch_size {
+                self.remaining_capacity = 0;
+            }
+        }
+        Ok(Some(N))
+    }
+
+    fn create_decoder_for(
+        &mut self,
+        new_fingerprint: Fingerprint,
+    ) -> Result<RecordDecoder, ArrowError> {
+        let writer_schema_store = self
+            .writer_schema_store
+            .as_ref()
+            .ok_or_else(|| ArrowError::ParseError("Schema store 
unavailable".into()))?;
+        let writer_schema = writer_schema_store
+            .lookup(&new_fingerprint)
+            .ok_or_else(|| {
+                ArrowError::ParseError(format!("Unknown fingerprint: 
{new_fingerprint:?}"))
+            })?;
+        let Some(ref reader_schema) = self.reader_schema else {

Review Comment:
   Under what circumstances is the reader schema optional, that it's even 
possible to hit `None` here?



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -154,39 +165,175 @@ impl Decoder {
     ///
     /// Returns the number of bytes consumed.
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        if self.writer_schema_store.is_some()
+            && data.len() >= SINGLE_OBJECT_MAGIC.len()
+            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+        {
+            return Err(ArrowError::ParseError(
+                "Expected single‑object encoding fingerprint prefix for first 
message \
+                 (writer_schema_store is set but active_fingerprint is None)"
+                    .into(),
+            ));
+        }
         let mut total_consumed = 0usize;
-        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
-            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
-            // A successful call to record_decoder.decode means one row was 
decoded.
-            // If `consumed` is 0 on a non-empty buffer, it implies a valid 
zero-byte record.
-            // We increment `decoded_rows` to mark progress and avoid an 
infinite loop.
-            // We add `consumed` (which can be 0) to `total_consumed`.
-            total_consumed += consumed;
-            self.decoded_rows += 1;
+        let hash_type = self.writer_schema_store.as_ref().map_or(
+            FingerprintAlgorithm::Rabin,
+            SchemaStore::fingerprint_algorithm,
+        );
+        // The loop stops when the batch is full, a schema change is staged,
+        // or handle_prefix indicates we need more bytes (Some(0)).
+        while total_consumed < data.len() && self.remaining_capacity > 0 {
+            if let Some(n) = self.handle_prefix(&data[total_consumed..], 
hash_type)? {
+                // We either consumed a prefix (n > 0) and need a schema 
switch, or we need
+                // more bytes to make a decision. Either way, this decoding 
attempt is finished.
+                total_consumed += n;
+            }
+            // No prefix: decode one row and keep going.
+            let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
+            self.remaining_capacity -= 1;
+            total_consumed += n;
         }
         Ok(total_consumed)
     }
 
+    // Attempt to handle a single‑object‑encoding prefix at the current 
position.
+    //
+    // * Ok(None) – buffer does not start with the prefix.
+    // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller 
should await more bytes.
+    // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and 
fingerprint).
+    fn handle_prefix(
+        &mut self,
+        buf: &[u8],
+        hash_type: FingerprintAlgorithm,
+    ) -> Result<Option<usize>, ArrowError> {
+        // If there is no schema store, prefixes are unrecognized.
+        if self.writer_schema_store.is_none() {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Need at least the magic bytes to decide (2 bytes).
+        let Some(magic_bytes) = buf.get(..SINGLE_OBJECT_MAGIC.len()) else {
+            return Ok(Some(0)); // Get more bytes
+        };
+        // Bail out early if the magic does not match.
+        if magic_bytes != SINGLE_OBJECT_MAGIC {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Try to parse the fingerprint that follows the magic.
+        let fingerprint_size = match hash_type {
+            FingerprintAlgorithm::Rabin => self
+                .handle_fingerprint(&buf[SINGLE_OBJECT_MAGIC.len()..], |bytes| 
{
+                    Fingerprint::Rabin(u64::from_le_bytes(bytes))
+                })?,
+        };
+        // Convert the inner result into a “bytes consumed” count.
+        let consumed = match fingerprint_size {
+            Some(n) => n + SINGLE_OBJECT_MAGIC.len(), // magic + fingerprint
+            None => 0,                                // incomplete fingerprint
+        };
+        Ok(Some(consumed))
+    }
+
+    // Attempts to read and install a new fingerprint of `N` bytes.
+    //
+    // * Ok(None) – insufficient bytes (`buf.len() < `N`).
+    // * Ok(Some(N)) – fingerprint consumed (always `N`).
+    fn handle_fingerprint<const N: usize>(
+        &mut self,
+        buf: &[u8],
+        fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint,
+    ) -> Result<Option<usize>, ArrowError> {
+        // Need enough bytes to get fingerprint (next N bytes)
+        let Some(fingerprint_bytes) = buf.get(..N) else {
+            return Ok(None); // Get more bytes
+        };
+        // SAFETY: length checked above.
+        let new_fingerprint = 
fingerprint_from(fingerprint_bytes.try_into().unwrap());
+        // If the fingerprint indicates a schema change, prepare to switch 
decoders.
+        if self.active_fingerprint != Some(new_fingerprint) {
+            #[cfg(feature = "lru")]
+            let new_decoder = match self.cache.pop(&new_fingerprint) {
+                Some(decoder) => decoder,
+                None => self.create_decoder_for(new_fingerprint)?,
+            };
+            #[cfg(not(feature = "lru"))]
+            let new_decoder = match self.cache.shift_remove(&new_fingerprint) {
+                Some(decoder) => decoder,
+                None => self.create_decoder_for(new_fingerprint)?,
+            };
+            self.pending_schema = Some((new_fingerprint, new_decoder));
+            // If there are already decoded rows, we must flush them first.
+            // Reducing `remaining_capacity` to 0 ensures `flush` is called 
next.
+            if self.remaining_capacity < self.batch_size {
+                self.remaining_capacity = 0;
+            }
+        }
+        Ok(Some(N))
+    }
+
+    fn create_decoder_for(
+        &mut self,
+        new_fingerprint: Fingerprint,
+    ) -> Result<RecordDecoder, ArrowError> {
+        let writer_schema_store = self
+            .writer_schema_store
+            .as_ref()
+            .ok_or_else(|| ArrowError::ParseError("Schema store 
unavailable".into()))?;
+        let writer_schema = writer_schema_store
+            .lookup(&new_fingerprint)
+            .ok_or_else(|| {
+                ArrowError::ParseError(format!("Unknown fingerprint: 
{new_fingerprint:?}"))
+            })?;
+        let Some(ref reader_schema) = self.reader_schema else {
+            return Err(ArrowError::ParseError(
+                "Reader schema unavailable for resolution".into(),
+            ));
+        };
+        let resolved = AvroField::resolve_from_writer_and_reader(
+            writer_schema,
+            reader_schema,
+            self.utf8_view,
+            self.strict_mode,
+        )?;
+        RecordDecoder::try_new_with_options(resolved.data_type(), 
self.utf8_view)
+    }
+
     /// Produce a `RecordBatch` if at least one row is fully decoded, returning
     /// `Ok(None)` if no new rows are available.
     pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
-        if self.decoded_rows == 0 {
-            Ok(None)
-        } else {
-            let batch = self.record_decoder.flush()?;
-            self.decoded_rows = 0;
-            Ok(Some(batch))
+        if self.remaining_capacity == self.batch_size {
+            return Ok(None);
         }
+        let batch = self.active_decoder.flush()?;
+        self.remaining_capacity = self.batch_size;
+        // Apply any staged schema switch.
+        if let Some((new_fingerprint, new_decoder)) = 
self.pending_schema.take() {
+            if let Some(old_fingerprint) = 
self.active_fingerprint.replace(new_fingerprint) {
+                let old_decoder = std::mem::replace(&mut self.active_decoder, 
new_decoder);
+                #[cfg(feature = "lru")]
+                self.cache.put(old_fingerprint, old_decoder);
+                #[cfg(not(feature = "lru"))]
+                self.cache.shift_remove(&old_fingerprint);
+                #[cfg(not(feature = "lru"))]
+                self.cache.insert(old_fingerprint, old_decoder);
+                #[cfg(not(feature = "lru"))]
+                if self.cache.len() > self.max_cache_size {
+                    self.cache.shift_remove_index(0);
+                }

Review Comment:
   You could also factor out the LRU logic as a conditionally defined method, 
but that's probably overkill



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -154,39 +165,175 @@ impl Decoder {
     ///
     /// Returns the number of bytes consumed.
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        if self.writer_schema_store.is_some()
+            && data.len() >= SINGLE_OBJECT_MAGIC.len()
+            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+        {
+            return Err(ArrowError::ParseError(
+                "Expected single‑object encoding fingerprint prefix for first 
message \
+                 (writer_schema_store is set but active_fingerprint is None)"
+                    .into(),
+            ));
+        }
         let mut total_consumed = 0usize;
-        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
-            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
-            // A successful call to record_decoder.decode means one row was 
decoded.
-            // If `consumed` is 0 on a non-empty buffer, it implies a valid 
zero-byte record.
-            // We increment `decoded_rows` to mark progress and avoid an 
infinite loop.
-            // We add `consumed` (which can be 0) to `total_consumed`.
-            total_consumed += consumed;
-            self.decoded_rows += 1;
+        let hash_type = self.writer_schema_store.as_ref().map_or(
+            FingerprintAlgorithm::Rabin,
+            SchemaStore::fingerprint_algorithm,
+        );
+        // The loop stops when the batch is full, a schema change is staged,
+        // or handle_prefix indicates we need more bytes (Some(0)).
+        while total_consumed < data.len() && self.remaining_capacity > 0 {
+            if let Some(n) = self.handle_prefix(&data[total_consumed..], 
hash_type)? {
+                // We either consumed a prefix (n > 0) and need a schema 
switch, or we need
+                // more bytes to make a decision. Either way, this decoding 
attempt is finished.
+                total_consumed += n;
+            }
+            // No prefix: decode one row and keep going.
+            let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
+            self.remaining_capacity -= 1;
+            total_consumed += n;
         }
         Ok(total_consumed)
     }
 
+    // Attempt to handle a single‑object‑encoding prefix at the current 
position.
+    //
+    // * Ok(None) – buffer does not start with the prefix.
+    // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller 
should await more bytes.
+    // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and 
fingerprint).
+    fn handle_prefix(
+        &mut self,
+        buf: &[u8],
+        hash_type: FingerprintAlgorithm,
+    ) -> Result<Option<usize>, ArrowError> {
+        // If there is no schema store, prefixes are unrecognized.
+        if self.writer_schema_store.is_none() {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Need at least the magic bytes to decide (2 bytes).
+        let Some(magic_bytes) = buf.get(..SINGLE_OBJECT_MAGIC.len()) else {
+            return Ok(Some(0)); // Get more bytes
+        };
+        // Bail out early if the magic does not match.
+        if magic_bytes != SINGLE_OBJECT_MAGIC {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Try to parse the fingerprint that follows the magic.
+        let fingerprint_size = match hash_type {
+            FingerprintAlgorithm::Rabin => self
+                .handle_fingerprint(&buf[SINGLE_OBJECT_MAGIC.len()..], |bytes| 
{
+                    Fingerprint::Rabin(u64::from_le_bytes(bytes))
+                })?,
+        };
+        // Convert the inner result into a “bytes consumed” count.
+        let consumed = match fingerprint_size {
+            Some(n) => n + SINGLE_OBJECT_MAGIC.len(), // magic + fingerprint
+            None => 0,                                // incomplete fingerprint
+        };
+        Ok(Some(consumed))
+    }
+
+    // Attempts to read and install a new fingerprint of `N` bytes.
+    //
+    // * Ok(None) – insufficient bytes (`buf.len() < `N`).
+    // * Ok(Some(N)) – fingerprint consumed (always `N`).
+    fn handle_fingerprint<const N: usize>(
+        &mut self,
+        buf: &[u8],
+        fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint,
+    ) -> Result<Option<usize>, ArrowError> {
+        // Need enough bytes to get fingerprint (next N bytes)
+        let Some(fingerprint_bytes) = buf.get(..N) else {
+            return Ok(None); // Get more bytes
+        };
+        // SAFETY: length checked above.
+        let new_fingerprint = 
fingerprint_from(fingerprint_bytes.try_into().unwrap());
+        // If the fingerprint indicates a schema change, prepare to switch 
decoders.
+        if self.active_fingerprint != Some(new_fingerprint) {
+            #[cfg(feature = "lru")]
+            let new_decoder = match self.cache.pop(&new_fingerprint) {
+                Some(decoder) => decoder,
+                None => self.create_decoder_for(new_fingerprint)?,
+            };
+            #[cfg(not(feature = "lru"))]
+            let new_decoder = match self.cache.shift_remove(&new_fingerprint) {
+                Some(decoder) => decoder,
+                None => self.create_decoder_for(new_fingerprint)?,
+            };
+            self.pending_schema = Some((new_fingerprint, new_decoder));
+            // If there are already decoded rows, we must flush them first.
+            // Reducing `remaining_capacity` to 0 ensures `flush` is called 
next.
+            if self.remaining_capacity < self.batch_size {
+                self.remaining_capacity = 0;
+            }
+        }
+        Ok(Some(N))
+    }
+
+    fn create_decoder_for(
+        &mut self,
+        new_fingerprint: Fingerprint,
+    ) -> Result<RecordDecoder, ArrowError> {
+        let writer_schema_store = self
+            .writer_schema_store
+            .as_ref()
+            .ok_or_else(|| ArrowError::ParseError("Schema store 
unavailable".into()))?;
+        let writer_schema = writer_schema_store
+            .lookup(&new_fingerprint)
+            .ok_or_else(|| {
+                ArrowError::ParseError(format!("Unknown fingerprint: 
{new_fingerprint:?}"))
+            })?;

Review Comment:
   It's interesting that `ok_or_else` doesn't usually help make the code more 
compact; at best it's the same length:
   ```suggestion
           let Some(writer_schema) = 
writer_schema_store.lookup(&new_fingerprint) else {
               return Err(ArrowError::ParseError(format!(
                   "Unknown fingerprint: {new_fingerprint:?}"
               )));
           };
   ```



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -294,20 +540,14 @@ impl ReaderBuilder {
         })
     }
 
-    /// Create a [`Decoder`] from this builder and a `BufRead` by
-    /// reading and parsing the Avro file's header. This will
-    /// not create a full [`Reader`].
-    pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder, 
ArrowError> {
-        match self.schema {
-            Some(ref schema) => {
-                let record_decoder = self.make_record_decoder(schema)?;
-                Ok(Decoder::new(record_decoder, self.batch_size))
-            }
-            None => {
-                let (_, decoder) = self.build_impl(&mut reader)?;
-                Ok(decoder)
-            }
+    /// Create a [`Decoder`] from this builder.
+    pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
+        if self.writer_schema_store.is_none() {
+            return Err(ArrowError::InvalidArgumentError(
+                "Cannot build a decoder without a writer schema 
store".to_string(),

Review Comment:
   Digging deeper, we seem to have another split logic issue here -- write 
schema store is only used by `build_decoder`, but both methods call 
`make_decoder`, which has to infer intent by whether Some header was passed or 
not. Suggest to get rid of `make_decoder` by moving each "half" of the logic in 
the corresponding caller. Conveniently, you already factored out 
`make_decoder_with_parts` that does the heavy lifting.



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -124,23 +132,26 @@ fn read_header<R: BufRead>(mut reader: R) -> 
Result<Header, ArrowError> {
 /// A low-level interface for decoding Avro-encoded bytes into Arrow 
`RecordBatch`.
 #[derive(Debug)]
 pub struct Decoder {
-    record_decoder: RecordDecoder,
+    active_decoder: RecordDecoder,
+    active_fingerprint: Option<Fingerprint>,
     batch_size: usize,
-    decoded_rows: usize,
+    remaining_capacity: usize,
+    #[cfg(feature = "lru")]
+    cache: LruCache<Fingerprint, RecordDecoder>,
+    #[cfg(not(feature = "lru"))]
+    cache: IndexMap<Fingerprint, RecordDecoder>,
+    max_cache_size: usize,
+    reader_schema: Option<AvroSchema<'static>>,
+    writer_schema_store: Option<SchemaStore<'static>>,

Review Comment:
   Looking at the code, it's because both `TypeName<'a>` and `Attributes<'a>` 
have `&'a str` members. But is it really so expensive to just copy the string 
that we're willing to deal with a proliferation of lifetimes? Especially when 
practical usage anyway requires the `'static` lifetime? Or am I missing some 
obvious/easy place where the lifetimes make a big difference that justifies the 
complexity?



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -154,39 +158,178 @@ impl Decoder {
     ///
     /// Returns the number of bytes consumed.
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        if self.active_fingerprint.is_none()
+            && self.writer_schema_store.is_some()
+            && data.len() >= SINGLE_OBJECT_MAGIC.len()
+            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+        {
+            return Err(ArrowError::ParseError(
+                "Expected single‑object encoding fingerprint prefix for first 
message \
+                 (writer_schema_store is set but active_fingerprint is None)"
+                    .into(),
+            ));
+        }
         let mut total_consumed = 0usize;
-        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
-            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
-            // A successful call to record_decoder.decode means one row was 
decoded.
-            // If `consumed` is 0 on a non-empty buffer, it implies a valid 
zero-byte record.
-            // We increment `decoded_rows` to mark progress and avoid an 
infinite loop.
-            // We add `consumed` (which can be 0) to `total_consumed`.
-            total_consumed += consumed;
-            self.decoded_rows += 1;
+        let hash_type = self.writer_schema_store.as_ref().map_or(
+            FingerprintAlgorithm::Rabin,
+            SchemaStore::fingerprint_algorithm,
+        );
+        // The loop stops when the batch is full, a schema change is staged,
+        // or handle_prefix indicates we need more bytes (Some(0)).
+        while total_consumed < data.len() && self.remaining_capacity > 0 {
+            match self.handle_prefix(&data[total_consumed..], hash_type)? {
+                None => {
+                    // No prefix: decode one row.
+                    let n = 
self.active_decoder.decode(&data[total_consumed..], 1)?;
+                    total_consumed += n;
+                    self.remaining_capacity -= 1;
+                }
+                Some(0) => {
+                    // Detected start of a prefix but need more bytes.
+                    break;
+                }
+                Some(n) => {
+                    // Consumed a complete prefix (n > 0). Stage flush and 
stop.
+                    total_consumed += n;
+                    break;
+                }
+            }
         }
         Ok(total_consumed)
     }
 
+    // Attempt to handle a single‑object‑encoding prefix at the current 
position.
+    //
+    // * Ok(None) – buffer does not start with the prefix.
+    // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller 
should await more bytes.
+    // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and 
fingerprint).
+    fn handle_prefix(
+        &mut self,
+        buf: &[u8],
+        hash_type: FingerprintAlgorithm,
+    ) -> Result<Option<usize>, ArrowError> {
+        // If there is no schema store, prefixes are unrecognized.
+        if self.writer_schema_store.is_none() {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Need at least the magic bytes to decide (2 bytes).
+        let Some(magic_bytes) = buf.get(..SINGLE_OBJECT_MAGIC.len()) else {
+            return Ok(Some(0)); // Get more bytes
+        };
+        // Bail out early if the magic does not match.
+        if magic_bytes != SINGLE_OBJECT_MAGIC {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Try to parse the fingerprint that follows the magic.
+        let fingerprint_size = match hash_type {
+            FingerprintAlgorithm::Rabin => self
+                .handle_fingerprint::<8>(&buf[SINGLE_OBJECT_MAGIC.len()..], 
|bytes| {
+                    Fingerprint::Rabin(u64::from_le_bytes(bytes))
+                })?,
+        };
+        // Convert the inner result into a “bytes consumed” count.
+        let consumed = match fingerprint_size {
+            Some(n) => n + SINGLE_OBJECT_MAGIC.len(), // magic + fingerprint
+            None => 0,                                // incomplete fingerprint
+        };
+        Ok(Some(consumed))
+    }
+
+    // Attempts to read and install a new fingerprint of `N` bytes.
+    //
+    // * Ok(None) – insufficient bytes (`buf.len() < `N`).
+    // * Ok(Some(N)) – fingerprint consumed (always `N`).
+    fn handle_fingerprint<const N: usize>(
+        &mut self,
+        buf: &[u8],
+        fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint,
+    ) -> Result<Option<usize>, ArrowError> {
+        // Need enough bytes to get fingerprint (next N bytes)
+        let Some(fingerprint_bytes) = buf.get(..N) else {
+            return Ok(None); // Get more bytes
+        };
+        // SAFETY: length checked above.
+        let new_fingerprint = 
fingerprint_from(fingerprint_bytes.try_into().unwrap());
+        // If the fingerprint indicates a schema change, prepare to switch 
decoders.
+        if self.active_fingerprint != Some(new_fingerprint) {
+            let new_decoder = match self.cache.shift_remove(&new_fingerprint) {
+                Some(decoder) => decoder,
+                None => self.create_decoder_for(new_fingerprint)?,
+            };
+            self.pending_schema = Some((new_fingerprint, new_decoder));
+            // If there are already decoded rows, we must flush them first.
+            // Reducing `remaining_capacity` to 0 ensures `flush` is called 
next.
+            if self.remaining_capacity < self.batch_size {
+                self.remaining_capacity = 0;
+            }
+        }
+        Ok(Some(N))
+    }
+
+    fn create_decoder_for(
+        &mut self,
+        new_fingerprint: Fingerprint,
+    ) -> Result<RecordDecoder, ArrowError> {
+        let writer_schema_store = self
+            .writer_schema_store
+            .as_ref()
+            .ok_or_else(|| ArrowError::ParseError("Schema store 
unavailable".into()))?;
+        let writer_schema = writer_schema_store
+            .lookup(&new_fingerprint)
+            .ok_or_else(|| {
+                ArrowError::ParseError(format!("Unknown fingerprint: 
{new_fingerprint:?}"))
+            })?;
+        match self.reader_schema {
+            Some(ref reader_schema) => {
+                let resolved = AvroField::resolve_from_writer_and_reader(
+                    writer_schema,
+                    reader_schema,
+                    self.utf8_view,
+                    self.strict_mode,
+                )?;
+                Ok(RecordDecoder::try_new_with_options(
+                    resolved.data_type(),
+                    self.utf8_view,
+                )?)
+            }
+            None => Err(ArrowError::ParseError(
+                "Reader schema unavailable for resolution".into(),
+            )),
+        }
+    }
+
     /// Produce a `RecordBatch` if at least one row is fully decoded, returning
     /// `Ok(None)` if no new rows are available.
     pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
-        if self.decoded_rows == 0 {
-            Ok(None)
-        } else {
-            let batch = self.record_decoder.flush()?;
-            self.decoded_rows = 0;
-            Ok(Some(batch))
+        if self.remaining_capacity == self.batch_size {
+            return Ok(None);
         }
+        let batch = self.active_decoder.flush()?;
+        self.remaining_capacity = self.batch_size;
+        // Apply any staged schema switch.
+        if let Some((new_fingerprint, new_decoder)) = 
self.pending_schema.take() {
+            if let Some(old_fingerprint) = 
self.active_fingerprint.replace(new_fingerprint) {
+                let old_decoder = std::mem::replace(&mut self.active_decoder, 
new_decoder);
+                self.cache.shift_remove(&old_fingerprint);
+                self.cache.insert(old_fingerprint, old_decoder);
+                if self.cache.len() > self.max_cache_size {
+                    self.cache.shift_remove_index(0);

Review Comment:
   I'll defer to @alamb on the dependency management question.



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -154,39 +165,175 @@ impl Decoder {
     ///
     /// Returns the number of bytes consumed.
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        if self.writer_schema_store.is_some()
+            && data.len() >= SINGLE_OBJECT_MAGIC.len()
+            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+        {
+            return Err(ArrowError::ParseError(
+                "Expected single‑object encoding fingerprint prefix for first 
message \
+                 (writer_schema_store is set but active_fingerprint is None)"
+                    .into(),
+            ));
+        }
         let mut total_consumed = 0usize;
-        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
-            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
-            // A successful call to record_decoder.decode means one row was 
decoded.
-            // If `consumed` is 0 on a non-empty buffer, it implies a valid 
zero-byte record.
-            // We increment `decoded_rows` to mark progress and avoid an 
infinite loop.
-            // We add `consumed` (which can be 0) to `total_consumed`.
-            total_consumed += consumed;
-            self.decoded_rows += 1;
+        let hash_type = self.writer_schema_store.as_ref().map_or(
+            FingerprintAlgorithm::Rabin,
+            SchemaStore::fingerprint_algorithm,
+        );
+        // The loop stops when the batch is full, a schema change is staged,
+        // or handle_prefix indicates we need more bytes (Some(0)).
+        while total_consumed < data.len() && self.remaining_capacity > 0 {
+            if let Some(n) = self.handle_prefix(&data[total_consumed..], 
hash_type)? {
+                // We either consumed a prefix (n > 0) and need a schema 
switch, or we need
+                // more bytes to make a decision. Either way, this decoding 
attempt is finished.
+                total_consumed += n;
+            }
+            // No prefix: decode one row and keep going.
+            let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
+            self.remaining_capacity -= 1;
+            total_consumed += n;
         }
         Ok(total_consumed)
     }
 
+    // Attempt to handle a single‑object‑encoding prefix at the current 
position.
+    //
+    // * Ok(None) – buffer does not start with the prefix.
+    // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller 
should await more bytes.
+    // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and 
fingerprint).
+    fn handle_prefix(
+        &mut self,
+        buf: &[u8],
+        hash_type: FingerprintAlgorithm,
+    ) -> Result<Option<usize>, ArrowError> {
+        // If there is no schema store, prefixes are unrecognized.
+        if self.writer_schema_store.is_none() {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Need at least the magic bytes to decide (2 bytes).
+        let Some(magic_bytes) = buf.get(..SINGLE_OBJECT_MAGIC.len()) else {
+            return Ok(Some(0)); // Get more bytes
+        };
+        // Bail out early if the magic does not match.
+        if magic_bytes != SINGLE_OBJECT_MAGIC {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Try to parse the fingerprint that follows the magic.
+        let fingerprint_size = match hash_type {
+            FingerprintAlgorithm::Rabin => self
+                .handle_fingerprint(&buf[SINGLE_OBJECT_MAGIC.len()..], |bytes| 
{
+                    Fingerprint::Rabin(u64::from_le_bytes(bytes))
+                })?,
+        };
+        // Convert the inner result into a “bytes consumed” count.
+        let consumed = match fingerprint_size {
+            Some(n) => n + SINGLE_OBJECT_MAGIC.len(), // magic + fingerprint
+            None => 0,                                // incomplete fingerprint
+        };
+        Ok(Some(consumed))
+    }
+
+    // Attempts to read and install a new fingerprint of `N` bytes.
+    //
+    // * Ok(None) – insufficient bytes (`buf.len() < `N`).
+    // * Ok(Some(N)) – fingerprint consumed (always `N`).
+    fn handle_fingerprint<const N: usize>(
+        &mut self,
+        buf: &[u8],
+        fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint,
+    ) -> Result<Option<usize>, ArrowError> {
+        // Need enough bytes to get fingerprint (next N bytes)
+        let Some(fingerprint_bytes) = buf.get(..N) else {
+            return Ok(None); // Get more bytes
+        };
+        // SAFETY: length checked above.
+        let new_fingerprint = 
fingerprint_from(fingerprint_bytes.try_into().unwrap());
+        // If the fingerprint indicates a schema change, prepare to switch 
decoders.
+        if self.active_fingerprint != Some(new_fingerprint) {
+            #[cfg(feature = "lru")]
+            let new_decoder = match self.cache.pop(&new_fingerprint) {
+                Some(decoder) => decoder,
+                None => self.create_decoder_for(new_fingerprint)?,
+            };
+            #[cfg(not(feature = "lru"))]
+            let new_decoder = match self.cache.shift_remove(&new_fingerprint) {
+                Some(decoder) => decoder,
+                None => self.create_decoder_for(new_fingerprint)?,
+            };
+            self.pending_schema = Some((new_fingerprint, new_decoder));
+            // If there are already decoded rows, we must flush them first.
+            // Reducing `remaining_capacity` to 0 ensures `flush` is called 
next.
+            if self.remaining_capacity < self.batch_size {
+                self.remaining_capacity = 0;
+            }
+        }
+        Ok(Some(N))
+    }
+
+    fn create_decoder_for(
+        &mut self,
+        new_fingerprint: Fingerprint,
+    ) -> Result<RecordDecoder, ArrowError> {
+        let writer_schema_store = self
+            .writer_schema_store
+            .as_ref()
+            .ok_or_else(|| ArrowError::ParseError("Schema store 
unavailable".into()))?;
+        let writer_schema = writer_schema_store
+            .lookup(&new_fingerprint)
+            .ok_or_else(|| {
+                ArrowError::ParseError(format!("Unknown fingerprint: 
{new_fingerprint:?}"))
+            })?;
+        let Some(ref reader_schema) = self.reader_schema else {

Review Comment:
   Looking at the code, I _suspect_ that reader schema is required for the raw 
read path, and forbidden for the file read path (since the file read path in 
`make_decoder` seems to ignore it entirely)? But I can't quite untangle the 
spaghetti to be confident.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to