This is an automated email from the ASF dual-hosted git repository.
kriskras99 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro-rs.git
The following commit(s) were added to refs/heads/main by this push:
new d57d3d4 feat: Add support for resetting a Writer back by a clearable
buffer (#469)
d57d3d4 is described below
commit d57d3d47f456c60293f6518f638b060508b1c6e5
Author: Kriskras99 <[email protected]>
AuthorDate: Thu Feb 19 10:58:48 2026 +0100
feat: Add support for resetting a Writer back by a clearable buffer (#469)
* feat: Add support for resetting a Writer back by a clearable buffer
* Apply suggestions from code review
Co-authored-by: Martin Grigorov <[email protected]>
* fix: Regenerate sync marker on `Writer` reset
---------
Co-authored-by: Martin Grigorov <[email protected]>
---
avro/src/lib.rs | 4 +-
avro/src/writer.rs | 122 ++++++++++++++++++++++++++++++++++++++++++++++++++---
2 files changed, 118 insertions(+), 8 deletions(-)
diff --git a/avro/src/lib.rs b/avro/src/lib.rs
index 739c5b5..d166c51 100644
--- a/avro/src/lib.rs
+++ b/avro/src/lib.rs
@@ -98,8 +98,8 @@ pub use schema::Schema;
pub use serde::{AvroSchema, AvroSchemaComponent, from_value, to_value};
pub use uuid::Uuid;
pub use writer::{
- GenericSingleObjectWriter, SpecificSingleObjectWriter, Writer,
WriterBuilder, to_avro_datum,
- to_avro_datum_schemata, write_avro_datum_ref,
+ Clearable, GenericSingleObjectWriter, SpecificSingleObjectWriter, Writer,
WriterBuilder,
+ to_avro_datum, to_avro_datum_schemata, write_avro_datum_ref,
};
#[cfg(feature = "derive")]
diff --git a/avro/src/writer.rs b/avro/src/writer.rs
index c8e02c8..61c02de 100644
--- a/avro/src/writer.rs
+++ b/avro/src/writer.rs
@@ -542,6 +542,66 @@ impl<'a, W: Write> Writer<'a, W> {
}
}
+/// A buffer that can be cleared.
+pub trait Clearable {
+ /// Clear the buffer.
+ fn clear(&mut self);
+}
+
+impl Clearable for Vec<u8> {
+ fn clear(&mut self) {
+ Vec::clear(self);
+ }
+}
+
+impl<'a, W: Clearable + Write> Writer<'a, W> {
+ /// Reset the writer.
+ ///
+ /// This will clear the underlying writer, the internal buffer, and the
user metadata.
+ /// It will also generate a new sync marker.
+ ///
+ /// # Example
+ /// ```
+ /// # use apache_avro::{Writer, Schema, Error};
+ /// # let schema = Schema::Boolean;
+ /// # let values = [true, false];
+ /// # fn send(_: &Vec<u8>) {}
+ /// let mut writer = Writer::new(&schema, Vec::new())?;
+ ///
+ /// // Write some values
+ /// for value in values {
+ /// writer.append_value(value)?;
+ /// }
+ ///
+ /// // Flush the buffer and only then do something with buffer
+ /// writer.flush()?;
+ /// send(writer.get_ref());
+ ///
+ /// // Reset the writer
+ /// writer.reset();
+ ///
+ /// // Write some values again
+ /// for value in values {
+ /// writer.append_value(value)?;
+ /// }
+ ///
+ /// # Ok::<(), Error>(())
+ /// ```
+ ///
+ /// # Warning
+ /// Any data that has been appended but not yet flushed will be silently
+ /// discarded. Call [`flush`](Writer::flush) before `reset()` if you need
+ /// to preserve in-flight records.
+ pub fn reset(&mut self) {
+ self.buffer.clear();
+ self.writer.clear();
+ self.has_header = false;
+ self.num_values = 0;
+ self.user_metadata.clear();
+ self.marker = generate_sync_marker();
+ }
+}
+
impl<W: Write> Drop for Writer<'_, W> {
/// Drop the writer, will try to flush ignoring any errors.
fn drop(&mut self) {
@@ -809,12 +869,7 @@ pub fn to_avro_datum_schemata<T: Into<Value>>(
#[cfg(not(target_arch = "wasm32"))]
fn generate_sync_marker() -> [u8; 16] {
- let mut marker = [0_u8; 16];
- std::iter::repeat_with(rand::random)
- .take(16)
- .enumerate()
- .for_each(|(i, n)| marker[i] = n);
- marker
+ rand::random()
}
#[cfg(target_arch = "wasm32")]
@@ -1858,4 +1913,59 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn avro_rs_469_reset_writer() -> TestResult {
+ let schema = Schema::Boolean;
+ let values = [true, false, true, false];
+ let mut writer = Writer::new(&schema, Vec::new())?;
+
+ for value in values {
+ writer.append_value(value)?;
+ }
+
+ writer.flush()?;
+ let first_buffer = writer.get_ref().clone();
+
+ writer.reset();
+ assert_eq!(writer.get_ref().len(), 0);
+
+ for value in values {
+ writer.append_value(value)?;
+ }
+
+ writer.flush()?;
+ let second_buffer = writer.get_ref().clone();
+ assert_eq!(first_buffer.len(), second_buffer.len());
+ // File structure:
+ // Header: ? bytes
+ // Sync marker: 16 bytes
+ // Data: 6 bytes
+ // Sync marker: 16 bytes
+ let len = first_buffer.len();
+ let header = len - 16 - 6 - 16;
+ let data = header + 16;
+ assert_eq!(
+ first_buffer[..header],
+ second_buffer[..header],
+ "Written header must be the same, excluding sync marker"
+ );
+ assert_ne!(
+ first_buffer[header..data],
+ second_buffer[header..data],
+ "Sync markers should be different"
+ );
+ assert_eq!(
+ first_buffer[data..data + 6],
+ second_buffer[data..data + 6],
+ "Written data must be the same"
+ );
+ assert_ne!(
+ first_buffer[len - 16..],
+ second_buffer[len - 16..],
+ "Sync markers should be different"
+ );
+
+ Ok(())
+ }
}