This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 8d097d58 chore: Revert "chore: Reserve memory for native shuffle 
writer per partition (#988)" (#1020)
8d097d58 is described below

commit 8d097d584e9e70399b635e5063ad7dea3fc87cac
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Tue Oct 15 11:25:50 2024 -0700

    chore: Revert "chore: Reserve memory for native shuffle writer per 
partition (#988)" (#1020)
    
    This reverts commit e146cfa1676fedcfb3003c2e3f1da1578d6b664b.
---
 .../src/execution/datafusion/shuffle_writer.rs     | 251 +++++----------------
 1 file changed, 61 insertions(+), 190 deletions(-)

diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs 
b/native/core/src/execution/datafusion/shuffle_writer.rs
index 6c317466..9668359f 100644
--- a/native/core/src/execution/datafusion/shuffle_writer.rs
+++ b/native/core/src/execution/datafusion/shuffle_writer.rs
@@ -66,14 +66,6 @@ use crate::{
 };
 use datafusion_comet_spark_expr::spark_hash::create_murmur3_hashes;
 
-/// The status of appending rows to a partition buffer.
-enum AppendRowStatus {
-    /// The difference in memory usage after appending rows
-    MemDiff(Result<isize>),
-    /// The index of the next row to append
-    StartIndex(usize),
-}
-
 /// The shuffle writer operator maps each input partition to M output 
partitions based on a
 /// partitioning scheme. No guarantees are made about the order of the 
resulting partitions.
 #[derive(Debug)]
@@ -214,21 +206,10 @@ struct PartitionBuffer {
     /// The maximum number of rows in a batch. Once `num_active_rows` reaches 
`batch_size`,
     /// the active array builders will be frozen and appended to frozen buffer 
`frozen`.
     batch_size: usize,
-    /// Memory reservation for this partition buffer.
-    reservation: MemoryReservation,
 }
 
 impl PartitionBuffer {
-    fn new(
-        schema: SchemaRef,
-        batch_size: usize,
-        partition_id: usize,
-        runtime: &Arc<RuntimeEnv>,
-    ) -> Self {
-        let reservation = MemoryConsumer::new(format!("PartitionBuffer[{}]", 
partition_id))
-            .with_can_spill(true)
-            .register(&runtime.memory_pool);
-
+    fn new(schema: SchemaRef, batch_size: usize) -> Self {
         Self {
             schema,
             frozen: vec![],
@@ -236,52 +217,47 @@ impl PartitionBuffer {
             active_slots_mem_size: 0,
             num_active_rows: 0,
             batch_size,
-            reservation,
         }
     }
 
     /// Initializes active builders if necessary.
-    /// Returns error if memory reservation fails.
     fn init_active_if_necessary(&mut self) -> Result<isize> {
         let mut mem_diff = 0;
 
         if self.active.is_empty() {
-            // Estimate the memory size of active builders
+            self.active = new_array_builders(&self.schema, self.batch_size);
             if self.active_slots_mem_size == 0 {
                 self.active_slots_mem_size = self
-                    .schema
-                    .fields()
+                    .active
                     .iter()
-                    .map(|field| slot_size(self.batch_size, field.data_type()))
+                    .zip(self.schema.fields())
+                    .map(|(_ab, field)| slot_size(self.batch_size, 
field.data_type()))
                     .sum::<usize>();
             }
-
-            self.reservation.try_grow(self.active_slots_mem_size)?;
-
-            self.active = new_array_builders(&self.schema, self.batch_size);
-
             mem_diff += self.active_slots_mem_size as isize;
         }
         Ok(mem_diff)
     }
 
+    /// Appends all rows of given batch into active array builders.
+    fn append_batch(&mut self, batch: &RecordBatch, time_metric: &Time) -> 
Result<isize> {
+        let columns = batch.columns();
+        let indices = (0..batch.num_rows()).collect::<Vec<usize>>();
+        self.append_rows(columns, &indices, time_metric)
+    }
+
     /// Appends rows of specified indices from columns into active array 
builders.
     fn append_rows(
         &mut self,
         columns: &[ArrayRef],
         indices: &[usize],
-        start_index: usize,
         time_metric: &Time,
-    ) -> AppendRowStatus {
+    ) -> Result<isize> {
         let mut mem_diff = 0;
-        let mut start = start_index;
+        let mut start = 0;
 
         // lazy init because some partition may be empty
-        let init = self.init_active_if_necessary();
-        if init.is_err() {
-            return AppendRowStatus::StartIndex(start);
-        }
-        mem_diff += init.unwrap();
+        mem_diff += self.init_active_if_necessary()?;
 
         while start < indices.len() {
             let end = (start + self.batch_size).min(indices.len());
@@ -294,22 +270,14 @@ impl PartitionBuffer {
             self.num_active_rows += end - start;
             if self.num_active_rows >= self.batch_size {
                 let mut timer = time_metric.timer();
-                let flush = self.flush();
-                if let Err(e) = flush {
-                    return AppendRowStatus::MemDiff(Err(e));
-                }
-                mem_diff += flush.unwrap();
+                mem_diff += self.flush()?;
                 timer.stop();
 
-                let init = self.init_active_if_necessary();
-                if init.is_err() {
-                    return AppendRowStatus::StartIndex(end);
-                }
-                mem_diff += init.unwrap();
+                mem_diff += self.init_active_if_necessary()?;
             }
             start = end;
         }
-        AppendRowStatus::MemDiff(Ok(mem_diff))
+        Ok(mem_diff)
     }
 
     /// flush active data into frozen bytes
@@ -323,7 +291,7 @@ impl PartitionBuffer {
         let active = std::mem::take(&mut self.active);
         let num_rows = self.num_active_rows;
         self.num_active_rows = 0;
-        self.reservation.try_shrink(self.active_slots_mem_size)?;
+        mem_diff -= self.active_slots_mem_size as isize;
 
         let frozen_batch = make_batch(Arc::clone(&self.schema), active, 
num_rows)?;
 
@@ -607,7 +575,7 @@ struct ShuffleRepartitioner {
     output_data_file: String,
     output_index_file: String,
     schema: SchemaRef,
-    buffered_partitions: Vec<PartitionBuffer>,
+    buffered_partitions: Mutex<Vec<PartitionBuffer>>,
     spills: Mutex<Vec<SpillInfo>>,
     /// Sort expressions
     /// Partitioning scheme to use
@@ -680,11 +648,11 @@ impl ShuffleRepartitioner {
             output_data_file,
             output_index_file,
             schema: Arc::clone(&schema),
-            buffered_partitions: (0..num_output_partitions)
-                .map(|partition_id| {
-                    PartitionBuffer::new(Arc::clone(&schema), batch_size, 
partition_id, &runtime)
-                })
-                .collect::<Vec<_>>(),
+            buffered_partitions: Mutex::new(
+                (0..num_output_partitions)
+                    .map(|_| PartitionBuffer::new(Arc::clone(&schema), 
batch_size))
+                    .collect::<Vec<_>>(),
+            ),
             spills: Mutex::new(vec![]),
             partitioning,
             num_output_partitions,
@@ -731,6 +699,8 @@ impl ShuffleRepartitioner {
         // Update data size metric
         self.metrics.data_size.add(input.get_array_memory_size());
 
+        let time_metric = self.metrics.baseline.elapsed_compute();
+
         // NOTE: in shuffle writer exec, the output_rows metrics represents the
         // number of rows those are written to output data file.
         self.metrics.baseline.record_output(input.num_rows());
@@ -795,36 +765,34 @@ impl ShuffleRepartitioner {
                     .enumerate()
                     .filter(|(_, (start, end))| start < end)
                 {
-                    mem_diff += self
-                        .append_rows_to_partition(
-                            input.columns(),
-                            &shuffled_partition_ids[start..end],
-                            partition_id,
-                        )
-                        .await?;
-
-                    if mem_diff > 0 {
-                        let mem_increase = mem_diff as usize;
-                        if self.reservation.try_grow(mem_increase).is_err() {
-                            self.spill().await?;
-                            self.reservation.free();
-                            self.reservation.try_grow(mem_increase)?;
-
-                            mem_diff = 0;
-                        }
-                    }
-
-                    if mem_diff < 0 {
-                        let mem_used = self.reservation.size();
-                        let mem_decrease = mem_used.min(-mem_diff as usize);
-                        self.reservation.shrink(mem_decrease);
+                    let mut buffered_partitions = 
self.buffered_partitions.lock().await;
+                    let output = &mut buffered_partitions[partition_id];
+
+                    // If the range of indices is not big enough, just 
appending the rows into
+                    // active array builders instead of directly adding them 
as a record batch.
+                    mem_diff += output.append_rows(
+                        input.columns(),
+                        &shuffled_partition_ids[start..end],
+                        time_metric,
+                    )?;
+                }
 
-                        mem_diff += mem_decrease as isize;
+                if mem_diff > 0 {
+                    let mem_increase = mem_diff as usize;
+                    if self.reservation.try_grow(mem_increase).is_err() {
+                        self.spill().await?;
+                        self.reservation.free();
+                        self.reservation.try_grow(mem_increase)?;
                     }
                 }
+                if mem_diff < 0 {
+                    let mem_used = self.reservation.size();
+                    let mem_decrease = mem_used.min(-mem_diff as usize);
+                    self.reservation.shrink(mem_decrease);
+                }
             }
             Partitioning::UnknownPartitioning(n) if *n == 1 => {
-                let buffered_partitions = &mut self.buffered_partitions;
+                let mut buffered_partitions = 
self.buffered_partitions.lock().await;
 
                 assert!(
                     buffered_partitions.len() == 1,
@@ -832,10 +800,8 @@ impl ShuffleRepartitioner {
                     buffered_partitions.len()
                 );
 
-                let indices = (0..input.num_rows()).collect::<Vec<usize>>();
-
-                self.append_rows_to_partition(input.columns(), &indices, 0)
-                    .await?;
+                let output = &mut buffered_partitions[0];
+                output.append_batch(&input, time_metric)?;
             }
             other => {
                 // this should be unreachable as long as the validation logic
@@ -852,7 +818,7 @@ impl ShuffleRepartitioner {
     /// Writes buffered shuffled record batches into Arrow IPC bytes.
     async fn shuffle_write(&mut self) -> Result<SendableRecordBatchStream> {
         let num_output_partitions = self.num_output_partitions;
-        let buffered_partitions = &mut self.buffered_partitions;
+        let mut buffered_partitions = self.buffered_partitions.lock().await;
         let mut output_batches: Vec<Vec<u8>> = vec![vec![]; 
num_output_partitions];
 
         for i in 0..num_output_partitions {
@@ -950,15 +916,16 @@ impl ShuffleRepartitioner {
         self.metrics.data_size.value()
     }
 
-    async fn spill(&mut self) -> Result<usize> {
+    async fn spill(&self) -> Result<usize> {
         log::debug!(
             "ShuffleRepartitioner spilling shuffle data of {} to disk while 
inserting ({} time(s) so far)",
             self.used(),
             self.spill_count()
         );
 
+        let mut buffered_partitions = self.buffered_partitions.lock().await;
         // we could always get a chance to free some memory as long as we are 
holding some
-        if self.buffered_partitions.is_empty() {
+        if buffered_partitions.len() == 0 {
             return Ok(0);
         }
 
@@ -969,7 +936,7 @@ impl ShuffleRepartitioner {
             .disk_manager
             .create_tmp_file("shuffle writer spill")?;
         let offsets = spill_into(
-            &mut self.buffered_partitions,
+            &mut buffered_partitions,
             spillfile.path(),
             self.num_output_partitions,
         )
@@ -987,60 +954,6 @@ impl ShuffleRepartitioner {
         });
         Ok(used)
     }
-
-    /// Appends rows of specified indices from columns into active array 
builders in the specified partition.
-    async fn append_rows_to_partition(
-        &mut self,
-        columns: &[ArrayRef],
-        indices: &[usize],
-        partition_id: usize,
-    ) -> Result<isize> {
-        let mut mem_diff = 0;
-
-        let output = &mut self.buffered_partitions[partition_id];
-
-        let time_metric = self.metrics.baseline.elapsed_compute();
-
-        // If the range of indices is not big enough, just appending the rows 
into
-        // active array builders instead of directly adding them as a record 
batch.
-        let mut start_index: usize = 0;
-        let mut output_ret = output.append_rows(columns, indices, start_index, 
time_metric);
-
-        loop {
-            match output_ret {
-                AppendRowStatus::MemDiff(l) => {
-                    mem_diff += l?;
-                    break;
-                }
-                AppendRowStatus::StartIndex(new_start) => {
-                    // Cannot allocate enough memory for the array builders in 
the partition,
-                    // spill partitions and retry.
-                    self.spill().await?;
-
-                    let output = &mut self.buffered_partitions[partition_id];
-                    output.reservation.free();
-
-                    let time_metric = self.metrics.baseline.elapsed_compute();
-
-                    start_index = new_start;
-                    output_ret = output.append_rows(columns, indices, 
start_index, time_metric);
-
-                    if let AppendRowStatus::StartIndex(new_start) = output_ret 
{
-                        if new_start == start_index {
-                            // If the start index is not updated, it means 
that the partition
-                            // is still not able to allocate enough memory for 
the array builders.
-                            return Err(DataFusionError::Internal(
-                                "Partition is still not able to allocate 
enough memory for the array builders after spilling."
-                                    .to_string(),
-                            ));
-                        }
-                    }
-                }
-            }
-        }
-
-        Ok(mem_diff)
-    }
 }
 
 /// consume the `buffered_partitions` and do spill into a single temp shuffle 
output file
@@ -1557,8 +1470,6 @@ mod test {
     use datafusion::physical_plan::common::collect;
     use datafusion::physical_plan::memory::MemoryExec;
     use datafusion::prelude::SessionContext;
-    use datafusion_execution::config::SessionConfig;
-    use datafusion_execution::runtime_env::RuntimeEnvBuilder;
     use datafusion_physical_expr::expressions::Column;
     use tokio::runtime::Runtime;
 
@@ -1593,65 +1504,25 @@ mod test {
     #[test]
     #[cfg_attr(miri, ignore)] // miri can't call foreign function 
`ZSTD_createCCtx`
     fn test_insert_larger_batch() {
-        shuffle_write_test(10000, 1, 16, None);
-    }
-
-    #[test]
-    #[cfg_attr(miri, ignore)] // miri can't call foreign function 
`ZSTD_createCCtx`
-    fn test_insert_smaller_batch() {
-        shuffle_write_test(1000, 1, 16, None);
-        shuffle_write_test(1000, 10, 16, None);
-    }
-
-    #[test]
-    #[cfg_attr(miri, ignore)] // miri can't call foreign function 
`ZSTD_createCCtx`
-    #[cfg(not(target_os = "macos"))] // Github MacOS runner fails with "Too 
many open files".
-    fn test_large_number_of_partitions() {
-        shuffle_write_test(10000, 10, 200, Some(10 * 1024 * 1024));
-        shuffle_write_test(10000, 10, 2000, Some(10 * 1024 * 1024));
-    }
-
-    #[test]
-    #[cfg_attr(miri, ignore)] // miri can't call foreign function 
`ZSTD_createCCtx`
-    #[cfg(not(target_os = "macos"))] // Github MacOS runner fails with "Too 
many open files".
-    fn test_large_number_of_partitions_spilling() {
-        shuffle_write_test(10000, 100, 200, Some(10 * 1024 * 1024));
-    }
-
-    fn shuffle_write_test(
-        batch_size: usize,
-        num_batches: usize,
-        num_partitions: usize,
-        memory_limit: Option<usize>,
-    ) {
         let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, 
true)]));
         let mut b = StringBuilder::new();
-        for i in 0..batch_size {
+        for i in 0..10000 {
             b.append_value(format!("{i}"));
         }
         let array = b.finish();
         let batch = RecordBatch::try_new(Arc::clone(&schema), 
vec![Arc::new(array)]).unwrap();
 
-        let batches = (0..num_batches).map(|_| 
batch.clone()).collect::<Vec<_>>();
+        let batches = vec![batch.clone()];
 
         let partitions = &[batches];
         let exec = ShuffleWriterExec::try_new(
             Arc::new(MemoryExec::try_new(partitions, batch.schema(), 
None).unwrap()),
-            Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 
num_partitions),
+            Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16),
             "/tmp/data.out".to_string(),
             "/tmp/index.out".to_string(),
         )
         .unwrap();
-
-        // 10MB memory should be enough for running this test
-        let config = SessionConfig::new();
-        let mut runtime_env_builder = RuntimeEnvBuilder::new();
-        runtime_env_builder = match memory_limit {
-            Some(limit) => runtime_env_builder.with_memory_limit(limit, 1.0),
-            None => runtime_env_builder,
-        };
-        let runtime_env = Arc::new(runtime_env_builder.build().unwrap());
-        let ctx = SessionContext::new_with_config_rt(config, runtime_env);
+        let ctx = SessionContext::new();
         let task_ctx = ctx.task_ctx();
         let stream = exec.execute(0, task_ctx).unwrap();
         let rt = Runtime::new().unwrap();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to