Dandandan commented on code in PR #1431:
URL:
https://github.com/apache/datafusion-ballista/pull/1431#discussion_r2749531369
##########
ballista/core/src/execution_plans/sort_shuffle/buffer.rs:
##########
@@ -110,6 +111,77 @@ impl PartitionBuffer {
pub fn take_batches(&mut self) -> Vec<RecordBatch> {
std::mem::take(&mut self.batches)
}
+
+ /// Drains batches from the buffer, coalescing small batches into
+ /// larger ones up to `target_batch_size` rows each.
+ pub fn drain_coalesced(&mut self, target_batch_size: usize) ->
Vec<RecordBatch> {
+ self.memory_used = 0;
+ self.num_rows = 0;
+ let batches = std::mem::take(&mut self.batches);
+ coalesce_batches(batches, &self.schema, target_batch_size)
+ }
+
+ /// Takes all batches, coalescing small batches into larger ones
+ /// up to `target_batch_size` rows each.
+ pub fn take_batches_coalesced(
+ &mut self,
+ target_batch_size: usize,
+ ) -> Vec<RecordBatch> {
+ let batches = std::mem::take(&mut self.batches);
+ coalesce_batches(batches, &self.schema, target_batch_size)
+ }
+}
+
+/// Coalesces small batches into larger ones up to `target_batch_size`
+/// rows each using `concat_batches`.
+fn coalesce_batches(
+ batches: Vec<RecordBatch>,
+ schema: &SchemaRef,
+ target_batch_size: usize,
+) -> Vec<RecordBatch> {
+ if batches.len() <= 1 {
+ return batches;
+ }
+
+ let mut result = Vec::new();
+ let mut pending: Vec<RecordBatch> = Vec::new();
+ let mut pending_rows: usize = 0;
+
+ for batch in batches {
+ let rows = batch.num_rows();
+ if rows >= target_batch_size {
+ // Flush pending small batches first
+ if !pending.is_empty() {
+ if let Ok(merged) = concat_batches(schema, pending.iter()) {
+ result.push(merged);
+ }
+ pending.clear();
+ pending_rows = 0;
+ }
+ result.push(batch);
+ } else {
+ pending_rows += rows;
+ pending.push(batch);
+ if pending_rows >= target_batch_size {
+ if let Ok(merged) = concat_batches(schema, pending.iter()) {
Review Comment:
Can this use `LimitedBatchCoalescer`? This uses the `coalesce` api rather
than `concat` which is a bit more efficient/precise and future proof.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]