splix commented on code in PR #530:
URL: https://github.com/apache/avro-rs/pull/530#discussion_r3293698579
##########
avro/src/reader/mod.rs:
##########
@@ -366,4 +406,111 @@ mod tests {
panic!("Expected an error in the reading of the codec!");
}
}
+
+ /// Write an Avro file with multiple blocks and verify we can seek between
them.
+ #[test]
+ fn test_seek_to_block() -> TestResult {
+ use crate::writer::Writer;
+
+ let schema = Schema::parse_str(SCHEMA)?;
+ let mut writer = Writer::new(&schema, Vec::new())?;
+
+ // Block 0: records with a=10, a=20
+ let mut r = Record::new(&schema).unwrap();
+ r.put("a", 10i64);
+ r.put("b", "b0_r0");
+ writer.append_value(r)?;
+ let mut r = Record::new(&schema).unwrap();
+ r.put("a", 20i64);
+ r.put("b", "b0_r1");
+ writer.append_value(r)?;
+ writer.flush()?;
+
+ // Block 1: records with a=30, a=40
+ let mut r = Record::new(&schema).unwrap();
+ r.put("a", 30i64);
+ r.put("b", "b1_r0");
+ writer.append_value(r)?;
+ let mut r = Record::new(&schema).unwrap();
+ r.put("a", 40i64);
+ r.put("b", "b1_r1");
+ writer.append_value(r)?;
+ writer.flush()?;
+
+ // Block 2: records with a=50
+ let mut r = Record::new(&schema).unwrap();
+ r.put("a", 50i64);
+ r.put("b", "b2_r0");
+ writer.append_value(r)?;
+ writer.flush()?;
+
+ let data = writer.into_inner()?;
+
+ // Read forward and collect block positions
+ let mut reader = Reader::new(Cursor::new(&data))?;
+ let mut block_offsets: Vec<BlockPosition> = Vec::new();
+ let mut all_values: Vec<Value> = Vec::new();
+
+ assert!(reader.current_block().is_none());
+
+ while let Some(value) = reader.next() {
+ all_values.push(value?);
+ let pos = reader.current_block().expect("block info after read");
+ if block_offsets
+ .last()
+ .is_none_or(|last| last.offset != pos.offset)
+ {
+ block_offsets.push(pos);
+ }
+ }
+
+ assert_eq!(all_values.len(), 5);
+ assert_eq!(block_offsets.len(), 3);
+ assert_eq!(block_offsets[0].message_count, 2);
+ assert_eq!(block_offsets[1].message_count, 2);
+ assert_eq!(block_offsets[2].message_count, 1);
+ assert_eq!(reader.data_start(), block_offsets[0].offset);
+
+ // Seek back to block 1 and read its records
+ reader.seek_to_block(block_offsets[1].offset)?;
+ let v1 = reader.next().unwrap()?;
+ assert_eq!(v1, all_values[2]);
+ let v2 = reader.next().unwrap()?;
+ assert_eq!(v2, all_values[3]);
+
+ // Seek back to block 0
+ reader.seek_to_block(block_offsets[0].offset)?;
+ let v0 = reader.next().unwrap()?;
+ assert_eq!(v0, all_values[0]);
+
+ // Seek to block 2
+ reader.seek_to_block(block_offsets[2].offset)?;
+ let v4 = reader.next().unwrap()?;
+ assert_eq!(v4, all_values[4]);
+
+ assert!(reader.next().is_none());
+
+ Ok(())
+ }
+
+ /// Seeking to an invalid offset should fail with a sync marker error.
+ #[test]
+ fn test_seek_to_invalid_offset() -> TestResult {
+ use crate::writer::Writer;
+
+ let schema = Schema::parse_str(SCHEMA)?;
+ let mut writer = Writer::new(&schema, Vec::new())?;
+ let mut r = Record::new(&schema).unwrap();
+ r.put("a", 1i64);
+ r.put("b", "x");
+ writer.append_value(r)?;
+ writer.flush()?;
+ let data = writer.into_inner()?;
+
+ let mut reader = Reader::new(Cursor::new(&data))?;
+ let result = reader.seek_to_block(7);
+ assert!(result.is_err());
+
Review Comment:
Both seek and read_block_next accept position after EOF. No errors, it's
just no messages.
I've added a test, but checking for `None`, instead of `Err`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]