martin-g commented on code in PR #1537:
URL:
https://github.com/apache/datafusion-ballista/pull/1537#discussion_r3026051897
##########
ballista/core/src/execution_plans/shuffle_writer.rs:
##########
@@ -255,96 +252,114 @@ impl ShuffleWriterExec {
}
Some(Partitioning::Hash(exprs, num_output_partitions)) => {
- // we won't necessary produce output for every possible
partition, so we
- // create writers on demand
- let mut writers: Vec<Option<WriteTracker>> = vec![];
- for _ in 0..num_output_partitions {
- writers.push(None);
- }
-
- let mut partitioner =
BatchPartitioner::new_hash_partitioner(
- exprs,
- num_output_partitions,
- write_metrics.repart_time.clone(),
- );
-
- while let Some(result) = stream.next().await {
- let input_batch = result?;
-
- write_metrics.input_rows.add(input_batch.num_rows());
-
- partitioner.partition(
- input_batch,
- |output_partition, output_batch| {
- // partition func in datafusion make sure not
write empty output_batch.
- let timer = write_metrics.write_time.timer();
- match &mut writers[output_partition] {
- Some(w) => {
- w.num_batches += 1;
- w.num_rows += output_batch.num_rows();
- w.writer.write(&output_batch)?;
- }
- None => {
- let mut path = path.clone();
-
path.push(format!("{output_partition}"));
- std::fs::create_dir_all(&path)?;
-
- path.push(format!(
- "data-{input_partition}.arrow"
- ));
- debug!("Writing results to {path:?}");
-
- let options =
IpcWriteOptions::default()
- .try_with_compression(Some(
- CompressionType::LZ4_FRAME,
- ))?;
-
- let file =
-
BufWriter::new(File::create(path.clone())?);
- let mut writer =
- StreamWriter::try_new_with_options(
- file,
- stream.schema().as_ref(),
- options,
- )?;
-
- writer.write(&output_batch)?;
- writers[output_partition] =
Some(WriteTracker {
- num_batches: 1,
- num_rows: output_batch.num_rows(),
- writer,
- path,
- });
+ let schema = stream.schema();
+ let (tx, mut rx) =
tokio::sync::mpsc::channel::<RecordBatch>(2);
+ let write_time = write_metrics.write_time.clone();
+ let repart_time = write_metrics.repart_time.clone();
+ let output_rows = write_metrics.output_rows.clone();
+
+ let handle = tokio::task::spawn_blocking(move || {
+ let mut writers: Vec<Option<WriteTracker>> =
+ (0..num_output_partitions).map(|_| None).collect();
Review Comment:
nit:
```suggestion
let mut writers: Vec<Option<WriteTracker>> =
vec![None; num_output_partitions];
```
##########
ballista/core/src/execution_plans/shuffle_writer.rs:
##########
@@ -214,14 +214,12 @@ impl ShuffleWriterExec {
match output_partitioning {
None => {
- let timer = write_metrics.write_time.timer();
path.push(format!("{input_partition}"));
- std::fs::create_dir_all(&path)?;
+ tokio::fs::create_dir_all(&path).await?;
path.push("data.arrow");
let path = path.to_str().unwrap();
Review Comment:
This is old (before the PR):
The `.unwrap()` may cause panic on non-UTF8 paths.
Looking at the code I think there is no need to make it a `&str`. Passing it
as `&Path` should be OK too.
##########
benchmarks/src/bin/shuffle_bench.rs:
##########
@@ -240,19 +275,53 @@ async fn benchmark_sort_shuffle(
output_partitions,
),
config,
- )?;
+ )?);
let start = Instant::now();
- // Execute all input partitions (not output partitions)
let input_partition_count = data.len();
let mut total_files = 0;
- for partition in 0..input_partition_count {
- let mut stream = shuffle_writer.execute(partition, task_ctx.clone())?;
- let batches = utils::collect_stream(&mut stream).await?;
- // Count output files from the result
- if let Some(batch) = batches.first() {
- total_files += batch.num_rows();
+
+ if concurrency <= 1 {
+ for partition in 0..input_partition_count {
+ let mut stream = shuffle_writer.execute(partition,
task_ctx.clone())?;
+ let batches = utils::collect_stream(&mut stream).await?;
+ if let Some(batch) = batches.first() {
+ total_files += batch.num_rows();
+ }
+ }
+ } else {
+ let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrency));
Review Comment:
This code looks very similar (same?!) to
https://github.com/apache/datafusion-ballista/pull/1537/changes?w=1#diff-bbacf10874172011fe967d766f8642a5b84583e33215b128f606ad52ef994329R200-R231
Maybe extract it to a helper function ?
##########
ballista/core/src/execution_plans/shuffle_writer.rs:
##########
@@ -255,96 +252,114 @@ impl ShuffleWriterExec {
}
Some(Partitioning::Hash(exprs, num_output_partitions)) => {
- // we won't necessary produce output for every possible
partition, so we
- // create writers on demand
- let mut writers: Vec<Option<WriteTracker>> = vec![];
- for _ in 0..num_output_partitions {
- writers.push(None);
- }
-
- let mut partitioner =
BatchPartitioner::new_hash_partitioner(
- exprs,
- num_output_partitions,
- write_metrics.repart_time.clone(),
- );
-
- while let Some(result) = stream.next().await {
- let input_batch = result?;
-
- write_metrics.input_rows.add(input_batch.num_rows());
-
- partitioner.partition(
- input_batch,
- |output_partition, output_batch| {
- // partition func in datafusion make sure not
write empty output_batch.
- let timer = write_metrics.write_time.timer();
- match &mut writers[output_partition] {
- Some(w) => {
- w.num_batches += 1;
- w.num_rows += output_batch.num_rows();
- w.writer.write(&output_batch)?;
- }
- None => {
- let mut path = path.clone();
-
path.push(format!("{output_partition}"));
- std::fs::create_dir_all(&path)?;
-
- path.push(format!(
- "data-{input_partition}.arrow"
- ));
- debug!("Writing results to {path:?}");
-
- let options =
IpcWriteOptions::default()
- .try_with_compression(Some(
- CompressionType::LZ4_FRAME,
- ))?;
-
- let file =
-
BufWriter::new(File::create(path.clone())?);
- let mut writer =
- StreamWriter::try_new_with_options(
- file,
- stream.schema().as_ref(),
- options,
- )?;
-
- writer.write(&output_batch)?;
- writers[output_partition] =
Some(WriteTracker {
- num_batches: 1,
- num_rows: output_batch.num_rows(),
- writer,
- path,
- });
+ let schema = stream.schema();
+ let (tx, mut rx) =
tokio::sync::mpsc::channel::<RecordBatch>(2);
Review Comment:
Why `2` ?
Isn't it too low ? This may lead to less throughput due to more context
switches.
##########
ballista/core/src/utils.rs:
##########
@@ -159,42 +159,74 @@ pub fn default_config_producer() -> SessionConfig {
SessionConfig::new_with_ballista()
}
-/// Stream data to disk in Arrow IPC format
+/// Stream data to disk in Arrow IPC format.
+///
+/// Batches are read from the async stream and forwarded through a bounded
+/// channel to a `spawn_blocking` task that performs all synchronous file I/O,
+/// keeping the tokio worker thread unblocked.
pub async fn write_stream_to_disk(
stream: &mut Pin<Box<dyn RecordBatchStream + Send>>,
path: &str,
disk_write_metric: &metrics::Time,
) -> Result<PartitionStats> {
- let file = BufWriter::new(File::create(path).map_err(|e| {
- error!("Failed to create partition file at {path}: {e:?}");
- BallistaError::IoError(e)
- })?);
+ let schema = stream.schema();
+ let path_owned = path.to_owned();
+ let write_metric = disk_write_metric.clone();
+
+ let (tx, mut rx) = tokio::sync::mpsc::channel::<RecordBatch>(2);
Review Comment:
Why `2` ?
Isn't it too low ? This may lead to less throughput due to more context
switches.
##########
ballista/core/src/utils.rs:
##########
@@ -159,42 +159,74 @@ pub fn default_config_producer() -> SessionConfig {
SessionConfig::new_with_ballista()
}
-/// Stream data to disk in Arrow IPC format
+/// Stream data to disk in Arrow IPC format.
+///
+/// Batches are read from the async stream and forwarded through a bounded
+/// channel to a `spawn_blocking` task that performs all synchronous file I/O,
+/// keeping the tokio worker thread unblocked.
pub async fn write_stream_to_disk(
stream: &mut Pin<Box<dyn RecordBatchStream + Send>>,
path: &str,
disk_write_metric: &metrics::Time,
) -> Result<PartitionStats> {
- let file = BufWriter::new(File::create(path).map_err(|e| {
- error!("Failed to create partition file at {path}: {e:?}");
- BallistaError::IoError(e)
- })?);
+ let schema = stream.schema();
+ let path_owned = path.to_owned();
+ let write_metric = disk_write_metric.clone();
+
+ let (tx, mut rx) = tokio::sync::mpsc::channel::<RecordBatch>(2);
+
+ let handle = tokio::task::spawn_blocking(move || -> Result<()> {
+ let file = BufWriter::new(File::create(&path_owned).map_err(|e| {
+ error!("Failed to create partition file at {path_owned}: {e:?}");
+ BallistaError::IoError(e)
+ })?);
+
+ let options = IpcWriteOptions::default()
+ .try_with_compression(Some(CompressionType::LZ4_FRAME))?;
+
+ let mut writer =
+ StreamWriter::try_new_with_options(file, schema.as_ref(),
options)?;
+
+ while let Some(batch) = rx.blocking_recv() {
+ let timer = write_metric.timer();
+ writer.write(&batch)?;
+ timer.done();
+ }
+ let timer = write_metric.timer();
+ writer.finish()?;
+ timer.done();
+ Ok(())
+ });
let mut num_rows = 0;
let mut num_batches = 0;
let mut num_bytes = 0;
- let options = IpcWriteOptions::default()
- .try_with_compression(Some(CompressionType::LZ4_FRAME))?;
-
- let mut writer =
- StreamWriter::try_new_with_options(file, stream.schema().as_ref(),
options)?;
-
- while let Some(result) = stream.next().await {
- let batch = result?;
+ let stream_err = loop {
+ match stream.next().await {
+ Some(Ok(batch)) => {
+ num_batches += 1;
+ num_rows += batch.num_rows();
+ num_bytes += batch.get_array_memory_size();
+ if tx.send(batch).await.is_err() {
+ break None;
+ }
+ }
+ Some(Err(e)) => break Some(e),
+ None => break None,
+ }
+ };
+ drop(tx);
- let batch_size_bytes: usize = batch.get_array_memory_size();
- num_batches += 1;
- num_rows += batch.num_rows();
- num_bytes += batch_size_bytes;
+ let write_result = handle
+ .await
+ .map_err(|e| BallistaError::General(format!("Disk writer task failed:
{e}")))?;
- let timer = disk_write_metric.timer();
- writer.write(&batch)?;
- timer.done();
+ if let Some(e) = stream_err {
+ return Err(e.into());
Review Comment:
Same issue - if `write_result` is Err then it is silently dropped.
##########
ballista/core/src/utils.rs:
##########
@@ -159,42 +159,74 @@ pub fn default_config_producer() -> SessionConfig {
SessionConfig::new_with_ballista()
}
-/// Stream data to disk in Arrow IPC format
+/// Stream data to disk in Arrow IPC format.
+///
+/// Batches are read from the async stream and forwarded through a bounded
+/// channel to a `spawn_blocking` task that performs all synchronous file I/O,
+/// keeping the tokio worker thread unblocked.
pub async fn write_stream_to_disk(
stream: &mut Pin<Box<dyn RecordBatchStream + Send>>,
path: &str,
disk_write_metric: &metrics::Time,
) -> Result<PartitionStats> {
- let file = BufWriter::new(File::create(path).map_err(|e| {
- error!("Failed to create partition file at {path}: {e:?}");
- BallistaError::IoError(e)
- })?);
+ let schema = stream.schema();
+ let path_owned = path.to_owned();
+ let write_metric = disk_write_metric.clone();
+
+ let (tx, mut rx) = tokio::sync::mpsc::channel::<RecordBatch>(2);
+
+ let handle = tokio::task::spawn_blocking(move || -> Result<()> {
+ let file = BufWriter::new(File::create(&path_owned).map_err(|e| {
+ error!("Failed to create partition file at {path_owned}: {e:?}");
+ BallistaError::IoError(e)
+ })?);
+
+ let options = IpcWriteOptions::default()
+ .try_with_compression(Some(CompressionType::LZ4_FRAME))?;
+
+ let mut writer =
+ StreamWriter::try_new_with_options(file, schema.as_ref(),
options)?;
+
+ while let Some(batch) = rx.blocking_recv() {
+ let timer = write_metric.timer();
+ writer.write(&batch)?;
+ timer.done();
+ }
+ let timer = write_metric.timer();
+ writer.finish()?;
+ timer.done();
+ Ok(())
+ });
let mut num_rows = 0;
let mut num_batches = 0;
let mut num_bytes = 0;
- let options = IpcWriteOptions::default()
- .try_with_compression(Some(CompressionType::LZ4_FRAME))?;
-
- let mut writer =
- StreamWriter::try_new_with_options(file, stream.schema().as_ref(),
options)?;
-
- while let Some(result) = stream.next().await {
- let batch = result?;
+ let stream_err = loop {
+ match stream.next().await {
+ Some(Ok(batch)) => {
+ num_batches += 1;
+ num_rows += batch.num_rows();
+ num_bytes += batch.get_array_memory_size();
Review Comment:
This is from before this PR:
Here `num_bytes` is the number of bytes in memory.
https://github.com/hcrosse/datafusion-ballista/blob/fdaf1b764bd4ab3320e7d0edbe70a911a9e954b9/ballista/core/src/execution_plans/shuffle_writer.rs#L323
is the number of bytes on disk, which is **compressed**.
##########
ballista/core/src/execution_plans/shuffle_writer.rs:
##########
@@ -255,96 +252,114 @@ impl ShuffleWriterExec {
}
Some(Partitioning::Hash(exprs, num_output_partitions)) => {
- // we won't necessary produce output for every possible
partition, so we
- // create writers on demand
- let mut writers: Vec<Option<WriteTracker>> = vec![];
- for _ in 0..num_output_partitions {
- writers.push(None);
- }
-
- let mut partitioner =
BatchPartitioner::new_hash_partitioner(
- exprs,
- num_output_partitions,
- write_metrics.repart_time.clone(),
- );
-
- while let Some(result) = stream.next().await {
- let input_batch = result?;
-
- write_metrics.input_rows.add(input_batch.num_rows());
-
- partitioner.partition(
- input_batch,
- |output_partition, output_batch| {
- // partition func in datafusion make sure not
write empty output_batch.
- let timer = write_metrics.write_time.timer();
- match &mut writers[output_partition] {
- Some(w) => {
- w.num_batches += 1;
- w.num_rows += output_batch.num_rows();
- w.writer.write(&output_batch)?;
- }
- None => {
- let mut path = path.clone();
-
path.push(format!("{output_partition}"));
- std::fs::create_dir_all(&path)?;
-
- path.push(format!(
- "data-{input_partition}.arrow"
- ));
- debug!("Writing results to {path:?}");
-
- let options =
IpcWriteOptions::default()
- .try_with_compression(Some(
- CompressionType::LZ4_FRAME,
- ))?;
-
- let file =
-
BufWriter::new(File::create(path.clone())?);
- let mut writer =
- StreamWriter::try_new_with_options(
- file,
- stream.schema().as_ref(),
- options,
- )?;
-
- writer.write(&output_batch)?;
- writers[output_partition] =
Some(WriteTracker {
- num_batches: 1,
- num_rows: output_batch.num_rows(),
- writer,
- path,
- });
+ let schema = stream.schema();
+ let (tx, mut rx) =
tokio::sync::mpsc::channel::<RecordBatch>(2);
+ let write_time = write_metrics.write_time.clone();
+ let repart_time = write_metrics.repart_time.clone();
+ let output_rows = write_metrics.output_rows.clone();
+
+ let handle = tokio::task::spawn_blocking(move || {
+ let mut writers: Vec<Option<WriteTracker>> =
+ (0..num_output_partitions).map(|_| None).collect();
+ let mut partitioner =
BatchPartitioner::new_hash_partitioner(
+ exprs,
+ num_output_partitions,
+ repart_time,
+ );
+
+ while let Some(input_batch) = rx.blocking_recv() {
+ partitioner.partition(
+ input_batch,
+ |output_partition, output_batch| {
+ let timer = write_time.timer();
+ match &mut writers[output_partition] {
+ Some(w) => {
+ w.num_batches += 1;
+ w.num_rows +=
output_batch.num_rows();
+ w.writer.write(&output_batch)?;
+ }
+ None => {
+ let mut p = path.clone();
+
p.push(format!("{output_partition}"));
+ std::fs::create_dir_all(&p)?;
+ p.push(format!(
+ "data-{input_partition}.arrow"
+ ));
+ debug!("Writing results to {p:?}");
+
+ let options =
IpcWriteOptions::default()
+ .try_with_compression(Some(
+ CompressionType::LZ4_FRAME,
+ ))?;
+ let file =
+
BufWriter::new(File::create(p.clone())?);
+ let mut writer =
+
StreamWriter::try_new_with_options(
+ file,
+ schema.as_ref(),
+ options,
+ )?;
+ writer.write(&output_batch)?;
+ writers[output_partition] =
+ Some(WriteTracker {
+ num_batches: 1,
+ num_rows:
output_batch.num_rows(),
+ writer,
+ path: p,
+ });
+ }
}
- }
-
write_metrics.output_rows.add(output_batch.num_rows());
- timer.done();
- Ok(())
- },
- )?;
- }
+ output_rows.add(output_batch.num_rows());
+ timer.done();
+ Ok(())
+ },
+ )?;
+ }
- let mut part_locs = vec![];
-
- for (i, w) in writers.iter_mut().enumerate() {
- if let Some(w) = w {
- w.writer.finish()?;
- let num_bytes = fs::metadata(&w.path)?.len();
- debug!(
- "Finished writing shuffle partition {} at
{:?}. Batches: {}. Rows: {}. Bytes: {}.",
- i, w.path, w.num_batches, w.num_rows, num_bytes
- );
-
- part_locs.push(ShuffleWritePartition {
- partition_id: i as u64,
- path: w.path.to_string_lossy().to_string(),
- num_batches: w.num_batches as u64,
- num_rows: w.num_rows as u64,
- num_bytes,
- });
+ let mut part_locs = vec![];
+ for (i, w) in writers.iter_mut().enumerate() {
+ if let Some(w) = w {
+ w.writer.finish()?;
+ let num_bytes = fs::metadata(&w.path)?.len();
+ debug!(
+ "Finished writing shuffle partition {} at
{:?}. Batches: {}. Rows: {}. Bytes: {}.",
+ i, w.path, w.num_batches, w.num_rows,
num_bytes
+ );
+ part_locs.push(ShuffleWritePartition {
+ partition_id: i as u64,
+ path: w.path.to_string_lossy().to_string(),
+ num_batches: w.num_batches as u64,
+ num_rows: w.num_rows as u64,
+ num_bytes,
+ });
+ }
+ }
+ Ok(part_locs)
+ });
+
+ let stream_err = loop {
+ match stream.next().await {
+ Some(Ok(batch)) => {
+ write_metrics.input_rows.add(batch.num_rows());
+ if tx.send(batch).await.is_err() {
+ break None;
+ }
+ }
+ Some(Err(e)) => break Some(e),
+ None => break None,
}
+ };
+ drop(tx);
+
+ let write_result = handle.await.map_err(|e| {
+ DataFusionError::Execution(format!(
+ "Shuffle writer task failed: {e}"
+ ))
+ })?;
+ if let Some(e) = stream_err {
+ return Err(e);
Review Comment:
If `write_result` is also an Err then it is silently dropped.
Log it as a warning, at least ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]