hcrosse commented on code in PR #1537:
URL: 
https://github.com/apache/datafusion-ballista/pull/1537#discussion_r3030354675


##########
ballista/core/src/execution_plans/shuffle_writer.rs:
##########
@@ -255,96 +252,114 @@ impl ShuffleWriterExec {
                 }
 
                 Some(Partitioning::Hash(exprs, num_output_partitions)) => {
-                    // we won't necessary produce output for every possible 
partition, so we
-                    // create writers on demand
-                    let mut writers: Vec<Option<WriteTracker>> = vec![];
-                    for _ in 0..num_output_partitions {
-                        writers.push(None);
-                    }
-
-                    let mut partitioner = 
BatchPartitioner::new_hash_partitioner(
-                        exprs,
-                        num_output_partitions,
-                        write_metrics.repart_time.clone(),
-                    );
-
-                    while let Some(result) = stream.next().await {
-                        let input_batch = result?;
-
-                        write_metrics.input_rows.add(input_batch.num_rows());
-
-                        partitioner.partition(
-                            input_batch,
-                            |output_partition, output_batch| {
-                                // partition func in datafusion make sure not 
write empty output_batch.
-                                let timer = write_metrics.write_time.timer();
-                                match &mut writers[output_partition] {
-                                    Some(w) => {
-                                        w.num_batches += 1;
-                                        w.num_rows += output_batch.num_rows();
-                                        w.writer.write(&output_batch)?;
-                                    }
-                                    None => {
-                                        let mut path = path.clone();
-                                        
path.push(format!("{output_partition}"));
-                                        std::fs::create_dir_all(&path)?;
-
-                                        path.push(format!(
-                                            "data-{input_partition}.arrow"
-                                        ));
-                                        debug!("Writing results to {path:?}");
-
-                                        let options = 
IpcWriteOptions::default()
-                                            .try_with_compression(Some(
-                                                CompressionType::LZ4_FRAME,
-                                            ))?;
-
-                                        let file =
-                                            
BufWriter::new(File::create(path.clone())?);
-                                        let mut writer =
-                                            StreamWriter::try_new_with_options(
-                                                file,
-                                                stream.schema().as_ref(),
-                                                options,
-                                            )?;
-
-                                        writer.write(&output_batch)?;
-                                        writers[output_partition] = 
Some(WriteTracker {
-                                            num_batches: 1,
-                                            num_rows: output_batch.num_rows(),
-                                            writer,
-                                            path,
-                                        });
+                    let schema = stream.schema();
+                    let (tx, mut rx) = 
tokio::sync::mpsc::channel::<RecordBatch>(2);

Review Comment:
   I originally went with `2` just to be conservative, but ran some benchmarks 
and I bumped the channel capacity from 2 to 8, and made it configurable via 
`ShuffleWriterExec::with_channel_capacity()`. I also added `--channel-capacity` 
to the benchmark so this can be checked more easily. Happy to remove the 
configurability if you feel like that's overkill.
   
   For benchmarking, I ran capacity sweeps from 1 to 32 channels and compared 
against the baseline at a few concurrency levels (without much variance):
   
   | Cap | Median | Inv CS |
   |-----|--------|--------|
   | 1 | 217.0ms | 883 |
   | 2 | 203.3ms | 845 |
   | 4 | 202.6ms | 834 |
   | 8 | 199.2ms | 817 |
   | 16 | 203.0ms | 787 |
   | 32 | 200.2ms | 724 |
   
   | Mode | Median | Inv CS |
   |------|--------|--------|
   | Channel (cap=8) | 199.2ms | 817 |
   | Inline (no channel) | 199.6ms | 183 |
   
   There wasn't a huge difference at any level beyond going from 1 -> 2, except 
for larger capacity modestly reducing involuntary context switches (from 883 to 
724 from cap 1 to 32), which is consistent with fewer producer/consumer sync 
points.
   
   I also ran a load test with 8 concurrent shuffles on 4 tokio workers, 
measuring `tokio::time::sleep` jitter as a proxy for worker health. Inline I/O 
had p99 jitter of 120-160ms vs ~3-11ms with the channel.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to