jecsand838 commented on code in PR #8006: URL: https://github.com/apache/arrow-rs/pull/8006#discussion_r2244531096
########## arrow-avro/src/reader/mod.rs: ########## @@ -154,39 +167,130 @@ impl Decoder { /// /// Returns the number of bytes consumed. pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> { + if self.active_fingerprint.is_none() + && self.writer_schema_store.is_some() + && !data.starts_with(&SINGLE_OBJECT_MAGIC) + { + return Err(ArrowError::ParseError( + "Expected single‑object encoding fingerprint prefix for first message \ + (writer_schema_store is set but active_fingerprint is None)" + .into(), + )); + } let mut total_consumed = 0usize; - while total_consumed < data.len() && self.decoded_rows < self.batch_size { - let consumed = self.record_decoder.decode(&data[total_consumed..], 1)?; - // A successful call to record_decoder.decode means one row was decoded. - // If `consumed` is 0 on a non-empty buffer, it implies a valid zero-byte record. - // We increment `decoded_rows` to mark progress and avoid an infinite loop. - // We add `consumed` (which can be 0) to `total_consumed`. - total_consumed += consumed; - self.decoded_rows += 1; + let hash_type = self.writer_schema_store.as_ref().map_or( + FingerprintAlgorithm::Rabin, + SchemaStore::fingerprint_algorithm, + ); + while total_consumed < data.len() && self.remaining_capacity > 0 { + if let Some(prefix_bytes) = self.handle_prefix(&data[total_consumed..], hash_type)? { + // A batch is complete when its `remaining_capacity` is 0. It may be completed early if + // a schema change is detected or there are insufficient bytes to read the next prefix. + // A schema change requires a new batch. + total_consumed += prefix_bytes; + break; + } + let n = self.active_decoder.decode(&data[total_consumed..], 1)?; + total_consumed += n; + self.remaining_capacity -= 1; } Ok(total_consumed) } /// Produce a `RecordBatch` if at least one row is fully decoded, returning /// `Ok(None)` if no new rows are available. pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> { - if self.decoded_rows == 0 { - Ok(None) - } else { - let batch = self.record_decoder.flush()?; - self.decoded_rows = 0; - Ok(Some(batch)) + if self.remaining_capacity == self.batch_size { + return Ok(None); + } + let batch = self.active_decoder.flush()?; + self.remaining_capacity = self.batch_size; + // Apply a pending schema switch if one is staged + if let Some((new_fingerprint, new_decoder)) = self.pending_schema.take() { + // Cache the old decoder before replacing it + if let Some(old_fingerprint) = self.active_fingerprint.replace(new_fingerprint) { + let old_decoder = std::mem::replace(&mut self.active_decoder, new_decoder); + self.cache.shift_remove(&old_fingerprint); + self.cache.insert(old_fingerprint, old_decoder); + if self.cache.len() > self.max_cache_size { + self.cache.shift_remove_index(0); + } + } else { + self.active_decoder = new_decoder; + } + } + Ok(Some(batch)) + } + + #[inline] + fn handle_prefix( + &mut self, + buf: &[u8], + hash_type: FingerprintAlgorithm, + ) -> Result<Option<usize>, ArrowError> { + if self.writer_schema_store.is_none() || !buf.starts_with(&SINGLE_OBJECT_MAGIC) { + return Ok(None); + } + let fp_bytes = &buf[2..]; // safe thanks to the `starts_with` check above + let new_fp = match hash_type { + FingerprintAlgorithm::Rabin => { + let Ok(bytes) = <[u8; 8]>::try_from(fp_bytes) else { Review Comment: This is extremely useful information. I'm going through this now. > but I don't know if that "extract an array from a slice at a given offset" pattern arises often enough in this crate to be worth creating a similar helper here? I haven't come across that yet in this crate, but if I do again I may as well make a helper. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org