viirya commented on code in PR #4301:
URL: https://github.com/apache/arrow-datafusion/pull/4301#discussion_r1028605200
##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -551,17 +558,46 @@ impl SortPreservingMergeStream {
if self.aborted {
return Poll::Ready(None);
}
+ let num_streams = self.streams.num_streams();
+
+ // Init all cursors and the loser tree in the first poll
+ if self.loser_tree.is_empty() {
+ // Ensure all non-exhausted streams have a cursor from which
+ // rows can be pulled
+ for i in 0..num_streams {
+ match futures::ready!(self.maybe_poll_stream(cx, i)) {
+ Ok(_) => {}
+ Err(e) => {
+ self.aborted = true;
+ return Poll::Ready(Some(Err(e)));
+ }
+ }
+ }
- // Ensure all non-exhausted streams have a cursor from which
- // rows can be pulled
- for i in 0..self.streams.num_streams() {
- match futures::ready!(self.maybe_poll_stream(cx, i)) {
- Ok(_) => {}
- Err(e) => {
- self.aborted = true;
- return Poll::Ready(Some(Err(e)));
+ // Init loser tree
+ self.loser_tree.resize(num_streams, usize::MAX);
+ for i in 0..num_streams {
+ let mut winner = i;
+ let mut cmp_node = (num_streams + i) / 2;
+ while cmp_node != 0 && self.loser_tree[cmp_node] != usize::MAX
{
+ let challenger = self.loser_tree[cmp_node];
+ let challenger_win =
+ match (&self.cursors[winner],
&self.cursors[challenger]) {
+ (None, _) => true,
+ (_, None) => false,
+ (Some(winner), Some(challenger)) => challenger <
winner,
+ };
+ if challenger_win {
+ self.loser_tree[cmp_node] = winner;
+ winner = challenger;
+ } else {
+ self.loser_tree[cmp_node] = challenger;
+ }
Review Comment:
```suggestion
if challenger_win {
self.loser_tree[cmp_node] = winner;
winner = challenger;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]