This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 9f37816 fix: flush Writer on drop (#199)
9f37816 is described below
commit 9f37816e667b63f9dd4a16d9c01cbad3b65505f2
Author: Kriskras99 <[email protected]>
AuthorDate: Fri May 30 23:10:23 2025 +0200
fix: flush Writer on drop (#199)
* fix: flush Writer on drop
This brings the writer in line with standard library types like `BufWriter`
and `LineWriter`.
This is a breaking change when the inner writer is `&mut W` and used later
in the scope without dropping the writer or calling `into_inner` (as seen in
the fixed tests).
* Drop the Avro `writer` so that its inner `Write` impl can be used again
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
* Update the comments why the writer is dropped manually
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
---------
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Co-authored-by: kriskras99 <[email protected]>
Co-authored-by: Martin Tzvetanov Grigorov <[email protected]>
---
avro/src/writer.rs | 29 +++++++++++++++++++++++++++--
avro/tests/to_from_avro_datum_schemata.rs | 3 ++-
avro/tests/union_schema.rs | 1 +
3 files changed, 30 insertions(+), 3 deletions(-)
diff --git a/avro/src/writer.rs b/avro/src/writer.rs
index a1e87dc..2f3b235 100644
--- a/avro/src/writer.rs
+++ b/avro/src/writer.rs
@@ -25,12 +25,18 @@ use crate::{
AvroResult, Codec, Error,
};
use serde::Serialize;
-use std::{collections::HashMap, io::Write, marker::PhantomData,
ops::RangeInclusive};
+use std::{
+ collections::HashMap, io::Write, marker::PhantomData, mem::ManuallyDrop,
ops::RangeInclusive,
+};
const DEFAULT_BLOCK_SIZE: usize = 16000;
const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
/// Main interface for writing Avro formatted values.
+///
+/// It is critical to call flush before `Writer<W>` is dropped. Though
dropping will attempt to flush
+/// the contents of the buffer, any errors that happen in the process of
dropping will be ignored.
+/// Calling flush ensures that the buffer is empty and thus dropping will not
even attempt file operations.
#[derive(bon::Builder)]
pub struct Writer<'a, W: Write> {
schema: &'a Schema,
@@ -348,7 +354,18 @@ impl<'a, W: Write> Writer<'a, W> {
pub fn into_inner(mut self) -> AvroResult<W> {
self.maybe_write_header()?;
self.flush()?;
- Ok(self.writer)
+
+ let mut this = ManuallyDrop::new(self);
+
+ // Extract every member that is not Copy and therefore should be
dropped
+ let _resolved_schema = std::mem::take(&mut this.resolved_schema);
+ let _buffer = std::mem::take(&mut this.buffer);
+ let _user_metadata = std::mem::take(&mut this.user_metadata);
+
+ // SAFETY: double-drops are prevented by putting `this` in a
ManuallyDrop that is never dropped
+ let writer = unsafe { std::ptr::read(&this.writer) };
+
+ Ok(writer)
}
/// Gets a reference to the underlying writer.
@@ -459,6 +476,14 @@ impl<'a, W: Write> Writer<'a, W> {
}
}
+impl<'a, W: Write> Drop for Writer<'a, W> {
+ /// Drop the writer, will try to flush ignoring any errors.
+ fn drop(&mut self) {
+ let _ = self.maybe_write_header();
+ let _ = self.flush();
+ }
+}
+
/// Encode a compatible value (implementing the `ToAvro` trait) into Avro
format, also performing
/// schema validation.
///
diff --git a/avro/tests/to_from_avro_datum_schemata.rs
b/avro/tests/to_from_avro_datum_schemata.rs
index 6f2d595..9986cbb 100644
--- a/avro/tests/to_from_avro_datum_schemata.rs
+++ b/avro/tests/to_from_avro_datum_schemata.rs
@@ -110,9 +110,10 @@ fn test_avro_3683_multiple_schemata_writer_reader() ->
TestResult {
let mut writer = Writer::with_schemata(schema_b, schemata.clone(), &mut
output, Codec::Null);
writer.append(record.clone())?;
writer.flush()?;
+ drop(writer); //drop the writer so that `output` is no more referenced
mutably
let reader = Reader::with_schemata(schema_b, schemata, output.as_slice())?;
- let value = reader.into_iter().next().unwrap().unwrap();
+ let value = reader.into_iter().next().unwrap()?;
assert_eq!(value, record);
Ok(())
diff --git a/avro/tests/union_schema.rs b/avro/tests/union_schema.rs
index 106b062..db0f356 100644
--- a/avro/tests/union_schema.rs
+++ b/avro/tests/union_schema.rs
@@ -74,6 +74,7 @@ where
Writer::with_schemata(schema, schemata.iter().collect(), &mut encoded,
Codec::Null);
writer.append_ser(input)?;
writer.flush()?;
+ drop(writer); //drop the writer so that `encoded` is no more referenced
mutably
let mut reader = Reader::with_schemata(schema, schemata.iter().collect(),
encoded.as_slice())?;
from_value::<T>(&reader.next().expect("")?)