This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new e146cfa1 chore: Reserve memory for native shuffle writer per partition
(#988)
e146cfa1 is described below
commit e146cfa1676fedcfb3003c2e3f1da1578d6b664b
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Mon Oct 14 07:49:03 2024 -0700
chore: Reserve memory for native shuffle writer per partition (#988)
* chore: Reserve memory for native shuffle writer per partition
* Revise
* skip large partition number shuffle on macos runners
* For review
---
.../src/execution/datafusion/shuffle_writer.rs | 251 ++++++++++++++++-----
1 file changed, 190 insertions(+), 61 deletions(-)
diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs
b/native/core/src/execution/datafusion/shuffle_writer.rs
index 9668359f..6c317466 100644
--- a/native/core/src/execution/datafusion/shuffle_writer.rs
+++ b/native/core/src/execution/datafusion/shuffle_writer.rs
@@ -66,6 +66,14 @@ use crate::{
};
use datafusion_comet_spark_expr::spark_hash::create_murmur3_hashes;
+/// The status of appending rows to a partition buffer.
+enum AppendRowStatus {
+ /// The difference in memory usage after appending rows
+ MemDiff(Result<isize>),
+ /// The index of the next row to append
+ StartIndex(usize),
+}
+
/// The shuffle writer operator maps each input partition to M output
partitions based on a
/// partitioning scheme. No guarantees are made about the order of the
resulting partitions.
#[derive(Debug)]
@@ -206,10 +214,21 @@ struct PartitionBuffer {
/// The maximum number of rows in a batch. Once `num_active_rows` reaches
`batch_size`,
/// the active array builders will be frozen and appended to frozen buffer
`frozen`.
batch_size: usize,
+ /// Memory reservation for this partition buffer.
+ reservation: MemoryReservation,
}
impl PartitionBuffer {
- fn new(schema: SchemaRef, batch_size: usize) -> Self {
+ fn new(
+ schema: SchemaRef,
+ batch_size: usize,
+ partition_id: usize,
+ runtime: &Arc<RuntimeEnv>,
+ ) -> Self {
+ let reservation = MemoryConsumer::new(format!("PartitionBuffer[{}]",
partition_id))
+ .with_can_spill(true)
+ .register(&runtime.memory_pool);
+
Self {
schema,
frozen: vec![],
@@ -217,47 +236,52 @@ impl PartitionBuffer {
active_slots_mem_size: 0,
num_active_rows: 0,
batch_size,
+ reservation,
}
}
/// Initializes active builders if necessary.
+ /// Returns error if memory reservation fails.
fn init_active_if_necessary(&mut self) -> Result<isize> {
let mut mem_diff = 0;
if self.active.is_empty() {
- self.active = new_array_builders(&self.schema, self.batch_size);
+ // Estimate the memory size of active builders
if self.active_slots_mem_size == 0 {
self.active_slots_mem_size = self
- .active
+ .schema
+ .fields()
.iter()
- .zip(self.schema.fields())
- .map(|(_ab, field)| slot_size(self.batch_size,
field.data_type()))
+ .map(|field| slot_size(self.batch_size, field.data_type()))
.sum::<usize>();
}
+
+ self.reservation.try_grow(self.active_slots_mem_size)?;
+
+ self.active = new_array_builders(&self.schema, self.batch_size);
+
mem_diff += self.active_slots_mem_size as isize;
}
Ok(mem_diff)
}
- /// Appends all rows of given batch into active array builders.
- fn append_batch(&mut self, batch: &RecordBatch, time_metric: &Time) ->
Result<isize> {
- let columns = batch.columns();
- let indices = (0..batch.num_rows()).collect::<Vec<usize>>();
- self.append_rows(columns, &indices, time_metric)
- }
-
/// Appends rows of specified indices from columns into active array
builders.
fn append_rows(
&mut self,
columns: &[ArrayRef],
indices: &[usize],
+ start_index: usize,
time_metric: &Time,
- ) -> Result<isize> {
+ ) -> AppendRowStatus {
let mut mem_diff = 0;
- let mut start = 0;
+ let mut start = start_index;
// lazy init because some partition may be empty
- mem_diff += self.init_active_if_necessary()?;
+ let init = self.init_active_if_necessary();
+ if init.is_err() {
+ return AppendRowStatus::StartIndex(start);
+ }
+ mem_diff += init.unwrap();
while start < indices.len() {
let end = (start + self.batch_size).min(indices.len());
@@ -270,14 +294,22 @@ impl PartitionBuffer {
self.num_active_rows += end - start;
if self.num_active_rows >= self.batch_size {
let mut timer = time_metric.timer();
- mem_diff += self.flush()?;
+ let flush = self.flush();
+ if let Err(e) = flush {
+ return AppendRowStatus::MemDiff(Err(e));
+ }
+ mem_diff += flush.unwrap();
timer.stop();
- mem_diff += self.init_active_if_necessary()?;
+ let init = self.init_active_if_necessary();
+ if init.is_err() {
+ return AppendRowStatus::StartIndex(end);
+ }
+ mem_diff += init.unwrap();
}
start = end;
}
- Ok(mem_diff)
+ AppendRowStatus::MemDiff(Ok(mem_diff))
}
/// flush active data into frozen bytes
@@ -291,7 +323,7 @@ impl PartitionBuffer {
let active = std::mem::take(&mut self.active);
let num_rows = self.num_active_rows;
self.num_active_rows = 0;
- mem_diff -= self.active_slots_mem_size as isize;
+ self.reservation.try_shrink(self.active_slots_mem_size)?;
let frozen_batch = make_batch(Arc::clone(&self.schema), active,
num_rows)?;
@@ -575,7 +607,7 @@ struct ShuffleRepartitioner {
output_data_file: String,
output_index_file: String,
schema: SchemaRef,
- buffered_partitions: Mutex<Vec<PartitionBuffer>>,
+ buffered_partitions: Vec<PartitionBuffer>,
spills: Mutex<Vec<SpillInfo>>,
/// Sort expressions
/// Partitioning scheme to use
@@ -648,11 +680,11 @@ impl ShuffleRepartitioner {
output_data_file,
output_index_file,
schema: Arc::clone(&schema),
- buffered_partitions: Mutex::new(
- (0..num_output_partitions)
- .map(|_| PartitionBuffer::new(Arc::clone(&schema),
batch_size))
- .collect::<Vec<_>>(),
- ),
+ buffered_partitions: (0..num_output_partitions)
+ .map(|partition_id| {
+ PartitionBuffer::new(Arc::clone(&schema), batch_size,
partition_id, &runtime)
+ })
+ .collect::<Vec<_>>(),
spills: Mutex::new(vec![]),
partitioning,
num_output_partitions,
@@ -699,8 +731,6 @@ impl ShuffleRepartitioner {
// Update data size metric
self.metrics.data_size.add(input.get_array_memory_size());
- let time_metric = self.metrics.baseline.elapsed_compute();
-
// NOTE: in shuffle writer exec, the output_rows metrics represents the
// number of rows those are written to output data file.
self.metrics.baseline.record_output(input.num_rows());
@@ -765,34 +795,36 @@ impl ShuffleRepartitioner {
.enumerate()
.filter(|(_, (start, end))| start < end)
{
- let mut buffered_partitions =
self.buffered_partitions.lock().await;
- let output = &mut buffered_partitions[partition_id];
-
- // If the range of indices is not big enough, just
appending the rows into
- // active array builders instead of directly adding them
as a record batch.
- mem_diff += output.append_rows(
- input.columns(),
- &shuffled_partition_ids[start..end],
- time_metric,
- )?;
- }
+ mem_diff += self
+ .append_rows_to_partition(
+ input.columns(),
+ &shuffled_partition_ids[start..end],
+ partition_id,
+ )
+ .await?;
+
+ if mem_diff > 0 {
+ let mem_increase = mem_diff as usize;
+ if self.reservation.try_grow(mem_increase).is_err() {
+ self.spill().await?;
+ self.reservation.free();
+ self.reservation.try_grow(mem_increase)?;
+
+ mem_diff = 0;
+ }
+ }
- if mem_diff > 0 {
- let mem_increase = mem_diff as usize;
- if self.reservation.try_grow(mem_increase).is_err() {
- self.spill().await?;
- self.reservation.free();
- self.reservation.try_grow(mem_increase)?;
+ if mem_diff < 0 {
+ let mem_used = self.reservation.size();
+ let mem_decrease = mem_used.min(-mem_diff as usize);
+ self.reservation.shrink(mem_decrease);
+
+ mem_diff += mem_decrease as isize;
}
}
- if mem_diff < 0 {
- let mem_used = self.reservation.size();
- let mem_decrease = mem_used.min(-mem_diff as usize);
- self.reservation.shrink(mem_decrease);
- }
}
Partitioning::UnknownPartitioning(n) if *n == 1 => {
- let mut buffered_partitions =
self.buffered_partitions.lock().await;
+ let buffered_partitions = &mut self.buffered_partitions;
assert!(
buffered_partitions.len() == 1,
@@ -800,8 +832,10 @@ impl ShuffleRepartitioner {
buffered_partitions.len()
);
- let output = &mut buffered_partitions[0];
- output.append_batch(&input, time_metric)?;
+ let indices = (0..input.num_rows()).collect::<Vec<usize>>();
+
+ self.append_rows_to_partition(input.columns(), &indices, 0)
+ .await?;
}
other => {
// this should be unreachable as long as the validation logic
@@ -818,7 +852,7 @@ impl ShuffleRepartitioner {
/// Writes buffered shuffled record batches into Arrow IPC bytes.
async fn shuffle_write(&mut self) -> Result<SendableRecordBatchStream> {
let num_output_partitions = self.num_output_partitions;
- let mut buffered_partitions = self.buffered_partitions.lock().await;
+ let buffered_partitions = &mut self.buffered_partitions;
let mut output_batches: Vec<Vec<u8>> = vec![vec![];
num_output_partitions];
for i in 0..num_output_partitions {
@@ -916,16 +950,15 @@ impl ShuffleRepartitioner {
self.metrics.data_size.value()
}
- async fn spill(&self) -> Result<usize> {
+ async fn spill(&mut self) -> Result<usize> {
log::debug!(
"ShuffleRepartitioner spilling shuffle data of {} to disk while
inserting ({} time(s) so far)",
self.used(),
self.spill_count()
);
- let mut buffered_partitions = self.buffered_partitions.lock().await;
// we could always get a chance to free some memory as long as we are
holding some
- if buffered_partitions.len() == 0 {
+ if self.buffered_partitions.is_empty() {
return Ok(0);
}
@@ -936,7 +969,7 @@ impl ShuffleRepartitioner {
.disk_manager
.create_tmp_file("shuffle writer spill")?;
let offsets = spill_into(
- &mut buffered_partitions,
+ &mut self.buffered_partitions,
spillfile.path(),
self.num_output_partitions,
)
@@ -954,6 +987,60 @@ impl ShuffleRepartitioner {
});
Ok(used)
}
+
+ /// Appends rows of specified indices from columns into active array
builders in the specified partition.
+ async fn append_rows_to_partition(
+ &mut self,
+ columns: &[ArrayRef],
+ indices: &[usize],
+ partition_id: usize,
+ ) -> Result<isize> {
+ let mut mem_diff = 0;
+
+ let output = &mut self.buffered_partitions[partition_id];
+
+ let time_metric = self.metrics.baseline.elapsed_compute();
+
+ // If the range of indices is not big enough, just appending the rows
into
+ // active array builders instead of directly adding them as a record
batch.
+ let mut start_index: usize = 0;
+ let mut output_ret = output.append_rows(columns, indices, start_index,
time_metric);
+
+ loop {
+ match output_ret {
+ AppendRowStatus::MemDiff(l) => {
+ mem_diff += l?;
+ break;
+ }
+ AppendRowStatus::StartIndex(new_start) => {
+ // Cannot allocate enough memory for the array builders in
the partition,
+ // spill partitions and retry.
+ self.spill().await?;
+
+ let output = &mut self.buffered_partitions[partition_id];
+ output.reservation.free();
+
+ let time_metric = self.metrics.baseline.elapsed_compute();
+
+ start_index = new_start;
+ output_ret = output.append_rows(columns, indices,
start_index, time_metric);
+
+ if let AppendRowStatus::StartIndex(new_start) = output_ret
{
+ if new_start == start_index {
+ // If the start index is not updated, it means
that the partition
+ // is still not able to allocate enough memory for
the array builders.
+ return Err(DataFusionError::Internal(
+ "Partition is still not able to allocate
enough memory for the array builders after spilling."
+ .to_string(),
+ ));
+ }
+ }
+ }
+ }
+ }
+
+ Ok(mem_diff)
+ }
}
/// consume the `buffered_partitions` and do spill into a single temp shuffle
output file
@@ -1470,6 +1557,8 @@ mod test {
use datafusion::physical_plan::common::collect;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::prelude::SessionContext;
+ use datafusion_execution::config::SessionConfig;
+ use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_physical_expr::expressions::Column;
use tokio::runtime::Runtime;
@@ -1504,25 +1593,65 @@ mod test {
#[test]
#[cfg_attr(miri, ignore)] // miri can't call foreign function
`ZSTD_createCCtx`
fn test_insert_larger_batch() {
+ shuffle_write_test(10000, 1, 16, None);
+ }
+
+ #[test]
+ #[cfg_attr(miri, ignore)] // miri can't call foreign function
`ZSTD_createCCtx`
+ fn test_insert_smaller_batch() {
+ shuffle_write_test(1000, 1, 16, None);
+ shuffle_write_test(1000, 10, 16, None);
+ }
+
+ #[test]
+ #[cfg_attr(miri, ignore)] // miri can't call foreign function
`ZSTD_createCCtx`
+ #[cfg(not(target_os = "macos"))] // Github MacOS runner fails with "Too
many open files".
+ fn test_large_number_of_partitions() {
+ shuffle_write_test(10000, 10, 200, Some(10 * 1024 * 1024));
+ shuffle_write_test(10000, 10, 2000, Some(10 * 1024 * 1024));
+ }
+
+ #[test]
+ #[cfg_attr(miri, ignore)] // miri can't call foreign function
`ZSTD_createCCtx`
+ #[cfg(not(target_os = "macos"))] // Github MacOS runner fails with "Too
many open files".
+ fn test_large_number_of_partitions_spilling() {
+ shuffle_write_test(10000, 100, 200, Some(10 * 1024 * 1024));
+ }
+
+ fn shuffle_write_test(
+ batch_size: usize,
+ num_batches: usize,
+ num_partitions: usize,
+ memory_limit: Option<usize>,
+ ) {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8,
true)]));
let mut b = StringBuilder::new();
- for i in 0..10000 {
+ for i in 0..batch_size {
b.append_value(format!("{i}"));
}
let array = b.finish();
let batch = RecordBatch::try_new(Arc::clone(&schema),
vec![Arc::new(array)]).unwrap();
- let batches = vec![batch.clone()];
+ let batches = (0..num_batches).map(|_|
batch.clone()).collect::<Vec<_>>();
let partitions = &[batches];
let exec = ShuffleWriterExec::try_new(
Arc::new(MemoryExec::try_new(partitions, batch.schema(),
None).unwrap()),
- Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16),
+ Partitioning::Hash(vec![Arc::new(Column::new("a", 0))],
num_partitions),
"/tmp/data.out".to_string(),
"/tmp/index.out".to_string(),
)
.unwrap();
- let ctx = SessionContext::new();
+
+ // 10MB memory should be enough for running this test
+ let config = SessionConfig::new();
+ let mut runtime_env_builder = RuntimeEnvBuilder::new();
+ runtime_env_builder = match memory_limit {
+ Some(limit) => runtime_env_builder.with_memory_limit(limit, 1.0),
+ None => runtime_env_builder,
+ };
+ let runtime_env = Arc::new(runtime_env_builder.build().unwrap());
+ let ctx = SessionContext::new_with_config_rt(config, runtime_env);
let task_ctx = ctx.task_ctx();
let stream = exec.execute(0, task_ctx).unwrap();
let rt = Runtime::new().unwrap();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]