wiedld commented on code in PR #7842:
URL: https://github.com/apache/arrow-datafusion/pull/7842#discussion_r1369047593


##########
datafusion/physical-plan/src/sorts/builder.rs:
##########
@@ -60,47 +58,125 @@ impl BatchBuilder {
         Self {
             schema,
             batches: Vec::with_capacity(stream_count * 2),
-            cursors: vec![BatchCursor::default(); stream_count],
+            cursors: (0..stream_count).map(|_| None).collect(),
             indices: Vec::with_capacity(batch_size),
             reservation,
         }
     }
 
     /// Append a new batch in `stream_idx`
-    pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> 
Result<()> {
+    pub fn push_batch(
+        &mut self,
+        stream_idx: usize,
+        cursor_values: C,
+        batch: RecordBatch,
+    ) -> Result<()> {
         self.reservation.try_grow(batch.get_array_memory_size())?;
         let batch_idx = self.batches.len();
         self.batches.push((stream_idx, batch));
-        self.cursors[stream_idx] = BatchCursor {
+        self.cursors[stream_idx] = Some(BatchCursor {
             batch_idx,
+            cursor: Cursor::new(cursor_values),
             row_idx: 0,
-        };
+        });
         Ok(())
     }
 
     /// Append the next row from `stream_idx`
     pub fn push_row(&mut self, stream_idx: usize) {
-        let cursor = &mut self.cursors[stream_idx];
-        let row_idx = cursor.row_idx;
-        cursor.row_idx += 1;
-        self.indices.push((cursor.batch_idx, row_idx));
+        let BatchCursor {
+            cursor: _,
+            batch_idx,
+            row_idx,
+        } = self.cursors[stream_idx]
+            .as_mut()
+            .expect("row pushed for non-existant cursor");
+        self.indices.push((*batch_idx, *row_idx));
+        *row_idx += 1;
+    }
+
+    /// Returns true if there is an in-progress cursor for a given stream
+    pub fn cursor_in_progress(&mut self, stream_idx: usize) -> bool {
+        self.cursors[stream_idx]
+            .as_mut()
+            .map_or(false, |batch_cursor| !batch_cursor.cursor.is_finished())
+    }
+
+    /// Advance the cursor for `stream_idx`
+    /// Return true if cursor was advanced
+    pub fn advance_cursor(&mut self, stream_idx: usize) -> bool {
+        let slot = &mut self.cursors[stream_idx];
+        match slot.as_mut() {
+            Some(c) => {
+                if c.cursor.is_finished() {
+                    return false;
+                }
+                c.cursor.advance();
+                true
+            }
+            None => false,
+        }
     }
 
-    /// Returns the number of in-progress rows in this [`BatchBuilder`]
+    /// Returns `true` if the cursor at index `a` is greater than at index `b`
+    #[inline]
+    pub fn is_gt(&mut self, stream_idx_a: usize, stream_idx_b: usize) -> bool {
+        match (
+            self.cursor_in_progress(stream_idx_a),
+            self.cursor_in_progress(stream_idx_b),
+        ) {
+            (false, _) => true,
+            (_, false) => false,
+            _ => match (&self.cursors[stream_idx_a], 
&self.cursors[stream_idx_b]) {
+                (Some(a), Some(b)) => a
+                    .cursor
+                    .cmp(&b.cursor)
+                    .then_with(|| stream_idx_a.cmp(&stream_idx_b))
+                    .is_gt(),
+                _ => unreachable!(),
+            },
+        }
+    }
+
+    /// Returns the number of in-progress rows in this [`SortOrderBuilder`]
     pub fn len(&self) -> usize {
         self.indices.len()
     }
 
-    /// Returns `true` if this [`BatchBuilder`] contains no in-progress rows
+    /// Returns `true` if this [`SortOrderBuilder`] contains no in-progress 
rows
     pub fn is_empty(&self) -> bool {
         self.indices.is_empty()
     }
 
-    /// Returns the schema of this [`BatchBuilder`]
+    /// Returns the schema of this [`SortOrderBuilder`]
     pub fn schema(&self) -> &SchemaRef {
         &self.schema
     }
 
+    /// Yields a sort_order and the sliced [`BatchCursor`]s
+    /// representing (in total) up to N batch size.
+    ///
+    ///         BatchCursors
+    /// ┌────────────────────────┐
+    /// │    Cursor     BatchId  │
+    /// │ ┌──────────┐ ┌───────┐ │
+    /// │ │  1..7    │ │   A   │ |  (sliced to partial batches)
+    /// │ ├──────────┤ ├───────┤ │
+    /// │ │  11..14  │ │   B   │ |
+    /// │ └──────────┘ └───────┘ │
+    /// └────────────────────────┘
+    ///
+    ///
+    ///         SortOrder
+    /// ┌─────────────────────────────┐
+    /// | (B,11) (A,1) (A,2) (B,12)...|  (up to N rows)
+    /// └─────────────────────────────┘
+    ///
+    #[allow(dead_code)]
+    pub fn yield_sort_order(&mut self) -> Result<Option<YieldedSortOrder<C>>> {
+        unimplemented!("to implement in future PR");

Review Comment:
   By having the SortOrderBuilder take full ownership of the cursors, then we 
can have all batch partials (and awareness of partials) be handled in the 
SortOrderBuilder. This `yield_sort_order()` will have an implementation [like 
this](https://github.com/wiedld/arrow-datafusion/blob/1477beb4145f6dd0bb4cc1c9b5535361c425a837/datafusion/physical-plan/src/sorts/builder.rs#L128-L164).
   
   The loser/merge tree will only interface with the cursor through the 
SortOrderBuilder (e.g.`push_batch()`, `push_row()`, `advance_cursor()`). Please 
let me know if there is a better design option. 😄 



##########
datafusion/physical-plan/src/sorts/builder.rs:
##########
@@ -60,47 +58,125 @@ impl BatchBuilder {
         Self {
             schema,
             batches: Vec::with_capacity(stream_count * 2),
-            cursors: vec![BatchCursor::default(); stream_count],
+            cursors: (0..stream_count).map(|_| None).collect(),
             indices: Vec::with_capacity(batch_size),
             reservation,
         }
     }
 
     /// Append a new batch in `stream_idx`
-    pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> 
Result<()> {
+    pub fn push_batch(
+        &mut self,
+        stream_idx: usize,
+        cursor_values: C,
+        batch: RecordBatch,
+    ) -> Result<()> {
         self.reservation.try_grow(batch.get_array_memory_size())?;
         let batch_idx = self.batches.len();
         self.batches.push((stream_idx, batch));
-        self.cursors[stream_idx] = BatchCursor {
+        self.cursors[stream_idx] = Some(BatchCursor {
             batch_idx,
+            cursor: Cursor::new(cursor_values),
             row_idx: 0,
-        };
+        });
         Ok(())
     }
 
     /// Append the next row from `stream_idx`
     pub fn push_row(&mut self, stream_idx: usize) {
-        let cursor = &mut self.cursors[stream_idx];
-        let row_idx = cursor.row_idx;
-        cursor.row_idx += 1;
-        self.indices.push((cursor.batch_idx, row_idx));
+        let BatchCursor {
+            cursor: _,
+            batch_idx,
+            row_idx,
+        } = self.cursors[stream_idx]
+            .as_mut()
+            .expect("row pushed for non-existant cursor");
+        self.indices.push((*batch_idx, *row_idx));
+        *row_idx += 1;
+    }
+
+    /// Returns true if there is an in-progress cursor for a given stream
+    pub fn cursor_in_progress(&mut self, stream_idx: usize) -> bool {
+        self.cursors[stream_idx]
+            .as_mut()
+            .map_or(false, |batch_cursor| !batch_cursor.cursor.is_finished())
+    }
+
+    /// Advance the cursor for `stream_idx`
+    /// Return true if cursor was advanced
+    pub fn advance_cursor(&mut self, stream_idx: usize) -> bool {
+        let slot = &mut self.cursors[stream_idx];
+        match slot.as_mut() {
+            Some(c) => {
+                if c.cursor.is_finished() {
+                    return false;
+                }
+                c.cursor.advance();
+                true
+            }
+            None => false,
+        }
     }
 
-    /// Returns the number of in-progress rows in this [`BatchBuilder`]
+    /// Returns `true` if the cursor at index `a` is greater than at index `b`
+    #[inline]
+    pub fn is_gt(&mut self, stream_idx_a: usize, stream_idx_b: usize) -> bool {
+        match (
+            self.cursor_in_progress(stream_idx_a),
+            self.cursor_in_progress(stream_idx_b),
+        ) {
+            (false, _) => true,
+            (_, false) => false,
+            _ => match (&self.cursors[stream_idx_a], 
&self.cursors[stream_idx_b]) {
+                (Some(a), Some(b)) => a
+                    .cursor
+                    .cmp(&b.cursor)
+                    .then_with(|| stream_idx_a.cmp(&stream_idx_b))
+                    .is_gt(),
+                _ => unreachable!(),
+            },
+        }
+    }
+
+    /// Returns the number of in-progress rows in this [`SortOrderBuilder`]
     pub fn len(&self) -> usize {
         self.indices.len()
     }
 
-    /// Returns `true` if this [`BatchBuilder`] contains no in-progress rows
+    /// Returns `true` if this [`SortOrderBuilder`] contains no in-progress 
rows
     pub fn is_empty(&self) -> bool {
         self.indices.is_empty()
     }
 
-    /// Returns the schema of this [`BatchBuilder`]
+    /// Returns the schema of this [`SortOrderBuilder`]
     pub fn schema(&self) -> &SchemaRef {
         &self.schema
     }
 
+    /// Yields a sort_order and the sliced [`BatchCursor`]s
+    /// representing (in total) up to N batch size.
+    ///
+    ///         BatchCursors
+    /// ┌────────────────────────┐
+    /// │    Cursor     BatchId  │
+    /// │ ┌──────────┐ ┌───────┐ │
+    /// │ │  1..7    │ │   A   │ |  (sliced to partial batches)
+    /// │ ├──────────┤ ├───────┤ │
+    /// │ │  11..14  │ │   B   │ |
+    /// │ └──────────┘ └───────┘ │
+    /// └────────────────────────┘
+    ///
+    ///
+    ///         SortOrder
+    /// ┌─────────────────────────────┐
+    /// | (B,11) (A,1) (A,2) (B,12)...|  (up to N rows)
+    /// └─────────────────────────────┘
+    ///
+    #[allow(dead_code)]
+    pub fn yield_sort_order(&mut self) -> Result<Option<YieldedSortOrder<C>>> {
+        unimplemented!("to implement in future PR");

Review Comment:
   By having the SortOrderBuilder take full ownership of the cursors, then we 
can have all batch partials (and awareness of partials) be handled in the 
SortOrderBuilder. This `yield_sort_order()` will have an implementation [like 
this](https://github.com/wiedld/arrow-datafusion/blob/1477beb4145f6dd0bb4cc1c9b5535361c425a837/datafusion/physical-plan/src/sorts/builder.rs#L128-L164).
   
   The loser/merge tree will only interface with the cursor through the 
SortOrderBuilder (e.g.`push_batch()`, `push_row()`, `advance_cursor()`). Please 
let me know if there is a better design option. 😄 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to