alamb commented on code in PR #4301:
URL: https://github.com/apache/arrow-datafusion/pull/4301#discussion_r1034040105


##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -570,45 +606,57 @@ impl SortPreservingMergeStream {
         let _timer = elapsed_compute.timer();
 
         loop {
-            match self.heap.pop() {
-                Some(Reverse(mut cursor)) => {
-                    let stream_idx = cursor.stream_idx();
-                    let batch_idx = self.batches[stream_idx].len() - 1;
-                    let row_idx = cursor.advance();
-
-                    let mut cursor_finished = false;
-                    // insert the cursor back to heap if the record batch is 
not exhausted
-                    if !cursor.is_finished() {
-                        self.heap.push(Reverse(cursor));
-                    } else {
-                        cursor_finished = true;
-                        self.cursor_finished[stream_idx] = true;
+            // Adjust the loser tree if necessary
+            if !self.loser_tree_adjusted {
+                let mut winner = self.loser_tree[0];
+                match futures::ready!(self.maybe_poll_stream(cx, winner)) {
+                    Ok(_) => {}
+                    Err(e) => {
+                        self.aborted = true;
+                        return Poll::Ready(Some(Err(e)));
                     }
+                }
 
-                    self.in_progress.push(RowIndex {
-                        stream_idx,
-                        batch_idx,
-                        row_idx,
-                    });
-
-                    if self.in_progress.len() == self.batch_size {
-                        return Poll::Ready(Some(self.build_record_batch()));
+                let mut cmp_node = (num_streams + winner) / 2;
+                while cmp_node != 0 {
+                    let challenger = self.loser_tree[cmp_node];
+                    let challenger_win =
+                        match (&self.cursors[winner], 
&self.cursors[challenger]) {
+                            (None, _) => true,
+                            (_, None) => false,
+                            (Some(winner), Some(challenger)) => challenger < 
winner,
+                        };
+                    if challenger_win {
+                        self.loser_tree[cmp_node] = winner;
+                        winner = challenger;
                     }
+                    cmp_node /= 2;
+                }
+                self.loser_tree[0] = winner;
+                self.loser_tree_adjusted = true;
+            }
 
-                    // If removed the last row from the cursor, need to fetch 
a new record
-                    // batch if possible, before looping round again
-                    if cursor_finished {
-                        match futures::ready!(self.maybe_poll_stream(cx, 
stream_idx)) {
-                            Ok(_) => {}
-                            Err(e) => {
-                                self.aborted = true;
-                                return Poll::Ready(Some(Err(e)));
-                            }
-                        }
-                    }
+            let min_cursor_idx = self.loser_tree[0];

Review Comment:
   I couldn't make this work in 
https://github.com/apache/arrow-datafusion/pull/4407



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to