This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new e146cfa1 chore: Reserve memory for native shuffle writer per partition 
(#988)
e146cfa1 is described below

commit e146cfa1676fedcfb3003c2e3f1da1578d6b664b
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Mon Oct 14 07:49:03 2024 -0700

    chore: Reserve memory for native shuffle writer per partition (#988)
    
    * chore: Reserve memory for native shuffle writer per partition
    
    * Revise
    
    * skip large partition number shuffle on macos runners
    
    * For review
---
 .../src/execution/datafusion/shuffle_writer.rs     | 251 ++++++++++++++++-----
 1 file changed, 190 insertions(+), 61 deletions(-)

diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs 
b/native/core/src/execution/datafusion/shuffle_writer.rs
index 9668359f..6c317466 100644
--- a/native/core/src/execution/datafusion/shuffle_writer.rs
+++ b/native/core/src/execution/datafusion/shuffle_writer.rs
@@ -66,6 +66,14 @@ use crate::{
 };
 use datafusion_comet_spark_expr::spark_hash::create_murmur3_hashes;
 
+/// The status of appending rows to a partition buffer.
+enum AppendRowStatus {
+    /// The difference in memory usage after appending rows
+    MemDiff(Result<isize>),
+    /// The index of the next row to append
+    StartIndex(usize),
+}
+
 /// The shuffle writer operator maps each input partition to M output 
partitions based on a
 /// partitioning scheme. No guarantees are made about the order of the 
resulting partitions.
 #[derive(Debug)]
@@ -206,10 +214,21 @@ struct PartitionBuffer {
     /// The maximum number of rows in a batch. Once `num_active_rows` reaches 
`batch_size`,
     /// the active array builders will be frozen and appended to frozen buffer 
`frozen`.
     batch_size: usize,
+    /// Memory reservation for this partition buffer.
+    reservation: MemoryReservation,
 }
 
 impl PartitionBuffer {
-    fn new(schema: SchemaRef, batch_size: usize) -> Self {
+    fn new(
+        schema: SchemaRef,
+        batch_size: usize,
+        partition_id: usize,
+        runtime: &Arc<RuntimeEnv>,
+    ) -> Self {
+        let reservation = MemoryConsumer::new(format!("PartitionBuffer[{}]", 
partition_id))
+            .with_can_spill(true)
+            .register(&runtime.memory_pool);
+
         Self {
             schema,
             frozen: vec![],
@@ -217,47 +236,52 @@ impl PartitionBuffer {
             active_slots_mem_size: 0,
             num_active_rows: 0,
             batch_size,
+            reservation,
         }
     }
 
     /// Initializes active builders if necessary.
+    /// Returns error if memory reservation fails.
     fn init_active_if_necessary(&mut self) -> Result<isize> {
         let mut mem_diff = 0;
 
         if self.active.is_empty() {
-            self.active = new_array_builders(&self.schema, self.batch_size);
+            // Estimate the memory size of active builders
             if self.active_slots_mem_size == 0 {
                 self.active_slots_mem_size = self
-                    .active
+                    .schema
+                    .fields()
                     .iter()
-                    .zip(self.schema.fields())
-                    .map(|(_ab, field)| slot_size(self.batch_size, 
field.data_type()))
+                    .map(|field| slot_size(self.batch_size, field.data_type()))
                     .sum::<usize>();
             }
+
+            self.reservation.try_grow(self.active_slots_mem_size)?;
+
+            self.active = new_array_builders(&self.schema, self.batch_size);
+
             mem_diff += self.active_slots_mem_size as isize;
         }
         Ok(mem_diff)
     }
 
-    /// Appends all rows of given batch into active array builders.
-    fn append_batch(&mut self, batch: &RecordBatch, time_metric: &Time) -> 
Result<isize> {
-        let columns = batch.columns();
-        let indices = (0..batch.num_rows()).collect::<Vec<usize>>();
-        self.append_rows(columns, &indices, time_metric)
-    }
-
     /// Appends rows of specified indices from columns into active array 
builders.
     fn append_rows(
         &mut self,
         columns: &[ArrayRef],
         indices: &[usize],
+        start_index: usize,
         time_metric: &Time,
-    ) -> Result<isize> {
+    ) -> AppendRowStatus {
         let mut mem_diff = 0;
-        let mut start = 0;
+        let mut start = start_index;
 
         // lazy init because some partition may be empty
-        mem_diff += self.init_active_if_necessary()?;
+        let init = self.init_active_if_necessary();
+        if init.is_err() {
+            return AppendRowStatus::StartIndex(start);
+        }
+        mem_diff += init.unwrap();
 
         while start < indices.len() {
             let end = (start + self.batch_size).min(indices.len());
@@ -270,14 +294,22 @@ impl PartitionBuffer {
             self.num_active_rows += end - start;
             if self.num_active_rows >= self.batch_size {
                 let mut timer = time_metric.timer();
-                mem_diff += self.flush()?;
+                let flush = self.flush();
+                if let Err(e) = flush {
+                    return AppendRowStatus::MemDiff(Err(e));
+                }
+                mem_diff += flush.unwrap();
                 timer.stop();
 
-                mem_diff += self.init_active_if_necessary()?;
+                let init = self.init_active_if_necessary();
+                if init.is_err() {
+                    return AppendRowStatus::StartIndex(end);
+                }
+                mem_diff += init.unwrap();
             }
             start = end;
         }
-        Ok(mem_diff)
+        AppendRowStatus::MemDiff(Ok(mem_diff))
     }
 
     /// flush active data into frozen bytes
@@ -291,7 +323,7 @@ impl PartitionBuffer {
         let active = std::mem::take(&mut self.active);
         let num_rows = self.num_active_rows;
         self.num_active_rows = 0;
-        mem_diff -= self.active_slots_mem_size as isize;
+        self.reservation.try_shrink(self.active_slots_mem_size)?;
 
         let frozen_batch = make_batch(Arc::clone(&self.schema), active, 
num_rows)?;
 
@@ -575,7 +607,7 @@ struct ShuffleRepartitioner {
     output_data_file: String,
     output_index_file: String,
     schema: SchemaRef,
-    buffered_partitions: Mutex<Vec<PartitionBuffer>>,
+    buffered_partitions: Vec<PartitionBuffer>,
     spills: Mutex<Vec<SpillInfo>>,
     /// Sort expressions
     /// Partitioning scheme to use
@@ -648,11 +680,11 @@ impl ShuffleRepartitioner {
             output_data_file,
             output_index_file,
             schema: Arc::clone(&schema),
-            buffered_partitions: Mutex::new(
-                (0..num_output_partitions)
-                    .map(|_| PartitionBuffer::new(Arc::clone(&schema), 
batch_size))
-                    .collect::<Vec<_>>(),
-            ),
+            buffered_partitions: (0..num_output_partitions)
+                .map(|partition_id| {
+                    PartitionBuffer::new(Arc::clone(&schema), batch_size, 
partition_id, &runtime)
+                })
+                .collect::<Vec<_>>(),
             spills: Mutex::new(vec![]),
             partitioning,
             num_output_partitions,
@@ -699,8 +731,6 @@ impl ShuffleRepartitioner {
         // Update data size metric
         self.metrics.data_size.add(input.get_array_memory_size());
 
-        let time_metric = self.metrics.baseline.elapsed_compute();
-
         // NOTE: in shuffle writer exec, the output_rows metrics represents the
         // number of rows those are written to output data file.
         self.metrics.baseline.record_output(input.num_rows());
@@ -765,34 +795,36 @@ impl ShuffleRepartitioner {
                     .enumerate()
                     .filter(|(_, (start, end))| start < end)
                 {
-                    let mut buffered_partitions = 
self.buffered_partitions.lock().await;
-                    let output = &mut buffered_partitions[partition_id];
-
-                    // If the range of indices is not big enough, just 
appending the rows into
-                    // active array builders instead of directly adding them 
as a record batch.
-                    mem_diff += output.append_rows(
-                        input.columns(),
-                        &shuffled_partition_ids[start..end],
-                        time_metric,
-                    )?;
-                }
+                    mem_diff += self
+                        .append_rows_to_partition(
+                            input.columns(),
+                            &shuffled_partition_ids[start..end],
+                            partition_id,
+                        )
+                        .await?;
+
+                    if mem_diff > 0 {
+                        let mem_increase = mem_diff as usize;
+                        if self.reservation.try_grow(mem_increase).is_err() {
+                            self.spill().await?;
+                            self.reservation.free();
+                            self.reservation.try_grow(mem_increase)?;
+
+                            mem_diff = 0;
+                        }
+                    }
 
-                if mem_diff > 0 {
-                    let mem_increase = mem_diff as usize;
-                    if self.reservation.try_grow(mem_increase).is_err() {
-                        self.spill().await?;
-                        self.reservation.free();
-                        self.reservation.try_grow(mem_increase)?;
+                    if mem_diff < 0 {
+                        let mem_used = self.reservation.size();
+                        let mem_decrease = mem_used.min(-mem_diff as usize);
+                        self.reservation.shrink(mem_decrease);
+
+                        mem_diff += mem_decrease as isize;
                     }
                 }
-                if mem_diff < 0 {
-                    let mem_used = self.reservation.size();
-                    let mem_decrease = mem_used.min(-mem_diff as usize);
-                    self.reservation.shrink(mem_decrease);
-                }
             }
             Partitioning::UnknownPartitioning(n) if *n == 1 => {
-                let mut buffered_partitions = 
self.buffered_partitions.lock().await;
+                let buffered_partitions = &mut self.buffered_partitions;
 
                 assert!(
                     buffered_partitions.len() == 1,
@@ -800,8 +832,10 @@ impl ShuffleRepartitioner {
                     buffered_partitions.len()
                 );
 
-                let output = &mut buffered_partitions[0];
-                output.append_batch(&input, time_metric)?;
+                let indices = (0..input.num_rows()).collect::<Vec<usize>>();
+
+                self.append_rows_to_partition(input.columns(), &indices, 0)
+                    .await?;
             }
             other => {
                 // this should be unreachable as long as the validation logic
@@ -818,7 +852,7 @@ impl ShuffleRepartitioner {
     /// Writes buffered shuffled record batches into Arrow IPC bytes.
     async fn shuffle_write(&mut self) -> Result<SendableRecordBatchStream> {
         let num_output_partitions = self.num_output_partitions;
-        let mut buffered_partitions = self.buffered_partitions.lock().await;
+        let buffered_partitions = &mut self.buffered_partitions;
         let mut output_batches: Vec<Vec<u8>> = vec![vec![]; 
num_output_partitions];
 
         for i in 0..num_output_partitions {
@@ -916,16 +950,15 @@ impl ShuffleRepartitioner {
         self.metrics.data_size.value()
     }
 
-    async fn spill(&self) -> Result<usize> {
+    async fn spill(&mut self) -> Result<usize> {
         log::debug!(
             "ShuffleRepartitioner spilling shuffle data of {} to disk while 
inserting ({} time(s) so far)",
             self.used(),
             self.spill_count()
         );
 
-        let mut buffered_partitions = self.buffered_partitions.lock().await;
         // we could always get a chance to free some memory as long as we are 
holding some
-        if buffered_partitions.len() == 0 {
+        if self.buffered_partitions.is_empty() {
             return Ok(0);
         }
 
@@ -936,7 +969,7 @@ impl ShuffleRepartitioner {
             .disk_manager
             .create_tmp_file("shuffle writer spill")?;
         let offsets = spill_into(
-            &mut buffered_partitions,
+            &mut self.buffered_partitions,
             spillfile.path(),
             self.num_output_partitions,
         )
@@ -954,6 +987,60 @@ impl ShuffleRepartitioner {
         });
         Ok(used)
     }
+
+    /// Appends rows of specified indices from columns into active array 
builders in the specified partition.
+    async fn append_rows_to_partition(
+        &mut self,
+        columns: &[ArrayRef],
+        indices: &[usize],
+        partition_id: usize,
+    ) -> Result<isize> {
+        let mut mem_diff = 0;
+
+        let output = &mut self.buffered_partitions[partition_id];
+
+        let time_metric = self.metrics.baseline.elapsed_compute();
+
+        // If the range of indices is not big enough, just appending the rows 
into
+        // active array builders instead of directly adding them as a record 
batch.
+        let mut start_index: usize = 0;
+        let mut output_ret = output.append_rows(columns, indices, start_index, 
time_metric);
+
+        loop {
+            match output_ret {
+                AppendRowStatus::MemDiff(l) => {
+                    mem_diff += l?;
+                    break;
+                }
+                AppendRowStatus::StartIndex(new_start) => {
+                    // Cannot allocate enough memory for the array builders in 
the partition,
+                    // spill partitions and retry.
+                    self.spill().await?;
+
+                    let output = &mut self.buffered_partitions[partition_id];
+                    output.reservation.free();
+
+                    let time_metric = self.metrics.baseline.elapsed_compute();
+
+                    start_index = new_start;
+                    output_ret = output.append_rows(columns, indices, 
start_index, time_metric);
+
+                    if let AppendRowStatus::StartIndex(new_start) = output_ret 
{
+                        if new_start == start_index {
+                            // If the start index is not updated, it means 
that the partition
+                            // is still not able to allocate enough memory for 
the array builders.
+                            return Err(DataFusionError::Internal(
+                                "Partition is still not able to allocate 
enough memory for the array builders after spilling."
+                                    .to_string(),
+                            ));
+                        }
+                    }
+                }
+            }
+        }
+
+        Ok(mem_diff)
+    }
 }
 
 /// consume the `buffered_partitions` and do spill into a single temp shuffle 
output file
@@ -1470,6 +1557,8 @@ mod test {
     use datafusion::physical_plan::common::collect;
     use datafusion::physical_plan::memory::MemoryExec;
     use datafusion::prelude::SessionContext;
+    use datafusion_execution::config::SessionConfig;
+    use datafusion_execution::runtime_env::RuntimeEnvBuilder;
     use datafusion_physical_expr::expressions::Column;
     use tokio::runtime::Runtime;
 
@@ -1504,25 +1593,65 @@ mod test {
     #[test]
     #[cfg_attr(miri, ignore)] // miri can't call foreign function 
`ZSTD_createCCtx`
     fn test_insert_larger_batch() {
+        shuffle_write_test(10000, 1, 16, None);
+    }
+
+    #[test]
+    #[cfg_attr(miri, ignore)] // miri can't call foreign function 
`ZSTD_createCCtx`
+    fn test_insert_smaller_batch() {
+        shuffle_write_test(1000, 1, 16, None);
+        shuffle_write_test(1000, 10, 16, None);
+    }
+
+    #[test]
+    #[cfg_attr(miri, ignore)] // miri can't call foreign function 
`ZSTD_createCCtx`
+    #[cfg(not(target_os = "macos"))] // Github MacOS runner fails with "Too 
many open files".
+    fn test_large_number_of_partitions() {
+        shuffle_write_test(10000, 10, 200, Some(10 * 1024 * 1024));
+        shuffle_write_test(10000, 10, 2000, Some(10 * 1024 * 1024));
+    }
+
+    #[test]
+    #[cfg_attr(miri, ignore)] // miri can't call foreign function 
`ZSTD_createCCtx`
+    #[cfg(not(target_os = "macos"))] // Github MacOS runner fails with "Too 
many open files".
+    fn test_large_number_of_partitions_spilling() {
+        shuffle_write_test(10000, 100, 200, Some(10 * 1024 * 1024));
+    }
+
+    fn shuffle_write_test(
+        batch_size: usize,
+        num_batches: usize,
+        num_partitions: usize,
+        memory_limit: Option<usize>,
+    ) {
         let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, 
true)]));
         let mut b = StringBuilder::new();
-        for i in 0..10000 {
+        for i in 0..batch_size {
             b.append_value(format!("{i}"));
         }
         let array = b.finish();
         let batch = RecordBatch::try_new(Arc::clone(&schema), 
vec![Arc::new(array)]).unwrap();
 
-        let batches = vec![batch.clone()];
+        let batches = (0..num_batches).map(|_| 
batch.clone()).collect::<Vec<_>>();
 
         let partitions = &[batches];
         let exec = ShuffleWriterExec::try_new(
             Arc::new(MemoryExec::try_new(partitions, batch.schema(), 
None).unwrap()),
-            Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16),
+            Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 
num_partitions),
             "/tmp/data.out".to_string(),
             "/tmp/index.out".to_string(),
         )
         .unwrap();
-        let ctx = SessionContext::new();
+
+        // 10MB memory should be enough for running this test
+        let config = SessionConfig::new();
+        let mut runtime_env_builder = RuntimeEnvBuilder::new();
+        runtime_env_builder = match memory_limit {
+            Some(limit) => runtime_env_builder.with_memory_limit(limit, 1.0),
+            None => runtime_env_builder,
+        };
+        let runtime_env = Arc::new(runtime_env_builder.build().unwrap());
+        let ctx = SessionContext::new_with_config_rt(config, runtime_env);
         let task_ctx = ctx.task_ctx();
         let stream = exec.execute(0, task_ctx).unwrap();
         let rt = Runtime::new().unwrap();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to