alamb commented on code in PR #4407:
URL: https://github.com/apache/arrow-datafusion/pull/4407#discussion_r1067925771
##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -660,6 +609,127 @@ impl SortPreservingMergeStream {
}
}
}
+
+ /// Attempts to initialize the loser tree with one value from each
+ /// non exhausted input, if possible.
+ ///
+ /// Returns None on success, or Some(poll) if any of the inputs
+ /// are not ready or errored
+ #[inline]
+ fn init_loser_tree(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) ->
TreeUpdate {
+ let num_streams = self.streams.num_streams();
+
+ if !self.loser_tree.is_empty() {
+ return TreeUpdate::Complete;
+ }
+
+ // Ensure all non-exhausted streams have a cursor from which
+ // rows can be pulled
+ for i in 0..num_streams {
+ match self.maybe_poll_stream(cx, i) {
+ Poll::Ready(Ok(_)) => {}
+ Poll::Ready(Err(e)) => {
+ self.aborted = true;
+ return TreeUpdate::Error(e);
+ }
+ Poll::Pending => return TreeUpdate::Pending,
+ }
+ }
+
+ // Init loser tree
+ self.loser_tree.resize(num_streams, usize::MAX);
+ for i in 0..num_streams {
+ let mut winner = i;
+ let mut cmp_node = (num_streams + i) / 2;
+ while cmp_node != 0 && self.loser_tree[cmp_node] != usize::MAX {
+ let challenger = self.loser_tree[cmp_node];
+ let challenger_win =
+ match (&self.cursors[winner], &self.cursors[challenger]) {
+ (None, _) => true,
+ (_, None) => false,
+ (Some(winner), Some(challenger)) => challenger <
winner,
+ };
+
+ if challenger_win {
+ self.loser_tree[cmp_node] = winner;
+ winner = challenger;
+ }
+
+ cmp_node /= 2;
+ }
+ self.loser_tree[cmp_node] = winner;
+ }
+ self.loser_tree_adjusted = true;
+ TreeUpdate::Complete
+ }
+
+ /// Attempts to updated the loser tree, if possible
+ ///
+ /// Returns None on success, or Some(poll) if the winning input
+ /// was not ready or errored
+ #[inline]
+ fn update_loser_tree(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) ->
TreeUpdate {
+ if self.loser_tree_adjusted {
+ return TreeUpdate::Complete;
+ }
+
+ let num_streams = self.streams.num_streams();
+ let mut winner = self.loser_tree[0];
+ match self.maybe_poll_stream(cx, winner) {
+ Poll::Ready(Ok(_)) => {}
+ Poll::Ready(Err(e)) => {
+ self.aborted = true;
+ return TreeUpdate::Error(e);
+ }
+ Poll::Pending => return TreeUpdate::Pending,
+ }
+
+ // Replace overall winner by walking tree of losers
+ let mut cmp_node = (num_streams + winner) / 2;
+ while cmp_node != 0 {
+ let challenger = self.loser_tree[cmp_node];
+ let challenger_win = match (&self.cursors[winner],
&self.cursors[challenger])
+ {
+ (None, _) => true,
+ (_, None) => false,
+ (Some(winner), Some(challenger)) => challenger < winner,
+ };
+ if challenger_win {
+ self.loser_tree[cmp_node] = winner;
+ winner = challenger;
+ }
+ cmp_node /= 2;
+ }
+ self.loser_tree[0] = winner;
+ self.loser_tree_adjusted = true;
+ TreeUpdate::Complete
+ }
+}
+
+/// The result of updating the loser tree. It is the same as an Option
+/// but with specific names for easier readability
+enum TreeUpdate {
Review Comment:
I tried it out and you are right -- I think `Poll` made the code better --
in 2237abed8
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]