martin-g commented on code in PR #530:
URL: https://github.com/apache/avro-rs/pull/530#discussion_r3038210217
##########
avro/src/reader/block.rs:
##########
@@ -35,10 +35,57 @@ use crate::{
util,
};
+/// Byte offset and record count of a single Avro data block.
+///
+/// Captured automatically as blocks are read during forward iteration.
+/// Use with [`super::Reader::seek_to_block`] to jump back to a
previously-read block.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct BlockPosition {
+ /// Byte offset in the stream where this block starts (before the
object-count varint).
+ pub offset: u64,
+ /// Total number of records in this block.
+ pub message_count: usize,
+}
+
+/// Wraps an inner reader and tracks the current byte position.
+///
+/// Avoids requiring `Seek` just to know how many bytes have been consumed.
+/// When the inner reader also implements `Seek`, seeking updates the tracked
position.
+#[derive(Debug, Clone)]
+struct PositionTracker<R> {
+ inner: R,
+ pos: u64,
+}
+
+impl<R> PositionTracker<R> {
+ fn new(inner: R) -> Self {
+ Self { inner, pos: 0 }
+ }
+
+ fn position(&self) -> u64 {
+ self.pos
+ }
+}
+
+impl<R: Read> Read for PositionTracker<R> {
+ fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+ let n = self.inner.read(buf)?;
+ self.pos += n as u64;
+ Ok(n)
+ }
Review Comment:
The implementation assumes that `read_buf()`, `read_exact()`,
`read_vectored()`, ... use their default impls and delegate to `read()`. But if
they are overwritten then the tracking breaks.
##########
avro/src/reader/mod.rs:
##########
@@ -159,6 +164,41 @@ impl<R: Read> Iterator for Reader<'_, R> {
}
}
+impl<R: Read> Reader<'_, R> {
+ /// The currently loaded block's position and record count.
+ ///
+ /// Returns `None` only before the first block is loaded (via iteration or
+ /// [`seek_to_block`](Self::seek_to_block)). Always `Some` afterward.
+ pub fn current_block(&self) -> Option<BlockPosition> {
+ self.block.current_block_info
+ }
+
+ /// Byte offset where data blocks begin (right after the file header).
+ ///
+ /// This is the offset of the first data block — equivalent to the position
+ /// that would be returned by `current_block().offset` for block 0.
+ pub fn data_start(&self) -> u64 {
+ self.block.data_start
+ }
+}
+
+impl<R: Read + Seek> Reader<'_, R> {
+ /// Seek to the data block at the given byte offset and load it.
+ ///
+ /// The offset must point to the start of a valid data block (before its
+ /// object-count varint). The block is read, decompressed, and its sync
+ /// marker is validated against the file header. After this call,
[`Iterator::next`]
+ /// yields the first record in that block.
+ ///
+ /// Typically the caller saves offsets from
[`current_block`](Self::current_block)
+ /// during forward iteration and later passes them here to jump back.
+ pub fn seek_to_block(&mut self, offset: u64) -> AvroResult<()> {
+ self.block.seek_to_block(offset)?;
Review Comment:
If this fails then `self.errored` should be set to `true`, no ?
##########
avro/src/reader/block.rs:
##########
@@ -156,6 +211,11 @@ impl<'r, R: Read> Block<'r, R> {
return Err(Details::GetBlockMarker.into());
}
+ self.current_block_info = Some(BlockPosition {
+ offset: block_start,
+ message_count: block_len as usize,
Review Comment:
```suggestion
message_count: self.message_count,
```
##########
avro/src/reader/block.rs:
##########
@@ -295,6 +355,36 @@ impl<'r, R: Read> Block<'r, R> {
}
}
+impl<R: Read + Seek> Block<'_, R> {
+ /// Seek the underlying stream to `offset` and read the block there.
+ /// Validates the sync marker to confirm it's a real block boundary.
+ /// Returns an error if no valid block can be read at the offset
+ /// (e.g., the offset is at or past EOF).
+ pub(super) fn seek_to_block(&mut self, offset: u64) -> AvroResult<()> {
+ self.reader
+ .seek(SeekFrom::Start(offset))
+ .map_err(Details::SeekToBlock)?;
+
+ self.buf.clear();
+ self.buf_idx = 0;
+ self.message_count = 0;
+ self.current_block_info = None;
+
+ // read_block_next treats UnexpectedEof as a clean end-of-stream
+ // (returns Ok with message_count=0). That's correct for forward
+ // iteration but wrong here — the caller asked for a specific block.
+ self.read_block_next()?;
+ if self.is_empty() {
Review Comment:
The Avro spec does not disallow empty blocks.
IIRC it happens when a `types::Value::Null` is flushed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]