wiedld commented on code in PR #7842:
URL: https://github.com/apache/arrow-datafusion/pull/7842#discussion_r1361349727


##########
datafusion/physical-plan/src/sorts/builder.rs:
##########
@@ -60,43 +55,97 @@ impl BatchBuilder {
         Self {
             schema,
             batches: Vec::with_capacity(stream_count * 2),
-            cursors: vec![BatchCursor::default(); stream_count],
+            cursors: (0..stream_count).map(|_| None).collect(),
             indices: Vec::with_capacity(batch_size),
             reservation,
         }
     }
 
     /// Append a new batch in `stream_idx`
-    pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> 
Result<()> {
+    pub fn push_batch(
+        &mut self,
+        stream_idx: usize,
+        cursor: C,
+        batch: RecordBatch,
+    ) -> Result<()> {
         self.reservation.try_grow(batch.get_array_memory_size())?;
         let batch_idx = self.batches.len();
         self.batches.push((stream_idx, batch));
-        self.cursors[stream_idx] = BatchCursor {
+        self.cursors[stream_idx] = Some(BatchCursor {
             batch_idx,
+            cursor,
             row_idx: 0,
-        };
+        });
         Ok(())
     }
 
     /// Append the next row from `stream_idx`
     pub fn push_row(&mut self, stream_idx: usize) {
-        let cursor = &mut self.cursors[stream_idx];
-        let row_idx = cursor.row_idx;
-        cursor.row_idx += 1;
-        self.indices.push((cursor.batch_idx, row_idx));
+        let BatchCursor {
+            cursor: _,
+            batch_idx,
+            row_idx,
+        } = self.cursors[stream_idx]
+            .as_mut()
+            .expect("row pushed for non-existant cursor");
+        self.indices.push((*batch_idx, *row_idx));
+        *row_idx += 1;
     }
 
-    /// Returns the number of in-progress rows in this [`BatchBuilder`]
+    /// Returns true if there is an in-progress cursor for a given stream
+    pub fn cursor_in_progress(&mut self, stream_idx: usize) -> bool {
+        self.cursors[stream_idx]
+            .as_mut()
+            .map_or(false, |batch_cursor| !batch_cursor.cursor.is_finished())
+    }
+
+    /// Advance the cursor for `stream_idx`
+    /// Return true if cursor was advanced
+    pub fn advance_cursor(&mut self, stream_idx: usize) -> bool {
+        let slot = &mut self.cursors[stream_idx];
+        match slot.as_mut() {
+            Some(c) => {
+                if c.cursor.is_finished() {
+                    return false;

Review Comment:
   This is slightly different from the code removed from the merge node, which 
on finish sets `*slot = None`.
   
   Instead, we do not dump the batch_cursor and instead return false prior to 
advancement => which results in the same boolean returned on the next time 
`advance_cursor()` is checked.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to