hcrosse commented on code in PR #1537:
URL: 
https://github.com/apache/datafusion-ballista/pull/1537#discussion_r3028568797


##########
ballista/core/src/execution_plans/shuffle_writer.rs:
##########
@@ -255,96 +252,114 @@ impl ShuffleWriterExec {
                 }
 
                 Some(Partitioning::Hash(exprs, num_output_partitions)) => {
-                    // we won't necessary produce output for every possible 
partition, so we
-                    // create writers on demand
-                    let mut writers: Vec<Option<WriteTracker>> = vec![];
-                    for _ in 0..num_output_partitions {
-                        writers.push(None);
-                    }
-
-                    let mut partitioner = 
BatchPartitioner::new_hash_partitioner(
-                        exprs,
-                        num_output_partitions,
-                        write_metrics.repart_time.clone(),
-                    );
-
-                    while let Some(result) = stream.next().await {
-                        let input_batch = result?;
-
-                        write_metrics.input_rows.add(input_batch.num_rows());
-
-                        partitioner.partition(
-                            input_batch,
-                            |output_partition, output_batch| {
-                                // partition func in datafusion make sure not 
write empty output_batch.
-                                let timer = write_metrics.write_time.timer();
-                                match &mut writers[output_partition] {
-                                    Some(w) => {
-                                        w.num_batches += 1;
-                                        w.num_rows += output_batch.num_rows();
-                                        w.writer.write(&output_batch)?;
-                                    }
-                                    None => {
-                                        let mut path = path.clone();
-                                        
path.push(format!("{output_partition}"));
-                                        std::fs::create_dir_all(&path)?;
-
-                                        path.push(format!(
-                                            "data-{input_partition}.arrow"
-                                        ));
-                                        debug!("Writing results to {path:?}");
-
-                                        let options = 
IpcWriteOptions::default()
-                                            .try_with_compression(Some(
-                                                CompressionType::LZ4_FRAME,
-                                            ))?;
-
-                                        let file =
-                                            
BufWriter::new(File::create(path.clone())?);
-                                        let mut writer =
-                                            StreamWriter::try_new_with_options(
-                                                file,
-                                                stream.schema().as_ref(),
-                                                options,
-                                            )?;
-
-                                        writer.write(&output_batch)?;
-                                        writers[output_partition] = 
Some(WriteTracker {
-                                            num_batches: 1,
-                                            num_rows: output_batch.num_rows(),
-                                            writer,
-                                            path,
-                                        });
+                    let schema = stream.schema();
+                    let (tx, mut rx) = 
tokio::sync::mpsc::channel::<RecordBatch>(2);
+                    let write_time = write_metrics.write_time.clone();
+                    let repart_time = write_metrics.repart_time.clone();
+                    let output_rows = write_metrics.output_rows.clone();
+
+                    let handle = tokio::task::spawn_blocking(move || {
+                        let mut writers: Vec<Option<WriteTracker>> =
+                            (0..num_output_partitions).map(|_| None).collect();
+                        let mut partitioner = 
BatchPartitioner::new_hash_partitioner(
+                            exprs,
+                            num_output_partitions,
+                            repart_time,
+                        );
+
+                        while let Some(input_batch) = rx.blocking_recv() {
+                            partitioner.partition(
+                                input_batch,
+                                |output_partition, output_batch| {
+                                    let timer = write_time.timer();
+                                    match &mut writers[output_partition] {
+                                        Some(w) => {
+                                            w.num_batches += 1;
+                                            w.num_rows += 
output_batch.num_rows();
+                                            w.writer.write(&output_batch)?;
+                                        }
+                                        None => {
+                                            let mut p = path.clone();
+                                            
p.push(format!("{output_partition}"));
+                                            std::fs::create_dir_all(&p)?;
+                                            p.push(format!(
+                                                "data-{input_partition}.arrow"
+                                            ));
+                                            debug!("Writing results to {p:?}");
+
+                                            let options = 
IpcWriteOptions::default()
+                                                .try_with_compression(Some(
+                                                    CompressionType::LZ4_FRAME,
+                                                ))?;
+                                            let file =
+                                                
BufWriter::new(File::create(p.clone())?);
+                                            let mut writer =
+                                                
StreamWriter::try_new_with_options(
+                                                    file,
+                                                    schema.as_ref(),
+                                                    options,
+                                                )?;
+                                            writer.write(&output_batch)?;
+                                            writers[output_partition] =
+                                                Some(WriteTracker {
+                                                    num_batches: 1,
+                                                    num_rows: 
output_batch.num_rows(),
+                                                    writer,
+                                                    path: p,
+                                                });
+                                        }
                                     }
-                                }
-                                
write_metrics.output_rows.add(output_batch.num_rows());
-                                timer.done();
-                                Ok(())
-                            },
-                        )?;
-                    }
+                                    output_rows.add(output_batch.num_rows());
+                                    timer.done();
+                                    Ok(())
+                                },
+                            )?;
+                        }
 
-                    let mut part_locs = vec![];
-
-                    for (i, w) in writers.iter_mut().enumerate() {
-                        if let Some(w) = w {
-                            w.writer.finish()?;
-                            let num_bytes = fs::metadata(&w.path)?.len();
-                            debug!(
-                                "Finished writing shuffle partition {} at 
{:?}. Batches: {}. Rows: {}. Bytes: {}.",
-                                i, w.path, w.num_batches, w.num_rows, num_bytes
-                            );
-
-                            part_locs.push(ShuffleWritePartition {
-                                partition_id: i as u64,
-                                path: w.path.to_string_lossy().to_string(),
-                                num_batches: w.num_batches as u64,
-                                num_rows: w.num_rows as u64,
-                                num_bytes,
-                            });
+                        let mut part_locs = vec![];
+                        for (i, w) in writers.iter_mut().enumerate() {
+                            if let Some(w) = w {
+                                w.writer.finish()?;
+                                let num_bytes = fs::metadata(&w.path)?.len();
+                                debug!(
+                                    "Finished writing shuffle partition {} at 
{:?}. Batches: {}. Rows: {}. Bytes: {}.",
+                                    i, w.path, w.num_batches, w.num_rows, 
num_bytes
+                                );
+                                part_locs.push(ShuffleWritePartition {
+                                    partition_id: i as u64,
+                                    path: w.path.to_string_lossy().to_string(),
+                                    num_batches: w.num_batches as u64,
+                                    num_rows: w.num_rows as u64,
+                                    num_bytes,
+                                });
+                            }
+                        }
+                        Ok(part_locs)
+                    });
+
+                    let stream_err = loop {
+                        match stream.next().await {
+                            Some(Ok(batch)) => {
+                                write_metrics.input_rows.add(batch.num_rows());
+                                if tx.send(batch).await.is_err() {
+                                    break None;
+                                }
+                            }
+                            Some(Err(e)) => break Some(e),
+                            None => break None,
                         }
+                    };
+                    drop(tx);
+
+                    let write_result = handle.await.map_err(|e| {
+                        DataFusionError::Execution(format!(
+                            "Shuffle writer task failed: {e}"
+                        ))
+                    })?;
+                    if let Some(e) = stream_err {
+                        return Err(e);

Review Comment:
   Thanks for flagging - logging an `error` now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to