2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2217220186


##########
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##########
@@ -0,0 +1,345 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+    MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+pub(crate) struct MultiLevelMergeBuilder {
+    spill_manager: SpillManager,
+    schema: SchemaRef,
+    sorted_spill_files: Vec<SortedSpillFile>,
+    sorted_streams: Vec<SendableRecordBatchStream>,
+    expr: LexOrdering,
+    metrics: BaselineMetrics,
+    batch_size: usize,
+    reservation: MemoryReservation,
+    fetch: Option<usize>,
+    enable_round_robin_tie_breaker: bool,
+
+    // This is for avoiding double reservation of memory from our side and the 
sort preserving merge stream
+    // side.
+    // and doing a lot of code changes to avoid accounting for the memory used 
by the streams
+    unbounded_memory_pool: Arc<dyn MemoryPool>,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "MultiLevelMergeBuilder")
+    }
+}
+
+impl MultiLevelMergeBuilder {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        spill_manager: SpillManager,
+        schema: SchemaRef,
+        sorted_spill_files: Vec<SortedSpillFile>,
+        sorted_streams: Vec<SendableRecordBatchStream>,
+        expr: LexOrdering,
+        metrics: BaselineMetrics,
+        batch_size: usize,
+        reservation: MemoryReservation,
+        fetch: Option<usize>,
+        enable_round_robin_tie_breaker: bool,
+    ) -> Self {
+        Self {
+            spill_manager,
+            schema,
+            sorted_spill_files,
+            sorted_streams,
+            expr,
+            metrics,
+            batch_size,
+            reservation,
+            enable_round_robin_tie_breaker,
+            fetch,
+            unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+        }
+    }
+
+    pub(crate) fn create_spillable_merge_stream(self) -> 
SendableRecordBatchStream {
+        Box::pin(RecordBatchStreamAdapter::new(
+            Arc::clone(&self.schema),
+            futures::stream::once(self.create_stream()).try_flatten(),
+        ))
+    }
+
+    async fn create_stream(mut self) -> Result<SendableRecordBatchStream> {
+        loop {
+            // Hold this for the lifetime of the stream

Review Comment:
   If we have this reservation with the same lifetime as the stream, would it 
be better to create a `MultiLevelMergeStream` and make this reservation a 
struct field?



##########
datafusion/physical-plan/src/sorts/streaming_merge.rs:
##########
@@ -131,14 +168,42 @@ impl<'a> StreamingMergeBuilder<'a> {
             enable_round_robin_tie_breaker,
         } = self;
 
-        // Early return if streams or expressions are empty:
-        if streams.is_empty() {
-            return internal_err!("Streams cannot be empty for streaming 
merge");
-        }
+        // Early return if expressions are empty:
         let Some(expressions) = expressions else {
             return internal_err!("Sort expressions cannot be empty for 
streaming merge");
         };
 
+        if !sorted_spill_files.is_empty() {

Review Comment:
   I agree it's good to reduce the number of APIs, then two approaches seem to 
have similar complexity.



##########
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##########
@@ -0,0 +1,345 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+    MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+pub(crate) struct MultiLevelMergeBuilder {
+    spill_manager: SpillManager,
+    schema: SchemaRef,
+    sorted_spill_files: Vec<SortedSpillFile>,
+    sorted_streams: Vec<SendableRecordBatchStream>,
+    expr: LexOrdering,
+    metrics: BaselineMetrics,
+    batch_size: usize,
+    reservation: MemoryReservation,
+    fetch: Option<usize>,
+    enable_round_robin_tie_breaker: bool,
+
+    // This is for avoiding double reservation of memory from our side and the 
sort preserving merge stream
+    // side.
+    // and doing a lot of code changes to avoid accounting for the memory used 
by the streams
+    unbounded_memory_pool: Arc<dyn MemoryPool>,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "MultiLevelMergeBuilder")
+    }
+}
+
+impl MultiLevelMergeBuilder {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        spill_manager: SpillManager,
+        schema: SchemaRef,
+        sorted_spill_files: Vec<SortedSpillFile>,
+        sorted_streams: Vec<SendableRecordBatchStream>,
+        expr: LexOrdering,
+        metrics: BaselineMetrics,
+        batch_size: usize,
+        reservation: MemoryReservation,
+        fetch: Option<usize>,
+        enable_round_robin_tie_breaker: bool,
+    ) -> Self {
+        Self {
+            spill_manager,
+            schema,
+            sorted_spill_files,
+            sorted_streams,
+            expr,
+            metrics,
+            batch_size,
+            reservation,
+            enable_round_robin_tie_breaker,
+            fetch,
+            unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+        }
+    }
+
+    pub(crate) fn create_spillable_merge_stream(self) -> 
SendableRecordBatchStream {
+        Box::pin(RecordBatchStreamAdapter::new(
+            Arc::clone(&self.schema),
+            futures::stream::once(self.create_stream()).try_flatten(),
+        ))
+    }
+
+    async fn create_stream(mut self) -> Result<SendableRecordBatchStream> {
+        loop {
+            // Hold this for the lifetime of the stream
+            let mut current_memory_reservation = self.reservation.new_empty();
+            let mut stream =
+                self.create_sorted_stream(&mut current_memory_reservation)?;
+
+            // TODO - add a threshold for number of files to disk even if 
empty and reading from disk so
+            //        we can avoid the memory reservation
+
+            // If no spill files are left, we can return the stream as this is 
the last sorted run
+            // TODO - We can write to disk before reading it back to avoid 
having multiple streams in memory
+            if self.sorted_spill_files.is_empty() {
+                // Attach the memory reservation to the stream as we are done 
with it
+                // but because we replaced the memory reservation of the merge 
stream, we must hold
+                // this to make sure we have enough memory
+                return Ok(Box::pin(StreamAttachedReservation::new(
+                    stream,
+                    current_memory_reservation,
+                )));
+            }
+
+            // Need to sort to a spill file
+            let Some((spill_file, max_record_batch_memory)) = self
+                .spill_manager
+                .spill_record_batch_stream_by_size(
+                    &mut stream,
+                    self.batch_size,
+                    "MultiLevelMergeBuilder intermediate spill",
+                )
+                .await?
+            else {
+                continue;
+            };
+
+            // Add the spill file
+            self.sorted_spill_files.push(SortedSpillFile {
+                file: spill_file,
+                max_record_batch_memory,
+            });
+        }
+    }
+
+    fn create_sorted_stream(

Review Comment:
   It would be great to include some comment for its high-level idea.
   I also think maybe `merge_sorted_runs_within_mem_limit()` can be a more 
precise name?



##########
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##########
@@ -0,0 +1,345 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+    MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream

Review Comment:
   It would be great to add a high-level doc about how this multi-level merge 
work.



##########
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##########
@@ -0,0 +1,345 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+    MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+pub(crate) struct MultiLevelMergeBuilder {
+    spill_manager: SpillManager,
+    schema: SchemaRef,
+    sorted_spill_files: Vec<SortedSpillFile>,
+    sorted_streams: Vec<SendableRecordBatchStream>,
+    expr: LexOrdering,
+    metrics: BaselineMetrics,
+    batch_size: usize,
+    reservation: MemoryReservation,
+    fetch: Option<usize>,
+    enable_round_robin_tie_breaker: bool,
+
+    // This is for avoiding double reservation of memory from our side and the 
sort preserving merge stream
+    // side.
+    // and doing a lot of code changes to avoid accounting for the memory used 
by the streams
+    unbounded_memory_pool: Arc<dyn MemoryPool>,

Review Comment:
   I think a clearer way to implement is let `StreamingMergeBuilder` to include 
a new interface: `with_bypass_mempool()`, and this will construct a temporary 
unbounded memory pool inside SPM, and let its memory reservation point to it.



##########
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##########
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+    MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+pub(crate) struct MultiLevelMergeBuilder {
+    spill_manager: SpillManager,
+    schema: SchemaRef,
+    sorted_spill_files: Vec<SortedSpillFile>,
+    sorted_streams: Vec<SendableRecordBatchStream>,
+    expr: LexOrdering,
+    metrics: BaselineMetrics,
+    batch_size: usize,
+    reservation: MemoryReservation,
+    fetch: Option<usize>,
+    enable_round_robin_tie_breaker: bool,
+
+    // This is for avoiding double reservation of memory from our side and the 
sort preserving merge stream
+    // side.
+    // and doing a lot of code changes to avoid accounting for the memory used 
by the streams
+    unbounded_memory_pool: Arc<dyn MemoryPool>,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "MultiLevelMergeBuilder")
+    }
+}
+
+impl MultiLevelMergeBuilder {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        spill_manager: SpillManager,
+        schema: SchemaRef,
+        sorted_spill_files: Vec<SortedSpillFile>,
+        sorted_streams: Vec<SendableRecordBatchStream>,
+        expr: LexOrdering,
+        metrics: BaselineMetrics,
+        batch_size: usize,
+        reservation: MemoryReservation,
+        fetch: Option<usize>,
+        enable_round_robin_tie_breaker: bool,
+    ) -> Self {
+        Self {
+            spill_manager,
+            schema,
+            sorted_spill_files,
+            sorted_streams,
+            expr,
+            metrics,
+            batch_size,
+            reservation,
+            enable_round_robin_tie_breaker,
+            fetch,
+            unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+        }
+    }
+
+    pub(crate) fn create_spillable_merge_stream(self) -> 
SendableRecordBatchStream {
+        Box::pin(RecordBatchStreamAdapter::new(
+            Arc::clone(&self.schema),
+            futures::stream::once(self.create_stream()).try_flatten(),
+        ))
+    }
+
+    async fn create_stream(mut self) -> Result<SendableRecordBatchStream> {
+        loop {
+            // Hold this for the lifetime of the stream
+            let mut current_memory_reservation = self.reservation.new_empty();
+            let mut stream =
+                self.create_sorted_stream(&mut current_memory_reservation)?;
+
+            // TODO - add a threshold for number of files to disk even if 
empty and reading from disk so
+            //        we can avoid the memory reservation
+
+            // If no spill files are left, we can return the stream as this is 
the last sorted run
+            // TODO - We can write to disk before reading it back to avoid 
having multiple streams in memory
+            if self.sorted_spill_files.is_empty() {
+                // Attach the memory reservation to the stream as we are done 
with it
+                // but because we replaced the memory reservation of the merge 
stream, we must hold
+                // this to make sure we have enough memory
+                return Ok(Box::pin(StreamAttachedReservation::new(
+                    stream,
+                    current_memory_reservation,
+                )));
+            }
+
+            // Need to sort to a spill file
+            let Some((spill_file, max_record_batch_memory)) = self
+                .spill_manager
+                .spill_record_batch_stream_by_size(
+                    &mut stream,
+                    self.batch_size,
+                    "MultiLevelMergeBuilder intermediate spill",
+                )
+                .await?
+            else {
+                continue;
+            };
+
+            // Add the spill file
+            self.sorted_spill_files.push(SortedSpillFile {
+                file: spill_file,
+                max_record_batch_memory,
+            });
+        }
+    }
+
+    fn create_sorted_stream(
+        &mut self,
+        memory_reservation: &mut MemoryReservation,
+    ) -> Result<SendableRecordBatchStream> {
+        match (self.sorted_spill_files.len(), self.sorted_streams.len()) {
+            // No data so empty batch
+            (0, 0) => Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
+                &self.schema,
+            )))),
+
+            // Only in-memory stream, return that
+            (0, 1) => Ok(self.sorted_streams.remove(0)),
+
+            // Only single sorted spill file so return it
+            (1, 0) => {
+                let spill_file = self.sorted_spill_files.remove(0);
+
+                self.spill_manager.read_spill_as_stream(spill_file.file)
+            }
+
+            // Only in memory streams, so merge them all in a single pass
+            (0, _) => {
+                let sorted_stream = mem::take(&mut self.sorted_streams);
+                self.create_new_merge_sort(
+                    sorted_stream,
+                    // If we have no sorted spill files left, this is the last 
run
+                    true,
+                )
+            }
+
+            // Need to merge multiple streams
+            (_, _) => {
+                // Don't account for existing streams memory
+                // as we are not holding the memory for them
+                let mut sorted_streams = mem::take(&mut self.sorted_streams);
+
+                let (sorted_spill_files, buffer_size) = self
+                    .get_sorted_spill_files_to_merge(
+                        2,
+                        // we must have at least 2 streams to merge
+                        2_usize.saturating_sub(sorted_streams.len()),
+                        memory_reservation,
+                    )?;
+
+                for spill in sorted_spill_files {
+                    let stream = self
+                        .spill_manager
+                        .clone()
+                        .with_batch_read_buffer_capacity(buffer_size)
+                        .read_spill_as_stream(spill.file)?;
+                    sorted_streams.push(stream);
+                }
+
+                self.create_new_merge_sort(
+                    sorted_streams,
+                    // If we have no sorted spill files left, this is the last 
run
+                    self.sorted_spill_files.is_empty(),
+                )
+            }
+        }
+    }
+
+    fn create_new_merge_sort(
+        &mut self,
+        streams: Vec<SendableRecordBatchStream>,
+        is_output: bool,
+    ) -> Result<SendableRecordBatchStream> {
+        StreamingMergeBuilder::new()
+            .with_schema(Arc::clone(&self.schema))
+            .with_expressions(&self.expr)
+            .with_batch_size(self.batch_size)
+            .with_fetch(self.fetch)
+            .with_metrics(if is_output {
+                // Only add the metrics to the last run
+                self.metrics.clone()
+            } else {
+                self.metrics.intermediate()
+            })
+            .with_round_robin_tie_breaker(self.enable_round_robin_tie_breaker)
+            .with_streams(streams)
+            // Don't track memory used by this stream as we reserve that 
memory by worst case sceneries
+            // (reserving memory for the biggest batch in each stream)
+            // This is a hack
+            .with_reservation(
+                MemoryConsumer::new("merge stream mock memory")

Review Comment:
   I see, this makes sense.
   To enforce such validation, in the future we can extend 
`StreamingMergeBuilder` with each stream's max batch size, and do some inner 
sanity checks:
   ```rust
               let res = StreamingMergeBuilder::new()
                   .with_streams(streams)
                   .with_max_batch_size_per_stream(max_batch_sizes)
   ```



##########
datafusion/physical-plan/src/spill/spill_manager.rs:
##########
@@ -125,6 +133,156 @@ impl SpillManager {
         self.spill_record_batch_and_finish(&batches, request_description)
     }
 
+    /// Refer to the documentation for 
[`Self::spill_record_batch_and_finish`]. This method
+    /// additionally spills the `RecordBatch` into smaller batches, divided by 
`row_limit`.
+    ///
+    /// # Errors
+    /// - Returns an error if spilling would exceed the disk usage limit 
configured
+    ///   by `max_temp_directory_size` in `DiskManager`
+    pub(crate) fn spill_record_batch_by_size_and_return_max_batch_memory(
+        &self,
+        batch: &RecordBatch,
+        request_description: &str,
+        row_limit: usize,
+    ) -> Result<Option<(RefCountedTempFile, usize)>> {
+        let total_rows = batch.num_rows();
+        let mut batches = Vec::new();
+        let mut offset = 0;
+
+        // It's ok to calculate all slices first, because slicing is zero-copy.
+        while offset < total_rows {
+            let length = std::cmp::min(total_rows - offset, row_limit);
+            let sliced_batch = batch.slice(offset, length);
+            batches.push(sliced_batch);
+            offset += length;
+        }
+
+        let mut in_progress_file = 
self.create_in_progress_file(request_description)?;
+
+        let mut max_record_batch_size = 0;
+
+        for batch in batches {
+            in_progress_file.append_batch(&batch)?;
+
+            max_record_batch_size =
+                max_record_batch_size.max(batch.get_actually_used_size());
+        }
+
+        let file = in_progress_file.finish()?;
+
+        Ok(file.map(|f| (f, max_record_batch_size)))
+    }
+
+    /// Spill the `RecordBatch` to disk as smaller batches
+    /// split by `batch_size_rows`.
+    ///
+    /// will return the spill file and the size of the largest batch in memory
+    pub async fn spill_record_batch_stream_by_size(
+        &self,
+        stream: &mut SendableRecordBatchStream,
+        batch_size_rows: usize,
+        request_msg: &str,
+    ) -> Result<Option<(RefCountedTempFile, usize)>> {
+        use futures::StreamExt;
+        let mut in_progress_file = self.create_in_progress_file(request_msg)?;
+
+        let mut max_record_batch_size = 0;
+
+        let mut maybe_last_batch: Option<RecordBatch> = None;
+
+        while let Some(batch) = stream.next().await {
+            let mut batch = batch?;
+
+            if let Some(mut last_batch) = maybe_last_batch.take() {
+                assert!(
+                    last_batch.num_rows() < batch_size_rows,
+                    "last batch size must be smaller than the requested batch 
size"
+                );
+
+                // Get the number of rows to take from current batch so the 
last_batch
+                // will have `batch_size_rows` rows
+                let current_batch_offset = std::cmp::min(
+                    // rows needed to fill
+                    batch_size_rows - last_batch.num_rows(),
+                    // Current length of the batch
+                    batch.num_rows(),
+                );
+
+                // if have last batch that has less rows than concat and spill
+                last_batch = arrow::compute::concat_batches(
+                    &stream.schema(),
+                    &[last_batch, batch.slice(0, current_batch_offset)],
+                )?;
+
+                assert!(last_batch.num_rows() <= batch_size_rows, "must build 
a batch that is smaller or equal to the requested batch size from the current 
batch");
+
+                // If not enough rows
+                if last_batch.num_rows() < batch_size_rows {
+                    // keep the last batch for next iteration
+                    maybe_last_batch = Some(last_batch);
+                    continue;
+                }
+
+                max_record_batch_size =
+                    
max_record_batch_size.max(last_batch.get_actually_used_size());
+
+                in_progress_file.append_batch(&last_batch)?;
+
+                if current_batch_offset == batch.num_rows() {
+                    // No remainder
+                    continue;
+                }
+
+                // remainder
+                batch = batch.slice(
+                    current_batch_offset,
+                    batch.num_rows() - current_batch_offset,
+                );
+            }
+
+            let mut offset = 0;
+            let total_rows = batch.num_rows();
+
+            // Keep slicing the batch until we have left with a batch that is 
smaller than

Review Comment:
   Why do we have to do this step here?
   
   I looked at its only use case:
   
https://github.com/apache/datafusion/blob/ae1ed6d6bb89a70bd9bdf370944f67b5254b31fe/datafusion/physical-plan/src/sorts/multi_level_merge.rs#L129
   The stream this function takes is produced by SPM, which has already chunk 
the output by `batch_size` in the configuration.



##########
datafusion/physical-plan/src/spill/get_size.rs:
##########
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+    Array, ArrayRef, ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray,
+    FixedSizeListArray, GenericByteArray, GenericListArray, OffsetSizeTrait,
+    PrimitiveArray, RecordBatch, StructArray,
+};
+use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+use arrow::datatypes::{ArrowNativeType, ByteArrayType};
+use arrow::downcast_primitive_array;
+use arrow_schema::DataType;
+
+/// TODO - NEED TO MOVE THIS TO ARROW

Review Comment:
   It's not obvious to me under what situation it can overestimate by a lot? I 
was thinking those batch arrays won't over allocate buffers too much, because 
we have a configured batch size.
   
   Do you have a reproducer, perhaps we can look into it further.



##########
datafusion/physical-plan/src/spill/spill_manager.rs:
##########
@@ -125,6 +133,156 @@ impl SpillManager {
         self.spill_record_batch_and_finish(&batches, request_description)
     }
 
+    /// Refer to the documentation for 
[`Self::spill_record_batch_and_finish`]. This method
+    /// additionally spills the `RecordBatch` into smaller batches, divided by 
`row_limit`.
+    ///
+    /// # Errors
+    /// - Returns an error if spilling would exceed the disk usage limit 
configured
+    ///   by `max_temp_directory_size` in `DiskManager`
+    pub(crate) fn spill_record_batch_by_size_and_return_max_batch_memory(

Review Comment:
   Note the `SpillManager` mod is exported as pub(crate) 
https://github.com/apache/datafusion/blob/ae1ed6d6bb89a70bd9bdf370944f67b5254b31fe/datafusion/physical-plan/src/spill/mod.rs#L22,
 (I think we should better mark them as pub(crate) here to avoid confusion), so 
existing functions are not public APIs.
   
   Then we’re free to modify the existing functions, and combine this one with 
`spill_record_batch_by_size()` to reuse the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to