scovich commented on code in PR #8006:
URL: https://github.com/apache/arrow-rs/pull/8006#discussion_r2245920424


##########
arrow-avro/src/reader/mod.rs:
##########
@@ -154,39 +167,134 @@ impl Decoder {
     ///
     /// Returns the number of bytes consumed.
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        if self.active_fingerprint.is_none()
+            && self.writer_schema_store.is_some()
+            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+        {
+            return Err(ArrowError::ParseError(
+                "Expected single‑object encoding fingerprint prefix for first 
message \
+                     (writer_schema_store is set but active_fingerprint is 
None)"
+                    .into(),
+            ));
+        }
         let mut total_consumed = 0usize;
-        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
-            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
-            // A successful call to record_decoder.decode means one row was 
decoded.
-            // If `consumed` is 0 on a non-empty buffer, it implies a valid 
zero-byte record.
-            // We increment `decoded_rows` to mark progress and avoid an 
infinite loop.
-            // We add `consumed` (which can be 0) to `total_consumed`.
-            total_consumed += consumed;
-            self.decoded_rows += 1;
+        let hash_type = self.writer_schema_store.as_ref().map_or(
+            FingerprintAlgorithm::Rabin,
+            SchemaStore::fingerprint_algorithm,
+        );
+        while total_consumed < data.len() && self.remaining_capacity > 0 {
+            if let Some(prefix_bytes) = 
self.handle_prefix(&data[total_consumed..], hash_type)? {
+                // A batch is complete when its `remaining_capacity` is 0. It 
may be completed early if
+                // a schema change is detected or there are insufficient bytes 
to read the next prefix.
+                // A schema change requires a new batch.
+                total_consumed += prefix_bytes;
+                break;
+            }
+            let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
+            total_consumed += n;
+            self.remaining_capacity -= 1;
         }
         Ok(total_consumed)
     }
 
     /// Produce a `RecordBatch` if at least one row is fully decoded, returning
     /// `Ok(None)` if no new rows are available.
     pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
-        if self.decoded_rows == 0 {
-            Ok(None)
-        } else {
-            let batch = self.record_decoder.flush()?;
-            self.decoded_rows = 0;
-            Ok(Some(batch))
+        if self.remaining_capacity == self.batch_size {
+            return Ok(None);
+        }
+        let batch = self.active_decoder.flush()?;
+        self.remaining_capacity = self.batch_size;
+        // Apply a pending schema switch if one is staged
+        if let Some((new_fingerprint, new_decoder)) = 
self.pending_schema.take() {
+            // Cache the old decoder before replacing it
+            if let Some(old_fingerprint) = 
self.active_fingerprint.replace(new_fingerprint) {
+                let old_decoder = std::mem::replace(&mut self.active_decoder, 
new_decoder);
+                self.cache.shift_remove(&old_fingerprint);
+                self.cache.insert(old_fingerprint, old_decoder);
+                if self.cache.len() > self.max_cache_size {
+                    self.cache.shift_remove_index(0);
+                }
+            } else {
+                self.active_decoder = new_decoder;
+            }
+        }
+        Ok(Some(batch))
+    }
+
+    #[inline]
+    fn handle_prefix(
+        &mut self,
+        buf: &[u8],
+        hash_type: FingerprintAlgorithm,
+    ) -> Result<Option<usize>, ArrowError> {
+        if self.writer_schema_store.is_none() || 
!buf.starts_with(&SINGLE_OBJECT_MAGIC) {
+            return Ok(None);
+        }
+        let full_len = prefix_len(hash_type);
+        if buf.len() < full_len {
+            return Ok(Some(0)); // Not enough data to read the full fingerprint
+        }
+        let fp_bytes = &buf[2..full_len];
+        let new_fp = match hash_type {
+            FingerprintAlgorithm::Rabin => {
+                let Ok(bytes) = <[u8; 8]>::try_from(fp_bytes) else {
+                    return Err(ArrowError::ParseError(format!(
+                        "Invalid Rabin fingerprint length, expected 8, got {}",
+                        fp_bytes.len()
+                    )));
+                };
+                Fingerprint::Rabin(u64::from_le_bytes(bytes))
+            }
+        };

Review Comment:
   My main concern is the combination of split logic and magic constants 
produces a large bug surface. Hard to get the code correct, hard for others to 
convince themselves it's correct, and easy for future changes to break things 
if one part of the code ever gets out of sync.
   
   It's less clear to me whether that needs to be fixed in this PR or just 
"soon." And we wouldn't want to follow the specific approach I suggested, if it 
gets in the way of future changes you have in your head.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to