alamb commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r513652359



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = 
mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       I have spent entirely too long on this today -- lol sorry it is taking 
so long.
   
   I am now looking into the angle that this is related to how HashAggregate is 
implemented (specifically, it creates a stream and then calls `poll_next_unpin` 
on it: 
https://github.com/apache/arrow/blob/master/rust/datafusion/src/physical_plan/hash_aggregate.rs#L399).
 If the input is not all available immediately, I believe this stream is simply 
dropped and nothing ever drives the futures forward again.
   
   However, I realize I had a different idea the other day so this one might 
also turn out to be a dead end too.
   
   I have been trying (locally) to rewrite hash aggregate so it uses a future 
that was not created during the actual call to `poll_next`. However, I am 
currently stymied on the need to try and get `pin-ing` correct -- my branch 
refuses to compile. I'll keep posting updates....
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to