This is an automated email from the ASF dual-hosted git repository.

milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 64ed7879 feat: disable task stage plan binary cache (#1266)
64ed7879 is described below

commit 64ed787948667eda2cbf45f5e74866782fd27b13
Author: Marko Milenković <milenkov...@users.noreply.github.com>
AuthorDate: Wed May 28 09:48:13 2025 +0100

    feat: disable task stage plan binary cache (#1266)
    
    - in some cases stage plan should not be cached,
      as task plan may change, as there is no easy way
      to invalidate the cache, it can be optionally
      disabled
---
 ballista/scheduler/Cargo.toml                |  3 ++
 ballista/scheduler/src/state/task_manager.rs | 81 ++++++++++++++++------------
 2 files changed, 49 insertions(+), 35 deletions(-)

diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index c0078a67..0b56a9f6 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -37,6 +37,9 @@ required-features = ["build-binary"]
 [features]
 build-binary = ["configure_me", "clap", "tracing-subscriber", 
"tracing-appender", "tracing", "ballista-core/build-binary"]
 default = ["build-binary"]
+# job info can cache stage plans, in some cases where 
+# task plans can be re-computed, cache behavior may need to be disabled.
+disable-stage-plan-cache = []
 graphviz-support = ["dep:graphviz-rust"]
 keda-scaler = []
 prometheus-metrics = ["prometheus", "once_cell"]
diff --git a/ballista/scheduler/src/state/task_manager.rs 
b/ballista/scheduler/src/state/task_manager.rs
index c267aa71..6390273b 100644
--- a/ballista/scheduler/src/state/task_manager.rs
+++ b/ballista/scheduler/src/state/task_manager.rs
@@ -39,7 +39,8 @@ use dashmap::DashMap;
 
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion_proto::logical_plan::AsLogicalPlan;
-use datafusion_proto::physical_plan::AsExecutionPlan;
+use datafusion_proto::physical_plan::{AsExecutionPlan, PhysicalExtensionCodec};
+use datafusion_proto::protobuf::PhysicalPlanNode;
 use log::{debug, error, info, trace, warn};
 use rand::{rng, Rng};
 use std::collections::{HashMap, HashSet};
@@ -125,6 +126,7 @@ pub struct JobInfoCache {
     pub execution_graph: Arc<RwLock<ExecutionGraph>>,
     // Cache for job status
     pub status: Option<job_status::Status>,
+    #[cfg(not(feature = "disable-stage-plan-cache"))]
     // Cache for encoded execution stage plan to avoid duplicated encoding for 
multiple tasks
     encoded_stage_plans: HashMap<usize, Vec<u8>>,
 }
@@ -135,9 +137,42 @@ impl JobInfoCache {
         Self {
             execution_graph: Arc::new(RwLock::new(graph)),
             status,
+            #[cfg(not(feature = "disable-stage-plan-cache"))]
             encoded_stage_plans: HashMap::new(),
         }
     }
+    #[cfg(not(feature = "disable-stage-plan-cache"))]
+    fn encode_stage_plan<U: AsExecutionPlan>(
+        &mut self,
+        stage_id: usize,
+        plan: &Arc<dyn ExecutionPlan>,
+        codec: &dyn PhysicalExtensionCodec,
+    ) -> Result<Vec<u8>> {
+        if let Some(plan) = self.encoded_stage_plans.get(&stage_id) {
+            Ok(plan.clone())
+        } else {
+            let mut plan_buf: Vec<u8> = vec![];
+            let plan_proto = U::try_from_physical_plan(plan.clone(), codec)?;
+            plan_proto.try_encode(&mut plan_buf)?;
+            self.encoded_stage_plans.insert(stage_id, plan_buf.clone());
+
+            Ok(plan_buf)
+        }
+    }
+
+    #[cfg(feature = "disable-stage-plan-cache")]
+    fn encode_stage_plan<U: AsExecutionPlan>(
+        &mut self,
+        _stage_id: usize,
+        plan: &Arc<dyn ExecutionPlan>,
+        codec: &dyn PhysicalExtensionCodec,
+    ) -> Result<Vec<u8>> {
+        let mut plan_buf: Vec<u8> = vec![];
+        let plan_proto = U::try_from_physical_plan(plan.clone(), codec)?;
+        plan_proto.try_encode(&mut plan_buf)?;
+
+        Ok(plan_buf)
+    }
 }
 
 #[derive(Clone)]
@@ -222,7 +257,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         info!("Submitting execution graph: {:?}", graph);
 
         self.state.submit_job(job_id.to_string(), &graph).await?;
-
         graph.revive();
         self.active_job_cache
             .insert(job_id.to_owned(), JobInfoCache::new(graph));
@@ -483,22 +517,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         let stage_id = task.partition.stage_id;
 
         if let Some(mut job_info) = self.active_job_cache.get_mut(&job_id) {
-            let plan = if let Some(plan) = 
job_info.encoded_stage_plans.get(&stage_id) {
-                plan.clone()
-            } else {
-                let mut plan_buf: Vec<u8> = vec![];
-                let plan_proto = U::try_from_physical_plan(
-                    task.plan,
-                    self.codec.physical_extension_codec(),
-                )?;
-                plan_proto.try_encode(&mut plan_buf)?;
-
-                job_info
-                    .encoded_stage_plans
-                    .insert(stage_id, plan_buf.clone());
-
-                plan_buf
-            };
+            let plan = job_info.encode_stage_plan::<PhysicalPlanNode>(
+                stage_id,
+                &task.plan,
+                self.codec.physical_extension_codec(),
+            )?;
 
             let task_definition = TaskDefinition {
                 task_id: task.task_id as u32,
@@ -569,23 +592,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
             }
 
             if let Some(mut job_info) = self.active_job_cache.get_mut(&job_id) 
{
-                let plan = if let Some(plan) = 
job_info.encoded_stage_plans.get(&stage_id)
-                {
-                    plan.clone()
-                } else {
-                    let mut plan_buf: Vec<u8> = vec![];
-                    let plan_proto = U::try_from_physical_plan(
-                        task.plan.clone(),
-                        self.codec.physical_extension_codec(),
-                    )?;
-                    plan_proto.try_encode(&mut plan_buf)?;
-
-                    job_info
-                        .encoded_stage_plans
-                        .insert(stage_id, plan_buf.clone());
-
-                    plan_buf
-                };
+                let plan = job_info.encode_stage_plan::<PhysicalPlanNode>(
+                    stage_id,
+                    &task.plan,
+                    self.codec.physical_extension_codec(),
+                )?;
 
                 let launch_time = SystemTime::now()
                     .duration_since(UNIX_EPOCH)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to