This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 44daa9a7b8 minor: enhance comment in SortPreservingMergeStream.abort 
(#17115)
44daa9a7b8 is described below

commit 44daa9a7b86f39b324762f93af901b292e98008d
Author: mwish <maplewish...@gmail.com>
AuthorDate: Wed Aug 13 01:55:40 2025 +0800

    minor: enhance comment in SortPreservingMergeStream.abort (#17115)
    
    * minor: enhance comment in SortPreservingMergeStream.abort
    
    * change aborted to done
---
 datafusion/physical-plan/src/sorts/merge.rs | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)

diff --git a/datafusion/physical-plan/src/sorts/merge.rs 
b/datafusion/physical-plan/src/sorts/merge.rs
index ca2d5f2105..0b0136cd12 100644
--- a/datafusion/physical-plan/src/sorts/merge.rs
+++ b/datafusion/physical-plan/src/sorts/merge.rs
@@ -49,8 +49,9 @@ pub(crate) struct SortPreservingMergeStream<C: CursorValues> {
     /// used to record execution metrics
     metrics: BaselineMetrics,
 
-    /// If the stream has encountered an error
-    aborted: bool,
+    /// If the stream has encountered an error or reaches the
+    /// `fetch` limit.
+    done: bool,
 
     /// A loser tree that always produces the minimum cursor
     ///
@@ -162,7 +163,7 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
             in_progress: BatchBuilder::new(schema, stream_count, batch_size, 
reservation),
             streams,
             metrics,
-            aborted: false,
+            done: false,
             cursors: (0..stream_count).map(|_| None).collect(),
             prev_cursors: (0..stream_count).map(|_| None).collect(),
             round_robin_tie_breaker_mode: false,
@@ -206,7 +207,7 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
         &mut self,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Result<RecordBatch>>> {
-        if self.aborted {
+        if self.done {
             return Poll::Ready(None);
         }
         // Once all partitions have set their corresponding cursors for the 
loser tree,
@@ -220,7 +221,7 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
                 let partition_idx = self.uninitiated_partitions[idx];
                 match self.maybe_poll_stream(cx, partition_idx) {
                     Poll::Ready(Err(e)) => {
-                        self.aborted = true;
+                        self.done = true;
                         return Poll::Ready(Some(Err(e)));
                     }
                     Poll::Pending => {
@@ -268,7 +269,7 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
             if !self.loser_tree_adjusted {
                 let winner = self.loser_tree[0];
                 if let Err(e) = ready!(self.maybe_poll_stream(cx, winner)) {
-                    self.aborted = true;
+                    self.done = true;
                     return Poll::Ready(Some(Err(e)));
                 }
                 self.update_loser_tree();
@@ -281,7 +282,7 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
 
                 // stop sorting if fetch has been reached
                 if self.fetch_reached() {
-                    self.aborted = true;
+                    self.done = true;
                 } else if self.in_progress.len() < self.batch_size {
                     continue;
                 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to