alamb commented on a change in pull request #1596:
URL: https://github.com/apache/arrow-datafusion/pull/1596#discussion_r788120755



##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -227,115 +618,56 @@ pub(crate) fn sort_batch(
     )
 }
 
-pin_project! {
-    /// stream for sort plan
-    struct SortStream {
-        #[pin]
-        output: 
futures::channel::oneshot::Receiver<ArrowResult<Option<RecordBatch>>>,
-        finished: bool,
-        schema: SchemaRef,
-        drop_helper: AbortOnDropSingle<()>,
-    }
-}
-
-impl SortStream {
-    fn new(
-        input: SendableRecordBatchStream,
-        expr: Vec<PhysicalSortExpr>,
-        baseline_metrics: BaselineMetrics,
-    ) -> Self {
-        let (tx, rx) = futures::channel::oneshot::channel();
-        let schema = input.schema();
-        let join_handle = tokio::spawn(async move {
-            let schema = input.schema();
-            let sorted_batch = common::collect(input)
-                .await
-                .map_err(DataFusionError::into_arrow_external_error)
-                .and_then(move |batches| {
-                    let timer = baseline_metrics.elapsed_compute().timer();
-                    // combine all record batches into one for each column
-                    let combined = common::combine_batches(&batches, 
schema.clone())?;
-                    // sort combined record batch
-                    let result = combined
-                        .map(|batch| sort_batch(batch, schema, &expr))
-                        .transpose()?
-                        .record_output(&baseline_metrics);
-                    timer.done();
-                    Ok(result)
-                });
-
-            // failing here is OK, the receiver is gone and does not care 
about the result
-            tx.send(sorted_batch).ok();
-        });
-
-        Self {
-            output: rx,
-            finished: false,
-            schema,
-            drop_helper: AbortOnDropSingle::new(join_handle),
-        }
-    }
-}
-
-impl Stream for SortStream {
-    type Item = ArrowResult<RecordBatch>;
-
-    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
-        if self.finished {
-            return Poll::Ready(None);
-        }
-
-        // is the output ready?
-        let this = self.project();
-        let output_poll = this.output.poll(cx);
-
-        match output_poll {
-            Poll::Ready(result) => {
-                *this.finished = true;
-
-                // check for error in receiving channel and unwrap actual 
result
-                let result = match result {
-                    Err(e) => 
Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving
-                    Ok(result) => result.transpose(),
-                };
-
-                Poll::Ready(result)
-            }
-            Poll::Pending => Poll::Pending,
-        }
+async fn do_sort(
+    mut input: SendableRecordBatchStream,
+    partition_id: usize,
+    expr: Vec<PhysicalSortExpr>,
+    metrics: AggregatedMetricsSet,
+    runtime: Arc<RuntimeEnv>,
+) -> Result<SendableRecordBatchStream> {
+    let schema = input.schema();
+    let sorter = Arc::new(ExternalSorter::new(
+        partition_id,
+        schema.clone(),
+        expr,
+        metrics,
+        runtime.clone(),
+    ));
+    runtime.register_consumer(&(sorter.clone() as Arc<dyn MemoryConsumer>));

Review comment:
       yeah I like to think of it is "if the memory manager says to a consumer 
it can't have more memory, then the consumer is responsible for spilling and 
staying within its allotted budget"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to