Copilot commented on code in PR #18572:
URL: https://github.com/apache/datafusion/pull/18572#discussion_r2508318004
##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1194,9 +1194,18 @@ impl RepartitionExec {
partitioning: Partitioning,
metrics: RepartitionMetrics,
) -> Result<()> {
+ let is_hash_partitioning = matches!(&partitioning,
Partitioning::Hash(_, _));
let mut partitioner =
BatchPartitioner::try_new(partitioning,
metrics.repartition_time.clone())?;
+ let mut coalesce_batches = vec![];
+
+ if is_hash_partitioning {
+ for _ in 0..partitioner.num_partitions() {
+ coalesce_batches.push(BatchCoalescer::new(stream.schema(),
4096));
Review Comment:
The hardcoded batch size of 4096 should use the configured batch_size from
the session config. Other uses of `BatchCoalescer::new` in the codebase use
`context.session_config().batch_size()` or `config.execution.batch_size`.
Consider passing the batch_size as a parameter to `pull_from_input` from the
caller (`consume_input_streams`) which has access to the `context:
Arc<TaskContext>`.
##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1274,6 +1296,41 @@ impl RepartitionExec {
}
}
+ if is_hash_partitioning {
+ // flush any remaining coalesced batches
+ for (partition, coalesce_batch) in
coalesce_batches.iter_mut().enumerate() {
+ while let Some(batch) = coalesce_batch.next_completed_batch() {
+ let size = batch.get_array_memory_size();
+ // Check if channel still exists (may have been removed if
receiver hung up)
+ if let Some(channel) = output_channels.get_mut(&partition)
{
+ let (batch_to_send, is_memory_batch) =
+ match channel.reservation.lock().try_grow(size) {
+ Ok(_) => {
+ // Memory available - send in-memory batch
+ (RepartitionBatch::Memory(batch), true)
+ }
+ Err(_) => {
+ // We're memory limited - spill to
SpillPool
+ // SpillPool handles file handle reuse and
rotation
+ channel.spill_writer.push_batch(&batch)?;
+ // Send marker indicating batch was spilled
+ (RepartitionBatch::Spilled, false)
+ }
+ };
+
+ if
channel.sender.send(Some(Ok(batch_to_send))).await.is_err() {
Review Comment:
The send timer metric (`metrics.send_time[partition]`) is not being tracked
for these final flush batches, unlike the main sending logic at line 1245. This
will result in inaccurate metrics for hash partitioning operations as the time
spent sending flushed batches won't be recorded.
##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1274,6 +1296,41 @@ impl RepartitionExec {
}
}
+ if is_hash_partitioning {
+ // flush any remaining coalesced batches
+ for (partition, coalesce_batch) in
coalesce_batches.iter_mut().enumerate() {
Review Comment:
The flush logic should call `coalesce_batch.finish()` before retrieving
remaining batches with `next_completed_batch()`. According to the
`BatchCoalescer` API pattern used elsewhere in the codebase, `finish()` must be
called to signal end-of-stream and ensure all buffered data is made available.
Without this, any batches that haven't reached the target size threshold will
be silently dropped.
```suggestion
for (partition, coalesce_batch) in
coalesce_batches.iter_mut().enumerate() {
coalesce_batch.finish();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]