This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 0868a8b AVRO-4063: Call `flush` on the inner `writer` during
`Writer::flush` (#14)
0868a8b is described below
commit 0868a8b2d6aa90070bcb6a3388555746d3e44546
Author: Jane Lewis <[email protected]>
AuthorDate: Wed Oct 2 02:35:26 2024 -0700
AVRO-4063: Call `flush` on the inner `writer` during `Writer::flush` (#14)
---
avro/src/error.rs | 3 +++
avro/src/writer.rs | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 61 insertions(+)
diff --git a/avro/src/error.rs b/avro/src/error.rs
index d92daa4..09d458c 100644
--- a/avro/src/error.rs
+++ b/avro/src/error.rs
@@ -446,6 +446,9 @@ pub enum Error {
#[error("Failed to write buffer bytes during flush: {0}")]
WriteBytes(#[source] std::io::Error),
+ #[error("Failed to flush inner writer during flush: {0}")]
+ FlushWriter(#[source] std::io::Error),
+
#[error("Failed to write marker: {0}")]
WriteMarker(#[source] std::io::Error),
diff --git a/avro/src/writer.rs b/avro/src/writer.rs
index 5010bff..f1fdae4 100644
--- a/avro/src/writer.rs
+++ b/avro/src/writer.rs
@@ -315,6 +315,8 @@ impl<'a, W: Write> Writer<'a, W> {
self.buffer.clear();
self.num_values = 0;
+ self.writer.flush().map_err(Error::FlushWriter)?;
+
Ok(num_bytes)
}
@@ -657,6 +659,8 @@ fn generate_sync_marker() -> [u8; 16] {
#[cfg(test)]
mod tests {
+ use std::{cell::RefCell, rc::Rc};
+
use super::*;
use crate::{
decimal::Decimal,
@@ -1444,4 +1448,58 @@ mod tests {
}
Ok(())
}
+
+ #[test]
+ fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
+ const SCHEMA: &str = r#"
+ {
+ "type": "record",
+ "name": "ExampleSchema",
+ "fields": [
+ {"name": "exampleField", "type": "string"}
+ ]
+ }
+ "#;
+
+ #[derive(Clone, Default)]
+ struct TestBuffer(Rc<RefCell<Vec<u8>>>);
+
+ impl TestBuffer {
+ fn len(&self) -> usize {
+ self.0.borrow().len()
+ }
+ }
+
+ impl Write for TestBuffer {
+ fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+ self.0.borrow_mut().write(buf)
+ }
+
+ fn flush(&mut self) -> std::io::Result<()> {
+ Ok(())
+ }
+ }
+
+ let shared_buffer = TestBuffer::default();
+
+ let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
+
+ let schema = Schema::parse_str(SCHEMA)?;
+
+ let mut writer = Writer::new(&schema, buffered_writer);
+
+ let mut record = Record::new(writer.schema()).unwrap();
+ record.put("exampleField", "value");
+
+ writer.append(record)?;
+ writer.flush()?;
+
+ assert_eq!(
+ shared_buffer.len(),
+ 167,
+ "the test buffer was not fully written to after Writer::flush was
called"
+ );
+
+ Ok(())
+ }
}