This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 597db4f fix: Also (maybe) write header on `Writer::flush`. (#222)
597db4f is described below
commit 597db4f434a2f1d32eaf7f7a01bd23e24a523683
Author: Kriskras99 <[email protected]>
AuthorDate: Wed Jul 2 09:33:10 2025 +0200
fix: Also (maybe) write header on `Writer::flush`. (#222)
---
avro/src/writer.rs | 52 +++++++++++++++++++++++++++++++++++++++++-----------
1 file changed, 41 insertions(+), 11 deletions(-)
diff --git a/avro/src/writer.rs b/avro/src/writer.rs
index 413e60a..e043425 100644
--- a/avro/src/writer.rs
+++ b/avro/src/writer.rs
@@ -53,6 +53,9 @@ pub struct Writer<'a, W: Write> {
num_values: usize,
#[builder(default = generate_sync_marker())]
marker: [u8; 16],
+ /// Has the header already been written.
+ ///
+ /// To disable writing the header, this can be set to `true`.
#[builder(default = false)]
has_header: bool,
#[builder(default)]
@@ -152,7 +155,7 @@ impl<'a, W: Write> Writer<'a, W> {
/// Append a compatible value (implementing the `ToAvro` trait) to a
`Writer`, also performing
/// schema validation.
///
- /// Return the number of bytes written (it might be 0, see below).
+ /// Returns the number of bytes written (it might be 0, see below).
///
/// **NOTE**: This function is not guaranteed to perform any actual write,
since it relies on
/// internal buffering for performance reasons. If you want to be sure the
value has been
@@ -166,7 +169,7 @@ impl<'a, W: Write> Writer<'a, W> {
/// Append a compatible value to a `Writer`, also performing schema
validation.
///
- /// Return the number of bytes written (it might be 0, see below).
+ /// Returns the number of bytes written (it might be 0, see below).
///
/// **NOTE**: This function is not guaranteed to perform any actual write,
since it relies on
/// internal buffering for performance reasons. If you want to be sure the
value has been
@@ -198,7 +201,7 @@ impl<'a, W: Write> Writer<'a, W> {
/// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also
performing schema
/// validation.
///
- /// Return the number of bytes written.
+ /// Returns the number of bytes written.
///
/// **NOTE**: This function is not guaranteed to perform any actual write,
since it relies on
/// internal buffering for performance reasons. If you want to be sure the
value has been
@@ -234,7 +237,7 @@ impl<'a, W: Write> Writer<'a, W> {
/// Extend a `Writer` with an `Iterator` of compatible values
(implementing the `ToAvro`
/// trait), also performing schema validation.
///
- /// Return the number of bytes written.
+ /// Returns the number of bytes written.
///
/// **NOTE**: This function forces the written data to be flushed (an
implicit
/// call to [`flush`](Writer::flush) is performed).
@@ -269,7 +272,7 @@ impl<'a, W: Write> Writer<'a, W> {
/// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also
performing schema
/// validation.
///
- /// Return the number of bytes written.
+ /// Returns the number of bytes written.
///
/// **NOTE**: This function forces the written data to be flushed (an
implicit
/// call to [`flush`](Writer::flush) is performed).
@@ -303,7 +306,7 @@ impl<'a, W: Write> Writer<'a, W> {
/// Extend a `Writer` by appending each `Value` from a slice, while also
performing schema
/// validation on each value appended.
///
- /// Return the number of bytes written.
+ /// Returns the number of bytes written.
///
/// **NOTE**: This function forces the written data to be flushed (an
implicit
/// call to [`flush`](Writer::flush) is performed).
@@ -317,13 +320,17 @@ impl<'a, W: Write> Writer<'a, W> {
Ok(num_bytes)
}
- /// Flush the content appended to a `Writer`. Call this function to make
sure all the content
- /// has been written before releasing the `Writer`.
+ /// Flush the content to the inner `Writer`.
+ ///
+ /// Call this function to make sure all the content has been written
before releasing the `Writer`.
+ /// This will also write the header if it wasn't written yet and hasn't
been disabled using
+ /// [`WriterBuilder::has_header`].
///
- /// Return the number of bytes written.
+ /// Returns the number of bytes written.
pub fn flush(&mut self) -> AvroResult<usize> {
+ let mut num_bytes = self.maybe_write_header()?;
if self.num_values == 0 {
- return Ok(0);
+ return Ok(num_bytes);
}
self.codec.compress(&mut self.buffer)?;
@@ -331,7 +338,7 @@ impl<'a, W: Write> Writer<'a, W> {
let num_values = self.num_values;
let stream_len = self.buffer.len();
- let num_bytes = self.append_raw(&num_values.into(), &Schema::Long)?
+ num_bytes += self.append_raw(&num_values.into(), &Schema::Long)?
+ self.append_raw(&stream_len.into(), &Schema::Long)?
+ self
.writer
@@ -840,6 +847,29 @@ mod tests {
Ok(())
}
+ #[test]
+ fn avro_rs_220_flush_write_header() -> TestResult {
+ let schema = Schema::parse_str(SCHEMA)?;
+
+ // By default flush should write the header even if nothing was added
yet
+ let mut writer = Writer::new(&schema, Vec::new());
+ writer.flush()?;
+ let result = writer.into_inner()?;
+ assert_eq!(result.len(), 163);
+
+ // Unless the user indicates via the builder that the header has
already been written
+ let mut writer = Writer::builder()
+ .has_header(true)
+ .schema(&schema)
+ .writer(Vec::new())
+ .build();
+ writer.flush()?;
+ let result = writer.into_inner()?;
+ assert_eq!(result.len(), 0);
+
+ Ok(())
+ }
+
#[test]
fn test_union_not_null() -> TestResult {
let schema = Schema::parse_str(UNION_SCHEMA)?;