This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 7c622272e perf: optimize shuffle writer with buffered I/O and fix file 
size bug (#1386)
7c622272e is described below

commit 7c622272ef660f2135c8a6323373ed9b91001a11
Author: Andy Grove <[email protected]>
AuthorDate: Sat Jan 17 14:43:10 2026 -0700

    perf: optimize shuffle writer with buffered I/O and fix file size bug 
(#1386)
    
    1. Add BufWriter for buffered file I/O in shuffle writer
    
       Wrapping File with BufWriter reduces syscalls when writing
       multiple small batches to shuffle files. This affects both
       the hash-partitioned shuffle path and the non-partitioned
       write_stream_to_disk utility.
    
    2. Fix file size read before writer finish
    
       Previously, fs::metadata() was called before writer.finish(),
       which could report incorrect file sizes since data may not
       have been fully flushed to disk. This is especially important
       now that BufWriter is used, as buffered data would not be
       reflected in the file size until after finish() flushes it.
    
    Co-authored-by: Claude Opus 4.5 <[email protected]>
---
 ballista/core/src/execution_plans/shuffle_writer.rs | 8 +++++---
 ballista/core/src/utils.rs                          | 5 +++--
 2 files changed, 8 insertions(+), 5 deletions(-)

diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs 
b/ballista/core/src/execution_plans/shuffle_writer.rs
index 114fc6643..e5193c2e1 100644
--- a/ballista/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/core/src/execution_plans/shuffle_writer.rs
@@ -29,6 +29,7 @@ use std::fmt::Debug;
 use std::fs;
 use std::fs::File;
 use std::future::Future;
+use std::io::BufWriter;
 use std::iter::Iterator;
 use std::path::PathBuf;
 use std::sync::Arc;
@@ -105,7 +106,7 @@ impl std::fmt::Display for ShuffleWriterExec {
 pub struct WriteTracker {
     pub num_batches: usize,
     pub num_rows: usize,
-    pub writer: StreamWriter<File>,
+    pub writer: StreamWriter<BufWriter<File>>,
     pub path: PathBuf,
 }
 
@@ -295,7 +296,8 @@ impl ShuffleWriterExec {
                                                 CompressionType::LZ4_FRAME,
                                             ))?;
 
-                                        let file = File::create(path.clone())?;
+                                        let file =
+                                            
BufWriter::new(File::create(path.clone())?);
                                         let mut writer =
                                             StreamWriter::try_new_with_options(
                                                 file,
@@ -323,8 +325,8 @@ impl ShuffleWriterExec {
 
                     for (i, w) in writers.iter_mut().enumerate() {
                         if let Some(w) = w {
-                            let num_bytes = fs::metadata(&w.path)?.len();
                             w.writer.finish()?;
+                            let num_bytes = fs::metadata(&w.path)?.len();
                             debug!(
                                 "Finished writing shuffle partition {} at 
{:?}. Batches: {}. Rows: {}. Bytes: {}.",
                                 i, w.path, w.num_batches, w.num_rows, num_bytes
diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index 9f6ab502c..75698233a 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -31,6 +31,7 @@ use datafusion::physical_plan::metrics::MetricsSet;
 use datafusion::physical_plan::{ExecutionPlan, RecordBatchStream, metrics};
 use futures::StreamExt;
 use log::error;
+use std::io::BufWriter;
 use std::sync::Arc;
 use std::time::{Duration, SystemTime, UNIX_EPOCH};
 use std::{fs::File, pin::Pin};
@@ -148,10 +149,10 @@ pub async fn write_stream_to_disk(
     path: &str,
     disk_write_metric: &metrics::Time,
 ) -> Result<PartitionStats> {
-    let file = File::create(path).map_err(|e| {
+    let file = BufWriter::new(File::create(path).map_err(|e| {
         error!("Failed to create partition file at {path}: {e:?}");
         BallistaError::IoError(e)
-    })?;
+    })?);
 
     let mut num_rows = 0;
     let mut num_batches = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to