This is an automated email from the ASF dual-hosted git repository.

milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 6774c4bc feat: implement job data cleanup in pull-staged strategy 
#1219 (#1314)
6774c4bc is described below

commit 6774c4bc368ccfa2df6f5bbfd3fa5d6baeaa7e1c
Author: KR-bluejay <145690647+kr-blue...@users.noreply.github.com>
AuthorDate: Fri Sep 12 03:49:32 2025 +0900

    feat: implement job data cleanup in pull-staged strategy #1219 (#1314)
    
    * feat: add jobs_to_clean field to PollWorkResult for job cleanup (#1219)
    
    * feat(scheduler): return pending cleanup via poll_work (#1219)
    
    * feat(executor): handle jobs_to_clean and remove job data (#1219)
    
    * refactor(executor): depup job cleanup; switch to tokio::fs; keep behavior
    
    ---------
    
    Co-authored-by: KR-bluejay <kr.bluejay...@gmail.com>
---
 ballista/core/proto/ballista.proto               |  9 +--
 ballista/core/src/serde/generated/ballista.rs    |  2 +
 ballista/executor/src/execution_loop.rs          | 20 +++++-
 ballista/executor/src/executor_process.rs        | 86 ++++++++++++++++++++++++
 ballista/executor/src/executor_server.rs         | 86 ++----------------------
 ballista/scheduler/src/scheduler_server/grpc.rs  | 14 +++-
 ballista/scheduler/src/state/executor_manager.rs | 73 ++++++++++++++------
 7 files changed, 180 insertions(+), 110 deletions(-)

diff --git a/ballista/core/proto/ballista.proto 
b/ballista/core/proto/ballista.proto
index 8acb7751..2c1ce3d9 100644
--- a/ballista/core/proto/ballista.proto
+++ b/ballista/core/proto/ballista.proto
@@ -290,7 +290,7 @@ message ExecutorMetadata {
 }
 
 
-// Used for scheduler-executor 
+// Used for scheduler-executor
 // communication
 message ExecutorRegistration {
   string id = 1;
@@ -466,6 +466,7 @@ message JobSessionConfig {
 
 message PollWorkResult {
   repeated TaskDefinition tasks = 1;
+  repeated CleanJobDataParams jobs_to_clean = 2;
 }
 
 message RegisterExecutorParams {
@@ -523,11 +524,11 @@ message ExecuteQueryParams {
     bytes logical_plan = 1;
     string sql = 2 [deprecated=true]; // I'd suggest to remove this, if SQL 
needed use `flight-sql`
   }
-  
+
   string session_id = 3;
   repeated KeyValuePair settings = 4;
-  // operation_id is unique number for each request 
-  // client makes. it helps mapping requests between 
+  // operation_id is unique number for each request
+  // client makes. it helps mapping requests between
   // client and scheduler
   string operation_id = 5;
 }
diff --git a/ballista/core/src/serde/generated/ballista.rs 
b/ballista/core/src/serde/generated/ballista.rs
index 8809051c..5e13645e 100644
--- a/ballista/core/src/serde/generated/ballista.rs
+++ b/ballista/core/src/serde/generated/ballista.rs
@@ -720,6 +720,8 @@ pub struct JobSessionConfig {
 pub struct PollWorkResult {
     #[prost(message, repeated, tag = "1")]
     pub tasks: ::prost::alloc::vec::Vec<TaskDefinition>,
+    #[prost(message, repeated, tag = "2")]
+    pub jobs_to_clean: ::prost::alloc::vec::Vec<CleanJobDataParams>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct RegisterExecutorParams {
diff --git a/ballista/executor/src/execution_loop.rs 
b/ballista/executor/src/execution_loop.rs
index 67a694e6..b1eae202 100644
--- a/ballista/executor/src/execution_loop.rs
+++ b/ballista/executor/src/execution_loop.rs
@@ -17,6 +17,7 @@
 
 use crate::cpu_bound_executor::DedicatedExecutor;
 use crate::executor::Executor;
+use crate::executor_process::remove_job_dir;
 use crate::{as_task_status, TaskExecutionTimes};
 use ballista_core::error::BallistaError;
 use ballista_core::extension::SessionConfigHelperExt;
@@ -88,9 +89,26 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 
'static + AsExecutionPlan>
 
         match poll_work_result {
             Ok(result) => {
-                let tasks = result.into_inner().tasks;
+                let PollWorkResult {
+                    tasks,
+                    jobs_to_clean,
+                } = result.into_inner();
                 active_job = !tasks.is_empty();
 
+                // Clean up any state related to the listed jobs
+                for cleanup in jobs_to_clean {
+                    let job_id = cleanup.job_id.clone();
+                    let work_dir = executor.work_dir.clone();
+
+                    // In poll-based cleanup, removing job data is 
fire-and-forget.
+                    // Failures here do not affect task execution and are only 
logged.
+                    tokio::spawn(async move {
+                        if let Err(e) = remove_job_dir(&work_dir, 
&job_id).await {
+                            error!("failed to remove job dir {job_id}: {e}");
+                        }
+                    });
+                }
+
                 for task in tasks {
                     let task_status_sender = task_status_sender.clone();
 
diff --git a/ballista/executor/src/executor_process.rs 
b/ballista/executor/src/executor_process.rs
index 6e8ccb62..3d67fd61 100644
--- a/ballista/executor/src/executor_process.rs
+++ b/ballista/executor/src/executor_process.rs
@@ -18,6 +18,7 @@
 //! Ballista Executor Process
 
 use std::net::SocketAddr;
+use std::path::{Path, PathBuf};
 use std::sync::atomic::Ordering;
 use std::sync::Arc;
 use std::time::{Duration, Instant, UNIX_EPOCH};
@@ -579,6 +580,53 @@ async fn clean_all_shuffle_data(work_dir: &str) -> 
ballista_core::error::Result<
     Ok(())
 }
 
+/// Remove a job directory under work_dir.
+/// Used by both push-based (gRPC handler) and pull-based (poll loop) cleanup.
+pub(crate) async fn remove_job_dir(
+    work_dir: &str,
+    job_id: &str,
+) -> ballista_core::error::Result<()> {
+    let work_path = PathBuf::from(&work_dir);
+    let job_path = work_path.join(job_id);
+
+    // Match legacy behavior: If the job path does not exist, return OK
+    if !tokio::fs::try_exists(&job_path).await.unwrap_or(false) {
+        return Ok(());
+    }
+
+    if !job_path.is_dir() {
+        return Err(BallistaError::General(format!(
+            "Path {job_path:?} is not a directory"
+        )));
+    } else if !is_subdirectory(job_path.as_path(), work_path.as_path()) {
+        return Err(BallistaError::General(format!(
+            "Path {job_path:?} is not a subdirectory of {work_path:?}"
+        )));
+    }
+
+    info!("Remove data for job {:?}", job_id);
+
+    tokio::fs::remove_dir_all(&job_path).await.map_err(|e| {
+        BallistaError::General(format!("Failed to remove {job_path:?} due to 
{e}"))
+    })?;
+
+    Ok(())
+}
+
+// Check whether the path is the subdirectory of the base directory
+pub(crate) fn is_subdirectory(path: &Path, base_path: &Path) -> bool {
+    let path = match path.canonicalize() {
+        Ok(p) => p,
+        Err(_) => return false,
+    };
+    let base = match base_path.canonicalize() {
+        Ok(b) => b,
+        Err(_) => return false,
+    };
+
+    path.parent().is_some_and(|p| p.starts_with(&base))
+}
+
 /// Determines if a directory contains files newer than the cutoff time.
 /// If return true, it means the directory contains files newer than the 
cutoff time. It satisfy the ttl and should not be deleted.
 pub async fn satisfy_dir_ttl(
@@ -624,6 +672,9 @@ pub async fn satisfy_dir_ttl(
 
 #[cfg(test)]
 mod tests {
+    use crate::executor_process::is_subdirectory;
+    use std::path::{Path, PathBuf};
+
     use super::clean_shuffle_data_loop;
     use std::fs;
     use std::fs::File;
@@ -691,4 +742,39 @@ mod tests {
 
         assert!(config.override_arrow_flight_service.is_some());
     }
+
+    #[tokio::test]
+    async fn test_is_subdirectory() {
+        let base_dir = TempDir::new().unwrap();
+        let base_dir = base_dir.path();
+
+        // Normal correct one
+        {
+            let job_path = prepare_testing_job_directory(base_dir, "job_a");
+            assert!(is_subdirectory(&job_path, base_dir));
+        }
+
+        // Empty job id
+        {
+            let job_path = prepare_testing_job_directory(base_dir, "");
+            assert!(!is_subdirectory(&job_path, base_dir));
+
+            let job_path = prepare_testing_job_directory(base_dir, ".");
+            assert!(!is_subdirectory(&job_path, base_dir));
+        }
+
+        // Malicious job id
+        {
+            let job_path = prepare_testing_job_directory(base_dir, "..");
+            assert!(!is_subdirectory(&job_path, base_dir));
+        }
+    }
+    fn prepare_testing_job_directory(base_dir: &Path, job_id: &str) -> PathBuf 
{
+        let mut path = base_dir.to_path_buf();
+        path.push(job_id);
+        if !path.exists() {
+            fs::create_dir(&path).unwrap();
+        }
+        path
+    }
 }
diff --git a/ballista/executor/src/executor_server.rs 
b/ballista/executor/src/executor_server.rs
index 7173a5e6..8ca90c3d 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -18,7 +18,6 @@
 use ballista_core::BALLISTA_VERSION;
 use std::collections::HashMap;
 use std::convert::TryInto;
-use std::path::{Path, PathBuf};
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
 use std::time::{Duration, SystemTime, UNIX_EPOCH};
@@ -53,7 +52,7 @@ use tokio::task::JoinHandle;
 
 use crate::cpu_bound_executor::DedicatedExecutor;
 use crate::executor::Executor;
-use crate::executor_process::ExecutorProcessConfig;
+use crate::executor_process::{remove_job_dir, ExecutorProcessConfig};
 use crate::shutdown::ShutdownNotifier;
 use crate::{as_task_status, TaskExecutionTimes};
 
@@ -729,88 +728,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> ExecutorGrpc
     ) -> Result<Response<RemoveJobDataResult>, Status> {
         let job_id = request.into_inner().job_id;
 
-        let work_dir = PathBuf::from(&self.executor.work_dir);
-        let mut path = work_dir.clone();
-        path.push(&job_id);
-
-        // Verify it's an existing directory
-        if !path.is_dir() {
-            return if !path.exists() {
-                Ok(Response::new(RemoveJobDataResult {}))
-            } else {
-                Err(Status::invalid_argument(format!(
-                    "Path {path:?} is not for a directory!!!"
-                )))
-            };
-        }
-
-        if !is_subdirectory(path.as_path(), work_dir.as_path()) {
-            return Err(Status::invalid_argument(format!(
-                "Path {path:?} is not a subdirectory of {work_dir:?}!!!"
-            )));
-        }
-
-        info!("Remove data for job {:?}", job_id);
-
-        std::fs::remove_dir_all(&path)?;
+        remove_job_dir(&self.executor.work_dir, &job_id)
+            .await
+            .map_err(|e| Status::invalid_argument(e.to_string()))?;
 
         Ok(Response::new(RemoveJobDataResult {}))
     }
 }
 
-// Check whether the path is the subdirectory of the base directory
-fn is_subdirectory(path: &Path, base_path: &Path) -> bool {
-    if let (Ok(path), Ok(base_path)) = (path.canonicalize(), 
base_path.canonicalize()) {
-        if let Some(parent_path) = path.parent() {
-            parent_path.starts_with(base_path)
-        } else {
-            false
-        }
-    } else {
-        false
-    }
-}
-
 #[cfg(test)]
-mod test {
-    use crate::executor_server::is_subdirectory;
-    use std::fs;
-    use std::path::{Path, PathBuf};
-    use tempfile::TempDir;
-
-    #[tokio::test]
-    async fn test_is_subdirectory() {
-        let base_dir = TempDir::new().unwrap();
-        let base_dir = base_dir.path();
-
-        // Normal correct one
-        {
-            let job_path = prepare_testing_job_directory(base_dir, "job_a");
-            assert!(is_subdirectory(&job_path, base_dir));
-        }
-
-        // Empty job id
-        {
-            let job_path = prepare_testing_job_directory(base_dir, "");
-            assert!(!is_subdirectory(&job_path, base_dir));
-
-            let job_path = prepare_testing_job_directory(base_dir, ".");
-            assert!(!is_subdirectory(&job_path, base_dir));
-        }
-
-        // Malicious job id
-        {
-            let job_path = prepare_testing_job_directory(base_dir, "..");
-            assert!(!is_subdirectory(&job_path, base_dir));
-        }
-    }
-
-    fn prepare_testing_job_directory(base_dir: &Path, job_id: &str) -> PathBuf 
{
-        let mut path = base_dir.to_path_buf();
-        path.push(job_id);
-        if !path.exists() {
-            fs::create_dir(&path).unwrap();
-        }
-        path
-    }
-}
+mod test {}
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs 
b/ballista/scheduler/src/scheduler_server/grpc.rs
index e9337d9c..86133842 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -104,7 +104,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
                 })?;
 
             let mut available_slots = [AvailableTaskSlots {
-                executor_id,
+                executor_id: executor_id.clone(),
                 slots: num_free_slots,
             }];
             let available_slots = available_slots.iter_mut().collect();
@@ -141,7 +141,17 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
                     }
                 }
             }
-            Ok(Response::new(PollWorkResult { tasks }))
+            let jobs_to_clean = self
+                .state
+                .executor_manager
+                .drain_pending_cleanup_jobs(&executor_id)
+                .into_iter()
+                .map(|job_id| CleanJobDataParams { job_id })
+                .collect();
+            Ok(Response::new(PollWorkResult {
+                tasks,
+                jobs_to_clean,
+            }))
         } else {
             warn!("Received invalid executor poll_work request");
             Err(Status::invalid_argument("Missing metadata in request"))
diff --git a/ballista/scheduler/src/state/executor_manager.rs 
b/ballista/scheduler/src/state/executor_manager.rs
index 1b4d0614..9f3e2f5d 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -47,6 +47,7 @@ pub struct ExecutorManager {
     cluster_state: Arc<dyn ClusterState>,
     config: Arc<SchedulerConfig>,
     clients: ExecutorClients,
+    pending_cleanup_jobs: Arc<DashMap<String, HashSet<String>>>,
 }
 
 impl ExecutorManager {
@@ -58,6 +59,7 @@ impl ExecutorManager {
             cluster_state,
             config,
             clients: Default::default(),
+            pending_cleanup_jobs: Default::default(),
         }
     }
 
@@ -163,26 +165,36 @@ impl ExecutorManager {
         });
     }
 
-    /// Send rpc to Executors to clean up the job data
+    /// 1. Push strategy: Send rpc to Executors to clean up the job data
+    /// 2. Poll strategy: Save cleanup job ids and send them to executors
     async fn clean_up_job_data_inner(&self, job_id: String) {
         let alive_executors = self.get_alive_executors();
+
         for executor in alive_executors {
             let job_id_clone = job_id.to_owned();
-            if let Ok(mut client) = self.get_client(&executor).await {
-                tokio::spawn(async move {
-                    if let Err(err) = client
-                        .remove_job_data(RemoveJobDataParams {
-                            job_id: job_id_clone,
-                        })
-                        .await
-                    {
-                        warn!(
-                            "Failed to call remove_job_data on Executor 
{executor} due to {err:?}"
-                        )
-                    }
-                });
+
+            if self.config.is_push_staged_scheduling() {
+                if let Ok(mut client) = self.get_client(&executor).await {
+                    tokio::spawn(async move {
+                        if let Err(err) = client
+                            .remove_job_data(RemoveJobDataParams {
+                                job_id: job_id_clone,
+                            })
+                            .await
+                        {
+                            warn!(
+                                    "Failed to call remove_job_data on 
Executor {executor} due to {err:?}"
+                                )
+                        }
+                    });
+                } else {
+                    warn!("Failed to get client for Executor {executor}")
+                }
             } else {
-                warn!("Failed to get client for Executor {executor}")
+                self.pending_cleanup_jobs
+                    .entry(executor)
+                    .or_default()
+                    .insert(job_id.clone());
             }
         }
     }
@@ -308,6 +320,16 @@ impl ExecutorManager {
         Ok(())
     }
 
+    pub(crate) fn drain_pending_cleanup_jobs(
+        &self,
+        executor_id: &str,
+    ) -> HashSet<String> {
+        self.pending_cleanup_jobs
+            .remove(executor_id)
+            .map(|(_, jobs)| jobs)
+            .unwrap_or_default()
+    }
+
     pub(crate) async fn save_executor_heartbeat(
         &self,
         heartbeat: ExecutorHeartbeat,
@@ -341,13 +363,20 @@ impl ExecutorManager {
             .executor_heartbeats()
             .iter()
             .filter_map(|(exec, heartbeat)| {
-                let active = matches!(
-                    heartbeat
-                        .status
-                        .as_ref()
-                        .and_then(|status| status.status.as_ref()),
-                    Some(executor_status::Status::Active(_))
-                );
+                let active = match heartbeat
+                    .status
+                    .as_ref()
+                    .and_then(|status| status.status.as_ref())
+                {
+                    Some(executor_status::Status::Active(_)) => true,
+                    Some(executor_status::Status::Terminating(_))
+                    | Some(executor_status::Status::Dead(_)) => false,
+                    None => {
+                        // If config is poll-based scheduling, treat executors 
with no status as active
+                        !self.config.is_push_staged_scheduling()
+                    }
+                    _ => false,
+                };
                 let live = heartbeat.timestamp > last_seen_ts_threshold;
 
                 (active && live).then(|| exec.clone())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to