Copilot commented on code in PR #18207:
URL: https://github.com/apache/datafusion/pull/18207#discussion_r2494726186
##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1228,65 +1419,74 @@ impl Stream for RepartitionStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
+ use futures::StreamExt;
+
loop {
- match &mut self.state {
- RepartitionStreamState::ReceivingFromChannel => {
- let value =
futures::ready!(self.input.recv().poll_unpin(cx));
+ match self.state {
+ StreamState::ReadingMemory => {
+ // Poll the memory channel for next message
+ let value = match self.input.recv().poll_unpin(cx) {
+ Poll::Ready(v) => v,
+ Poll::Pending => {
+ // Nothing from channel, wait
+ return Poll::Pending;
+ }
+ };
+
match value {
Some(Some(v)) => match v {
Ok(RepartitionBatch::Memory(batch)) => {
- // Release memory and return
+ // Release memory and return batch
self.reservation
.lock()
.shrink(batch.get_array_memory_size());
return Poll::Ready(Some(Ok(batch)));
}
- Ok(RepartitionBatch::Spilled { spill_file, size })
=> {
- // Read from disk - SpillReaderStream uses
tokio::fs internally
- // Pass the original size for validation
- let stream = self
- .spill_manager
- .read_spill_as_stream(spill_file,
Some(size))?;
- self.state =
-
RepartitionStreamState::ReadingSpilledBatch(stream);
- // Continue loop to poll the stream immediately
+ Ok(RepartitionBatch::Spilled) => {
+ // Batch was spilled, transition to reading
from spill stream
+ // We must block on spill stream until we get
the batch
+ // to preserve ordering
+ self.state = StreamState::ReadingSpilled;
+ continue;
}
Err(e) => {
return Poll::Ready(Some(Err(e)));
}
},
Some(None) => {
- self.num_input_partitions_processed += 1;
-
- if self.num_input_partitions
- == self.num_input_partitions_processed
- {
- // all input partitions have finished sending
batches
+ // One input partition finished
+ self.remaining_partitions -= 1;
+ if self.remaining_partitions == 0 {
+ // All input partitions finished
return Poll::Ready(None);
- } else {
- // other partitions still have data to send
- continue;
}
+ // Continue to poll for more data from other
partitions
+ continue;
}
None => {
+ // Channel closed unexpectedly
return Poll::Ready(None);
}
}
}
- RepartitionStreamState::ReadingSpilledBatch(stream) => {
- match futures::ready!(stream.poll_next_unpin(cx)) {
- Some(Ok(batch)) => {
- // Return batch and stay in ReadingSpilledBatch
state to read more batches
+ StreamState::ReadingSpilled => {
+ // Poll spill stream for the spilled batch
+ match self.spill_stream.poll_next_unpin(cx) {
+ Poll::Ready(Some(Ok(batch))) => {
+ self.state = StreamState::ReadingMemory;
return Poll::Ready(Some(Ok(batch)));
}
- Some(Err(e)) => {
- self.state =
RepartitionStreamState::ReceivingFromChannel;
+ Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(Err(e)));
}
- None => {
- // Spill stream ended - go back to receiving from
channel
- self.state =
RepartitionStreamState::ReceivingFromChannel;
- continue;
+ Poll::Ready(None) => {
+ // Spill stream ended keep draining the memory
channel
Review Comment:
Missing punctuation. The comment should have a comma after 'ended' for
proper grammar: 'Spill stream ended, keep draining the memory channel'.
```suggestion
// Spill stream ended, keep draining the memory
channel
```
##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1331,55 +1550,67 @@ impl Stream for PerPartitionStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
+ use futures::StreamExt;
+
loop {
- match &mut self.state {
- RepartitionStreamState::ReceivingFromChannel => {
- let value =
futures::ready!(self.receiver.recv().poll_unpin(cx));
+ match self.state {
+ StreamState::ReadingMemory => {
+ // Poll the memory channel for next message
+ let value = match self.receiver.recv().poll_unpin(cx) {
+ Poll::Ready(v) => v,
+ Poll::Pending => {
+ // Nothing from channel, wait
+ return Poll::Pending;
+ }
+ };
+
match value {
Some(Some(v)) => match v {
Ok(RepartitionBatch::Memory(batch)) => {
- // Release memory and return
+ // Release memory and return batch
self.reservation
.lock()
.shrink(batch.get_array_memory_size());
return Poll::Ready(Some(Ok(batch)));
}
- Ok(RepartitionBatch::Spilled { spill_file, size })
=> {
- // Read from disk - SpillReaderStream uses
tokio::fs internally
- // Pass the original size for validation
- let stream = self
- .spill_manager
- .read_spill_as_stream(spill_file,
Some(size))?;
- self.state =
-
RepartitionStreamState::ReadingSpilledBatch(stream);
- // Continue loop to poll the stream immediately
+ Ok(RepartitionBatch::Spilled) => {
+ // Batch was spilled, transition to reading
from spill stream
+ // We must block on spill stream until we get
the batch
+ // to preserve ordering
+ self.state = StreamState::ReadingSpilled;
+ continue;
}
Err(e) => {
return Poll::Ready(Some(Err(e)));
}
},
Some(None) => {
- // Input partition has finished sending batches
return Poll::Ready(None);
}
- None => return Poll::Ready(None),
+ None => {
+ // Channel closed unexpectedly
+ return Poll::Ready(None);
+ }
}
}
-
- RepartitionStreamState::ReadingSpilledBatch(stream) => {
- match futures::ready!(stream.poll_next_unpin(cx)) {
- Some(Ok(batch)) => {
- // Return batch and stay in ReadingSpilledBatch
state to read more batches
+ StreamState::ReadingSpilled => {
+ // Poll spill stream for the spilled batch
+ match self.spill_stream.poll_next_unpin(cx) {
+ Poll::Ready(Some(Ok(batch))) => {
+ self.state = StreamState::ReadingMemory;
return Poll::Ready(Some(Ok(batch)));
}
- Some(Err(e)) => {
- self.state =
RepartitionStreamState::ReceivingFromChannel;
+ Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(Err(e)));
}
- None => {
- // Spill stream ended - go back to receiving from
channel
- self.state =
RepartitionStreamState::ReceivingFromChannel;
- continue;
+ Poll::Ready(None) => {
+ // Spill stream ended keep draining the memory
channel
Review Comment:
Missing punctuation. The comment should have a comma after 'ended' for
proper grammar: 'Spill stream ended, keep draining the memory channel'.
```suggestion
// Spill stream ended, keep draining the memory
channel
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]