milenkovicm commented on code in PR #1389:
URL: 
https://github.com/apache/datafusion-ballista/pull/1389#discussion_r2702710118


##########
ballista/core/src/execution_plans/sort_shuffle/spill.rs:
##########
@@ -0,0 +1,313 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Spill manager for sort-based shuffle.
+//!
+//! Handles writing partition buffers to disk when memory pressure is high,
+//! and reading them back during the finalization phase.
+
+use crate::error::{BallistaError, Result};
+use datafusion::arrow::datatypes::SchemaRef;
+use datafusion::arrow::ipc::reader::StreamReader;
+use datafusion::arrow::ipc::writer::StreamWriter;
+use datafusion::arrow::ipc::{CompressionType, writer::IpcWriteOptions};
+use datafusion::arrow::record_batch::RecordBatch;
+use log::debug;
+use std::collections::HashMap;
+use std::fs::File;
+use std::io::BufWriter;
+use std::path::PathBuf;
+
+/// Manages spill files for sort-based shuffle.
+///
+/// When partition buffers exceed memory limits, they are spilled to disk
+/// as Arrow IPC files. During finalization, these spill files are read
+/// back and merged into the consolidated output file.
+#[derive(Debug)]
+pub struct SpillManager {
+    /// Base directory for spill files
+    spill_dir: PathBuf,
+    /// Spill files per output partition: partition_id -> Vec<spill_file_path>
+    spill_files: HashMap<usize, Vec<PathBuf>>,
+    /// Counter for generating unique spill file names
+    spill_counter: usize,
+    /// Compression codec for spill files
+    compression: CompressionType,
+    /// Total number of spills performed
+    total_spills: usize,
+    /// Total bytes spilled
+    total_bytes_spilled: u64,
+}
+
+impl SpillManager {
+    /// Creates a new spill manager.
+    ///
+    /// # Arguments
+    /// * `work_dir` - Base work directory
+    /// * `job_id` - Job identifier
+    /// * `stage_id` - Stage identifier
+    /// * `input_partition` - Input partition number
+    /// * `compression` - Compression codec for spill files
+    pub fn new(
+        work_dir: &str,
+        job_id: &str,
+        stage_id: usize,
+        input_partition: usize,
+        compression: CompressionType,
+    ) -> Result<Self> {
+        let mut spill_dir = PathBuf::from(work_dir);
+        spill_dir.push(job_id);
+        spill_dir.push(format!("{stage_id}"));
+        spill_dir.push(format!("{input_partition}"));
+        spill_dir.push("spill");
+
+        // Create spill directory
+        std::fs::create_dir_all(&spill_dir).map_err(BallistaError::IoError)?;
+
+        Ok(Self {
+            spill_dir,
+            spill_files: HashMap::new(),
+            spill_counter: 0,
+            compression,
+            total_spills: 0,
+            total_bytes_spilled: 0,
+        })
+    }
+
+    /// Spills batches for a partition to disk.
+    ///
+    /// Returns the number of bytes written.
+    pub fn spill(
+        &mut self,
+        partition_id: usize,
+        batches: Vec<RecordBatch>,
+        schema: &SchemaRef,
+    ) -> Result<u64> {
+        if batches.is_empty() {
+            return Ok(0);
+        }
+
+        let spill_path = self.next_spill_path(partition_id);
+        debug!(
+            "Spilling {} batches for partition {} to {:?}",
+            batches.len(),
+            partition_id,
+            spill_path
+        );
+
+        let file = File::create(&spill_path).map_err(BallistaError::IoError)?;
+        let buffered = BufWriter::new(file);
+
+        let options =
+            
IpcWriteOptions::default().try_with_compression(Some(self.compression))?;
+
+        let mut writer = StreamWriter::try_new_with_options(buffered, schema, 
options)?;
+
+        for batch in &batches {
+            writer.write(batch)?;
+        }
+
+        writer.finish()?;
+
+        let bytes_written = std::fs::metadata(&spill_path)
+            .map_err(BallistaError::IoError)?
+            .len();
+
+        // Track the spill file
+        self.spill_files
+            .entry(partition_id)
+            .or_default()
+            .push(spill_path);
+
+        self.total_spills += 1;
+        self.total_bytes_spilled += bytes_written;
+
+        Ok(bytes_written)
+    }
+
+    /// Returns the spill files for a partition.
+    pub fn get_spill_files(&self, partition_id: usize) -> &[PathBuf] {
+        self.spill_files
+            .get(&partition_id)
+            .map(|v| v.as_slice())
+            .unwrap_or(&[])
+    }
+
+    /// Returns true if the partition has spill files.
+    pub fn has_spill_files(&self, partition_id: usize) -> bool {
+        self.spill_files
+            .get(&partition_id)
+            .is_some_and(|v| !v.is_empty())
+    }
+
+    /// Reads all spill files for a partition and returns the batches.
+    pub fn read_spill_files(&self, partition_id: usize) -> 
Result<Vec<RecordBatch>> {

Review Comment:
   Do we need to keep this as vector, perhaps just return `StreamReader`? 



##########
ballista/executor/src/flight_service.rs:
##########
@@ -95,8 +99,43 @@ impl FlightService for BallistaFlightService {
             decode_protobuf(&ticket.ticket).map_err(|e| 
from_ballista_err(&e))?;
 
         match &action {
-            BallistaAction::FetchPartition { path, .. } => {
-                debug!("FetchPartition reading {path}");
+            BallistaAction::FetchPartition {
+                path, partition_id, ..
+            } => {
+                debug!("FetchPartition reading partition {partition_id} from 
{path}");
+                let data_path = Path::new(path);
+
+                // Check if this is a sort-based shuffle output
+                if is_sort_shuffle_output(data_path) {
+                    debug!("Detected sort-based shuffle format for {path}");
+                    let index_path = get_index_path(data_path);
+                    let stream = stream_sort_shuffle_partition(
+                        data_path,
+                        &index_path,
+                        *partition_id,
+                    )
+                    .map_err(|e| from_ballista_err(&e))?;
+
+                    let schema = stream.schema();
+                    // Map DataFusionError to FlightError
+                    let stream =
+                        stream.map_err(|e| 
FlightError::from(ArrowError::from(e)));
+
+                    let write_options: IpcWriteOptions = 
IpcWriteOptions::default()
+                        .try_with_compression(Some(CompressionType::LZ4_FRAME))
+                        .map_err(|e| from_arrow_err(&e))?;
+                    let flight_data_stream = FlightDataEncoderBuilder::new()
+                        .with_schema(schema)
+                        .with_options(write_options)
+                        .build(stream)
+                        .map_err(|err| Status::from_error(Box::new(err)));
+
+                    return Ok(Response::new(
+                        Box::pin(flight_data_stream) as Self::DoGetStream
+                    ));
+                }

Review Comment:
   nitpicking, consider adding else branch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to