This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push: new d4f1cfad79 Implement Improved arrow-avro Reader Zero-Byte Record Handling (#7966) d4f1cfad79 is described below commit d4f1cfad79ee38e65d8c92982616e5facd463c52 Author: Connor Sanders <con...@elastiflow.com> AuthorDate: Tue Jul 22 16:42:23 2025 -0500 Implement Improved arrow-avro Reader Zero-Byte Record Handling (#7966) # Which issue does this PR close? - Part of https://github.com/apache/arrow-rs/issues/4886 - Follow up to https://github.com/apache/arrow-rs/pull/7834 # Rationale for this change The initial Avro reader implementation contained an under-developed and temporary safeguard to prevent infinite loops when processing records that consumed zero bytes from the input buffer. When the `Decoder` reported that zero bytes were consumed, the `Reader` would advance it's cursor to the end of the current data block. While this successfully prevented an infinite loop, it had the critical side effect of silently discarding any remaining data in that block, leading to potential data loss. This change enhances the decoding logic to handle these zero-byte values correctly, ensuring that the `Reader` makes proper progress without dropping data and without risking an infinite loop. # What changes are included in this PR? - **Refined Decoder Logic**: The `Decoder` has been updated to accurately track and report the number of bytes consumed for all values, including valid zero-length records like `null` or empty `bytes`. This ensures the decoder always makes forward progress. - **Removal of Data-Skipping Safeguard**: The logic in the `Reader` that previously advanced to the end of a block on a zero-byte read has been removed. The reader now relies on the decoder to report accurate consumption and advances its cursor incrementally and safely. - * New integration test using a temporary `zero_byte.avro` file created via this python script: https://gist.github.com/jecsand838/e57647d0d12853f3cf07c350a6a40395 # Are these changes tested? Yes, a new `test_read_zero_byte_avro_file` test was added that reads the new `zero_byte.avro` file and confirms the update. # Are there any user-facing changes? N/A # Follow-Up PRs 1. PR to update `test_read_zero_byte_avro_file` once https://github.com/apache/arrow-testing/pull/109 is merged in. --- arrow-avro/src/reader/mod.rs | 36 ++++++++++++++++++++++++++++-------- arrow-avro/test/data/zero_byte.avro | Bin 0 -> 210 bytes 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs index b98777d3d7..02d3f49aa1 100644 --- a/arrow-avro/src/reader/mod.rs +++ b/arrow-avro/src/reader/mod.rs @@ -157,9 +157,10 @@ impl Decoder { let mut total_consumed = 0usize; while total_consumed < data.len() && self.decoded_rows < self.batch_size { let consumed = self.record_decoder.decode(&data[total_consumed..], 1)?; - if consumed == 0 { - break; - } + // A successful call to record_decoder.decode means one row was decoded. + // If `consumed` is 0 on a non-empty buffer, it implies a valid zero-byte record. + // We increment `decoded_rows` to mark progress and avoid an infinite loop. + // We add `consumed` (which can be 0) to `total_consumed`. total_consumed += consumed; self.decoded_rows += 1; } @@ -364,11 +365,7 @@ impl<R: BufRead> Reader<R> { } // Try to decode more rows from the current block. let consumed = self.decoder.decode(&self.block_data[self.block_cursor..])?; - if consumed == 0 && self.block_cursor < self.block_data.len() { - self.block_cursor = self.block_data.len(); - } else { - self.block_cursor += consumed; - } + self.block_cursor += consumed; } self.decoder.flush() } @@ -499,6 +496,29 @@ mod test { assert!(batch.column(0).as_any().is::<StringViewArray>()); } + #[test] + fn test_read_zero_byte_avro_file() { + let batch = read_file("test/data/zero_byte.avro", 3, false); + let schema = batch.schema(); + assert_eq!(schema.fields().len(), 1); + let field = schema.field(0); + assert_eq!(field.name(), "data"); + assert_eq!(field.data_type(), &DataType::Binary); + assert!(field.is_nullable()); + assert_eq!(batch.num_rows(), 3); + assert_eq!(batch.num_columns(), 1); + let binary_array = batch + .column(0) + .as_any() + .downcast_ref::<BinaryArray>() + .unwrap(); + assert!(binary_array.is_null(0)); + assert!(binary_array.is_valid(1)); + assert_eq!(binary_array.value(1), b""); + assert!(binary_array.is_valid(2)); + assert_eq!(binary_array.value(2), b"some bytes"); + } + #[test] fn test_alltypes() { let files = [ diff --git a/arrow-avro/test/data/zero_byte.avro b/arrow-avro/test/data/zero_byte.avro new file mode 100644 index 0000000000..f7ffd29b68 Binary files /dev/null and b/arrow-avro/test/data/zero_byte.avro differ