rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2219098075


##########
datafusion/physical-plan/src/spill/spill_manager.rs:
##########
@@ -125,6 +133,156 @@ impl SpillManager {
         self.spill_record_batch_and_finish(&batches, request_description)
     }
 
+    /// Refer to the documentation for 
[`Self::spill_record_batch_and_finish`]. This method
+    /// additionally spills the `RecordBatch` into smaller batches, divided by 
`row_limit`.
+    ///
+    /// # Errors
+    /// - Returns an error if spilling would exceed the disk usage limit 
configured
+    ///   by `max_temp_directory_size` in `DiskManager`
+    pub(crate) fn spill_record_batch_by_size_and_return_max_batch_memory(
+        &self,
+        batch: &RecordBatch,
+        request_description: &str,
+        row_limit: usize,
+    ) -> Result<Option<(RefCountedTempFile, usize)>> {
+        let total_rows = batch.num_rows();
+        let mut batches = Vec::new();
+        let mut offset = 0;
+
+        // It's ok to calculate all slices first, because slicing is zero-copy.
+        while offset < total_rows {
+            let length = std::cmp::min(total_rows - offset, row_limit);
+            let sliced_batch = batch.slice(offset, length);
+            batches.push(sliced_batch);
+            offset += length;
+        }
+
+        let mut in_progress_file = 
self.create_in_progress_file(request_description)?;
+
+        let mut max_record_batch_size = 0;
+
+        for batch in batches {
+            in_progress_file.append_batch(&batch)?;
+
+            max_record_batch_size =
+                max_record_batch_size.max(batch.get_actually_used_size());
+        }
+
+        let file = in_progress_file.finish()?;
+
+        Ok(file.map(|f| (f, max_record_batch_size)))
+    }
+
+    /// Spill the `RecordBatch` to disk as smaller batches
+    /// split by `batch_size_rows`.
+    ///
+    /// will return the spill file and the size of the largest batch in memory
+    pub async fn spill_record_batch_stream_by_size(
+        &self,
+        stream: &mut SendableRecordBatchStream,
+        batch_size_rows: usize,
+        request_msg: &str,
+    ) -> Result<Option<(RefCountedTempFile, usize)>> {
+        use futures::StreamExt;
+        let mut in_progress_file = self.create_in_progress_file(request_msg)?;
+
+        let mut max_record_batch_size = 0;
+
+        let mut maybe_last_batch: Option<RecordBatch> = None;
+
+        while let Some(batch) = stream.next().await {
+            let mut batch = batch?;
+
+            if let Some(mut last_batch) = maybe_last_batch.take() {
+                assert!(
+                    last_batch.num_rows() < batch_size_rows,
+                    "last batch size must be smaller than the requested batch 
size"
+                );
+
+                // Get the number of rows to take from current batch so the 
last_batch
+                // will have `batch_size_rows` rows
+                let current_batch_offset = std::cmp::min(
+                    // rows needed to fill
+                    batch_size_rows - last_batch.num_rows(),
+                    // Current length of the batch
+                    batch.num_rows(),
+                );
+
+                // if have last batch that has less rows than concat and spill
+                last_batch = arrow::compute::concat_batches(
+                    &stream.schema(),
+                    &[last_batch, batch.slice(0, current_batch_offset)],
+                )?;
+
+                assert!(last_batch.num_rows() <= batch_size_rows, "must build 
a batch that is smaller or equal to the requested batch size from the current 
batch");
+
+                // If not enough rows
+                if last_batch.num_rows() < batch_size_rows {
+                    // keep the last batch for next iteration
+                    maybe_last_batch = Some(last_batch);
+                    continue;
+                }
+
+                max_record_batch_size =
+                    
max_record_batch_size.max(last_batch.get_actually_used_size());
+
+                in_progress_file.append_batch(&last_batch)?;
+
+                if current_batch_offset == batch.num_rows() {
+                    // No remainder
+                    continue;
+                }
+
+                // remainder
+                batch = batch.slice(
+                    current_batch_offset,
+                    batch.num_rows() - current_batch_offset,
+                );
+            }
+
+            let mut offset = 0;
+            let total_rows = batch.num_rows();
+
+            // Keep slicing the batch until we have left with a batch that is 
smaller than

Review Comment:
   you are right, simplify



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to