This is an automated email from the ASF dual-hosted git repository. milenkovicm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push: new 6774c4bc feat: implement job data cleanup in pull-staged strategy #1219 (#1314) 6774c4bc is described below commit 6774c4bc368ccfa2df6f5bbfd3fa5d6baeaa7e1c Author: KR-bluejay <145690647+kr-blue...@users.noreply.github.com> AuthorDate: Fri Sep 12 03:49:32 2025 +0900 feat: implement job data cleanup in pull-staged strategy #1219 (#1314) * feat: add jobs_to_clean field to PollWorkResult for job cleanup (#1219) * feat(scheduler): return pending cleanup via poll_work (#1219) * feat(executor): handle jobs_to_clean and remove job data (#1219) * refactor(executor): depup job cleanup; switch to tokio::fs; keep behavior --------- Co-authored-by: KR-bluejay <kr.bluejay...@gmail.com> --- ballista/core/proto/ballista.proto | 9 +-- ballista/core/src/serde/generated/ballista.rs | 2 + ballista/executor/src/execution_loop.rs | 20 +++++- ballista/executor/src/executor_process.rs | 86 ++++++++++++++++++++++++ ballista/executor/src/executor_server.rs | 86 ++---------------------- ballista/scheduler/src/scheduler_server/grpc.rs | 14 +++- ballista/scheduler/src/state/executor_manager.rs | 73 ++++++++++++++------ 7 files changed, 180 insertions(+), 110 deletions(-) diff --git a/ballista/core/proto/ballista.proto b/ballista/core/proto/ballista.proto index 8acb7751..2c1ce3d9 100644 --- a/ballista/core/proto/ballista.proto +++ b/ballista/core/proto/ballista.proto @@ -290,7 +290,7 @@ message ExecutorMetadata { } -// Used for scheduler-executor +// Used for scheduler-executor // communication message ExecutorRegistration { string id = 1; @@ -466,6 +466,7 @@ message JobSessionConfig { message PollWorkResult { repeated TaskDefinition tasks = 1; + repeated CleanJobDataParams jobs_to_clean = 2; } message RegisterExecutorParams { @@ -523,11 +524,11 @@ message ExecuteQueryParams { bytes logical_plan = 1; string sql = 2 [deprecated=true]; // I'd suggest to remove this, if SQL needed use `flight-sql` } - + string session_id = 3; repeated KeyValuePair settings = 4; - // operation_id is unique number for each request - // client makes. it helps mapping requests between + // operation_id is unique number for each request + // client makes. it helps mapping requests between // client and scheduler string operation_id = 5; } diff --git a/ballista/core/src/serde/generated/ballista.rs b/ballista/core/src/serde/generated/ballista.rs index 8809051c..5e13645e 100644 --- a/ballista/core/src/serde/generated/ballista.rs +++ b/ballista/core/src/serde/generated/ballista.rs @@ -720,6 +720,8 @@ pub struct JobSessionConfig { pub struct PollWorkResult { #[prost(message, repeated, tag = "1")] pub tasks: ::prost::alloc::vec::Vec<TaskDefinition>, + #[prost(message, repeated, tag = "2")] + pub jobs_to_clean: ::prost::alloc::vec::Vec<CleanJobDataParams>, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct RegisterExecutorParams { diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs index 67a694e6..b1eae202 100644 --- a/ballista/executor/src/execution_loop.rs +++ b/ballista/executor/src/execution_loop.rs @@ -17,6 +17,7 @@ use crate::cpu_bound_executor::DedicatedExecutor; use crate::executor::Executor; +use crate::executor_process::remove_job_dir; use crate::{as_task_status, TaskExecutionTimes}; use ballista_core::error::BallistaError; use ballista_core::extension::SessionConfigHelperExt; @@ -88,9 +89,26 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> match poll_work_result { Ok(result) => { - let tasks = result.into_inner().tasks; + let PollWorkResult { + tasks, + jobs_to_clean, + } = result.into_inner(); active_job = !tasks.is_empty(); + // Clean up any state related to the listed jobs + for cleanup in jobs_to_clean { + let job_id = cleanup.job_id.clone(); + let work_dir = executor.work_dir.clone(); + + // In poll-based cleanup, removing job data is fire-and-forget. + // Failures here do not affect task execution and are only logged. + tokio::spawn(async move { + if let Err(e) = remove_job_dir(&work_dir, &job_id).await { + error!("failed to remove job dir {job_id}: {e}"); + } + }); + } + for task in tasks { let task_status_sender = task_status_sender.clone(); diff --git a/ballista/executor/src/executor_process.rs b/ballista/executor/src/executor_process.rs index 6e8ccb62..3d67fd61 100644 --- a/ballista/executor/src/executor_process.rs +++ b/ballista/executor/src/executor_process.rs @@ -18,6 +18,7 @@ //! Ballista Executor Process use std::net::SocketAddr; +use std::path::{Path, PathBuf}; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::{Duration, Instant, UNIX_EPOCH}; @@ -579,6 +580,53 @@ async fn clean_all_shuffle_data(work_dir: &str) -> ballista_core::error::Result< Ok(()) } +/// Remove a job directory under work_dir. +/// Used by both push-based (gRPC handler) and pull-based (poll loop) cleanup. +pub(crate) async fn remove_job_dir( + work_dir: &str, + job_id: &str, +) -> ballista_core::error::Result<()> { + let work_path = PathBuf::from(&work_dir); + let job_path = work_path.join(job_id); + + // Match legacy behavior: If the job path does not exist, return OK + if !tokio::fs::try_exists(&job_path).await.unwrap_or(false) { + return Ok(()); + } + + if !job_path.is_dir() { + return Err(BallistaError::General(format!( + "Path {job_path:?} is not a directory" + ))); + } else if !is_subdirectory(job_path.as_path(), work_path.as_path()) { + return Err(BallistaError::General(format!( + "Path {job_path:?} is not a subdirectory of {work_path:?}" + ))); + } + + info!("Remove data for job {:?}", job_id); + + tokio::fs::remove_dir_all(&job_path).await.map_err(|e| { + BallistaError::General(format!("Failed to remove {job_path:?} due to {e}")) + })?; + + Ok(()) +} + +// Check whether the path is the subdirectory of the base directory +pub(crate) fn is_subdirectory(path: &Path, base_path: &Path) -> bool { + let path = match path.canonicalize() { + Ok(p) => p, + Err(_) => return false, + }; + let base = match base_path.canonicalize() { + Ok(b) => b, + Err(_) => return false, + }; + + path.parent().is_some_and(|p| p.starts_with(&base)) +} + /// Determines if a directory contains files newer than the cutoff time. /// If return true, it means the directory contains files newer than the cutoff time. It satisfy the ttl and should not be deleted. pub async fn satisfy_dir_ttl( @@ -624,6 +672,9 @@ pub async fn satisfy_dir_ttl( #[cfg(test)] mod tests { + use crate::executor_process::is_subdirectory; + use std::path::{Path, PathBuf}; + use super::clean_shuffle_data_loop; use std::fs; use std::fs::File; @@ -691,4 +742,39 @@ mod tests { assert!(config.override_arrow_flight_service.is_some()); } + + #[tokio::test] + async fn test_is_subdirectory() { + let base_dir = TempDir::new().unwrap(); + let base_dir = base_dir.path(); + + // Normal correct one + { + let job_path = prepare_testing_job_directory(base_dir, "job_a"); + assert!(is_subdirectory(&job_path, base_dir)); + } + + // Empty job id + { + let job_path = prepare_testing_job_directory(base_dir, ""); + assert!(!is_subdirectory(&job_path, base_dir)); + + let job_path = prepare_testing_job_directory(base_dir, "."); + assert!(!is_subdirectory(&job_path, base_dir)); + } + + // Malicious job id + { + let job_path = prepare_testing_job_directory(base_dir, ".."); + assert!(!is_subdirectory(&job_path, base_dir)); + } + } + fn prepare_testing_job_directory(base_dir: &Path, job_id: &str) -> PathBuf { + let mut path = base_dir.to_path_buf(); + path.push(job_id); + if !path.exists() { + fs::create_dir(&path).unwrap(); + } + path + } } diff --git a/ballista/executor/src/executor_server.rs b/ballista/executor/src/executor_server.rs index 7173a5e6..8ca90c3d 100644 --- a/ballista/executor/src/executor_server.rs +++ b/ballista/executor/src/executor_server.rs @@ -18,7 +18,6 @@ use ballista_core::BALLISTA_VERSION; use std::collections::HashMap; use std::convert::TryInto; -use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -53,7 +52,7 @@ use tokio::task::JoinHandle; use crate::cpu_bound_executor::DedicatedExecutor; use crate::executor::Executor; -use crate::executor_process::ExecutorProcessConfig; +use crate::executor_process::{remove_job_dir, ExecutorProcessConfig}; use crate::shutdown::ShutdownNotifier; use crate::{as_task_status, TaskExecutionTimes}; @@ -729,88 +728,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorGrpc ) -> Result<Response<RemoveJobDataResult>, Status> { let job_id = request.into_inner().job_id; - let work_dir = PathBuf::from(&self.executor.work_dir); - let mut path = work_dir.clone(); - path.push(&job_id); - - // Verify it's an existing directory - if !path.is_dir() { - return if !path.exists() { - Ok(Response::new(RemoveJobDataResult {})) - } else { - Err(Status::invalid_argument(format!( - "Path {path:?} is not for a directory!!!" - ))) - }; - } - - if !is_subdirectory(path.as_path(), work_dir.as_path()) { - return Err(Status::invalid_argument(format!( - "Path {path:?} is not a subdirectory of {work_dir:?}!!!" - ))); - } - - info!("Remove data for job {:?}", job_id); - - std::fs::remove_dir_all(&path)?; + remove_job_dir(&self.executor.work_dir, &job_id) + .await + .map_err(|e| Status::invalid_argument(e.to_string()))?; Ok(Response::new(RemoveJobDataResult {})) } } -// Check whether the path is the subdirectory of the base directory -fn is_subdirectory(path: &Path, base_path: &Path) -> bool { - if let (Ok(path), Ok(base_path)) = (path.canonicalize(), base_path.canonicalize()) { - if let Some(parent_path) = path.parent() { - parent_path.starts_with(base_path) - } else { - false - } - } else { - false - } -} - #[cfg(test)] -mod test { - use crate::executor_server::is_subdirectory; - use std::fs; - use std::path::{Path, PathBuf}; - use tempfile::TempDir; - - #[tokio::test] - async fn test_is_subdirectory() { - let base_dir = TempDir::new().unwrap(); - let base_dir = base_dir.path(); - - // Normal correct one - { - let job_path = prepare_testing_job_directory(base_dir, "job_a"); - assert!(is_subdirectory(&job_path, base_dir)); - } - - // Empty job id - { - let job_path = prepare_testing_job_directory(base_dir, ""); - assert!(!is_subdirectory(&job_path, base_dir)); - - let job_path = prepare_testing_job_directory(base_dir, "."); - assert!(!is_subdirectory(&job_path, base_dir)); - } - - // Malicious job id - { - let job_path = prepare_testing_job_directory(base_dir, ".."); - assert!(!is_subdirectory(&job_path, base_dir)); - } - } - - fn prepare_testing_job_directory(base_dir: &Path, job_id: &str) -> PathBuf { - let mut path = base_dir.to_path_buf(); - path.push(job_id); - if !path.exists() { - fs::create_dir(&path).unwrap(); - } - path - } -} +mod test {} diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs index e9337d9c..86133842 100644 --- a/ballista/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/scheduler/src/scheduler_server/grpc.rs @@ -104,7 +104,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc })?; let mut available_slots = [AvailableTaskSlots { - executor_id, + executor_id: executor_id.clone(), slots: num_free_slots, }]; let available_slots = available_slots.iter_mut().collect(); @@ -141,7 +141,17 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc } } } - Ok(Response::new(PollWorkResult { tasks })) + let jobs_to_clean = self + .state + .executor_manager + .drain_pending_cleanup_jobs(&executor_id) + .into_iter() + .map(|job_id| CleanJobDataParams { job_id }) + .collect(); + Ok(Response::new(PollWorkResult { + tasks, + jobs_to_clean, + })) } else { warn!("Received invalid executor poll_work request"); Err(Status::invalid_argument("Missing metadata in request")) diff --git a/ballista/scheduler/src/state/executor_manager.rs b/ballista/scheduler/src/state/executor_manager.rs index 1b4d0614..9f3e2f5d 100644 --- a/ballista/scheduler/src/state/executor_manager.rs +++ b/ballista/scheduler/src/state/executor_manager.rs @@ -47,6 +47,7 @@ pub struct ExecutorManager { cluster_state: Arc<dyn ClusterState>, config: Arc<SchedulerConfig>, clients: ExecutorClients, + pending_cleanup_jobs: Arc<DashMap<String, HashSet<String>>>, } impl ExecutorManager { @@ -58,6 +59,7 @@ impl ExecutorManager { cluster_state, config, clients: Default::default(), + pending_cleanup_jobs: Default::default(), } } @@ -163,26 +165,36 @@ impl ExecutorManager { }); } - /// Send rpc to Executors to clean up the job data + /// 1. Push strategy: Send rpc to Executors to clean up the job data + /// 2. Poll strategy: Save cleanup job ids and send them to executors async fn clean_up_job_data_inner(&self, job_id: String) { let alive_executors = self.get_alive_executors(); + for executor in alive_executors { let job_id_clone = job_id.to_owned(); - if let Ok(mut client) = self.get_client(&executor).await { - tokio::spawn(async move { - if let Err(err) = client - .remove_job_data(RemoveJobDataParams { - job_id: job_id_clone, - }) - .await - { - warn!( - "Failed to call remove_job_data on Executor {executor} due to {err:?}" - ) - } - }); + + if self.config.is_push_staged_scheduling() { + if let Ok(mut client) = self.get_client(&executor).await { + tokio::spawn(async move { + if let Err(err) = client + .remove_job_data(RemoveJobDataParams { + job_id: job_id_clone, + }) + .await + { + warn!( + "Failed to call remove_job_data on Executor {executor} due to {err:?}" + ) + } + }); + } else { + warn!("Failed to get client for Executor {executor}") + } } else { - warn!("Failed to get client for Executor {executor}") + self.pending_cleanup_jobs + .entry(executor) + .or_default() + .insert(job_id.clone()); } } } @@ -308,6 +320,16 @@ impl ExecutorManager { Ok(()) } + pub(crate) fn drain_pending_cleanup_jobs( + &self, + executor_id: &str, + ) -> HashSet<String> { + self.pending_cleanup_jobs + .remove(executor_id) + .map(|(_, jobs)| jobs) + .unwrap_or_default() + } + pub(crate) async fn save_executor_heartbeat( &self, heartbeat: ExecutorHeartbeat, @@ -341,13 +363,20 @@ impl ExecutorManager { .executor_heartbeats() .iter() .filter_map(|(exec, heartbeat)| { - let active = matches!( - heartbeat - .status - .as_ref() - .and_then(|status| status.status.as_ref()), - Some(executor_status::Status::Active(_)) - ); + let active = match heartbeat + .status + .as_ref() + .and_then(|status| status.status.as_ref()) + { + Some(executor_status::Status::Active(_)) => true, + Some(executor_status::Status::Terminating(_)) + | Some(executor_status::Status::Dead(_)) => false, + None => { + // If config is poll-based scheduling, treat executors with no status as active + !self.config.is_push_staged_scheduling() + } + _ => false, + }; let live = heartbeat.timestamp > last_seen_ts_threshold; (active && live).then(|| exec.clone()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org