This is an automated email from the ASF dual-hosted git repository. milenkovicm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push: new 64ed7879 feat: disable task stage plan binary cache (#1266) 64ed7879 is described below commit 64ed787948667eda2cbf45f5e74866782fd27b13 Author: Marko Milenković <milenkov...@users.noreply.github.com> AuthorDate: Wed May 28 09:48:13 2025 +0100 feat: disable task stage plan binary cache (#1266) - in some cases stage plan should not be cached, as task plan may change, as there is no easy way to invalidate the cache, it can be optionally disabled --- ballista/scheduler/Cargo.toml | 3 ++ ballista/scheduler/src/state/task_manager.rs | 81 ++++++++++++++++------------ 2 files changed, 49 insertions(+), 35 deletions(-) diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml index c0078a67..0b56a9f6 100644 --- a/ballista/scheduler/Cargo.toml +++ b/ballista/scheduler/Cargo.toml @@ -37,6 +37,9 @@ required-features = ["build-binary"] [features] build-binary = ["configure_me", "clap", "tracing-subscriber", "tracing-appender", "tracing", "ballista-core/build-binary"] default = ["build-binary"] +# job info can cache stage plans, in some cases where +# task plans can be re-computed, cache behavior may need to be disabled. +disable-stage-plan-cache = [] graphviz-support = ["dep:graphviz-rust"] keda-scaler = [] prometheus-metrics = ["prometheus", "once_cell"] diff --git a/ballista/scheduler/src/state/task_manager.rs b/ballista/scheduler/src/state/task_manager.rs index c267aa71..6390273b 100644 --- a/ballista/scheduler/src/state/task_manager.rs +++ b/ballista/scheduler/src/state/task_manager.rs @@ -39,7 +39,8 @@ use dashmap::DashMap; use datafusion::physical_plan::ExecutionPlan; use datafusion_proto::logical_plan::AsLogicalPlan; -use datafusion_proto::physical_plan::AsExecutionPlan; +use datafusion_proto::physical_plan::{AsExecutionPlan, PhysicalExtensionCodec}; +use datafusion_proto::protobuf::PhysicalPlanNode; use log::{debug, error, info, trace, warn}; use rand::{rng, Rng}; use std::collections::{HashMap, HashSet}; @@ -125,6 +126,7 @@ pub struct JobInfoCache { pub execution_graph: Arc<RwLock<ExecutionGraph>>, // Cache for job status pub status: Option<job_status::Status>, + #[cfg(not(feature = "disable-stage-plan-cache"))] // Cache for encoded execution stage plan to avoid duplicated encoding for multiple tasks encoded_stage_plans: HashMap<usize, Vec<u8>>, } @@ -135,9 +137,42 @@ impl JobInfoCache { Self { execution_graph: Arc::new(RwLock::new(graph)), status, + #[cfg(not(feature = "disable-stage-plan-cache"))] encoded_stage_plans: HashMap::new(), } } + #[cfg(not(feature = "disable-stage-plan-cache"))] + fn encode_stage_plan<U: AsExecutionPlan>( + &mut self, + stage_id: usize, + plan: &Arc<dyn ExecutionPlan>, + codec: &dyn PhysicalExtensionCodec, + ) -> Result<Vec<u8>> { + if let Some(plan) = self.encoded_stage_plans.get(&stage_id) { + Ok(plan.clone()) + } else { + let mut plan_buf: Vec<u8> = vec![]; + let plan_proto = U::try_from_physical_plan(plan.clone(), codec)?; + plan_proto.try_encode(&mut plan_buf)?; + self.encoded_stage_plans.insert(stage_id, plan_buf.clone()); + + Ok(plan_buf) + } + } + + #[cfg(feature = "disable-stage-plan-cache")] + fn encode_stage_plan<U: AsExecutionPlan>( + &mut self, + _stage_id: usize, + plan: &Arc<dyn ExecutionPlan>, + codec: &dyn PhysicalExtensionCodec, + ) -> Result<Vec<u8>> { + let mut plan_buf: Vec<u8> = vec![]; + let plan_proto = U::try_from_physical_plan(plan.clone(), codec)?; + plan_proto.try_encode(&mut plan_buf)?; + + Ok(plan_buf) + } } #[derive(Clone)] @@ -222,7 +257,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U> info!("Submitting execution graph: {:?}", graph); self.state.submit_job(job_id.to_string(), &graph).await?; - graph.revive(); self.active_job_cache .insert(job_id.to_owned(), JobInfoCache::new(graph)); @@ -483,22 +517,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U> let stage_id = task.partition.stage_id; if let Some(mut job_info) = self.active_job_cache.get_mut(&job_id) { - let plan = if let Some(plan) = job_info.encoded_stage_plans.get(&stage_id) { - plan.clone() - } else { - let mut plan_buf: Vec<u8> = vec![]; - let plan_proto = U::try_from_physical_plan( - task.plan, - self.codec.physical_extension_codec(), - )?; - plan_proto.try_encode(&mut plan_buf)?; - - job_info - .encoded_stage_plans - .insert(stage_id, plan_buf.clone()); - - plan_buf - }; + let plan = job_info.encode_stage_plan::<PhysicalPlanNode>( + stage_id, + &task.plan, + self.codec.physical_extension_codec(), + )?; let task_definition = TaskDefinition { task_id: task.task_id as u32, @@ -569,23 +592,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U> } if let Some(mut job_info) = self.active_job_cache.get_mut(&job_id) { - let plan = if let Some(plan) = job_info.encoded_stage_plans.get(&stage_id) - { - plan.clone() - } else { - let mut plan_buf: Vec<u8> = vec![]; - let plan_proto = U::try_from_physical_plan( - task.plan.clone(), - self.codec.physical_extension_codec(), - )?; - plan_proto.try_encode(&mut plan_buf)?; - - job_info - .encoded_stage_plans - .insert(stage_id, plan_buf.clone()); - - plan_buf - }; + let plan = job_info.encode_stage_plan::<PhysicalPlanNode>( + stage_id, + &task.plan, + self.codec.physical_extension_codec(), + )?; let launch_time = SystemTime::now() .duration_since(UNIX_EPOCH) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org