This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 8724b7691 Add batch coalescing in BufBatchWriter to reduce IPC schema
overhead (#3441)
8724b7691 is described below
commit 8724b769164ac59c3aa908093eed5dc5ca10a7b8
Author: Andy Grove <[email protected]>
AuthorDate: Mon Feb 9 17:14:10 2026 -0700
Add batch coalescing in BufBatchWriter to reduce IPC schema overhead (#3441)
---
.../shuffle/partitioners/multi_partition.rs | 13 ++-
.../shuffle/partitioners/single_partition.rs | 11 ++-
.../core/src/execution/shuffle/shuffle_writer.rs | 108 +++++++++++++++++++++
.../execution/shuffle/writers/buf_batch_writer.rs | 64 +++++++++++-
.../execution/shuffle/writers/partition_writer.rs | 4 +-
5 files changed, 191 insertions(+), 9 deletions(-)
diff --git a/native/core/src/execution/shuffle/partitioners/multi_partition.rs
b/native/core/src/execution/shuffle/partitioners/multi_partition.rs
index 35f754695..9c366ad46 100644
--- a/native/core/src/execution/shuffle/partitioners/multi_partition.rs
+++ b/native/core/src/execution/shuffle/partitioners/multi_partition.rs
@@ -442,14 +442,19 @@ impl MultiPartitionShuffleRepartitioner {
encode_time: &Time,
write_time: &Time,
write_buffer_size: usize,
+ batch_size: usize,
) -> datafusion::common::Result<()> {
- let mut buf_batch_writer =
- BufBatchWriter::new(shuffle_block_writer, output_data,
write_buffer_size);
+ let mut buf_batch_writer = BufBatchWriter::new(
+ shuffle_block_writer,
+ output_data,
+ write_buffer_size,
+ batch_size,
+ );
for batch in partition_iter {
let batch = batch?;
buf_batch_writer.write(&batch, encode_time, write_time)?;
}
- buf_batch_writer.flush(write_time)?;
+ buf_batch_writer.flush(encode_time, write_time)?;
Ok(())
}
@@ -508,6 +513,7 @@ impl MultiPartitionShuffleRepartitioner {
&self.runtime,
&self.metrics,
self.write_buffer_size,
+ self.batch_size,
)?;
}
@@ -592,6 +598,7 @@ impl ShufflePartitioner for
MultiPartitionShuffleRepartitioner {
&self.metrics.encode_time,
&self.metrics.write_time,
self.write_buffer_size,
+ self.batch_size,
)?;
}
diff --git a/native/core/src/execution/shuffle/partitioners/single_partition.rs
b/native/core/src/execution/shuffle/partitioners/single_partition.rs
index 4ee5bd2bf..eeca4458c 100644
--- a/native/core/src/execution/shuffle/partitioners/single_partition.rs
+++ b/native/core/src/execution/shuffle/partitioners/single_partition.rs
@@ -59,8 +59,12 @@ impl SinglePartitionShufflePartitioner {
.truncate(true)
.open(output_data_path)?;
- let output_data_writer =
- BufBatchWriter::new(shuffle_block_writer, output_data_file,
write_buffer_size);
+ let output_data_writer = BufBatchWriter::new(
+ shuffle_block_writer,
+ output_data_file,
+ write_buffer_size,
+ batch_size,
+ );
Ok(Self {
output_data_writer,
@@ -162,7 +166,8 @@ impl ShufflePartitioner for
SinglePartitionShufflePartitioner {
&self.metrics.write_time,
)?;
}
- self.output_data_writer.flush(&self.metrics.write_time)?;
+ self.output_data_writer
+ .flush(&self.metrics.encode_time, &self.metrics.write_time)?;
// Write index file. It should only contain 2 entries: 0 and the total
number of bytes written
let index_file = OpenOptions::new()
diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs
b/native/core/src/execution/shuffle/shuffle_writer.rs
index a7e689a69..fe1bf0fcc 100644
--- a/native/core/src/execution/shuffle/shuffle_writer.rs
+++ b/native/core/src/execution/shuffle/shuffle_writer.rs
@@ -585,4 +585,112 @@ mod test {
let _ = fs::remove_file("/tmp/rr_data_1.out");
let _ = fs::remove_file("/tmp/rr_index_1.out");
}
+
+ /// Test that batch coalescing in BufBatchWriter reduces output size by
+ /// writing fewer, larger IPC blocks instead of many small ones.
+ #[test]
+ #[cfg_attr(miri, ignore)]
+ fn test_batch_coalescing_reduces_size() {
+ use crate::execution::shuffle::writers::BufBatchWriter;
+ use arrow::array::Int32Array;
+
+ // Create a wide schema to amplify per-block schema overhead
+ let fields: Vec<Field> = (0..20)
+ .map(|i| Field::new(format!("col_{i}"), DataType::Int32, false))
+ .collect();
+ let schema = Arc::new(Schema::new(fields));
+
+ // Create many small batches (50 rows each)
+ let small_batches: Vec<RecordBatch> = (0..100)
+ .map(|batch_idx| {
+ let columns: Vec<Arc<dyn Array>> = (0..20)
+ .map(|col_idx| {
+ let values: Vec<i32> = (0..50)
+ .map(|row| batch_idx * 50 + row + col_idx * 1000)
+ .collect();
+ Arc::new(Int32Array::from(values)) as Arc<dyn Array>
+ })
+ .collect();
+ RecordBatch::try_new(Arc::clone(&schema), columns).unwrap()
+ })
+ .collect();
+
+ let codec = CompressionCodec::Lz4Frame;
+ let encode_time = Time::default();
+ let write_time = Time::default();
+
+ // Write with coalescing (batch_size=8192)
+ let mut coalesced_output = Vec::new();
+ {
+ let mut writer = ShuffleBlockWriter::try_new(schema.as_ref(),
codec.clone()).unwrap();
+ let mut buf_writer = BufBatchWriter::new(
+ &mut writer,
+ Cursor::new(&mut coalesced_output),
+ 1024 * 1024,
+ 8192,
+ );
+ for batch in &small_batches {
+ buf_writer.write(batch, &encode_time, &write_time).unwrap();
+ }
+ buf_writer.flush(&encode_time, &write_time).unwrap();
+ }
+
+ // Write without coalescing (batch_size=1)
+ let mut uncoalesced_output = Vec::new();
+ {
+ let mut writer = ShuffleBlockWriter::try_new(schema.as_ref(),
codec.clone()).unwrap();
+ let mut buf_writer = BufBatchWriter::new(
+ &mut writer,
+ Cursor::new(&mut uncoalesced_output),
+ 1024 * 1024,
+ 1,
+ );
+ for batch in &small_batches {
+ buf_writer.write(batch, &encode_time, &write_time).unwrap();
+ }
+ buf_writer.flush(&encode_time, &write_time).unwrap();
+ }
+
+ // Coalesced output should be smaller due to fewer IPC schema blocks
+ assert!(
+ coalesced_output.len() < uncoalesced_output.len(),
+ "Coalesced output ({} bytes) should be smaller than uncoalesced
({} bytes)",
+ coalesced_output.len(),
+ uncoalesced_output.len()
+ );
+
+ // Verify both roundtrip correctly by reading all IPC blocks
+ let coalesced_rows = read_all_ipc_blocks(&coalesced_output);
+ let uncoalesced_rows = read_all_ipc_blocks(&uncoalesced_output);
+ assert_eq!(
+ coalesced_rows, 5000,
+ "Coalesced should contain all 5000 rows"
+ );
+ assert_eq!(
+ uncoalesced_rows, 5000,
+ "Uncoalesced should contain all 5000 rows"
+ );
+ }
+
+ /// Read all IPC blocks from a byte buffer written by
BufBatchWriter/ShuffleBlockWriter,
+ /// returning the total number of rows.
+ fn read_all_ipc_blocks(data: &[u8]) -> usize {
+ let mut offset = 0;
+ let mut total_rows = 0;
+ while offset < data.len() {
+ // First 8 bytes are the IPC length (little-endian u64)
+ let ipc_length =
+ u64::from_le_bytes(data[offset..offset +
8].try_into().unwrap()) as usize;
+ // Skip the 8-byte length prefix; the next 8 bytes are field_count
+ codec header
+ let block_start = offset + 8;
+ let block_end = block_start + ipc_length;
+ // read_ipc_compressed expects data starting after the 16-byte
header
+ // (i.e., after length + field_count), at the codec tag
+ let ipc_data = &data[block_start + 8..block_end];
+ let batch = read_ipc_compressed(ipc_data).unwrap();
+ total_rows += batch.num_rows();
+ offset = block_end;
+ }
+ total_rows
+ }
}
diff --git a/native/core/src/execution/shuffle/writers/buf_batch_writer.rs
b/native/core/src/execution/shuffle/writers/buf_batch_writer.rs
index 8428151dd..8d056d7bb 100644
--- a/native/core/src/execution/shuffle/writers/buf_batch_writer.rs
+++ b/native/core/src/execution/shuffle/writers/buf_batch_writer.rs
@@ -17,6 +17,7 @@
use crate::execution::shuffle::ShuffleBlockWriter;
use arrow::array::RecordBatch;
+use arrow::compute::kernels::coalesce::BatchCoalescer;
use datafusion::physical_plan::metrics::Time;
use std::borrow::Borrow;
use std::io::{Cursor, Seek, SeekFrom, Write};
@@ -24,20 +25,36 @@ use std::io::{Cursor, Seek, SeekFrom, Write};
/// Write batches to writer while using a buffer to avoid frequent system
calls.
/// The record batches were first written by ShuffleBlockWriter into an
internal buffer.
/// Once the buffer exceeds the max size, the buffer will be flushed to the
writer.
+///
+/// Small batches are coalesced using Arrow's [`BatchCoalescer`] before
serialization,
+/// producing exactly `batch_size`-row output batches to reduce per-block IPC
schema overhead.
+/// The coalescer is lazily initialized on the first write.
pub(crate) struct BufBatchWriter<S: Borrow<ShuffleBlockWriter>, W: Write> {
shuffle_block_writer: S,
writer: W,
buffer: Vec<u8>,
buffer_max_size: usize,
+ /// Coalesces small batches into target_batch_size before serialization.
+ /// Lazily initialized on first write to capture the schema.
+ coalescer: Option<BatchCoalescer>,
+ /// Target batch size for coalescing
+ batch_size: usize,
}
impl<S: Borrow<ShuffleBlockWriter>, W: Write> BufBatchWriter<S, W> {
- pub(crate) fn new(shuffle_block_writer: S, writer: W, buffer_max_size:
usize) -> Self {
+ pub(crate) fn new(
+ shuffle_block_writer: S,
+ writer: W,
+ buffer_max_size: usize,
+ batch_size: usize,
+ ) -> Self {
Self {
shuffle_block_writer,
writer,
buffer: vec![],
buffer_max_size,
+ coalescer: None,
+ batch_size,
}
}
@@ -46,6 +63,32 @@ impl<S: Borrow<ShuffleBlockWriter>, W: Write>
BufBatchWriter<S, W> {
batch: &RecordBatch,
encode_time: &Time,
write_time: &Time,
+ ) -> datafusion::common::Result<usize> {
+ let coalescer = self
+ .coalescer
+ .get_or_insert_with(|| BatchCoalescer::new(batch.schema(),
self.batch_size));
+ coalescer.push_batch(batch.clone())?;
+
+ // Drain completed batches into a local vec so the coalescer borrow
ends
+ // before we call write_batch_to_buffer (which borrows &mut self).
+ let mut completed = Vec::new();
+ while let Some(batch) = coalescer.next_completed_batch() {
+ completed.push(batch);
+ }
+
+ let mut bytes_written = 0;
+ for batch in &completed {
+ bytes_written += self.write_batch_to_buffer(batch, encode_time,
write_time)?;
+ }
+ Ok(bytes_written)
+ }
+
+ /// Serialize a single batch into the byte buffer, flushing to the writer
if needed.
+ fn write_batch_to_buffer(
+ &mut self,
+ batch: &RecordBatch,
+ encode_time: &Time,
+ write_time: &Time,
) -> datafusion::common::Result<usize> {
let mut cursor = Cursor::new(&mut self.buffer);
cursor.seek(SeekFrom::End(0))?;
@@ -63,7 +106,24 @@ impl<S: Borrow<ShuffleBlockWriter>, W: Write>
BufBatchWriter<S, W> {
Ok(bytes_written)
}
- pub(crate) fn flush(&mut self, write_time: &Time) ->
datafusion::common::Result<()> {
+ pub(crate) fn flush(
+ &mut self,
+ encode_time: &Time,
+ write_time: &Time,
+ ) -> datafusion::common::Result<()> {
+ // Finish any remaining buffered rows in the coalescer
+ let mut remaining = Vec::new();
+ if let Some(coalescer) = &mut self.coalescer {
+ coalescer.finish_buffered_batch()?;
+ while let Some(batch) = coalescer.next_completed_batch() {
+ remaining.push(batch);
+ }
+ }
+ for batch in &remaining {
+ self.write_batch_to_buffer(batch, encode_time, write_time)?;
+ }
+
+ // Flush the byte buffer to the underlying writer
let mut write_timer = write_time.timer();
if !self.buffer.is_empty() {
self.writer.write_all(&self.buffer)?;
diff --git a/native/core/src/execution/shuffle/writers/partition_writer.rs
b/native/core/src/execution/shuffle/writers/partition_writer.rs
index 8b5163d44..7c2dbe044 100644
--- a/native/core/src/execution/shuffle/writers/partition_writer.rs
+++ b/native/core/src/execution/shuffle/writers/partition_writer.rs
@@ -79,6 +79,7 @@ impl PartitionWriter {
runtime: &RuntimeEnv,
metrics: &ShufflePartitionerMetrics,
write_buffer_size: usize,
+ batch_size: usize,
) -> datafusion::common::Result<usize> {
if let Some(batch) = iter.next() {
self.ensure_spill_file_created(runtime)?;
@@ -88,6 +89,7 @@ impl PartitionWriter {
&mut self.shuffle_block_writer,
&mut self.spill_file.as_mut().unwrap().file,
write_buffer_size,
+ batch_size,
);
let mut bytes_written =
buf_batch_writer.write(&batch?, &metrics.encode_time,
&metrics.write_time)?;
@@ -99,7 +101,7 @@ impl PartitionWriter {
&metrics.write_time,
)?;
}
- buf_batch_writer.flush(&metrics.write_time)?;
+ buf_batch_writer.flush(&metrics.encode_time,
&metrics.write_time)?;
bytes_written
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]