alamb commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r513652359
##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
self.input.execute(0).await
}
_ => {
- let tasks = (0..input_partitions).map(|part_i| {
+ let (sender, receiver) =
mpsc::unbounded::<ArrowResult<RecordBatch>>();
Review comment:
I have spent entirely too long on this today -- lol sorry it is taking
so long.
I am now looking into the angle that this is related to how HashAggregate is
implemented (specifically, it creates a stream and then calls `poll_next_unpin`
on it:
https://github.com/apache/arrow/blob/master/rust/datafusion/src/physical_plan/hash_aggregate.rs#L399).
If the input is not all available immediately, I believe this stream is simply
dropped and nothing ever drives the futures forward again.
However, I realize I had a different idea the other day so this one might
also turn out to be a dead end too.
I have been trying (locally) to rewrite hash aggregate so it uses a future
that was not created during the actual call to `poll_next`. However, I am
currently stymied on the need to try and get `pin-ing` correct -- my branch
refuses to compile. I'll keep posting updates....
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]