This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 7c622272e perf: optimize shuffle writer with buffered I/O and fix file
size bug (#1386)
7c622272e is described below
commit 7c622272ef660f2135c8a6323373ed9b91001a11
Author: Andy Grove <[email protected]>
AuthorDate: Sat Jan 17 14:43:10 2026 -0700
perf: optimize shuffle writer with buffered I/O and fix file size bug
(#1386)
1. Add BufWriter for buffered file I/O in shuffle writer
Wrapping File with BufWriter reduces syscalls when writing
multiple small batches to shuffle files. This affects both
the hash-partitioned shuffle path and the non-partitioned
write_stream_to_disk utility.
2. Fix file size read before writer finish
Previously, fs::metadata() was called before writer.finish(),
which could report incorrect file sizes since data may not
have been fully flushed to disk. This is especially important
now that BufWriter is used, as buffered data would not be
reflected in the file size until after finish() flushes it.
Co-authored-by: Claude Opus 4.5 <[email protected]>
---
ballista/core/src/execution_plans/shuffle_writer.rs | 8 +++++---
ballista/core/src/utils.rs | 5 +++--
2 files changed, 8 insertions(+), 5 deletions(-)
diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs
b/ballista/core/src/execution_plans/shuffle_writer.rs
index 114fc6643..e5193c2e1 100644
--- a/ballista/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/core/src/execution_plans/shuffle_writer.rs
@@ -29,6 +29,7 @@ use std::fmt::Debug;
use std::fs;
use std::fs::File;
use std::future::Future;
+use std::io::BufWriter;
use std::iter::Iterator;
use std::path::PathBuf;
use std::sync::Arc;
@@ -105,7 +106,7 @@ impl std::fmt::Display for ShuffleWriterExec {
pub struct WriteTracker {
pub num_batches: usize,
pub num_rows: usize,
- pub writer: StreamWriter<File>,
+ pub writer: StreamWriter<BufWriter<File>>,
pub path: PathBuf,
}
@@ -295,7 +296,8 @@ impl ShuffleWriterExec {
CompressionType::LZ4_FRAME,
))?;
- let file = File::create(path.clone())?;
+ let file =
+
BufWriter::new(File::create(path.clone())?);
let mut writer =
StreamWriter::try_new_with_options(
file,
@@ -323,8 +325,8 @@ impl ShuffleWriterExec {
for (i, w) in writers.iter_mut().enumerate() {
if let Some(w) = w {
- let num_bytes = fs::metadata(&w.path)?.len();
w.writer.finish()?;
+ let num_bytes = fs::metadata(&w.path)?.len();
debug!(
"Finished writing shuffle partition {} at
{:?}. Batches: {}. Rows: {}. Bytes: {}.",
i, w.path, w.num_batches, w.num_rows, num_bytes
diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index 9f6ab502c..75698233a 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -31,6 +31,7 @@ use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::{ExecutionPlan, RecordBatchStream, metrics};
use futures::StreamExt;
use log::error;
+use std::io::BufWriter;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{fs::File, pin::Pin};
@@ -148,10 +149,10 @@ pub async fn write_stream_to_disk(
path: &str,
disk_write_metric: &metrics::Time,
) -> Result<PartitionStats> {
- let file = File::create(path).map_err(|e| {
+ let file = BufWriter::new(File::create(path).map_err(|e| {
error!("Failed to create partition file at {path}: {e:?}");
BallistaError::IoError(e)
- })?;
+ })?);
let mut num_rows = 0;
let mut num_batches = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]