pepijnve commented on code in PR #16319:
URL: https://github.com/apache/datafusion/pull/16319#discussion_r2136240231
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -1126,14 +1127,20 @@ impl ExecutionPlan for SortExec {
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
futures::stream::once(async move {
- while let Some(batch) = input.next().await {
- let batch = batch?;
- topk.insert_batch(batch)?;
- if topk.finished {
- break;
+ // Spawn a task the first time the stream is polled
for the sort phase.
+ // This ensures the consumer of the sort does not poll
unnecessarily
+ // while the sort is ongoing
Review Comment:
I've added a test case that attempts to demonstrate that processing is
deferred. If this looks ok to you I can add the same thing for the other
touched code as well.
I'm not sure how I can demonstrate the absence of multi-threading in a test
case.
Wrt comprehensibility, I have to admit I still very much in the
learning-as-I-go phase of using the futures crate. There might be a more
elegant or straightforward way to express this construct.
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -1126,14 +1127,20 @@ impl ExecutionPlan for SortExec {
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
futures::stream::once(async move {
- while let Some(batch) = input.next().await {
- let batch = batch?;
- topk.insert_batch(batch)?;
- if topk.finished {
- break;
+ // Spawn a task the first time the stream is polled
for the sort phase.
+ // This ensures the consumer of the sort does not poll
unnecessarily
+ // while the sort is ongoing
Review Comment:
I've added a test case that attempts to demonstrate that processing is
deferred. If this looks ok to you I can add the same thing for the other
touched code as well.
I'm not sure how I can demonstrate the absence of multi-threading in a test
case.
Wrt comprehensibility, I have to admit I am still very much in the
learning-as-I-go phase of using the futures crate. There might be a more
elegant or straightforward way to express this construct.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]