jecsand838 commented on code in PR #8006:
URL: https://github.com/apache/arrow-rs/pull/8006#discussion_r2246957540
##########
arrow-avro/src/reader/mod.rs:
##########
@@ -154,39 +167,130 @@ impl Decoder {
///
/// Returns the number of bytes consumed.
pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+ if self.active_fingerprint.is_none()
+ && self.writer_schema_store.is_some()
+ && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+ {
+ return Err(ArrowError::ParseError(
+ "Expected single‑object encoding fingerprint prefix for first
message \
+ (writer_schema_store is set but active_fingerprint is
None)"
+ .into(),
+ ));
+ }
let mut total_consumed = 0usize;
- while total_consumed < data.len() && self.decoded_rows <
self.batch_size {
- let consumed = self.record_decoder.decode(&data[total_consumed..],
1)?;
- // A successful call to record_decoder.decode means one row was
decoded.
- // If `consumed` is 0 on a non-empty buffer, it implies a valid
zero-byte record.
- // We increment `decoded_rows` to mark progress and avoid an
infinite loop.
- // We add `consumed` (which can be 0) to `total_consumed`.
- total_consumed += consumed;
- self.decoded_rows += 1;
+ let hash_type = self.writer_schema_store.as_ref().map_or(
+ FingerprintAlgorithm::Rabin,
+ SchemaStore::fingerprint_algorithm,
+ );
+ while total_consumed < data.len() && self.remaining_capacity > 0 {
+ if let Some(prefix_bytes) =
self.handle_prefix(&data[total_consumed..], hash_type)? {
+ // A batch is complete when its `remaining_capacity` is 0. It
may be completed early if
+ // a schema change is detected or there are insufficient bytes
to read the next prefix.
+ // A schema change requires a new batch.
+ total_consumed += prefix_bytes;
+ break;
+ }
+ let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
+ total_consumed += n;
+ self.remaining_capacity -= 1;
}
Ok(total_consumed)
}
/// Produce a `RecordBatch` if at least one row is fully decoded, returning
/// `Ok(None)` if no new rows are available.
pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
- if self.decoded_rows == 0 {
- Ok(None)
- } else {
- let batch = self.record_decoder.flush()?;
- self.decoded_rows = 0;
- Ok(Some(batch))
+ if self.remaining_capacity == self.batch_size {
+ return Ok(None);
+ }
+ let batch = self.active_decoder.flush()?;
+ self.remaining_capacity = self.batch_size;
+ // Apply a pending schema switch if one is staged
+ if let Some((new_fingerprint, new_decoder)) =
self.pending_schema.take() {
+ // Cache the old decoder before replacing it
+ if let Some(old_fingerprint) =
self.active_fingerprint.replace(new_fingerprint) {
+ let old_decoder = std::mem::replace(&mut self.active_decoder,
new_decoder);
+ self.cache.shift_remove(&old_fingerprint);
+ self.cache.insert(old_fingerprint, old_decoder);
+ if self.cache.len() > self.max_cache_size {
+ self.cache.shift_remove_index(0);
+ }
+ } else {
+ self.active_decoder = new_decoder;
+ }
+ }
+ Ok(Some(batch))
+ }
+
+ #[inline]
+ fn handle_prefix(
+ &mut self,
+ buf: &[u8],
+ hash_type: FingerprintAlgorithm,
+ ) -> Result<Option<usize>, ArrowError> {
+ if self.writer_schema_store.is_none() ||
!buf.starts_with(&SINGLE_OBJECT_MAGIC) {
+ return Ok(None);
Review Comment:
@scovich I'm finishing these changes now and getting close to pushing up.
> From what I can tell, this whole operation should actually be infallible
There's actually one scenario this operation is fallible imo, and that's if
the `Fingerprint` isn't found in the `SchemaStore`, i.e.
```rust
let writer_schema = store.lookup(&new_fingerprint).ok_or_else(|| {
ArrowError::ParseError(format!("Unknown fingerprint:
{new_fingerprint:?}"))
})?;
```
The soonest we could get the new `Fingerprint` is in either
`handle_fingerprint` or `handle_prefix` (not as clean imo). A `SchemaStore`
missing an otherwise valid `Fingerprint` should propagate an error back to the
caller. This will allow the caller to then recreate the `Decoder` with an
updated `SchemaStore`, etc.
As a side note, I've been toying with the idea of defining a `SchemaStore`
trait. This would enable users to create their own `SchemaStore`
implementations, including ones which could stay up to date with a Schema
Registry, etc. There maybe some challenges with managing the lifetimes, but I
think it's doable. I'm curious what you think of the idea?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]