This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new f951c8f6e3 Fix AsyncArrowWriter flush for large buffer sizes (#4526) 
(#4527)
f951c8f6e3 is described below

commit f951c8f6e39aa36e2be43532b895a0b75b231b7a
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Jul 14 13:00:55 2023 -0400

    Fix AsyncArrowWriter flush for large buffer sizes (#4526) (#4527)
---
 parquet/src/arrow/async_writer/mod.rs | 32 ++++++++++++++++++++++++++++++--
 1 file changed, 30 insertions(+), 2 deletions(-)

diff --git a/parquet/src/arrow/async_writer/mod.rs 
b/parquet/src/arrow/async_writer/mod.rs
index 4d8cf1b906..0957b58697 100644
--- a/parquet/src/arrow/async_writer/mod.rs
+++ b/parquet/src/arrow/async_writer/mod.rs
@@ -158,7 +158,7 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
         }
 
         async_writer
-            .write(buffer.as_slice())
+            .write_all(buffer.as_slice())
             .await
             .map_err(|e| ParquetError::External(Box::new(e)))?;
 
@@ -207,7 +207,7 @@ impl Write for SharedBuffer {
 
 #[cfg(test)]
 mod tests {
-    use arrow_array::{ArrayRef, Int64Array, RecordBatchReader};
+    use arrow_array::{ArrayRef, BinaryArray, Int64Array, RecordBatchReader};
     use bytes::Bytes;
     use tokio::pin;
 
@@ -374,4 +374,32 @@ mod tests {
             async_writer.close().await.unwrap();
         }
     }
+
+    #[tokio::test]
+    async fn test_async_writer_file() {
+        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as 
ArrayRef;
+        let col2 = Arc::new(BinaryArray::from_iter_values(vec![
+            vec![0; 500000],
+            vec![0; 500000],
+            vec![0; 500000],
+        ])) as ArrayRef;
+        let to_write =
+            RecordBatch::try_from_iter([("col", col), ("col2", 
col2)]).unwrap();
+
+        let temp = tempfile::tempfile().unwrap();
+
+        let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
+        let mut writer =
+            AsyncArrowWriter::try_new(file, to_write.schema(), 0, 
None).unwrap();
+        writer.write(&to_write).await.unwrap();
+        writer.close().await.unwrap();
+
+        let mut reader = ParquetRecordBatchReaderBuilder::try_new(temp)
+            .unwrap()
+            .build()
+            .unwrap();
+        let read = reader.next().unwrap().unwrap();
+
+        assert_eq!(to_write, read);
+    }
 }

Reply via email to