This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 8d097d58 chore: Revert "chore: Reserve memory for native shuffle
writer per partition (#988)" (#1020)
8d097d58 is described below
commit 8d097d584e9e70399b635e5063ad7dea3fc87cac
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Tue Oct 15 11:25:50 2024 -0700
chore: Revert "chore: Reserve memory for native shuffle writer per
partition (#988)" (#1020)
This reverts commit e146cfa1676fedcfb3003c2e3f1da1578d6b664b.
---
.../src/execution/datafusion/shuffle_writer.rs | 251 +++++----------------
1 file changed, 61 insertions(+), 190 deletions(-)
diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs
b/native/core/src/execution/datafusion/shuffle_writer.rs
index 6c317466..9668359f 100644
--- a/native/core/src/execution/datafusion/shuffle_writer.rs
+++ b/native/core/src/execution/datafusion/shuffle_writer.rs
@@ -66,14 +66,6 @@ use crate::{
};
use datafusion_comet_spark_expr::spark_hash::create_murmur3_hashes;
-/// The status of appending rows to a partition buffer.
-enum AppendRowStatus {
- /// The difference in memory usage after appending rows
- MemDiff(Result<isize>),
- /// The index of the next row to append
- StartIndex(usize),
-}
-
/// The shuffle writer operator maps each input partition to M output
partitions based on a
/// partitioning scheme. No guarantees are made about the order of the
resulting partitions.
#[derive(Debug)]
@@ -214,21 +206,10 @@ struct PartitionBuffer {
/// The maximum number of rows in a batch. Once `num_active_rows` reaches
`batch_size`,
/// the active array builders will be frozen and appended to frozen buffer
`frozen`.
batch_size: usize,
- /// Memory reservation for this partition buffer.
- reservation: MemoryReservation,
}
impl PartitionBuffer {
- fn new(
- schema: SchemaRef,
- batch_size: usize,
- partition_id: usize,
- runtime: &Arc<RuntimeEnv>,
- ) -> Self {
- let reservation = MemoryConsumer::new(format!("PartitionBuffer[{}]",
partition_id))
- .with_can_spill(true)
- .register(&runtime.memory_pool);
-
+ fn new(schema: SchemaRef, batch_size: usize) -> Self {
Self {
schema,
frozen: vec![],
@@ -236,52 +217,47 @@ impl PartitionBuffer {
active_slots_mem_size: 0,
num_active_rows: 0,
batch_size,
- reservation,
}
}
/// Initializes active builders if necessary.
- /// Returns error if memory reservation fails.
fn init_active_if_necessary(&mut self) -> Result<isize> {
let mut mem_diff = 0;
if self.active.is_empty() {
- // Estimate the memory size of active builders
+ self.active = new_array_builders(&self.schema, self.batch_size);
if self.active_slots_mem_size == 0 {
self.active_slots_mem_size = self
- .schema
- .fields()
+ .active
.iter()
- .map(|field| slot_size(self.batch_size, field.data_type()))
+ .zip(self.schema.fields())
+ .map(|(_ab, field)| slot_size(self.batch_size,
field.data_type()))
.sum::<usize>();
}
-
- self.reservation.try_grow(self.active_slots_mem_size)?;
-
- self.active = new_array_builders(&self.schema, self.batch_size);
-
mem_diff += self.active_slots_mem_size as isize;
}
Ok(mem_diff)
}
+ /// Appends all rows of given batch into active array builders.
+ fn append_batch(&mut self, batch: &RecordBatch, time_metric: &Time) ->
Result<isize> {
+ let columns = batch.columns();
+ let indices = (0..batch.num_rows()).collect::<Vec<usize>>();
+ self.append_rows(columns, &indices, time_metric)
+ }
+
/// Appends rows of specified indices from columns into active array
builders.
fn append_rows(
&mut self,
columns: &[ArrayRef],
indices: &[usize],
- start_index: usize,
time_metric: &Time,
- ) -> AppendRowStatus {
+ ) -> Result<isize> {
let mut mem_diff = 0;
- let mut start = start_index;
+ let mut start = 0;
// lazy init because some partition may be empty
- let init = self.init_active_if_necessary();
- if init.is_err() {
- return AppendRowStatus::StartIndex(start);
- }
- mem_diff += init.unwrap();
+ mem_diff += self.init_active_if_necessary()?;
while start < indices.len() {
let end = (start + self.batch_size).min(indices.len());
@@ -294,22 +270,14 @@ impl PartitionBuffer {
self.num_active_rows += end - start;
if self.num_active_rows >= self.batch_size {
let mut timer = time_metric.timer();
- let flush = self.flush();
- if let Err(e) = flush {
- return AppendRowStatus::MemDiff(Err(e));
- }
- mem_diff += flush.unwrap();
+ mem_diff += self.flush()?;
timer.stop();
- let init = self.init_active_if_necessary();
- if init.is_err() {
- return AppendRowStatus::StartIndex(end);
- }
- mem_diff += init.unwrap();
+ mem_diff += self.init_active_if_necessary()?;
}
start = end;
}
- AppendRowStatus::MemDiff(Ok(mem_diff))
+ Ok(mem_diff)
}
/// flush active data into frozen bytes
@@ -323,7 +291,7 @@ impl PartitionBuffer {
let active = std::mem::take(&mut self.active);
let num_rows = self.num_active_rows;
self.num_active_rows = 0;
- self.reservation.try_shrink(self.active_slots_mem_size)?;
+ mem_diff -= self.active_slots_mem_size as isize;
let frozen_batch = make_batch(Arc::clone(&self.schema), active,
num_rows)?;
@@ -607,7 +575,7 @@ struct ShuffleRepartitioner {
output_data_file: String,
output_index_file: String,
schema: SchemaRef,
- buffered_partitions: Vec<PartitionBuffer>,
+ buffered_partitions: Mutex<Vec<PartitionBuffer>>,
spills: Mutex<Vec<SpillInfo>>,
/// Sort expressions
/// Partitioning scheme to use
@@ -680,11 +648,11 @@ impl ShuffleRepartitioner {
output_data_file,
output_index_file,
schema: Arc::clone(&schema),
- buffered_partitions: (0..num_output_partitions)
- .map(|partition_id| {
- PartitionBuffer::new(Arc::clone(&schema), batch_size,
partition_id, &runtime)
- })
- .collect::<Vec<_>>(),
+ buffered_partitions: Mutex::new(
+ (0..num_output_partitions)
+ .map(|_| PartitionBuffer::new(Arc::clone(&schema),
batch_size))
+ .collect::<Vec<_>>(),
+ ),
spills: Mutex::new(vec![]),
partitioning,
num_output_partitions,
@@ -731,6 +699,8 @@ impl ShuffleRepartitioner {
// Update data size metric
self.metrics.data_size.add(input.get_array_memory_size());
+ let time_metric = self.metrics.baseline.elapsed_compute();
+
// NOTE: in shuffle writer exec, the output_rows metrics represents the
// number of rows those are written to output data file.
self.metrics.baseline.record_output(input.num_rows());
@@ -795,36 +765,34 @@ impl ShuffleRepartitioner {
.enumerate()
.filter(|(_, (start, end))| start < end)
{
- mem_diff += self
- .append_rows_to_partition(
- input.columns(),
- &shuffled_partition_ids[start..end],
- partition_id,
- )
- .await?;
-
- if mem_diff > 0 {
- let mem_increase = mem_diff as usize;
- if self.reservation.try_grow(mem_increase).is_err() {
- self.spill().await?;
- self.reservation.free();
- self.reservation.try_grow(mem_increase)?;
-
- mem_diff = 0;
- }
- }
-
- if mem_diff < 0 {
- let mem_used = self.reservation.size();
- let mem_decrease = mem_used.min(-mem_diff as usize);
- self.reservation.shrink(mem_decrease);
+ let mut buffered_partitions =
self.buffered_partitions.lock().await;
+ let output = &mut buffered_partitions[partition_id];
+
+ // If the range of indices is not big enough, just
appending the rows into
+ // active array builders instead of directly adding them
as a record batch.
+ mem_diff += output.append_rows(
+ input.columns(),
+ &shuffled_partition_ids[start..end],
+ time_metric,
+ )?;
+ }
- mem_diff += mem_decrease as isize;
+ if mem_diff > 0 {
+ let mem_increase = mem_diff as usize;
+ if self.reservation.try_grow(mem_increase).is_err() {
+ self.spill().await?;
+ self.reservation.free();
+ self.reservation.try_grow(mem_increase)?;
}
}
+ if mem_diff < 0 {
+ let mem_used = self.reservation.size();
+ let mem_decrease = mem_used.min(-mem_diff as usize);
+ self.reservation.shrink(mem_decrease);
+ }
}
Partitioning::UnknownPartitioning(n) if *n == 1 => {
- let buffered_partitions = &mut self.buffered_partitions;
+ let mut buffered_partitions =
self.buffered_partitions.lock().await;
assert!(
buffered_partitions.len() == 1,
@@ -832,10 +800,8 @@ impl ShuffleRepartitioner {
buffered_partitions.len()
);
- let indices = (0..input.num_rows()).collect::<Vec<usize>>();
-
- self.append_rows_to_partition(input.columns(), &indices, 0)
- .await?;
+ let output = &mut buffered_partitions[0];
+ output.append_batch(&input, time_metric)?;
}
other => {
// this should be unreachable as long as the validation logic
@@ -852,7 +818,7 @@ impl ShuffleRepartitioner {
/// Writes buffered shuffled record batches into Arrow IPC bytes.
async fn shuffle_write(&mut self) -> Result<SendableRecordBatchStream> {
let num_output_partitions = self.num_output_partitions;
- let buffered_partitions = &mut self.buffered_partitions;
+ let mut buffered_partitions = self.buffered_partitions.lock().await;
let mut output_batches: Vec<Vec<u8>> = vec![vec![];
num_output_partitions];
for i in 0..num_output_partitions {
@@ -950,15 +916,16 @@ impl ShuffleRepartitioner {
self.metrics.data_size.value()
}
- async fn spill(&mut self) -> Result<usize> {
+ async fn spill(&self) -> Result<usize> {
log::debug!(
"ShuffleRepartitioner spilling shuffle data of {} to disk while
inserting ({} time(s) so far)",
self.used(),
self.spill_count()
);
+ let mut buffered_partitions = self.buffered_partitions.lock().await;
// we could always get a chance to free some memory as long as we are
holding some
- if self.buffered_partitions.is_empty() {
+ if buffered_partitions.len() == 0 {
return Ok(0);
}
@@ -969,7 +936,7 @@ impl ShuffleRepartitioner {
.disk_manager
.create_tmp_file("shuffle writer spill")?;
let offsets = spill_into(
- &mut self.buffered_partitions,
+ &mut buffered_partitions,
spillfile.path(),
self.num_output_partitions,
)
@@ -987,60 +954,6 @@ impl ShuffleRepartitioner {
});
Ok(used)
}
-
- /// Appends rows of specified indices from columns into active array
builders in the specified partition.
- async fn append_rows_to_partition(
- &mut self,
- columns: &[ArrayRef],
- indices: &[usize],
- partition_id: usize,
- ) -> Result<isize> {
- let mut mem_diff = 0;
-
- let output = &mut self.buffered_partitions[partition_id];
-
- let time_metric = self.metrics.baseline.elapsed_compute();
-
- // If the range of indices is not big enough, just appending the rows
into
- // active array builders instead of directly adding them as a record
batch.
- let mut start_index: usize = 0;
- let mut output_ret = output.append_rows(columns, indices, start_index,
time_metric);
-
- loop {
- match output_ret {
- AppendRowStatus::MemDiff(l) => {
- mem_diff += l?;
- break;
- }
- AppendRowStatus::StartIndex(new_start) => {
- // Cannot allocate enough memory for the array builders in
the partition,
- // spill partitions and retry.
- self.spill().await?;
-
- let output = &mut self.buffered_partitions[partition_id];
- output.reservation.free();
-
- let time_metric = self.metrics.baseline.elapsed_compute();
-
- start_index = new_start;
- output_ret = output.append_rows(columns, indices,
start_index, time_metric);
-
- if let AppendRowStatus::StartIndex(new_start) = output_ret
{
- if new_start == start_index {
- // If the start index is not updated, it means
that the partition
- // is still not able to allocate enough memory for
the array builders.
- return Err(DataFusionError::Internal(
- "Partition is still not able to allocate
enough memory for the array builders after spilling."
- .to_string(),
- ));
- }
- }
- }
- }
- }
-
- Ok(mem_diff)
- }
}
/// consume the `buffered_partitions` and do spill into a single temp shuffle
output file
@@ -1557,8 +1470,6 @@ mod test {
use datafusion::physical_plan::common::collect;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::prelude::SessionContext;
- use datafusion_execution::config::SessionConfig;
- use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_physical_expr::expressions::Column;
use tokio::runtime::Runtime;
@@ -1593,65 +1504,25 @@ mod test {
#[test]
#[cfg_attr(miri, ignore)] // miri can't call foreign function
`ZSTD_createCCtx`
fn test_insert_larger_batch() {
- shuffle_write_test(10000, 1, 16, None);
- }
-
- #[test]
- #[cfg_attr(miri, ignore)] // miri can't call foreign function
`ZSTD_createCCtx`
- fn test_insert_smaller_batch() {
- shuffle_write_test(1000, 1, 16, None);
- shuffle_write_test(1000, 10, 16, None);
- }
-
- #[test]
- #[cfg_attr(miri, ignore)] // miri can't call foreign function
`ZSTD_createCCtx`
- #[cfg(not(target_os = "macos"))] // Github MacOS runner fails with "Too
many open files".
- fn test_large_number_of_partitions() {
- shuffle_write_test(10000, 10, 200, Some(10 * 1024 * 1024));
- shuffle_write_test(10000, 10, 2000, Some(10 * 1024 * 1024));
- }
-
- #[test]
- #[cfg_attr(miri, ignore)] // miri can't call foreign function
`ZSTD_createCCtx`
- #[cfg(not(target_os = "macos"))] // Github MacOS runner fails with "Too
many open files".
- fn test_large_number_of_partitions_spilling() {
- shuffle_write_test(10000, 100, 200, Some(10 * 1024 * 1024));
- }
-
- fn shuffle_write_test(
- batch_size: usize,
- num_batches: usize,
- num_partitions: usize,
- memory_limit: Option<usize>,
- ) {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8,
true)]));
let mut b = StringBuilder::new();
- for i in 0..batch_size {
+ for i in 0..10000 {
b.append_value(format!("{i}"));
}
let array = b.finish();
let batch = RecordBatch::try_new(Arc::clone(&schema),
vec![Arc::new(array)]).unwrap();
- let batches = (0..num_batches).map(|_|
batch.clone()).collect::<Vec<_>>();
+ let batches = vec![batch.clone()];
let partitions = &[batches];
let exec = ShuffleWriterExec::try_new(
Arc::new(MemoryExec::try_new(partitions, batch.schema(),
None).unwrap()),
- Partitioning::Hash(vec![Arc::new(Column::new("a", 0))],
num_partitions),
+ Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16),
"/tmp/data.out".to_string(),
"/tmp/index.out".to_string(),
)
.unwrap();
-
- // 10MB memory should be enough for running this test
- let config = SessionConfig::new();
- let mut runtime_env_builder = RuntimeEnvBuilder::new();
- runtime_env_builder = match memory_limit {
- Some(limit) => runtime_env_builder.with_memory_limit(limit, 1.0),
- None => runtime_env_builder,
- };
- let runtime_env = Arc::new(runtime_env_builder.build().unwrap());
- let ctx = SessionContext::new_with_config_rt(config, runtime_env);
+ let ctx = SessionContext::new();
let task_ctx = ctx.task_ctx();
let stream = exec.execute(0, task_ctx).unwrap();
let rt = Runtime::new().unwrap();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]