pepijnve commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2134778315


##########
datafusion/physical-plan/src/sorts/merge.rs:
##########
@@ -216,36 +212,49 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
         // Once all partitions have set their corresponding cursors for the 
loser tree,
         // we skip the following block. Until then, this function may be 
called multiple
         // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
         if self.loser_tree.is_empty() {
-            while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+            // Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+            let mut idx = 0;
+            while idx < self.uninitiated_partitions.len() {
+                // unwrap is safe since we just checked the index
+                let &partition_idx = 
self.uninitiated_partitions.get(idx).unwrap();
                 match self.maybe_poll_stream(cx, partition_idx) {
                     Poll::Ready(Err(e)) => {
                         self.aborted = true;
                         return Poll::Ready(Some(Err(e)));
                     }
                     Poll::Pending => {
-                        // If a partition returns Poll::Pending, to avoid 
continuously polling it
-                        // and potentially increasing upstream buffer sizes, 
we move it to the
-                        // back of the polling queue.
-                        self.uninitiated_partitions.rotate_left(1);
-
-                        // This function could remain in a pending state, so 
we manually wake it here.
-                        // However, this approach can be investigated further 
to find a more natural way
-                        // to avoid disrupting the runtime scheduler.
-                        cx.waker().wake_by_ref();
-                        return Poll::Pending;
+                        // The polled stream is pending which means we're 
already set up to
+                        // be woken when necessary
+                        // Try the next stream
+                        idx += 1;
                     }
                     _ => {
-                        // If the polling result is Poll::Ready(Some(batch)) 
or Poll::Ready(None),
-                        // we remove this partition from the queue so it is 
not polled again.
-                        self.uninitiated_partitions.pop_front();
+                        // The polled stream is ready
+                        // Remove it from uninitiated_partitions
+                        // Don't bump idx here, since a new element will have 
taken its
+                        // place which we'll try in the next loop iteration
+                        self.uninitiated_partitions.remove(idx);

Review Comment:
   Will do. I'll need to adapt `test_spm_congestion` as well then since it 
isn't really testing what it claims to be testing anymore then. I'll push 
swap_remove already to see if the other test breaks already.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to