scovich commented on code in PR #7966:
URL: https://github.com/apache/arrow-rs/pull/7966#discussion_r2217441671
##########
arrow-avro/src/reader/mod.rs:
##########
@@ -365,11 +371,7 @@ impl<R: BufRead> Reader<R> {
}
// Try to decode more rows from the current block.
let consumed =
self.decoder.decode(&self.block_data[self.block_cursor..])?;
- if consumed == 0 && self.block_cursor < self.block_data.len() {
- self.block_cursor = self.block_data.len();
Review Comment:
I think this check was never needed? Because eventually the outer loop's
`self.decoder.batch_is_full()` check would fail and break the loop?
##########
arrow-avro/src/reader/mod.rs:
##########
@@ -155,11 +155,17 @@ impl Decoder {
/// Returns the number of bytes consumed.
pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
let mut total_consumed = 0usize;
- while total_consumed < data.len() && self.decoded_rows <
self.batch_size {
- let consumed = self.record_decoder.decode(&data[total_consumed..],
1)?;
- if consumed == 0 {
+ while self.decoded_rows < self.batch_size {
+ let buffer = &data[total_consumed..];
+ if buffer.is_empty() {
+ // No more data to process.
break;
}
+ let consumed = self.record_decoder.decode(buffer, 1)?;
+ // A successful call to record_decoder.decode means one row was
decoded.
+ // If `consumed` is 0 on a non-empty buffer, it implies a valid
zero-byte record.
+ // We increment `decoded_rows` to mark progress and avoid an
infinite loop.
+ // We add `consumed` (which can be 0) to `total_consumed`.
Review Comment:
AFAICT, the old and new versions of this code are logically equivalent? And
the original `consumed == 0` check was actually unnecessary because the row
count increased after every `record_decoder.decode` call, which would cause the
loop to eventually terminate? Seems like we just want a simplified version of
the original code?
```rust
let mut total_consumed = 0usize;
while total_consumed < data.len() && self.decoded_rows <
self.batch_size {
let consumed =
self.record_decoder.decode(&data[total_consumed..], 1)?;
// A successful call to record_decoder.decode means one row was
decoded.
// If `consumed` is 0 on a non-empty buffer, it implies a valid
zero-byte record.
// We increment `decoded_rows` to mark progress and avoid an
infinite loop.
// We add `consumed` (which can be 0) to `total_consumed`.
total_consumed += consumed;
self.decoded_rows += 1;
}
```
or maybe (with `mut data: &[u8]`):
```rust
while !data.is_empty() && self.decoded_rows < self.batch_size {
// A successful call to record_decoder.decode means one row was
decoded.
// If `consumed` is 0 on a non-empty buffer, it implies a valid
zero-byte record.
// We advance the data slice by `consumed` (which can be 0)
// We increment `decoded_rows` to mark progress and avoid an
infinite loop.
let consumed = self.record_decoder.decode(data, 1)?;
data = &data[consumed..];
self.decoded_rows += 1;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]