scovich commented on code in PR #8100: URL: https://github.com/apache/arrow-rs/pull/8100#discussion_r2266533676
########## arrow-avro/src/reader/mod.rs: ########## @@ -173,36 +173,29 @@ impl Decoder { pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> { let mut total_consumed = 0usize; while total_consumed < data.len() && self.remaining_capacity > 0 { - if !self.awaiting_body { - if let Some(n) = self.handle_prefix(&data[total_consumed..])? { - if n == 0 { - break; - } - total_consumed += n; - self.awaiting_body = true; - self.apply_pending_schema_if_batch_empty(); - if self.remaining_capacity == 0 { - break; + if self.awaiting_body { + match self.active_decoder.decode(&data[total_consumed..], 1) { + Ok(n) => { + self.remaining_capacity -= 1; + total_consumed += n; + self.awaiting_body = false; + continue; } - } + Err(ref e) if is_incomplete_data(e) => return Ok(total_consumed), Review Comment: ```suggestion Err(ref e) if is_incomplete_data(e) => break, ``` ########## arrow-avro/src/reader/mod.rs: ########## @@ -130,6 +129,16 @@ fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> { }) } +fn is_incomplete_data(err: &ArrowError) -> bool { + matches!( + err, + ArrowError::ParseError(msg) + if msg.contains("Unexpected EOF") + || msg.contains("bad varint") + || msg.contains("offset overflow") Review Comment: Hmm, after thinking more about this over the weekend -- Trying to interpret/suppress these errors will almost certainly make the decoder brittle in the face of malformed input bytes that legitimately trigger these errors. For example, we could put the decoder in an infinite loop where it keeps trying to fetch more and more bytes in hopes of eliminating the error, when the error is fully contained in the existing buffer. ########## arrow-avro/src/reader/mod.rs: ########## @@ -270,6 +271,24 @@ impl Decoder { self.active_decoder = new_decoder; } } + } + + fn apply_pending_schema_if_batch_empty(&mut self) { + if self.remaining_capacity != self.batch_size { + return; + } + self.apply_pending_schema(); + } + + /// Produce a `RecordBatch` if at least one row is fully decoded, returning + /// `Ok(None)` if no new rows are available. + pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> { + if self.remaining_capacity == self.batch_size { + return Ok(None); + } + let batch = self.active_decoder.flush()?; + self.remaining_capacity = self.batch_size; + self.apply_pending_schema(); Review Comment: `flush` and `flush_block` are identical except this call to `self.apply_pending_schema`? Is there a way to deduplicate the code? Maybe a `flush_internal` that takes a boolean argument (which the compiler would aggressively inline away as if it were a generic parameter)? Or just call `self.apply_pending_schema` unconditionally, knowing it should be a no-op during block decoding because `self.pending_schema` is always None? ########## arrow-avro/src/reader/mod.rs: ########## @@ -270,6 +271,24 @@ impl Decoder { self.active_decoder = new_decoder; } } + } + + fn apply_pending_schema_if_batch_empty(&mut self) { + if self.remaining_capacity != self.batch_size { + return; + } + self.apply_pending_schema(); Review Comment: Also, we have quite a few places that could benefit from a small helper method: ```rust fn batch_is_empty(&self) -> bool { self.remaining_capacity == self.batch_size } ``` ... which could improve readability, e.g. ```suggestion if self.batch_is_empty() { self.apply_pending_schema(); } ``` ########## arrow-avro/src/reader/mod.rs: ########## @@ -270,6 +271,24 @@ impl Decoder { self.active_decoder = new_decoder; } } + } + + fn apply_pending_schema_if_batch_empty(&mut self) { + if self.remaining_capacity != self.batch_size { + return; + } + self.apply_pending_schema(); Review Comment: ```suggestion if self.remaining_capacity == self.batch_size { self.apply_pending_schema(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org