This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 44daa9a7b8 minor: enhance comment in SortPreservingMergeStream.abort (#17115) 44daa9a7b8 is described below commit 44daa9a7b86f39b324762f93af901b292e98008d Author: mwish <maplewish...@gmail.com> AuthorDate: Wed Aug 13 01:55:40 2025 +0800 minor: enhance comment in SortPreservingMergeStream.abort (#17115) * minor: enhance comment in SortPreservingMergeStream.abort * change aborted to done --- datafusion/physical-plan/src/sorts/merge.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index ca2d5f2105..0b0136cd12 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -49,8 +49,9 @@ pub(crate) struct SortPreservingMergeStream<C: CursorValues> { /// used to record execution metrics metrics: BaselineMetrics, - /// If the stream has encountered an error - aborted: bool, + /// If the stream has encountered an error or reaches the + /// `fetch` limit. + done: bool, /// A loser tree that always produces the minimum cursor /// @@ -162,7 +163,7 @@ impl<C: CursorValues> SortPreservingMergeStream<C> { in_progress: BatchBuilder::new(schema, stream_count, batch_size, reservation), streams, metrics, - aborted: false, + done: false, cursors: (0..stream_count).map(|_| None).collect(), prev_cursors: (0..stream_count).map(|_| None).collect(), round_robin_tie_breaker_mode: false, @@ -206,7 +207,7 @@ impl<C: CursorValues> SortPreservingMergeStream<C> { &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<RecordBatch>>> { - if self.aborted { + if self.done { return Poll::Ready(None); } // Once all partitions have set their corresponding cursors for the loser tree, @@ -220,7 +221,7 @@ impl<C: CursorValues> SortPreservingMergeStream<C> { let partition_idx = self.uninitiated_partitions[idx]; match self.maybe_poll_stream(cx, partition_idx) { Poll::Ready(Err(e)) => { - self.aborted = true; + self.done = true; return Poll::Ready(Some(Err(e))); } Poll::Pending => { @@ -268,7 +269,7 @@ impl<C: CursorValues> SortPreservingMergeStream<C> { if !self.loser_tree_adjusted { let winner = self.loser_tree[0]; if let Err(e) = ready!(self.maybe_poll_stream(cx, winner)) { - self.aborted = true; + self.done = true; return Poll::Ready(Some(Err(e))); } self.update_loser_tree(); @@ -281,7 +282,7 @@ impl<C: CursorValues> SortPreservingMergeStream<C> { // stop sorting if fetch has been reached if self.fetch_reached() { - self.aborted = true; + self.done = true; } else if self.in_progress.len() < self.batch_size { continue; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org