scovich commented on code in PR #8006:
URL: https://github.com/apache/arrow-rs/pull/8006#discussion_r2244379215


##########
arrow-avro/src/reader/mod.rs:
##########
@@ -154,39 +167,130 @@ impl Decoder {
     ///
     /// Returns the number of bytes consumed.
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        if self.active_fingerprint.is_none()
+            && self.writer_schema_store.is_some()
+            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+        {
+            return Err(ArrowError::ParseError(
+                "Expected single‑object encoding fingerprint prefix for first 
message \
+                     (writer_schema_store is set but active_fingerprint is 
None)"
+                    .into(),
+            ));
+        }
         let mut total_consumed = 0usize;
-        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
-            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
-            // A successful call to record_decoder.decode means one row was 
decoded.
-            // If `consumed` is 0 on a non-empty buffer, it implies a valid 
zero-byte record.
-            // We increment `decoded_rows` to mark progress and avoid an 
infinite loop.
-            // We add `consumed` (which can be 0) to `total_consumed`.
-            total_consumed += consumed;
-            self.decoded_rows += 1;
+        let hash_type = self.writer_schema_store.as_ref().map_or(
+            FingerprintAlgorithm::Rabin,
+            SchemaStore::fingerprint_algorithm,
+        );
+        while total_consumed < data.len() && self.remaining_capacity > 0 {
+            if let Some(prefix_bytes) = 
self.handle_prefix(&data[total_consumed..], hash_type)? {
+                // A batch is complete when its `remaining_capacity` is 0. It 
may be completed early if
+                // a schema change is detected or there are insufficient bytes 
to read the next prefix.
+                // A schema change requires a new batch.
+                total_consumed += prefix_bytes;
+                break;
+            }
+            let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
+            total_consumed += n;
+            self.remaining_capacity -= 1;
         }
         Ok(total_consumed)
     }
 
     /// Produce a `RecordBatch` if at least one row is fully decoded, returning
     /// `Ok(None)` if no new rows are available.
     pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
-        if self.decoded_rows == 0 {
-            Ok(None)
-        } else {
-            let batch = self.record_decoder.flush()?;
-            self.decoded_rows = 0;
-            Ok(Some(batch))
+        if self.remaining_capacity == self.batch_size {
+            return Ok(None);
+        }
+        let batch = self.active_decoder.flush()?;
+        self.remaining_capacity = self.batch_size;
+        // Apply a pending schema switch if one is staged
+        if let Some((new_fingerprint, new_decoder)) = 
self.pending_schema.take() {
+            // Cache the old decoder before replacing it
+            if let Some(old_fingerprint) = 
self.active_fingerprint.replace(new_fingerprint) {
+                let old_decoder = std::mem::replace(&mut self.active_decoder, 
new_decoder);
+                self.cache.shift_remove(&old_fingerprint);
+                self.cache.insert(old_fingerprint, old_decoder);
+                if self.cache.len() > self.max_cache_size {
+                    self.cache.shift_remove_index(0);
+                }
+            } else {
+                self.active_decoder = new_decoder;
+            }
+        }
+        Ok(Some(batch))
+    }
+
+    #[inline]
+    fn handle_prefix(
+        &mut self,
+        buf: &[u8],
+        hash_type: FingerprintAlgorithm,
+    ) -> Result<Option<usize>, ArrowError> {
+        if self.writer_schema_store.is_none() || 
!buf.starts_with(&SINGLE_OBJECT_MAGIC) {
+            return Ok(None);

Review Comment:
   From what I can tell, this whole operation should actually be infallible, 
because the only "fallible" action it performs is slice-to-array conversion 
that we can use template helper code to make provably infallible in practice.
   
   Maybe something like this?
   ```rust
   // Attempts to read and install a new fingerprint, checking the magic bytes 
prefix first.
   // Returns the number of bytes consumed; None indicates there was no 
fingerprint and 
   // Some(0) indicates there were insufficient bytes available.
   fn handle_prefix(
       &mut self,
       mut buf: &[u8],
       hash_type: FingerprintAlgorithm,
   ) -> Option<usize> {
       if self.writer_schema_store.is_none() {
           return None; // no schema changes allowed
       }
   
       let Some(magic_bytes) = buf.split_off(SINGLE_OBJECT_MAGIC.len()) else {
           return Some(0); // not enough bytes available to even check magic
       };
       
       // If the magic bytes match, then we know a fingerprint is coming and we 
must return 
       // Some bytes consumed (possibly zero, if insufficient bytes were 
available).
       (magic_bytes == &SINGLE_OBJECT_MAGIC).then(|| {
           // SHA-256, md5, ID support coming in a future PR
           let fp_size = match hash_type {
               FingerprintAlgorithm::Rabin => {
                   self.handle_fingerprint(&buf, |bytes| 
Fingerprint::Rabin(u64::from_le_bytes(bytes)))
               }
           };
           
           // Report 0 bytes consumed if insufficient bytes were available.
           fp_size.map_or(0, |n| n + magic_bytes.len())
       })
   }
   
   // Attempts to read and install a new fingerprint.
   // Returns the number of bytes consumed -- `Some(N)` -- or `None` if 
insufficient bytes were available.
   fn handle_fingerprint<const N: usize>(
       &mut self,
       buf: &[u8],
       fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint,
   ) -> Option<usize> {
       // NOTE: Converting a slice of length N to an array of length N always 
succeeds
       let fp_bytes = buf.get(..N)?;
       let new_fp = fingerprint_from(fp_bytes.try_into().unwrap());
   
       if self.active_fingerprint != Some(new_fp) {
           // Fingerprint change => schema change => prepare to switch decoders.
           ...
       }
       
       Some(N)
   }
   ```
   As a nice side effect, the `prefix_len` is replaced by a generic function 
that lets the compiler capture the fingerprint's size for us -- no more magic 
constants or split logic -- and which _also_ makes a nice hook point for 
installing the new fingerprint.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to