wiedld commented on code in PR #7842:
URL: https://github.com/apache/arrow-datafusion/pull/7842#discussion_r1367412508


##########
datafusion/physical-plan/src/sorts/builder.rs:
##########
@@ -60,43 +55,97 @@ impl BatchBuilder {
         Self {
             schema,
             batches: Vec::with_capacity(stream_count * 2),
-            cursors: vec![BatchCursor::default(); stream_count],
+            cursors: (0..stream_count).map(|_| None).collect(),
             indices: Vec::with_capacity(batch_size),
             reservation,
         }
     }
 
     /// Append a new batch in `stream_idx`
-    pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> 
Result<()> {
+    pub fn push_batch(
+        &mut self,
+        stream_idx: usize,
+        cursor: C,
+        batch: RecordBatch,
+    ) -> Result<()> {
         self.reservation.try_grow(batch.get_array_memory_size())?;
         let batch_idx = self.batches.len();
         self.batches.push((stream_idx, batch));
-        self.cursors[stream_idx] = BatchCursor {
+        self.cursors[stream_idx] = Some(BatchCursor {
             batch_idx,
+            cursor,
             row_idx: 0,
-        };
+        });
         Ok(())
     }
 
     /// Append the next row from `stream_idx`
     pub fn push_row(&mut self, stream_idx: usize) {
-        let cursor = &mut self.cursors[stream_idx];
-        let row_idx = cursor.row_idx;
-        cursor.row_idx += 1;
-        self.indices.push((cursor.batch_idx, row_idx));
+        let BatchCursor {
+            cursor: _,
+            batch_idx,
+            row_idx,
+        } = self.cursors[stream_idx]
+            .as_mut()
+            .expect("row pushed for non-existant cursor");
+        self.indices.push((*batch_idx, *row_idx));
+        *row_idx += 1;
     }
 
-    /// Returns the number of in-progress rows in this [`BatchBuilder`]
+    /// Returns true if there is an in-progress cursor for a given stream
+    pub fn cursor_in_progress(&mut self, stream_idx: usize) -> bool {
+        self.cursors[stream_idx]
+            .as_mut()
+            .map_or(false, |batch_cursor| !batch_cursor.cursor.is_finished())
+    }
+
+    /// Advance the cursor for `stream_idx`
+    /// Return true if cursor was advanced
+    pub fn advance_cursor(&mut self, stream_idx: usize) -> bool {
+        let slot = &mut self.cursors[stream_idx];
+        match slot.as_mut() {
+            Some(c) => {
+                if c.cursor.is_finished() {
+                    return false;

Review Comment:
   Note that this means that the lifetime/cycle of a `self.cursors[stream_idx]` 
slot is:
   1. Option == None. For that stream_idx, no ongoing cursor exists yet.
   2. Some(cursor) after `push_batch()`.
   4. several `push_rows()`
   5. merge/loser tree node checks that cursor is finished 
`!cursor_in_progress()`
      * polls for next CursorValues
      * `push_batch()`
   6. push_batch saves the completed BatchCursor (to `sorted_batches`) and adds 
the new BatchCursor.
   
   --- 
   
   After the next PR, we will then see:
   1. `SortOrderBuilder::build_batch()` will change to 
`SortOrderBuilder::yield_sort_order()`
   2. on yield, it will do:
       * fully yielded BatchCursors will have a new (at the start) Cursor with 
the full CursorValues
       * fully retained BatchCursors will have no change. (remain in ongoing 
`self.cursors[stream_idx]`)
       * partial yielded BatchCursor:
           * will slicing the underlying CursorValues and have new cursors in 
both (sliced) parts 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to