This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new f951c8f6e3 Fix AsyncArrowWriter flush for large buffer sizes (#4526)
(#4527)
f951c8f6e3 is described below
commit f951c8f6e39aa36e2be43532b895a0b75b231b7a
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Jul 14 13:00:55 2023 -0400
Fix AsyncArrowWriter flush for large buffer sizes (#4526) (#4527)
---
parquet/src/arrow/async_writer/mod.rs | 32 ++++++++++++++++++++++++++++++--
1 file changed, 30 insertions(+), 2 deletions(-)
diff --git a/parquet/src/arrow/async_writer/mod.rs
b/parquet/src/arrow/async_writer/mod.rs
index 4d8cf1b906..0957b58697 100644
--- a/parquet/src/arrow/async_writer/mod.rs
+++ b/parquet/src/arrow/async_writer/mod.rs
@@ -158,7 +158,7 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
}
async_writer
- .write(buffer.as_slice())
+ .write_all(buffer.as_slice())
.await
.map_err(|e| ParquetError::External(Box::new(e)))?;
@@ -207,7 +207,7 @@ impl Write for SharedBuffer {
#[cfg(test)]
mod tests {
- use arrow_array::{ArrayRef, Int64Array, RecordBatchReader};
+ use arrow_array::{ArrayRef, BinaryArray, Int64Array, RecordBatchReader};
use bytes::Bytes;
use tokio::pin;
@@ -374,4 +374,32 @@ mod tests {
async_writer.close().await.unwrap();
}
}
+
+ #[tokio::test]
+ async fn test_async_writer_file() {
+ let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as
ArrayRef;
+ let col2 = Arc::new(BinaryArray::from_iter_values(vec![
+ vec![0; 500000],
+ vec![0; 500000],
+ vec![0; 500000],
+ ])) as ArrayRef;
+ let to_write =
+ RecordBatch::try_from_iter([("col", col), ("col2",
col2)]).unwrap();
+
+ let temp = tempfile::tempfile().unwrap();
+
+ let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
+ let mut writer =
+ AsyncArrowWriter::try_new(file, to_write.schema(), 0,
None).unwrap();
+ writer.write(&to_write).await.unwrap();
+ writer.close().await.unwrap();
+
+ let mut reader = ParquetRecordBatchReaderBuilder::try_new(temp)
+ .unwrap()
+ .build()
+ .unwrap();
+ let read = reader.next().unwrap().unwrap();
+
+ assert_eq!(to_write, read);
+ }
}