This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new fa2fbfdc8 added a flush method to IPC writers (#6108)
fa2fbfdc8 is described below

commit fa2fbfdc8bcd043966cbfb1ddda55a7e76e8082c
Author: V0ldek <[email protected]>
AuthorDate: Thu Jul 25 19:06:30 2024 +0200

    added a flush method to IPC writers (#6108)
    
    While the writers expose `get_ref` and `get_mut` to access the underlying
    `io::Write` writer, there is an internal layer of a `BufWriter` that is not 
accessible.
    Because of that, there is no way to ensure that all messages written thus 
far to the
    `StreamWriter` or `FileWriter` have actually been passed to the underlying 
writer.
    
    Here we expose a `flush` method that flushes the internal buffer and the 
underlying writer.
    
    See #6099 for the discussion.
---
 arrow-ipc/src/writer.rs | 63 +++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 63 insertions(+)

diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs
index d0a78ca27..5a8adb31b 100644
--- a/arrow-ipc/src/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -982,6 +982,14 @@ impl<W: Write> FileWriter<W> {
         self.writer.get_mut()
     }
 
+    /// Flush the underlying writer.
+    ///
+    /// Both the BufWriter and the underlying writer are flushed.
+    pub fn flush(&mut self) -> Result<(), ArrowError> {
+        self.writer.flush()?;
+        Ok(())
+    }
+
     /// Unwraps the BufWriter housed in FileWriter.writer, returning the 
underlying
     /// writer
     ///
@@ -1097,6 +1105,14 @@ impl<W: Write> StreamWriter<W> {
         self.writer.get_mut()
     }
 
+    /// Flush the underlying writer.
+    ///
+    /// Both the BufWriter and the underlying writer are flushed.
+    pub fn flush(&mut self) -> Result<(), ArrowError> {
+        self.writer.flush()?;
+        Ok(())
+    }
+
     /// Unwraps the BufWriter housed in StreamWriter.writer, returning the 
underlying
     /// writer
     ///
@@ -2615,4 +2631,51 @@ mod tests {
              offset from expected alignment of 16 by 8"
         );
     }
+
+    #[test]
+    fn test_flush() {
+        // We write a schema which is small enough to fit into a buffer and 
not get flushed,
+        // and then force the write with .flush().
+        let num_cols = 2;
+        let mut fields = Vec::new();
+        let options = IpcWriteOptions::try_new(8, false, 
MetadataVersion::V5).unwrap();
+        for i in 0..num_cols {
+            let field = Field::new(&format!("col_{}", i), 
DataType::Decimal128(38, 10), true);
+            fields.push(field);
+        }
+        let schema = Schema::new(fields);
+        let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
+        let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
+        let mut stream_writer =
+            StreamWriter::try_new_with_options(inner_stream_writer, &schema, 
options.clone())
+                .unwrap();
+        let mut file_writer =
+            FileWriter::try_new_with_options(inner_file_writer, &schema, 
options).unwrap();
+
+        let stream_bytes_written_on_new = 
stream_writer.get_ref().get_ref().len();
+        let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
+        stream_writer.flush().unwrap();
+        file_writer.flush().unwrap();
+        let stream_bytes_written_on_flush = 
stream_writer.get_ref().get_ref().len();
+        let file_bytes_written_on_flush = 
file_writer.get_ref().get_ref().len();
+        let stream_out = 
stream_writer.into_inner().unwrap().into_inner().unwrap();
+        // Finishing a stream writes the continuation bytes in 
MetadataVersion::V5 (4 bytes)
+        // and then a length of 0 (4 bytes) for a total of 8 bytes.
+        // Everything before that should have been flushed in the .flush() 
call.
+        let expected_stream_flushed_bytes = stream_out.len() - 8;
+        // A file write is the same as the stream write except for the leading 
magic string
+        // ARROW1 plus padding, which is 8 bytes.
+        let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
+
+        assert!(
+            stream_bytes_written_on_new < stream_bytes_written_on_flush,
+            "this test makes no sense if flush is not actually required"
+        );
+        assert!(
+            file_bytes_written_on_new < file_bytes_written_on_flush,
+            "this test makes no sense if flush is not actually required"
+        );
+        assert_eq!(stream_bytes_written_on_flush, 
expected_stream_flushed_bytes);
+        assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
+    }
 }

Reply via email to