EmilyMatt opened a new issue, #22063:
URL: https://github.com/apache/datafusion/issues/22063

   ### Is your feature request related to a problem or challenge?
   
   When a child stream of an operator is done, we should drop it immediately, 
instead of continuing work and waiting for the owning stream to be dropped.
   This cleans up a huge amount of resources, reduces memory pressure, drops 
MemoryConsumers, thereby letting the pools make more educated decisions, and 
probably has more benefits down the line.
   
   ### Describe the solution you'd like
   
   Simply whenever a stream polls its child, and that child returns 
Ready(None), drop that child stream.
   Imagine something like the sort:
   ```
   let mut sorter = ExternalSorter::new(
                       partition,
                       input.schema(),
                       self.expr.clone(),
                       context.session_config().batch_size(),
                       execution_options.sort_spill_reservation_bytes,
                       execution_options.sort_in_place_threshold_bytes,
                       context.session_config().spill_compression(),
                       &self.metrics_set,
                       context.runtime_env(),
                   )?;
                   Ok(Box::pin(RecordBatchStreamAdapter::new(
                       self.schema(),
                       futures::stream::once(async move {
                           while let Some(batch) = input.next().await {
                               let batch = batch?;
                               sorter.insert_batch(batch).await?;
                           }
                           sorter.sort().await
                       })
                       .try_flatten(),
                   )))
   ```
   
   This is a memory intensive stream in general, but the last .sort().await can 
start an expensive merge whose performance is severely limited by the available 
memory.
   Using a naive fair pool, an aggregate leading into that sort will get half 
the memory reserved for it, unavailable to the sort, oblivious to the fact that 
the aggregate already finished its work.
   imagine if we added 
   `drop(input);` right after the while loop.
   the sorter.sort().await will now be given a 2x allocation from the pool!
   The same logic can be applied pretty much everywhere.
   
   ### Describe alternatives you've considered
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to