alamb commented on code in PR #11610: URL: https://github.com/apache/datafusion/pull/11610#discussion_r1687851341
########## datafusion/physical-plan/src/coalesce_batches.rs: ########## @@ -290,26 +277,106 @@ pub fn concat_batches( arrow::compute::concat_batches(schema, batches) } +/// Concatenate multiple record batches into larger batches +/// +/// See [`CoalesceBatchesExec`] for more details. +/// +/// Notes: +/// +/// 1. The output rows is the same order as the input rows +/// +/// 2. The output is a sequence of batches, with all but the last being at least +/// `target_batch_size` rows. +/// +/// 3. Eventually this may also be able to handle other optimizations such as a +/// combined filter/coalesce operation. +#[derive(Debug)] +struct BatchCoalescer { + /// The input schema + schema: SchemaRef, + /// Minimum number of rows for coalesces batches + target_batch_size: usize, + /// Buffered batches + buffer: Vec<RecordBatch>, + /// Buffered row count + buffered_rows: usize, +} + +impl BatchCoalescer { + /// Create a new BatchCoalescer that produces batches of at least `target_batch_size` rows + fn new(schema: SchemaRef, target_batch_size: usize) -> Self { + Self { + schema, + target_batch_size, + buffer: vec![], + buffered_rows: 0, + } + } + + /// Return the schema of the output batches + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + /// Add a batch to the coalescer, returning a batch if the target batch size is reached + fn push_batch(&mut self, batch: RecordBatch) -> Result<Option<RecordBatch>> { + if batch.num_rows() >= self.target_batch_size && self.buffer.is_empty() { + return Ok(Some(batch)); + } + // discard empty batches + if batch.num_rows() == 0 { + return Ok(None); + } + // add to the buffered batches + self.buffered_rows += batch.num_rows(); + self.buffer.push(batch); + // check to see if we have enough batches yet + let batch = if self.buffered_rows >= self.target_batch_size { + // combine the batches and return + let batch = concat_batches(&self.schema, &self.buffer, self.buffered_rows)?; + // reset buffer state + self.buffer.clear(); + self.buffered_rows = 0; + // return batch + Some(batch) + } else { + None + }; + Ok(batch) + } + + /// Finish the coalescing process, returning all buffered data as a final, + /// single batch, if any + fn finish(&mut self) -> Result<Option<RecordBatch>> { + if self.buffer.is_empty() { + Ok(None) + } else { + // combine the batches and return + let batch = concat_batches(&self.schema, &self.buffer, self.buffered_rows)?; + // reset buffer state + self.buffer.clear(); + self.buffered_rows = 0; + // return batch + Ok(Some(batch)) + } + } +} + #[cfg(test)] mod tests { use super::*; - use crate::{memory::MemoryExec, repartition::RepartitionExec, Partitioning}; - use arrow::datatypes::{DataType, Field, Schema}; use arrow_array::UInt32Array; #[tokio::test(flavor = "multi_thread")] async fn test_concat_batches() -> Result<()> { - let schema = test_schema(); - let partition = create_vec_batches(&schema, 10); - let partitions = vec![partition]; - - let output_partitions = coalesce_batches(&schema, partitions, 21).await?; - assert_eq!(1, output_partitions.len()); + let Scenario { schema, batch } = uint32_scenario(); Review Comment: The tests can now get simpler too because they don't need to run a plan, they can just test the logic in `BatchCoalescer` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org