This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new fa2fbfdc8 added a flush method to IPC writers (#6108)
fa2fbfdc8 is described below
commit fa2fbfdc8bcd043966cbfb1ddda55a7e76e8082c
Author: V0ldek <[email protected]>
AuthorDate: Thu Jul 25 19:06:30 2024 +0200
added a flush method to IPC writers (#6108)
While the writers expose `get_ref` and `get_mut` to access the underlying
`io::Write` writer, there is an internal layer of a `BufWriter` that is not
accessible.
Because of that, there is no way to ensure that all messages written thus
far to the
`StreamWriter` or `FileWriter` have actually been passed to the underlying
writer.
Here we expose a `flush` method that flushes the internal buffer and the
underlying writer.
See #6099 for the discussion.
---
arrow-ipc/src/writer.rs | 63 +++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 63 insertions(+)
diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs
index d0a78ca27..5a8adb31b 100644
--- a/arrow-ipc/src/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -982,6 +982,14 @@ impl<W: Write> FileWriter<W> {
self.writer.get_mut()
}
+ /// Flush the underlying writer.
+ ///
+ /// Both the BufWriter and the underlying writer are flushed.
+ pub fn flush(&mut self) -> Result<(), ArrowError> {
+ self.writer.flush()?;
+ Ok(())
+ }
+
/// Unwraps the BufWriter housed in FileWriter.writer, returning the
underlying
/// writer
///
@@ -1097,6 +1105,14 @@ impl<W: Write> StreamWriter<W> {
self.writer.get_mut()
}
+ /// Flush the underlying writer.
+ ///
+ /// Both the BufWriter and the underlying writer are flushed.
+ pub fn flush(&mut self) -> Result<(), ArrowError> {
+ self.writer.flush()?;
+ Ok(())
+ }
+
/// Unwraps the BufWriter housed in StreamWriter.writer, returning the
underlying
/// writer
///
@@ -2615,4 +2631,51 @@ mod tests {
offset from expected alignment of 16 by 8"
);
}
+
+ #[test]
+ fn test_flush() {
+ // We write a schema which is small enough to fit into a buffer and
not get flushed,
+ // and then force the write with .flush().
+ let num_cols = 2;
+ let mut fields = Vec::new();
+ let options = IpcWriteOptions::try_new(8, false,
MetadataVersion::V5).unwrap();
+ for i in 0..num_cols {
+ let field = Field::new(&format!("col_{}", i),
DataType::Decimal128(38, 10), true);
+ fields.push(field);
+ }
+ let schema = Schema::new(fields);
+ let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
+ let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
+ let mut stream_writer =
+ StreamWriter::try_new_with_options(inner_stream_writer, &schema,
options.clone())
+ .unwrap();
+ let mut file_writer =
+ FileWriter::try_new_with_options(inner_file_writer, &schema,
options).unwrap();
+
+ let stream_bytes_written_on_new =
stream_writer.get_ref().get_ref().len();
+ let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
+ stream_writer.flush().unwrap();
+ file_writer.flush().unwrap();
+ let stream_bytes_written_on_flush =
stream_writer.get_ref().get_ref().len();
+ let file_bytes_written_on_flush =
file_writer.get_ref().get_ref().len();
+ let stream_out =
stream_writer.into_inner().unwrap().into_inner().unwrap();
+ // Finishing a stream writes the continuation bytes in
MetadataVersion::V5 (4 bytes)
+ // and then a length of 0 (4 bytes) for a total of 8 bytes.
+ // Everything before that should have been flushed in the .flush()
call.
+ let expected_stream_flushed_bytes = stream_out.len() - 8;
+ // A file write is the same as the stream write except for the leading
magic string
+ // ARROW1 plus padding, which is 8 bytes.
+ let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
+
+ assert!(
+ stream_bytes_written_on_new < stream_bytes_written_on_flush,
+ "this test makes no sense if flush is not actually required"
+ );
+ assert!(
+ file_bytes_written_on_new < file_bytes_written_on_flush,
+ "this test makes no sense if flush is not actually required"
+ );
+ assert_eq!(stream_bytes_written_on_flush,
expected_stream_flushed_bytes);
+ assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
+ }
}