This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 8724b7691 Add batch coalescing in BufBatchWriter to reduce IPC schema 
overhead (#3441)
8724b7691 is described below

commit 8724b769164ac59c3aa908093eed5dc5ca10a7b8
Author: Andy Grove <[email protected]>
AuthorDate: Mon Feb 9 17:14:10 2026 -0700

    Add batch coalescing in BufBatchWriter to reduce IPC schema overhead (#3441)
---
 .../shuffle/partitioners/multi_partition.rs        |  13 ++-
 .../shuffle/partitioners/single_partition.rs       |  11 ++-
 .../core/src/execution/shuffle/shuffle_writer.rs   | 108 +++++++++++++++++++++
 .../execution/shuffle/writers/buf_batch_writer.rs  |  64 +++++++++++-
 .../execution/shuffle/writers/partition_writer.rs  |   4 +-
 5 files changed, 191 insertions(+), 9 deletions(-)

diff --git a/native/core/src/execution/shuffle/partitioners/multi_partition.rs 
b/native/core/src/execution/shuffle/partitioners/multi_partition.rs
index 35f754695..9c366ad46 100644
--- a/native/core/src/execution/shuffle/partitioners/multi_partition.rs
+++ b/native/core/src/execution/shuffle/partitioners/multi_partition.rs
@@ -442,14 +442,19 @@ impl MultiPartitionShuffleRepartitioner {
         encode_time: &Time,
         write_time: &Time,
         write_buffer_size: usize,
+        batch_size: usize,
     ) -> datafusion::common::Result<()> {
-        let mut buf_batch_writer =
-            BufBatchWriter::new(shuffle_block_writer, output_data, 
write_buffer_size);
+        let mut buf_batch_writer = BufBatchWriter::new(
+            shuffle_block_writer,
+            output_data,
+            write_buffer_size,
+            batch_size,
+        );
         for batch in partition_iter {
             let batch = batch?;
             buf_batch_writer.write(&batch, encode_time, write_time)?;
         }
-        buf_batch_writer.flush(write_time)?;
+        buf_batch_writer.flush(encode_time, write_time)?;
         Ok(())
     }
 
@@ -508,6 +513,7 @@ impl MultiPartitionShuffleRepartitioner {
                     &self.runtime,
                     &self.metrics,
                     self.write_buffer_size,
+                    self.batch_size,
                 )?;
             }
 
@@ -592,6 +598,7 @@ impl ShufflePartitioner for 
MultiPartitionShuffleRepartitioner {
                     &self.metrics.encode_time,
                     &self.metrics.write_time,
                     self.write_buffer_size,
+                    self.batch_size,
                 )?;
             }
 
diff --git a/native/core/src/execution/shuffle/partitioners/single_partition.rs 
b/native/core/src/execution/shuffle/partitioners/single_partition.rs
index 4ee5bd2bf..eeca4458c 100644
--- a/native/core/src/execution/shuffle/partitioners/single_partition.rs
+++ b/native/core/src/execution/shuffle/partitioners/single_partition.rs
@@ -59,8 +59,12 @@ impl SinglePartitionShufflePartitioner {
             .truncate(true)
             .open(output_data_path)?;
 
-        let output_data_writer =
-            BufBatchWriter::new(shuffle_block_writer, output_data_file, 
write_buffer_size);
+        let output_data_writer = BufBatchWriter::new(
+            shuffle_block_writer,
+            output_data_file,
+            write_buffer_size,
+            batch_size,
+        );
 
         Ok(Self {
             output_data_writer,
@@ -162,7 +166,8 @@ impl ShufflePartitioner for 
SinglePartitionShufflePartitioner {
                 &self.metrics.write_time,
             )?;
         }
-        self.output_data_writer.flush(&self.metrics.write_time)?;
+        self.output_data_writer
+            .flush(&self.metrics.encode_time, &self.metrics.write_time)?;
 
         // Write index file. It should only contain 2 entries: 0 and the total 
number of bytes written
         let index_file = OpenOptions::new()
diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs 
b/native/core/src/execution/shuffle/shuffle_writer.rs
index a7e689a69..fe1bf0fcc 100644
--- a/native/core/src/execution/shuffle/shuffle_writer.rs
+++ b/native/core/src/execution/shuffle/shuffle_writer.rs
@@ -585,4 +585,112 @@ mod test {
         let _ = fs::remove_file("/tmp/rr_data_1.out");
         let _ = fs::remove_file("/tmp/rr_index_1.out");
     }
+
+    /// Test that batch coalescing in BufBatchWriter reduces output size by
+    /// writing fewer, larger IPC blocks instead of many small ones.
+    #[test]
+    #[cfg_attr(miri, ignore)]
+    fn test_batch_coalescing_reduces_size() {
+        use crate::execution::shuffle::writers::BufBatchWriter;
+        use arrow::array::Int32Array;
+
+        // Create a wide schema to amplify per-block schema overhead
+        let fields: Vec<Field> = (0..20)
+            .map(|i| Field::new(format!("col_{i}"), DataType::Int32, false))
+            .collect();
+        let schema = Arc::new(Schema::new(fields));
+
+        // Create many small batches (50 rows each)
+        let small_batches: Vec<RecordBatch> = (0..100)
+            .map(|batch_idx| {
+                let columns: Vec<Arc<dyn Array>> = (0..20)
+                    .map(|col_idx| {
+                        let values: Vec<i32> = (0..50)
+                            .map(|row| batch_idx * 50 + row + col_idx * 1000)
+                            .collect();
+                        Arc::new(Int32Array::from(values)) as Arc<dyn Array>
+                    })
+                    .collect();
+                RecordBatch::try_new(Arc::clone(&schema), columns).unwrap()
+            })
+            .collect();
+
+        let codec = CompressionCodec::Lz4Frame;
+        let encode_time = Time::default();
+        let write_time = Time::default();
+
+        // Write with coalescing (batch_size=8192)
+        let mut coalesced_output = Vec::new();
+        {
+            let mut writer = ShuffleBlockWriter::try_new(schema.as_ref(), 
codec.clone()).unwrap();
+            let mut buf_writer = BufBatchWriter::new(
+                &mut writer,
+                Cursor::new(&mut coalesced_output),
+                1024 * 1024,
+                8192,
+            );
+            for batch in &small_batches {
+                buf_writer.write(batch, &encode_time, &write_time).unwrap();
+            }
+            buf_writer.flush(&encode_time, &write_time).unwrap();
+        }
+
+        // Write without coalescing (batch_size=1)
+        let mut uncoalesced_output = Vec::new();
+        {
+            let mut writer = ShuffleBlockWriter::try_new(schema.as_ref(), 
codec.clone()).unwrap();
+            let mut buf_writer = BufBatchWriter::new(
+                &mut writer,
+                Cursor::new(&mut uncoalesced_output),
+                1024 * 1024,
+                1,
+            );
+            for batch in &small_batches {
+                buf_writer.write(batch, &encode_time, &write_time).unwrap();
+            }
+            buf_writer.flush(&encode_time, &write_time).unwrap();
+        }
+
+        // Coalesced output should be smaller due to fewer IPC schema blocks
+        assert!(
+            coalesced_output.len() < uncoalesced_output.len(),
+            "Coalesced output ({} bytes) should be smaller than uncoalesced 
({} bytes)",
+            coalesced_output.len(),
+            uncoalesced_output.len()
+        );
+
+        // Verify both roundtrip correctly by reading all IPC blocks
+        let coalesced_rows = read_all_ipc_blocks(&coalesced_output);
+        let uncoalesced_rows = read_all_ipc_blocks(&uncoalesced_output);
+        assert_eq!(
+            coalesced_rows, 5000,
+            "Coalesced should contain all 5000 rows"
+        );
+        assert_eq!(
+            uncoalesced_rows, 5000,
+            "Uncoalesced should contain all 5000 rows"
+        );
+    }
+
+    /// Read all IPC blocks from a byte buffer written by 
BufBatchWriter/ShuffleBlockWriter,
+    /// returning the total number of rows.
+    fn read_all_ipc_blocks(data: &[u8]) -> usize {
+        let mut offset = 0;
+        let mut total_rows = 0;
+        while offset < data.len() {
+            // First 8 bytes are the IPC length (little-endian u64)
+            let ipc_length =
+                u64::from_le_bytes(data[offset..offset + 
8].try_into().unwrap()) as usize;
+            // Skip the 8-byte length prefix; the next 8 bytes are field_count 
+ codec header
+            let block_start = offset + 8;
+            let block_end = block_start + ipc_length;
+            // read_ipc_compressed expects data starting after the 16-byte 
header
+            // (i.e., after length + field_count), at the codec tag
+            let ipc_data = &data[block_start + 8..block_end];
+            let batch = read_ipc_compressed(ipc_data).unwrap();
+            total_rows += batch.num_rows();
+            offset = block_end;
+        }
+        total_rows
+    }
 }
diff --git a/native/core/src/execution/shuffle/writers/buf_batch_writer.rs 
b/native/core/src/execution/shuffle/writers/buf_batch_writer.rs
index 8428151dd..8d056d7bb 100644
--- a/native/core/src/execution/shuffle/writers/buf_batch_writer.rs
+++ b/native/core/src/execution/shuffle/writers/buf_batch_writer.rs
@@ -17,6 +17,7 @@
 
 use crate::execution::shuffle::ShuffleBlockWriter;
 use arrow::array::RecordBatch;
+use arrow::compute::kernels::coalesce::BatchCoalescer;
 use datafusion::physical_plan::metrics::Time;
 use std::borrow::Borrow;
 use std::io::{Cursor, Seek, SeekFrom, Write};
@@ -24,20 +25,36 @@ use std::io::{Cursor, Seek, SeekFrom, Write};
 /// Write batches to writer while using a buffer to avoid frequent system 
calls.
 /// The record batches were first written by ShuffleBlockWriter into an 
internal buffer.
 /// Once the buffer exceeds the max size, the buffer will be flushed to the 
writer.
+///
+/// Small batches are coalesced using Arrow's [`BatchCoalescer`] before 
serialization,
+/// producing exactly `batch_size`-row output batches to reduce per-block IPC 
schema overhead.
+/// The coalescer is lazily initialized on the first write.
 pub(crate) struct BufBatchWriter<S: Borrow<ShuffleBlockWriter>, W: Write> {
     shuffle_block_writer: S,
     writer: W,
     buffer: Vec<u8>,
     buffer_max_size: usize,
+    /// Coalesces small batches into target_batch_size before serialization.
+    /// Lazily initialized on first write to capture the schema.
+    coalescer: Option<BatchCoalescer>,
+    /// Target batch size for coalescing
+    batch_size: usize,
 }
 
 impl<S: Borrow<ShuffleBlockWriter>, W: Write> BufBatchWriter<S, W> {
-    pub(crate) fn new(shuffle_block_writer: S, writer: W, buffer_max_size: 
usize) -> Self {
+    pub(crate) fn new(
+        shuffle_block_writer: S,
+        writer: W,
+        buffer_max_size: usize,
+        batch_size: usize,
+    ) -> Self {
         Self {
             shuffle_block_writer,
             writer,
             buffer: vec![],
             buffer_max_size,
+            coalescer: None,
+            batch_size,
         }
     }
 
@@ -46,6 +63,32 @@ impl<S: Borrow<ShuffleBlockWriter>, W: Write> 
BufBatchWriter<S, W> {
         batch: &RecordBatch,
         encode_time: &Time,
         write_time: &Time,
+    ) -> datafusion::common::Result<usize> {
+        let coalescer = self
+            .coalescer
+            .get_or_insert_with(|| BatchCoalescer::new(batch.schema(), 
self.batch_size));
+        coalescer.push_batch(batch.clone())?;
+
+        // Drain completed batches into a local vec so the coalescer borrow 
ends
+        // before we call write_batch_to_buffer (which borrows &mut self).
+        let mut completed = Vec::new();
+        while let Some(batch) = coalescer.next_completed_batch() {
+            completed.push(batch);
+        }
+
+        let mut bytes_written = 0;
+        for batch in &completed {
+            bytes_written += self.write_batch_to_buffer(batch, encode_time, 
write_time)?;
+        }
+        Ok(bytes_written)
+    }
+
+    /// Serialize a single batch into the byte buffer, flushing to the writer 
if needed.
+    fn write_batch_to_buffer(
+        &mut self,
+        batch: &RecordBatch,
+        encode_time: &Time,
+        write_time: &Time,
     ) -> datafusion::common::Result<usize> {
         let mut cursor = Cursor::new(&mut self.buffer);
         cursor.seek(SeekFrom::End(0))?;
@@ -63,7 +106,24 @@ impl<S: Borrow<ShuffleBlockWriter>, W: Write> 
BufBatchWriter<S, W> {
         Ok(bytes_written)
     }
 
-    pub(crate) fn flush(&mut self, write_time: &Time) -> 
datafusion::common::Result<()> {
+    pub(crate) fn flush(
+        &mut self,
+        encode_time: &Time,
+        write_time: &Time,
+    ) -> datafusion::common::Result<()> {
+        // Finish any remaining buffered rows in the coalescer
+        let mut remaining = Vec::new();
+        if let Some(coalescer) = &mut self.coalescer {
+            coalescer.finish_buffered_batch()?;
+            while let Some(batch) = coalescer.next_completed_batch() {
+                remaining.push(batch);
+            }
+        }
+        for batch in &remaining {
+            self.write_batch_to_buffer(batch, encode_time, write_time)?;
+        }
+
+        // Flush the byte buffer to the underlying writer
         let mut write_timer = write_time.timer();
         if !self.buffer.is_empty() {
             self.writer.write_all(&self.buffer)?;
diff --git a/native/core/src/execution/shuffle/writers/partition_writer.rs 
b/native/core/src/execution/shuffle/writers/partition_writer.rs
index 8b5163d44..7c2dbe044 100644
--- a/native/core/src/execution/shuffle/writers/partition_writer.rs
+++ b/native/core/src/execution/shuffle/writers/partition_writer.rs
@@ -79,6 +79,7 @@ impl PartitionWriter {
         runtime: &RuntimeEnv,
         metrics: &ShufflePartitionerMetrics,
         write_buffer_size: usize,
+        batch_size: usize,
     ) -> datafusion::common::Result<usize> {
         if let Some(batch) = iter.next() {
             self.ensure_spill_file_created(runtime)?;
@@ -88,6 +89,7 @@ impl PartitionWriter {
                     &mut self.shuffle_block_writer,
                     &mut self.spill_file.as_mut().unwrap().file,
                     write_buffer_size,
+                    batch_size,
                 );
                 let mut bytes_written =
                     buf_batch_writer.write(&batch?, &metrics.encode_time, 
&metrics.write_time)?;
@@ -99,7 +101,7 @@ impl PartitionWriter {
                         &metrics.write_time,
                     )?;
                 }
-                buf_batch_writer.flush(&metrics.write_time)?;
+                buf_batch_writer.flush(&metrics.encode_time, 
&metrics.write_time)?;
                 bytes_written
             };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to