rluvaton commented on code in PR #20314:
URL: https://github.com/apache/datafusion/pull/20314#discussion_r2841228608


##########
datafusion/physical-plan/src/sorts/stream.rs:
##########
@@ -276,3 +280,159 @@ impl<T: CursorArray> PartitionedStream for 
FieldCursorStream<T> {
         }))
     }
 }
+
+/// A lazy, memory-efficient sort iterator used as a fallback during aggregate
+/// spill when there is not enough memory for an eager sort (which requires ~2x
+/// peak memory to hold both the unsorted and sorted copies simultaneously).
+///
+/// On the first call to `next()`, a sorted index array (`UInt32Array`) is
+/// computed via `lexsort_to_indices`. Subsequent calls yield chunks of
+/// `batch_size` rows by `take`-ing from the original batch using slices of
+/// this index array. Each `take` copies data for the chunk (not zero-copy),
+/// but only one chunk is live at a time since the caller consumes it before
+/// requesting the next. Once all rows have been yielded, the original batch
+/// and index array are dropped to free memory.
+///
+/// The caller must reserve `sizeof(batch) + sizeof(one chunk)` for this 
iterator,
+/// and free the reservation once the iterator is depleted.
+pub(crate) struct IncrementalSortIterator {
+    batch: RecordBatch,
+    expressions: LexOrdering,
+    batch_size: usize,
+    indices: Option<UInt32Array>,
+    cursor: usize,
+}
+
+impl IncrementalSortIterator {
+    pub(crate) fn new(
+        batch: RecordBatch,
+        expressions: LexOrdering,
+        batch_size: usize,
+    ) -> Self {
+        Self {
+            batch,
+            expressions,
+            batch_size,
+            cursor: 0,
+            indices: None,
+        }
+    }
+}
+
+impl Iterator for IncrementalSortIterator {
+    type Item = Result<RecordBatch>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        if self.cursor >= self.batch.num_rows() {
+            return None;
+        }
+
+        match self.indices.as_ref() {
+            None => {
+                let sort_columns = match self
+                    .expressions
+                    .iter()
+                    .map(|expr| expr.evaluate_to_sort_column(&self.batch))
+                    .collect::<Result<Vec<_>>>()
+                {
+                    Ok(cols) => cols,
+                    Err(e) => return Some(Err(e)),
+                };
+
+                let indices = match lexsort_to_indices(&sort_columns, None) {
+                    Ok(indices) => indices,
+                    Err(e) => return Some(Err(e.into())),
+                };
+                self.indices = Some(indices);
+
+                // Call again, this time it will hit the Some(indices) branch 
and return the first batch
+                self.next()
+            }
+            Some(indices) => {
+                let batch_size = self.batch_size.min(self.batch.num_rows() - 
self.cursor);
+
+                // Perform the take to produce the next batch
+                let new_batch_indices = indices.slice(self.cursor, batch_size);
+                let new_batch = match take_record_batch(&self.batch, 
&new_batch_indices) {
+                    Ok(batch) => batch,
+                    Err(e) => return Some(Err(e.into())),
+                };
+
+                self.cursor += batch_size;
+
+                // If this is the last batch, we can release the memory
+                if self.cursor >= self.batch.num_rows() {
+                    let schema = self.batch.schema();
+                    let _ = mem::replace(&mut self.batch, 
RecordBatch::new_empty(schema));
+                    self.indices = None;
+                }
+
+                // Return the new batch
+                Some(Ok(new_batch))
+            }
+        }
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        let num_rows = self.batch.num_rows();
+        let batch_size = self.batch_size;
+        let num_batches = num_rows.div_ceil(batch_size);
+        (num_batches, Some(num_batches))
+    }
+}
+
+impl FusedIterator for IncrementalSortIterator {}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::array::{AsArray, Int32Array};
+    use arrow::datatypes::{DataType, Field, Int32Type};
+    use datafusion_common::DataFusionError;
+    use datafusion_physical_expr::expressions::col;
+
+    /// Verifies that `take_record_batch` in `IncrementalSortIterator` actually
+    /// copies the data into a new allocation rather than returning a zero-copy
+    /// slice of the original batch. If the output arrays were slices, their
+    /// underlying buffer length would match the original array's length; a 
true
+    /// copy will have a buffer sized to fit only the chunk.
+    #[test]
+    fn incremental_sort_iterator_copies_data() -> Result<()> {
+        let original_len = 10;
+        let batch_size = 3;
+
+        // Build a batch with a single Int32 column of descending values
+        let schema = Arc::new(Schema::new(vec![Field::new("a", 
DataType::Int32, false)]));
+        let col_a: Int32Array = Int32Array::from(vec![0; original_len]);
+        let batch = RecordBatch::try_new(schema, vec![Arc::new(col_a)])?;
+
+        // Sort ascending on column "a"
+        let expressions = 
LexOrdering::new(vec![PhysicalSortExpr::new_default(col(
+            "a",
+            &batch.schema(),
+        )?)])
+        .unwrap();
+
+        let mut total_rows = 0;
+        IncrementalSortIterator::new(batch.clone(), expressions, 
batch_size).try_for_each(
+            |result| {
+                let chunk = result?;
+                total_rows += chunk.num_rows();
+
+                // Every output column must be a fresh allocation whose length
+                // equals the chunk size, NOT the original array length.
+                chunk.columns().iter().zip(batch.columns()).for_each(|(arr, 
original_arr)| {
+                    let (_, scalar_buf, _) = 
arr.as_primitive::<Int32Type>().clone().into_parts();
+                    let (_, original_scalar_buf, _) = 
original_arr.as_primitive::<Int32Type>().clone().into_parts();
+
+                    
assert!(!scalar_buf.inner().ptr_eq(original_scalar_buf.inner()), "Expected a 
copy of the data for each chunk, but got a slice that shares the same buffer as 
the original array");

Review Comment:
   `ptr_eq` will give false for sliced, it should use `data_ptr` for the 
underlying data



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to