This is an automated email from the ASF dual-hosted git repository.

milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new ec3f1be84 execution_graph extract to a trait (#1361)
ec3f1be84 is described below

commit ec3f1be840ac72600e4a111cc65970b0e31d6e1d
Author: Marko Milenković <[email protected]>
AuthorDate: Mon Feb 2 10:06:41 2026 +0000

    execution_graph extract to a trait (#1361)
---
 ballista/scheduler/src/cluster/memory.rs           |   33 +-
 ballista/scheduler/src/cluster/mod.rs              |   29 +-
 ballista/scheduler/src/cluster/test_util/mod.rs    |   12 +-
 ballista/scheduler/src/state/execution_graph.rs    | 1135 +++++++++++---------
 .../scheduler/src/state/execution_graph_dot.rs     |   16 +-
 ballista/scheduler/src/state/task_manager.rs       |   39 +-
 ballista/scheduler/src/test_utils.rs               |   39 +-
 7 files changed, 716 insertions(+), 587 deletions(-)

diff --git a/ballista/scheduler/src/cluster/memory.rs 
b/ballista/scheduler/src/cluster/memory.rs
index a1db74ecb..87ef6709f 100644
--- a/ballista/scheduler/src/cluster/memory.rs
+++ b/ballista/scheduler/src/cluster/memory.rs
@@ -21,7 +21,7 @@ use crate::cluster::{
     bind_task_consistent_hash, bind_task_round_robin, get_scan_files,
     is_skip_consistent_hash,
 };
-use crate::state::execution_graph::ExecutionGraph;
+use crate::state::execution_graph::ExecutionGraphBox;
 use async_trait::async_trait;
 use ballista_core::ConfigProducer;
 use ballista_core::error::{BallistaError, Result};
@@ -347,7 +347,7 @@ impl ClusterState for InMemoryClusterState {
 pub struct InMemoryJobState {
     scheduler: String,
     /// Jobs which have either completed successfully or failed
-    completed_jobs: DashMap<String, (JobStatus, Option<ExecutionGraph>)>,
+    completed_jobs: DashMap<String, (JobStatus, Option<ExecutionGraphBox>)>,
     /// In-memory store of queued jobs. Map from Job ID -> (Job Name, 
queued_at timestamp)
     queued_jobs: DashMap<String, (String, u64)>,
     /// In-memory store of running job statuses. Map from Job ID -> JobStatus
@@ -382,7 +382,7 @@ impl InMemoryJobState {
 
 #[async_trait]
 impl JobState for InMemoryJobState {
-    async fn submit_job(&self, job_id: String, graph: &ExecutionGraph) -> 
Result<()> {
+    async fn submit_job(&self, job_id: String, graph: &ExecutionGraphBox) -> 
Result<()> {
         if self.queued_jobs.get(&job_id).is_some() {
             self.running_jobs
                 .insert(job_id.clone(), graph.status().clone());
@@ -423,21 +423,24 @@ impl JobState for InMemoryJobState {
         Ok(None)
     }
 
-    async fn get_execution_graph(&self, job_id: &str) -> 
Result<Option<ExecutionGraph>> {
+    async fn get_execution_graph(
+        &self,
+        job_id: &str,
+    ) -> Result<Option<ExecutionGraphBox>> {
         Ok(self
             .completed_jobs
             .get(job_id)
             .as_deref()
-            .and_then(|(_, graph)| graph.clone()))
+            .and_then(|(_, graph)| graph.as_ref().map(|e| e.cloned())))
     }
 
-    async fn try_acquire_job(&self, _job_id: &str) -> 
Result<Option<ExecutionGraph>> {
+    async fn try_acquire_job(&self, _job_id: &str) -> 
Result<Option<ExecutionGraphBox>> {
         // Always return None. The only state stored here are for completed 
jobs
         // which cannot be acquired
         Ok(None)
     }
 
-    async fn save_job(&self, job_id: &str, graph: &ExecutionGraph) -> 
Result<()> {
+    async fn save_job(&self, job_id: &str, graph: &ExecutionGraphBox) -> 
Result<()> {
         let status = graph.status().clone();
 
         debug!("saving state for job {job_id} with status {:?}", status);
@@ -448,7 +451,7 @@ impl JobState for InMemoryJobState {
             Some(Status::Successful(_)) | Some(Status::Failed(_))
         ) {
             self.completed_jobs
-                .insert(job_id.to_string(), (status.clone(), 
Some(graph.clone())));
+                .insert(job_id.to_string(), (status.clone(), 
Some(graph.cloned())));
             self.running_jobs.remove(job_id);
         } else {
             // otherwise update running job
@@ -576,7 +579,7 @@ mod test {
                 Arc::new(default_session_builder),
                 Arc::new(default_config_producer),
             ),
-            test_aggregation_plan(4).await,
+            Box::new(test_aggregation_plan(4).await),
         )
         .await?;
         test_job_lifecycle(
@@ -585,7 +588,7 @@ mod test {
                 Arc::new(default_session_builder),
                 Arc::new(default_config_producer),
             ),
-            test_two_aggregations_plan(4).await,
+            Box::new(test_two_aggregations_plan(4).await),
         )
         .await?;
         test_job_lifecycle(
@@ -594,7 +597,7 @@ mod test {
                 Arc::new(default_session_builder),
                 Arc::new(default_config_producer),
             ),
-            test_join_plan(4).await,
+            Box::new(test_join_plan(4).await),
         )
         .await?;
 
@@ -609,7 +612,7 @@ mod test {
                 Arc::new(default_session_builder),
                 Arc::new(default_config_producer),
             ),
-            test_aggregation_plan(4).await,
+            Box::new(test_aggregation_plan(4).await),
         )
         .await?;
         test_job_planning_failure(
@@ -618,7 +621,7 @@ mod test {
                 Arc::new(default_session_builder),
                 Arc::new(default_config_producer),
             ),
-            test_two_aggregations_plan(4).await,
+            Box::new(test_two_aggregations_plan(4).await),
         )
         .await?;
         test_job_planning_failure(
@@ -627,7 +630,7 @@ mod test {
                 Arc::new(default_session_builder),
                 Arc::new(default_config_producer),
             ),
-            test_join_plan(4).await,
+            Box::new(test_join_plan(4).await),
         )
         .await?;
 
@@ -654,7 +657,7 @@ mod test {
         });
 
         barrier.wait().await;
-        test_job_lifecycle(state, test_aggregation_plan(4).await).await?;
+        test_job_lifecycle(state, 
Box::new(test_aggregation_plan(4).await)).await?;
         let result = events.await?;
         assert_eq!(2, result.len());
         match result.last().unwrap() {
diff --git a/ballista/scheduler/src/cluster/mod.rs 
b/ballista/scheduler/src/cluster/mod.rs
index 7b1f4d27a..5d01091cc 100644
--- a/ballista/scheduler/src/cluster/mod.rs
+++ b/ballista/scheduler/src/cluster/mod.rs
@@ -43,7 +43,9 @@ use crate::cluster::memory::{InMemoryClusterState, 
InMemoryJobState};
 
 use crate::config::{SchedulerConfig, TaskDistributionPolicy};
 use crate::scheduler_server::SessionBuilder;
-use crate::state::execution_graph::{ExecutionGraph, TaskDescription, 
create_task_info};
+use crate::state::execution_graph::{
+    ExecutionGraphBox, TaskDescription, create_task_info,
+};
 use crate::state::task_manager::JobInfoCache;
 
 /// Event broadcasting and subscription for cluster state changes.
@@ -299,7 +301,7 @@ pub trait JobState: Send + Sync {
     /// Submits a new job to the job state.
     ///
     /// The submitter is assumed to own the job.
-    async fn submit_job(&self, job_id: String, graph: &ExecutionGraph) -> 
Result<()>;
+    async fn submit_job(&self, job_id: String, graph: &ExecutionGraphBox) -> 
Result<()>;
 
     /// Returns the set of all active job IDs.
     async fn get_jobs(&self) -> Result<HashSet<String>>;
@@ -311,12 +313,15 @@ pub trait JobState: Send + Sync {
     ///
     /// The job may not belong to the caller, and the graph may be updated
     /// by another scheduler after this call returns.
-    async fn get_execution_graph(&self, job_id: &str) -> 
Result<Option<ExecutionGraph>>;
+    async fn get_execution_graph(
+        &self,
+        job_id: &str,
+    ) -> Result<Option<ExecutionGraphBox>>;
 
     /// Persists the current state of an owned job.
     ///
     /// Returns an error if the job is not owned by the caller.
-    async fn save_job(&self, job_id: &str, graph: &ExecutionGraph) -> 
Result<()>;
+    async fn save_job(&self, job_id: &str, graph: &ExecutionGraphBox) -> 
Result<()>;
 
     /// Marks an unscheduled job as failed.
     ///
@@ -330,7 +335,7 @@ pub trait JobState: Send + Sync {
     ///
     /// Returns the execution graph if the job is still running and 
successfully acquired,
     /// otherwise returns None.
-    async fn try_acquire_job(&self, job_id: &str) -> 
Result<Option<ExecutionGraph>>;
+    async fn try_acquire_job(&self, job_id: &str) -> 
Result<Option<ExecutionGraphBox>>;
 
     /// Returns a stream of job state events.
     async fn job_state_events(&self) -> Result<JobStateEventStream>;
@@ -764,7 +769,7 @@ mod test {
         BoundTask, TopologyNode, bind_task_bias, bind_task_consistent_hash,
         bind_task_round_robin,
     };
-    use crate::state::execution_graph::ExecutionGraph;
+    use crate::state::execution_graph::{ExecutionGraph, StaticExecutionGraph};
     use crate::state::task_manager::JobInfoCache;
     use crate::test_utils::{
         mock_completed_task, revive_graph_and_complete_next_stage,
@@ -995,8 +1000,14 @@ mod test {
         let graph_b = mock_graph("job_b", num_partition, 7).await?;
 
         let mut active_jobs = HashMap::new();
-        active_jobs.insert(graph_a.job_id().to_string(), 
JobInfoCache::new(graph_a));
-        active_jobs.insert(graph_b.job_id().to_string(), 
JobInfoCache::new(graph_b));
+        active_jobs.insert(
+            graph_a.job_id().to_string(),
+            JobInfoCache::new(Box::new(graph_a)),
+        );
+        active_jobs.insert(
+            graph_b.job_id().to_string(),
+            JobInfoCache::new(Box::new(graph_b)),
+        );
 
         Ok(active_jobs)
     }
@@ -1005,7 +1016,7 @@ mod test {
         job_id: &str,
         num_target_partitions: usize,
         num_pending_task: usize,
-    ) -> Result<ExecutionGraph> {
+    ) -> Result<StaticExecutionGraph> {
         let mut graph =
             test_aggregation_plan_with_job_id(num_target_partitions, 
job_id).await;
         let executor = ExecutorMetadata {
diff --git a/ballista/scheduler/src/cluster/test_util/mod.rs 
b/ballista/scheduler/src/cluster/test_util/mod.rs
index 625f4afad..d9e960004 100644
--- a/ballista/scheduler/src/cluster/test_util/mod.rs
+++ b/ballista/scheduler/src/cluster/test_util/mod.rs
@@ -17,7 +17,7 @@
 
 use crate::cluster::{JobState, JobStateEvent};
 use crate::scheduler_server::timestamp_millis;
-use crate::state::execution_graph::ExecutionGraph;
+use crate::state::execution_graph::ExecutionGraphBox;
 use crate::test_utils::{await_condition, mock_completed_task, mock_executor};
 use ballista_core::error::Result;
 use ballista_core::serde::protobuf::JobStatus;
@@ -87,7 +87,7 @@ impl<S: JobState> JobStateTest<S> {
     }
 
     /// Submits a job with the given execution graph.
-    pub async fn submit_job(self, graph: &ExecutionGraph) -> Result<Self> {
+    pub async fn submit_job(self, graph: &ExecutionGraphBox) -> Result<Self> {
         self.state
             .submit_job(graph.job_id().to_string(), graph)
             .await?;
@@ -113,7 +113,7 @@ impl<S: JobState> JobStateTest<S> {
     }
 
     /// Updates the job with the given execution graph.
-    pub async fn update_job(self, graph: &ExecutionGraph) -> Result<Self> {
+    pub async fn update_job(self, graph: &ExecutionGraphBox) -> Result<Self> {
         self.state.save_job(graph.job_id(), graph).await?;
         Ok(self)
     }
@@ -172,7 +172,7 @@ impl<S: JobState> JobStateTest<S> {
 /// Tests the complete job lifecycle from queued to successful.
 pub async fn test_job_lifecycle<S: JobState>(
     state: S,
-    mut graph: ExecutionGraph,
+    mut graph: ExecutionGraphBox,
 ) -> Result<()> {
     let test = JobStateTest::new(state).await?;
 
@@ -201,7 +201,7 @@ pub async fn test_job_lifecycle<S: JobState>(
 /// Tests job failure during the planning phase.
 pub async fn test_job_planning_failure<S: JobState>(
     state: S,
-    graph: ExecutionGraph,
+    graph: ExecutionGraphBox,
 ) -> Result<()> {
     let test = JobStateTest::new(state).await?;
 
@@ -216,7 +216,7 @@ pub async fn test_job_planning_failure<S: JobState>(
     Ok(())
 }
 
-fn drain_tasks(graph: &mut ExecutionGraph) -> Result<()> {
+fn drain_tasks(graph: &mut ExecutionGraphBox) -> Result<()> {
     let executor = mock_executor("executor-id1".to_string());
     while let Some(task) = graph.pop_next_task(&executor.id)? {
         let task_status = mock_completed_task(task, &executor.id);
diff --git a/ballista/scheduler/src/state/execution_graph.rs 
b/ballista/scheduler/src/state/execution_graph.rs
index 8d07ae022..f37b92944 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -52,6 +52,9 @@ pub(crate) use crate::state::execution_stage::{
 };
 use crate::state::task_manager::UpdatedStages;
 
+/// Boxed [ExecutionGraph]
+pub type ExecutionGraphBox = Box<dyn ExecutionGraph + Send + Sync>;
+
 /// Represents the DAG for a distributed query plan.
 ///
 /// A distributed query plan consists of a set of stages which must be 
executed sequentially.
@@ -96,8 +99,141 @@ use crate::state::task_manager::UpdatedStages;
 ///
 /// If a stage has `output_links` is empty then it is the final stage in this 
query, and it should
 /// publish its outputs to the `ExecutionGraph`s `output_locations` 
representing the final query results.
+pub trait ExecutionGraph: Debug {
+    /// Returns the job ID for this execution graph.
+    fn job_id(&self) -> &str;
+
+    /// Returns the job name for this execution graph.
+    fn job_name(&self) -> &str;
+
+    /// Returns the session ID associated with this job.
+    fn session_id(&self) -> &str;
+
+    /// Returns the current job status.
+    fn status(&self) -> &JobStatus;
+
+    /// Returns the timestamp when this job started execution.
+    fn start_time(&self) -> u64;
+
+    /// Returns the timestamp when this job started execution.
+    fn end_time(&self) -> u64;
+
+    /// Number of completed stages
+    fn completed_stages(&self) -> usize;
+
+    /// An ExecutionGraph is successful if all its stages are successful
+    fn is_successful(&self) -> bool;
+
+    /// Revive the execution graph by converting the resolved stages to 
running stages
+    /// If any stages are converted, return true; else false.
+    fn revive(&mut self) -> bool;
+
+    /// Update task statuses and task metrics in the graph.
+    /// This will also push shuffle partitions to their respective shuffle 
read stages.
+    fn update_task_status(
+        &mut self,
+        executor: &ExecutorMetadata,
+        task_statuses: Vec<TaskStatus>,
+        max_task_failures: usize,
+        max_stage_failures: usize,
+    ) -> Result<Vec<QueryStageSchedulerEvent>>;
+
+    /// Returns all the currently running stage IDs.
+    fn running_stages(&self) -> Vec<usize>;
+
+    /// Returns all currently running tasks along with the executor ID on 
which they are assigned.
+    fn running_tasks(&self) -> Vec<RunningTaskInfo>;
+
+    /// Returns the total number of tasks in this plan that are ready for 
scheduling.
+    fn available_tasks(&self) -> usize;
+
+    /// Fetches a running stage that has available tasks, excluding stages in 
the blacklist.
+    ///
+    /// Returns a mutable reference to the running stage and the task ID 
generator
+    /// if a suitable stage is found.
+    fn fetch_running_stage(
+        &mut self,
+        black_list: &[usize],
+    ) -> Option<(&mut RunningStage, &mut usize)>;
+
+    /// Updates the job status.
+    fn update_status(&mut self, status: JobStatus);
+
+    /// Returns the output partition locations for the final stage results.
+    fn output_locations(&self) -> Vec<PartitionLocation>;
+
+    /// Reset running and successful stages on a given executor
+    /// This will first check the unresolved/resolved/running stages and reset 
the running tasks and successful tasks.
+    /// Then it will check the successful stage and whether there are running 
parent stages need to read shuffle from it.
+    /// If yes, reset the successful tasks and roll back the resolved shuffle 
recursively.
+    ///
+    /// Returns the reset stage ids and running tasks should be killed
+    fn reset_stages_on_lost_executor(
+        &mut self,
+        executor_id: &str,
+    ) -> Result<(HashSet<usize>, Vec<RunningTaskInfo>)>;
+
+    /// Converts an unresolved stage to resolved state.
+    ///
+    /// Returns true if the stage was successfully resolved, false if the stage
+    /// was not found or not in unresolved state.
+    fn resolve_stage(&mut self, stage_id: usize) -> Result<bool>;
+
+    /// Converts a running stage to successful state.
+    ///
+    /// Returns true if the stage was successfully marked as complete.
+    fn succeed_stage(&mut self, stage_id: usize) -> bool;
+
+    /// Converts a running stage to failed state with the given error message.
+    ///
+    /// Returns true if the stage was found and marked as failed.
+    fn fail_stage(&mut self, stage_id: usize, err_msg: String) -> bool;
+
+    /// Convert running stage to be unresolved,
+    /// Returns a Vec of RunningTaskInfo for running tasks in this stage.
+    fn rollback_running_stage(
+        &mut self,
+        stage_id: usize,
+        failure_reasons: HashSet<String>,
+    ) -> Result<Vec<RunningTaskInfo>>;
+
+    /// Convert resolved stage to be unresolved
+    fn rollback_resolved_stage(&mut self, stage_id: usize) -> Result<bool>;
+
+    /// Converts a successful stage back to running state for re-execution.
+    ///
+    /// This is used when some outputs from the stage have been lost and tasks
+    /// need to be re-run.
+    fn rerun_successful_stage(&mut self, stage_id: usize) -> bool;
+
+    /// fail job with error message
+    fn fail_job(&mut self, error: String);
+
+    /// Marks the job as successfully completed.
+    ///
+    /// This should only be called after all stages have completed 
successfully.
+    /// Returns an error if the job is not in a successful state.
+    fn succeed_job(&mut self) -> Result<()>;
+
+    /// Exposes executions stages and stage id's
+    fn stages(&self) -> &HashMap<usize, ExecutionStage>;
+
+    /// returns next task to run
+    /// (used for testing only)
+    #[cfg(test)]
+    fn pop_next_task(&mut self, executor_id: &str) -> 
Result<Option<TaskDescription>>;
+
+    /// Returns the total number of stages in this execution graph.
+    fn stage_count(&self) -> usize;
+
+    /// Clones execution graph
+    fn cloned(&self) -> ExecutionGraphBox;
+}
+
+/// [ExecutionGraph] implementation which generates
+/// all stages on job submission time
 #[derive(Clone)]
-pub struct ExecutionGraph {
+pub struct StaticExecutionGraph {
     /// Curator scheduler name. Can be `None` is `ExecutionGraph` is not 
currently curated by any scheduler
     #[allow(dead_code)] // not used at the moment, will be used later
     scheduler_id: Option<String>,
@@ -117,9 +253,7 @@ pub struct ExecutionGraph {
     end_time: u64,
     /// Map from Stage ID -> ExecutionStage
     stages: HashMap<usize, ExecutionStage>,
-    /// Total number fo output partitions
-    #[allow(dead_code)] // not used at the moment, will be used later
-    output_partitions: usize,
+
     /// Locations of this `ExecutionGraph` final output locations
     output_locations: Vec<PartitionLocation>,
     /// Task ID generator, generate unique TID in the execution graph
@@ -149,7 +283,7 @@ pub struct RunningTaskInfo {
     pub executor_id: String,
 }
 
-impl ExecutionGraph {
+impl StaticExecutionGraph {
     /// Creates a new `ExecutionGraph` from a physical execution plan.
     ///
     /// This will use the `DistributedPlanner` to break the plan into stages
@@ -165,7 +299,6 @@ impl ExecutionGraph {
         session_config: Arc<SessionConfig>,
         planner: &mut dyn DistributedPlanner,
     ) -> Result<Self> {
-        let output_partitions = 
plan.properties().output_partitioning().partition_count();
         let shuffle_stages =
             planner.plan_query_stages(job_id, plan, session_config.options())?;
 
@@ -193,7 +326,6 @@ impl ExecutionGraph {
             start_time: started_at,
             end_time: 0,
             stages,
-            output_partitions,
             output_locations: vec![],
             task_id_gen: 0,
             failed_stage_attempts: HashMap::new(),
@@ -201,101 +333,372 @@ impl ExecutionGraph {
         })
     }
 
-    /// Returns the job ID for this execution graph.
-    pub fn job_id(&self) -> &str {
-        self.job_id.as_str()
+    #[cfg(test)]
+    fn next_task_id(&mut self) -> usize {
+        let new_tid = self.task_id_gen;
+        self.task_id_gen += 1;
+        new_tid
     }
 
-    /// Returns the job name for this execution graph.
-    pub fn job_name(&self) -> &str {
-        self.job_name.as_str()
-    }
+    /// Processing stage status update after task status changing
+    fn processing_stages_update(
+        &mut self,
+        updated_stages: UpdatedStages,
+    ) -> Result<Vec<QueryStageSchedulerEvent>> {
+        let job_id = self.job_id().to_owned();
+        let mut has_resolved = false;
+        let mut job_err_msg = "".to_owned();
 
-    /// Returns the session ID associated with this job.
-    pub fn session_id(&self) -> &str {
-        self.session_id.as_str()
-    }
+        for stage_id in updated_stages.resolved_stages {
+            self.resolve_stage(stage_id)?;
+            has_resolved = true;
+        }
 
-    /// Returns the current job status.
-    pub fn status(&self) -> &JobStatus {
-        &self.status
-    }
+        for stage_id in updated_stages.successful_stages {
+            self.succeed_stage(stage_id);
+        }
 
-    /// Returns the timestamp when this job started execution.
-    pub fn start_time(&self) -> u64 {
-        self.start_time
-    }
+        // Fail the stage and also abort the job
+        for (stage_id, err_msg) in &updated_stages.failed_stages {
+            job_err_msg =
+                format!("Job failed due to stage {stage_id} failed: 
{err_msg}\n");
+        }
 
-    /// Returns the timestamp when this job completed (0 if still running).
-    pub fn end_time(&self) -> u64 {
-        self.end_time
-    }
+        let mut events = vec![];
+        // Only handle the rollback logic when there are no failed stages
+        if updated_stages.failed_stages.is_empty() {
+            let mut running_tasks_to_cancel = vec![];
+            for (stage_id, failure_reasons) in 
updated_stages.rollback_running_stages {
+                let tasks = self.rollback_running_stage(stage_id, 
failure_reasons)?;
+                running_tasks_to_cancel.extend(tasks);
+            }
 
-    /// Returns the total number of stages in this execution graph.
-    pub fn stage_count(&self) -> usize {
-        self.stages.len()
-    }
+            for stage_id in updated_stages.resubmit_successful_stages {
+                self.rerun_successful_stage(stage_id);
+            }
 
-    /// Generates and returns the next unique task ID for this execution graph.
-    pub fn next_task_id(&mut self) -> usize {
-        let new_tid = self.task_id_gen;
-        self.task_id_gen += 1;
-        new_tid
-    }
+            if !running_tasks_to_cancel.is_empty() {
+                events.push(QueryStageSchedulerEvent::CancelTasks(
+                    running_tasks_to_cancel,
+                ));
+            }
+        }
 
-    /// Exposes executions stages and stage id's
-    pub fn stages(&self) -> &HashMap<usize, ExecutionStage> {
-        &self.stages
+        if !updated_stages.failed_stages.is_empty() {
+            info!("Job {job_id} is failed");
+            self.fail_job(job_err_msg.clone());
+            events.push(QueryStageSchedulerEvent::JobRunningFailed {
+                job_id,
+                fail_message: job_err_msg,
+                queued_at: self.queued_at,
+                failed_at: timestamp_millis(),
+            });
+        } else if self.is_successful() {
+            // If this ExecutionGraph is successful, finish it
+            info!("Job {job_id} is success, finalizing output partitions");
+            self.succeed_job()?;
+            events.push(QueryStageSchedulerEvent::JobFinished {
+                job_id,
+                queued_at: self.queued_at,
+                completed_at: timestamp_millis(),
+            });
+        } else if has_resolved {
+            events.push(QueryStageSchedulerEvent::JobUpdated(job_id))
+        }
+        Ok(events)
     }
 
-    /// An ExecutionGraph is successful if all its stages are successful
-    pub fn is_successful(&self) -> bool {
-        self.stages
-            .values()
-            .all(|s| matches!(s, ExecutionStage::Successful(_)))
-    }
+    /// Return a Vec of resolvable stage ids
+    fn update_stage_output_links(
+        &mut self,
+        stage_id: usize,
+        is_completed: bool,
+        locations: Vec<PartitionLocation>,
+        output_links: Vec<usize>,
+    ) -> Result<Vec<usize>> {
+        let mut resolved_stages = vec![];
+        let job_id = &self.job_id;
+        if output_links.is_empty() {
+            // If `output_links` is empty, then this is a final stage
+            self.output_locations.extend(locations);
+        } else {
+            for link in output_links.iter() {
+                // If this is an intermediate stage, we need to push its 
`PartitionLocation`s to the parent stage
+                if let Some(linked_stage) = self.stages.get_mut(link) {
+                    if let ExecutionStage::UnResolved(linked_unresolved_stage) 
=
+                        linked_stage
+                    {
+                        linked_unresolved_stage
+                            .add_input_partitions(stage_id, 
locations.clone())?;
 
-    /// Returns true if all stages in this graph have completed successfully.
-    pub fn is_complete(&self) -> bool {
-        self.stages
-            .values()
-            .all(|s| matches!(s, ExecutionStage::Successful(_)))
-    }
+                        // If all tasks for this stage are complete, mark the 
input complete in the parent stage
+                        if is_completed {
+                            linked_unresolved_stage.complete_input(stage_id);
+                        }
 
-    /// Revive the execution graph by converting the resolved stages to 
running stages
-    /// If any stages are converted, return true; else false.
-    pub fn revive(&mut self) -> bool {
-        let running_stages = self
-            .stages
-            .values()
-            .filter_map(|stage| {
-                if let ExecutionStage::Resolved(resolved_stage) = stage {
-                    Some(resolved_stage.to_running())
+                        // If all input partitions are ready, we can resolve 
any UnresolvedShuffleExec in the parent stage plan
+                        if linked_unresolved_stage.resolvable() {
+                            
resolved_stages.push(linked_unresolved_stage.stage_id);
+                        }
+                    } else {
+                        return Err(BallistaError::Internal(format!(
+                            "Error updating job {job_id}: The stage {link} as 
the output link of stage {stage_id}  should be unresolved"
+                        )));
+                    }
                 } else {
-                    None
+                    return Err(BallistaError::Internal(format!(
+                        "Error updating job {job_id}: Invalid output link 
{stage_id} for stage {link}"
+                    )));
                 }
-            })
-            .collect::<Vec<_>>();
-
-        if running_stages.is_empty() {
-            false
-        } else {
-            for running_stage in running_stages {
-                self.stages.insert(
-                    running_stage.stage_id,
-                    ExecutionStage::Running(running_stage),
-                );
             }
-            true
         }
+        Ok(resolved_stages)
     }
 
-    /// Update task statuses and task metrics in the graph.
-    /// This will also push shuffle partitions to their respective shuffle 
read stages.
-    pub fn update_task_status(
-        &mut self,
-        executor: &ExecutorMetadata,
-        task_statuses: Vec<TaskStatus>,
+    fn get_running_stage_id(&mut self, black_list: &[usize]) -> Option<usize> {
+        let mut running_stage_id = self.stages.iter().find_map(|(stage_id, 
stage)| {
+            if black_list.contains(stage_id) {
+                None
+            } else if let ExecutionStage::Running(stage) = stage {
+                if stage.available_tasks() > 0 {
+                    Some(*stage_id)
+                } else {
+                    None
+                }
+            } else {
+                None
+            }
+        });
+
+        // If no available tasks found in the running stage,
+        // try to find a resolved stage and convert it to the running stage
+        if running_stage_id.is_none() {
+            if self.revive() {
+                running_stage_id = self.get_running_stage_id(black_list);
+            } else {
+                running_stage_id = None;
+            }
+        }
+
+        running_stage_id
+    }
+
+    fn reset_stages_internal(
+        &mut self,
+        executor_id: &str,
+    ) -> Result<(HashSet<usize>, Vec<RunningTaskInfo>)> {
+        let job_id = self.job_id.clone();
+        // collect the input stages that need to resubmit
+        let mut resubmit_inputs: HashSet<usize> = HashSet::new();
+
+        let mut reset_running_stage = HashSet::new();
+        let mut rollback_resolved_stages = HashSet::new();
+        let mut rollback_running_stages = HashSet::new();
+        let mut resubmit_successful_stages = HashSet::new();
+
+        let mut empty_inputs: HashMap<usize, StageOutput> = HashMap::new();
+        // check the unresolved, resolved and running stages
+        self.stages
+            .iter_mut()
+            .for_each(|(stage_id, stage)| {
+                let stage_inputs = match stage {
+                    ExecutionStage::UnResolved(stage) => {
+                        &mut stage.inputs
+                    }
+                    ExecutionStage::Resolved(stage) => {
+                        &mut stage.inputs
+                    }
+                    ExecutionStage::Running(stage) => {
+                        let reset = stage.reset_tasks(executor_id);
+                        if reset > 0 {
+                            warn!(
+                        "Reset {reset} tasks for running job/stage 
{job_id}/{stage_id} on lost Executor {executor_id}"
+                        );
+                            reset_running_stage.insert(*stage_id);
+                        }
+                        &mut stage.inputs
+                    }
+                    _ => &mut empty_inputs
+                };
+
+                // For each stage input, check whether there are input 
locations match that executor
+                // and calculate the resubmit input stages if the input stages 
are successful.
+                let mut rollback_stage = false;
+                stage_inputs.iter_mut().for_each(|(input_stage_id, 
stage_output)| {
+                    let mut match_found = false;
+                    stage_output.partition_locations.iter_mut().for_each(
+                        |(_partition, locs)| {
+                            let before_len = locs.len();
+                            locs.retain(|loc| loc.executor_meta.id != 
executor_id);
+                            if locs.len() < before_len {
+                                match_found = true;
+                            }
+                        },
+                    );
+                    if match_found {
+                        stage_output.complete = false;
+                        rollback_stage = true;
+                        resubmit_inputs.insert(*input_stage_id);
+                    }
+                });
+
+                if rollback_stage {
+                    match stage {
+                        ExecutionStage::Resolved(_) => {
+                            rollback_resolved_stages.insert(*stage_id);
+                            warn!(
+                            "Roll back resolved job/stage {job_id}/{stage_id} 
and change ShuffleReaderExec back to UnresolvedShuffleExec");
+                        }
+                        ExecutionStage::Running(_) => {
+                            rollback_running_stages.insert(*stage_id);
+                            warn!(
+                            "Roll back running job/stage {job_id}/{stage_id} 
and change ShuffleReaderExec back to UnresolvedShuffleExec");
+                        }
+                        _ => {}
+                    }
+                }
+            });
+
+        // check and reset the successful stages
+        if !resubmit_inputs.is_empty() {
+            self.stages
+                .iter_mut()
+                .filter(|(stage_id, _stage)| 
resubmit_inputs.contains(stage_id))
+                .filter_map(|(_stage_id, stage)| {
+                    if let ExecutionStage::Successful(success) = stage {
+                        Some(success)
+                    } else {
+                        None
+                    }
+                })
+                .for_each(|stage| {
+                    let reset = stage.reset_tasks(executor_id);
+                    if reset > 0 {
+                        resubmit_successful_stages.insert(stage.stage_id);
+                        warn!(
+                            "Reset {} tasks for successful job/stage {}/{} on 
lost Executor {}",
+                            reset, job_id, stage.stage_id, executor_id
+                        )
+                    }
+                });
+        }
+
+        for stage_id in rollback_resolved_stages.iter() {
+            self.rollback_resolved_stage(*stage_id)?;
+        }
+
+        let mut all_running_tasks = vec![];
+        for stage_id in rollback_running_stages.iter() {
+            let tasks = self.rollback_running_stage(
+                *stage_id,
+                HashSet::from([executor_id.to_owned()]),
+            )?;
+            all_running_tasks.extend(tasks);
+        }
+
+        for stage_id in resubmit_successful_stages.iter() {
+            self.rerun_successful_stage(*stage_id);
+        }
+
+        let mut reset_stage = HashSet::new();
+        reset_stage.extend(reset_running_stage);
+        reset_stage.extend(rollback_resolved_stages);
+        reset_stage.extend(rollback_running_stages);
+        reset_stage.extend(resubmit_successful_stages);
+        Ok((reset_stage, all_running_tasks))
+    }
+
+    /// Clear the stage failure count for this stage if the stage is finally 
success
+    fn clear_stage_failure(&mut self, stage_id: usize) {
+        self.failed_stage_attempts.remove(&stage_id);
+    }
+}
+
+impl ExecutionGraph for StaticExecutionGraph {
+    fn cloned(&self) -> ExecutionGraphBox {
+        Box::new(self.clone())
+    }
+
+    fn job_id(&self) -> &str {
+        self.job_id.as_str()
+    }
+
+    fn job_name(&self) -> &str {
+        self.job_name.as_str()
+    }
+
+    fn session_id(&self) -> &str {
+        self.session_id.as_str()
+    }
+
+    fn status(&self) -> &JobStatus {
+        &self.status
+    }
+
+    fn start_time(&self) -> u64 {
+        self.start_time
+    }
+
+    fn end_time(&self) -> u64 {
+        self.end_time
+    }
+
+    fn completed_stages(&self) -> usize {
+        let mut completed_stages = 0;
+        for stage in self.stages.values() {
+            if let ExecutionStage::Successful(_) = stage {
+                completed_stages += 1;
+            }
+        }
+        completed_stages
+    }
+    /// An ExecutionGraph is successful if all its stages are successful
+    fn is_successful(&self) -> bool {
+        self.stages
+            .values()
+            .all(|s| matches!(s, ExecutionStage::Successful(_)))
+    }
+
+    // pub fn is_complete(&self) -> bool {
+    //     self.stages
+    //         .values()
+    //         .all(|s| matches!(s, ExecutionStage::Successful(_)))
+    // }
+
+    /// Revive the execution graph by converting the resolved stages to 
running stages
+    /// If any stages are converted, return true; else false.
+    fn revive(&mut self) -> bool {
+        let running_stages = self
+            .stages
+            .values()
+            .filter_map(|stage| {
+                if let ExecutionStage::Resolved(resolved_stage) = stage {
+                    Some(resolved_stage.to_running())
+                } else {
+                    None
+                }
+            })
+            .collect::<Vec<_>>();
+
+        if running_stages.is_empty() {
+            false
+        } else {
+            for running_stage in running_stages {
+                self.stages.insert(
+                    running_stage.stage_id,
+                    ExecutionStage::Running(running_stage),
+                );
+            }
+            true
+        }
+    }
+
+    /// Update task statuses and task metrics in the graph.
+    /// This will also push shuffle partitions to their respective shuffle 
read stages.
+    fn update_task_status(
+        &mut self,
+        executor: &ExecutorMetadata,
+        task_statuses: Vec<TaskStatus>,
         max_task_failures: usize,
         max_stage_failures: usize,
     ) -> Result<Vec<QueryStageSchedulerEvent>> {
@@ -687,123 +1090,8 @@ impl ExecutionGraph {
         })
     }
 
-    /// Processing stage status update after task status changing
-    fn processing_stages_update(
-        &mut self,
-        updated_stages: UpdatedStages,
-    ) -> Result<Vec<QueryStageSchedulerEvent>> {
-        let job_id = self.job_id().to_owned();
-        let mut has_resolved = false;
-        let mut job_err_msg = "".to_owned();
-
-        for stage_id in updated_stages.resolved_stages {
-            self.resolve_stage(stage_id)?;
-            has_resolved = true;
-        }
-
-        for stage_id in updated_stages.successful_stages {
-            self.succeed_stage(stage_id);
-        }
-
-        // Fail the stage and also abort the job
-        for (stage_id, err_msg) in &updated_stages.failed_stages {
-            job_err_msg =
-                format!("Job failed due to stage {stage_id} failed: 
{err_msg}\n");
-        }
-
-        let mut events = vec![];
-        // Only handle the rollback logic when there are no failed stages
-        if updated_stages.failed_stages.is_empty() {
-            let mut running_tasks_to_cancel = vec![];
-            for (stage_id, failure_reasons) in 
updated_stages.rollback_running_stages {
-                let tasks = self.rollback_running_stage(stage_id, 
failure_reasons)?;
-                running_tasks_to_cancel.extend(tasks);
-            }
-
-            for stage_id in updated_stages.resubmit_successful_stages {
-                self.rerun_successful_stage(stage_id);
-            }
-
-            if !running_tasks_to_cancel.is_empty() {
-                events.push(QueryStageSchedulerEvent::CancelTasks(
-                    running_tasks_to_cancel,
-                ));
-            }
-        }
-
-        if !updated_stages.failed_stages.is_empty() {
-            info!("Job {job_id} is failed");
-            self.fail_job(job_err_msg.clone());
-            events.push(QueryStageSchedulerEvent::JobRunningFailed {
-                job_id,
-                fail_message: job_err_msg,
-                queued_at: self.queued_at,
-                failed_at: timestamp_millis(),
-            });
-        } else if self.is_successful() {
-            // If this ExecutionGraph is successful, finish it
-            info!("Job {job_id} is success, finalizing output partitions");
-            self.succeed_job()?;
-            events.push(QueryStageSchedulerEvent::JobFinished {
-                job_id,
-                queued_at: self.queued_at,
-                completed_at: timestamp_millis(),
-            });
-        } else if has_resolved {
-            events.push(QueryStageSchedulerEvent::JobUpdated(job_id))
-        }
-        Ok(events)
-    }
-
-    /// Return a Vec of resolvable stage ids
-    fn update_stage_output_links(
-        &mut self,
-        stage_id: usize,
-        is_completed: bool,
-        locations: Vec<PartitionLocation>,
-        output_links: Vec<usize>,
-    ) -> Result<Vec<usize>> {
-        let mut resolved_stages = vec![];
-        let job_id = &self.job_id;
-        if output_links.is_empty() {
-            // If `output_links` is empty, then this is a final stage
-            self.output_locations.extend(locations);
-        } else {
-            for link in output_links.iter() {
-                // If this is an intermediate stage, we need to push its 
`PartitionLocation`s to the parent stage
-                if let Some(linked_stage) = self.stages.get_mut(link) {
-                    if let ExecutionStage::UnResolved(linked_unresolved_stage) 
=
-                        linked_stage
-                    {
-                        linked_unresolved_stage
-                            .add_input_partitions(stage_id, 
locations.clone())?;
-
-                        // If all tasks for this stage are complete, mark the 
input complete in the parent stage
-                        if is_completed {
-                            linked_unresolved_stage.complete_input(stage_id);
-                        }
-
-                        // If all input partitions are ready, we can resolve 
any UnresolvedShuffleExec in the parent stage plan
-                        if linked_unresolved_stage.resolvable() {
-                            
resolved_stages.push(linked_unresolved_stage.stage_id);
-                        }
-                    } else {
-                        return Err(BallistaError::Internal(format!(
-                            "Error updating job {job_id}: The stage {link} as 
the output link of stage {stage_id}  should be unresolved"
-                        )));
-                    }
-                } else {
-                    return Err(BallistaError::Internal(format!(
-                        "Error updating job {job_id}: Invalid output link 
{stage_id} for stage {link}"
-                    )));
-                }
-            }
-        }
-        Ok(resolved_stages)
-    }
-
-    /// Returns all the currently running stage IDs.
-    pub fn running_stages(&self) -> Vec<usize> {
+    /// Return all the currently running stage ids
+    fn running_stages(&self) -> Vec<usize> {
         self.stages
             .iter()
             .filter_map(|(stage_id, stage)| {
@@ -816,8 +1104,8 @@ impl ExecutionGraph {
             .collect::<Vec<_>>()
     }
 
-    /// Returns all currently running tasks along with the executor ID on 
which they are assigned.
-    pub fn running_tasks(&self) -> Vec<RunningTaskInfo> {
+    /// Return all currently running tasks along with the executor ID on which 
they are assigned
+    fn running_tasks(&self) -> Vec<RunningTaskInfo> {
         self.stages
             .iter()
             .flat_map(|(_, stage)| {
@@ -842,8 +1130,8 @@ impl ExecutionGraph {
             .collect::<Vec<RunningTaskInfo>>()
     }
 
-    /// Returns the total number of tasks in this plan that are ready for 
scheduling.
-    pub fn available_tasks(&self) -> usize {
+    /// Total number of tasks in this plan that are ready for scheduling
+    fn available_tasks(&self) -> usize {
         self.stages
             .values()
             .map(|stage| {
@@ -856,119 +1144,7 @@ impl ExecutionGraph {
             .sum()
     }
 
-    /// Get next task that can be assigned to the given executor.
-    /// This method should only be called when the resulting task is 
immediately
-    /// being launched as the status will be set to Running and it will not be
-    /// available to the scheduler.
-    /// If the task is not launched the status must be reset to allow the task 
to
-    /// be scheduled elsewhere.
-    pub fn pop_next_task(
-        &mut self,
-        executor_id: &str,
-    ) -> Result<Option<TaskDescription>> {
-        if matches!(
-            self.status,
-            JobStatus {
-                status: Some(job_status::Status::Failed(_)),
-                ..
-            }
-        ) {
-            warn!("Call pop_next_task on failed Job");
-            return Ok(None);
-        }
-
-        let job_id = self.job_id.clone();
-        let session_id = self.session_id.clone();
-
-        let find_candidate = self.stages.iter().any(|(_stage_id, stage)| {
-            if let ExecutionStage::Running(stage) = stage {
-                stage.available_tasks() > 0
-            } else {
-                false
-            }
-        });
-        let next_task_id = if find_candidate {
-            Some(self.next_task_id())
-        } else {
-            None
-        };
-
-        let mut next_task = self.stages.iter_mut().find(|(_stage_id, stage)| {
-            if let ExecutionStage::Running(stage) = stage {
-                stage.available_tasks() > 0
-            } else {
-                false
-            }
-        }).map(|(stage_id, stage)| {
-            if let ExecutionStage::Running(stage) = stage {
-                let (partition_id, _) = stage
-                    .task_infos
-                    .iter()
-                    .enumerate()
-                    .find(|(_partition, info)| info.is_none())
-                    .ok_or_else(|| {
-                        BallistaError::Internal(format!("Error getting next 
task for job {job_id}: Stage {stage_id} is ready but has no pending tasks"))
-                    })?;
-
-                let partition = PartitionId {
-                    job_id,
-                    stage_id: *stage_id,
-                    partition_id,
-                };
-
-                let task_id = next_task_id.unwrap();
-                let task_attempt = stage.task_failure_numbers[partition_id];
-                let task_info = TaskInfo {
-                    task_id,
-                    scheduled_time: SystemTime::now()
-                        .duration_since(UNIX_EPOCH)
-                        .unwrap()
-                        .as_millis(),
-                    // Those times will be updated when the task finish
-                    launch_time: 0,
-                    start_exec_time: 0,
-                    end_exec_time: 0,
-                    finish_time: 0,
-                    task_status: task_status::Status::Running(RunningTask {
-                        executor_id: executor_id.to_owned()
-                    }),
-                };
-
-                // Set the task info to Running for new task
-                stage.task_infos[partition_id] = Some(task_info);
-
-                Ok(TaskDescription {
-                    session_id,
-                    partition,
-                    stage_attempt_num: stage.stage_attempt_num,
-                    task_id,
-                    task_attempt,
-                    plan: stage.plan.clone(),
-                    session_config: self.session_config.clone()
-                })
-            } else {
-                Err(BallistaError::General(format!("Stage {stage_id} is not a 
running stage")))
-            }
-        }).transpose()?;
-
-        // If no available tasks found in the running stage,
-        // try to find a resolved stage and convert it to the running stage
-        if next_task.is_none() {
-            if self.revive() {
-                next_task = self.pop_next_task(executor_id)?;
-            } else {
-                next_task = None;
-            }
-        }
-
-        Ok(next_task)
-    }
-
-    /// Fetches a running stage that has available tasks, excluding stages in 
the blacklist.
-    ///
-    /// Returns a mutable reference to the running stage and the task ID 
generator
-    /// if a suitable stage is found.
-    pub fn fetch_running_stage(
+    fn fetch_running_stage(
         &mut self,
         black_list: &[usize],
     ) -> Option<(&mut RunningStage, &mut usize)> {
@@ -998,41 +1174,11 @@ impl ExecutionGraph {
         }
     }
 
-    fn get_running_stage_id(&mut self, black_list: &[usize]) -> Option<usize> {
-        let mut running_stage_id = self.stages.iter().find_map(|(stage_id, 
stage)| {
-            if black_list.contains(stage_id) {
-                None
-            } else if let ExecutionStage::Running(stage) = stage {
-                if stage.available_tasks() > 0 {
-                    Some(*stage_id)
-                } else {
-                    None
-                }
-            } else {
-                None
-            }
-        });
-
-        // If no available tasks found in the running stage,
-        // try to find a resolved stage and convert it to the running stage
-        if running_stage_id.is_none() {
-            if self.revive() {
-                running_stage_id = self.get_running_stage_id(black_list);
-            } else {
-                running_stage_id = None;
-            }
-        }
-
-        running_stage_id
-    }
-
-    /// Updates the job status.
-    pub fn update_status(&mut self, status: JobStatus) {
+    fn update_status(&mut self, status: JobStatus) {
         self.status = status;
     }
 
-    /// Returns the output partition locations for the final stage results.
-    pub fn output_locations(&self) -> Vec<PartitionLocation> {
+    fn output_locations(&self) -> Vec<PartitionLocation> {
         self.output_locations.clone()
     }
 
@@ -1042,7 +1188,7 @@ impl ExecutionGraph {
     /// If yes, reset the successful tasks and roll back the resolved shuffle 
recursively.
     ///
     /// Returns the reset stage ids and running tasks should be killed
-    pub fn reset_stages_on_lost_executor(
+    fn reset_stages_on_lost_executor(
         &mut self,
         executor_id: &str,
     ) -> Result<(HashSet<usize>, Vec<RunningTaskInfo>)> {
@@ -1059,136 +1205,8 @@ impl ExecutionGraph {
         }
     }
 
-    fn reset_stages_internal(
-        &mut self,
-        executor_id: &str,
-    ) -> Result<(HashSet<usize>, Vec<RunningTaskInfo>)> {
-        let job_id = self.job_id.clone();
-        // collect the input stages that need to resubmit
-        let mut resubmit_inputs: HashSet<usize> = HashSet::new();
-
-        let mut reset_running_stage = HashSet::new();
-        let mut rollback_resolved_stages = HashSet::new();
-        let mut rollback_running_stages = HashSet::new();
-        let mut resubmit_successful_stages = HashSet::new();
-
-        let mut empty_inputs: HashMap<usize, StageOutput> = HashMap::new();
-        // check the unresolved, resolved and running stages
-        self.stages
-            .iter_mut()
-            .for_each(|(stage_id, stage)| {
-                let stage_inputs = match stage {
-                    ExecutionStage::UnResolved(stage) => {
-                        &mut stage.inputs
-                    }
-                    ExecutionStage::Resolved(stage) => {
-                        &mut stage.inputs
-                    }
-                    ExecutionStage::Running(stage) => {
-                        let reset = stage.reset_tasks(executor_id);
-                        if reset > 0 {
-                            warn!(
-                        "Reset {reset} tasks for running job/stage 
{job_id}/{stage_id} on lost Executor {executor_id}"
-                        );
-                            reset_running_stage.insert(*stage_id);
-                        }
-                        &mut stage.inputs
-                    }
-                    _ => &mut empty_inputs
-                };
-
-                // For each stage input, check whether there are input 
locations match that executor
-                // and calculate the resubmit input stages if the input stages 
are successful.
-                let mut rollback_stage = false;
-                stage_inputs.iter_mut().for_each(|(input_stage_id, 
stage_output)| {
-                    let mut match_found = false;
-                    stage_output.partition_locations.iter_mut().for_each(
-                        |(_partition, locs)| {
-                            let before_len = locs.len();
-                            locs.retain(|loc| loc.executor_meta.id != 
executor_id);
-                            if locs.len() < before_len {
-                                match_found = true;
-                            }
-                        },
-                    );
-                    if match_found {
-                        stage_output.complete = false;
-                        rollback_stage = true;
-                        resubmit_inputs.insert(*input_stage_id);
-                    }
-                });
-
-                if rollback_stage {
-                    match stage {
-                        ExecutionStage::Resolved(_) => {
-                            rollback_resolved_stages.insert(*stage_id);
-                            warn!(
-                            "Roll back resolved job/stage {job_id}/{stage_id} 
and change ShuffleReaderExec back to UnresolvedShuffleExec");
-                        }
-                        ExecutionStage::Running(_) => {
-                            rollback_running_stages.insert(*stage_id);
-                            warn!(
-                            "Roll back running job/stage {job_id}/{stage_id} 
and change ShuffleReaderExec back to UnresolvedShuffleExec");
-                        }
-                        _ => {}
-                    }
-                }
-            });
-
-        // check and reset the successful stages
-        if !resubmit_inputs.is_empty() {
-            self.stages
-                .iter_mut()
-                .filter(|(stage_id, _stage)| 
resubmit_inputs.contains(stage_id))
-                .filter_map(|(_stage_id, stage)| {
-                    if let ExecutionStage::Successful(success) = stage {
-                        Some(success)
-                    } else {
-                        None
-                    }
-                })
-                .for_each(|stage| {
-                    let reset = stage.reset_tasks(executor_id);
-                    if reset > 0 {
-                        resubmit_successful_stages.insert(stage.stage_id);
-                        warn!(
-                            "Reset {} tasks for successful job/stage {}/{} on 
lost Executor {}",
-                            reset, job_id, stage.stage_id, executor_id
-                        )
-                    }
-                });
-        }
-
-        for stage_id in rollback_resolved_stages.iter() {
-            self.rollback_resolved_stage(*stage_id)?;
-        }
-
-        let mut all_running_tasks = vec![];
-        for stage_id in rollback_running_stages.iter() {
-            let tasks = self.rollback_running_stage(
-                *stage_id,
-                HashSet::from([executor_id.to_owned()]),
-            )?;
-            all_running_tasks.extend(tasks);
-        }
-
-        for stage_id in resubmit_successful_stages.iter() {
-            self.rerun_successful_stage(*stage_id);
-        }
-
-        let mut reset_stage = HashSet::new();
-        reset_stage.extend(reset_running_stage);
-        reset_stage.extend(rollback_resolved_stages);
-        reset_stage.extend(rollback_running_stages);
-        reset_stage.extend(resubmit_successful_stages);
-        Ok((reset_stage, all_running_tasks))
-    }
-
-    /// Converts an unresolved stage to resolved state.
-    ///
-    /// Returns true if the stage was successfully resolved, false if the stage
-    /// was not found or not in unresolved state.
-    pub fn resolve_stage(&mut self, stage_id: usize) -> Result<bool> {
+    /// Convert unresolved stage to be resolved
+    fn resolve_stage(&mut self, stage_id: usize) -> Result<bool> {
         if let Some(ExecutionStage::UnResolved(stage)) = 
self.stages.remove(&stage_id) {
             self.stages.insert(
                 stage_id,
@@ -1207,10 +1225,8 @@ impl ExecutionGraph {
         }
     }
 
-    /// Converts a running stage to successful state.
-    ///
-    /// Returns true if the stage was successfully marked as complete.
-    pub fn succeed_stage(&mut self, stage_id: usize) -> bool {
+    /// Convert running stage to be successful
+    fn succeed_stage(&mut self, stage_id: usize) -> bool {
         if let Some(ExecutionStage::Running(stage)) = 
self.stages.remove(&stage_id) {
             self.stages
                 .insert(stage_id, 
ExecutionStage::Successful(stage.to_successful()));
@@ -1226,10 +1242,8 @@ impl ExecutionGraph {
         }
     }
 
-    /// Converts a running stage to failed state with the given error message.
-    ///
-    /// Returns true if the stage was found and marked as failed.
-    pub fn fail_stage(&mut self, stage_id: usize, err_msg: String) -> bool {
+    /// Convert running stage to be failed
+    fn fail_stage(&mut self, stage_id: usize, err_msg: String) -> bool {
         if let Some(ExecutionStage::Running(stage)) = 
self.stages.remove(&stage_id) {
             self.stages
                 .insert(stage_id, 
ExecutionStage::Failed(stage.to_failed(err_msg)));
@@ -1246,7 +1260,7 @@ impl ExecutionGraph {
 
     /// Convert running stage to be unresolved,
     /// Returns a Vec of RunningTaskInfo for running tasks in this stage.
-    pub fn rollback_running_stage(
+    fn rollback_running_stage(
         &mut self,
         stage_id: usize,
         failure_reasons: HashSet<String>,
@@ -1281,7 +1295,7 @@ impl ExecutionGraph {
     }
 
     /// Convert resolved stage to be unresolved
-    pub fn rollback_resolved_stage(&mut self, stage_id: usize) -> Result<bool> 
{
+    fn rollback_resolved_stage(&mut self, stage_id: usize) -> Result<bool> {
         if let Some(ExecutionStage::Resolved(stage)) = 
self.stages.remove(&stage_id) {
             self.stages
                 .insert(stage_id, 
ExecutionStage::UnResolved(stage.to_unresolved()?));
@@ -1296,11 +1310,8 @@ impl ExecutionGraph {
         }
     }
 
-    /// Converts a successful stage back to running state for re-execution.
-    ///
-    /// This is used when some outputs from the stage have been lost and tasks
-    /// need to be re-run.
-    pub fn rerun_successful_stage(&mut self, stage_id: usize) -> bool {
+    /// Convert successful stage to be running
+    fn rerun_successful_stage(&mut self, stage_id: usize) -> bool {
         if let Some(ExecutionStage::Successful(stage)) = 
self.stages.remove(&stage_id) {
             self.stages
                 .insert(stage_id, ExecutionStage::Running(stage.to_running()));
@@ -1316,7 +1327,7 @@ impl ExecutionGraph {
     }
 
     /// fail job with error message
-    pub fn fail_job(&mut self, error: String) {
+    fn fail_job(&mut self, error: String) {
         self.status = JobStatus {
             job_id: self.job_id.clone(),
             job_name: self.job_name.clone(),
@@ -1329,11 +1340,8 @@ impl ExecutionGraph {
         };
     }
 
-    /// Marks the job as successfully completed.
-    ///
-    /// This should only be called after all stages have completed 
successfully.
-    /// Returns an error if the job is not in a successful state.
-    pub fn succeed_job(&mut self) -> Result<()> {
+    /// Mark the job success
+    fn succeed_job(&mut self) -> Result<()> {
         if !self.is_successful() {
             return Err(BallistaError::Internal(format!(
                 "Attempt to finalize an incomplete job {}",
@@ -1367,13 +1375,122 @@ impl ExecutionGraph {
         Ok(())
     }
 
-    /// Clear the stage failure count for this stage if the stage is finally 
success
-    fn clear_stage_failure(&mut self, stage_id: usize) {
-        self.failed_stage_attempts.remove(&stage_id);
+    fn stages(&self) -> &HashMap<usize, ExecutionStage> {
+        &self.stages
+    }
+
+    fn stage_count(&self) -> usize {
+        self.stages.len()
+    }
+
+    /// Get next task that can be assigned to the given executor.
+    /// This method should only be called when the resulting task is 
immediately
+    /// being launched as the status will be set to Running and it will not be
+    /// available to the scheduler.
+    /// If the task is not launched the status must be reset to allow the task 
to
+    /// be scheduled elsewhere.
+    #[cfg(test)]
+    fn pop_next_task(&mut self, executor_id: &str) -> 
Result<Option<TaskDescription>> {
+        if matches!(
+            self.status,
+            JobStatus {
+                status: Some(job_status::Status::Failed(_)),
+                ..
+            }
+        ) {
+            warn!("Call pop_next_task on failed Job");
+            return Ok(None);
+        }
+
+        let job_id = self.job_id.clone();
+        let session_id = self.session_id.clone();
+
+        let find_candidate = self.stages.iter().any(|(_stage_id, stage)| {
+            if let ExecutionStage::Running(stage) = stage {
+                stage.available_tasks() > 0
+            } else {
+                false
+            }
+        });
+        let next_task_id = if find_candidate {
+            Some(self.next_task_id())
+        } else {
+            None
+        };
+
+        let mut next_task = self.stages.iter_mut().find(|(_stage_id, stage)| {
+            if let ExecutionStage::Running(stage) = stage {
+                stage.available_tasks() > 0
+            } else {
+                false
+            }
+        }).map(|(stage_id, stage)| {
+            if let ExecutionStage::Running(stage) = stage {
+                let (partition_id, _) = stage
+                    .task_infos
+                    .iter()
+                    .enumerate()
+                    .find(|(_partition, info)| info.is_none())
+                    .ok_or_else(|| {
+                        BallistaError::Internal(format!("Error getting next 
task for job {job_id}: Stage {stage_id} is ready but has no pending tasks"))
+                    })?;
+
+                let partition = PartitionId {
+                    job_id,
+                    stage_id: *stage_id,
+                    partition_id,
+                };
+
+                let task_id = next_task_id.unwrap();
+                let task_attempt = stage.task_failure_numbers[partition_id];
+                let task_info = TaskInfo {
+                    task_id,
+                    scheduled_time: SystemTime::now()
+                        .duration_since(UNIX_EPOCH)
+                        .unwrap()
+                        .as_millis(),
+                    // Those times will be updated when the task finish
+                    launch_time: 0,
+                    start_exec_time: 0,
+                    end_exec_time: 0,
+                    finish_time: 0,
+                    task_status: task_status::Status::Running(RunningTask {
+                        executor_id: executor_id.to_owned()
+                    }),
+                };
+
+                // Set the task info to Running for new task
+                stage.task_infos[partition_id] = Some(task_info);
+
+                Ok(TaskDescription {
+                    session_id,
+                    partition,
+                    stage_attempt_num: stage.stage_attempt_num,
+                    task_id,
+                    task_attempt,
+                    plan: stage.plan.clone(),
+                    session_config: self.session_config.clone()
+                })
+            } else {
+                Err(BallistaError::General(format!("Stage {stage_id} is not a 
running stage")))
+            }
+        }).transpose()?;
+
+        // If no available tasks found in the running stage,
+        // try to find a resolved stage and convert it to the running stage
+        if next_task.is_none() {
+            if self.revive() {
+                next_task = self.pop_next_task(executor_id)?;
+            } else {
+                next_task = None;
+            }
+        }
+
+        Ok(next_task)
     }
 }
 
-impl Debug for ExecutionGraph {
+impl Debug for StaticExecutionGraph {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         let stages = self
             .stages
@@ -1704,8 +1821,6 @@ mod test {
 
         let outputs = agg_graph.output_locations();
 
-        assert_eq!(outputs.len(), agg_graph.output_partitions);
-
         for location in outputs {
             assert_eq!(location.executor_meta.host, "localhost2".to_owned());
         }
@@ -1965,7 +2080,7 @@ mod test {
 
         assert!(
             matches!(
-                agg_graph.status,
+                agg_graph.status(),
                 JobStatus {
                     status: Some(job_status::Status::Failed(_)),
                     ..
@@ -2242,7 +2357,7 @@ mod test {
         drain_tasks(&mut agg_graph)?;
         assert!(!agg_graph.is_successful(), "Expect to fail the agg plan");
 
-        let failure_reason = format!("{:?}", agg_graph.status);
+        let failure_reason = format!("{:?}", agg_graph.status());
         assert!(failure_reason.contains("Job failed due to stage 2 failed: 
Stage 2 has failed 4 times, most recent failure reason"));
         assert!(failure_reason.contains("FetchPartitionError"));
 
@@ -2723,7 +2838,7 @@ mod test {
     //     todo!()
     // }
 
-    fn drain_tasks(graph: &mut ExecutionGraph) -> Result<()> {
+    fn drain_tasks(graph: &mut dyn ExecutionGraph) -> Result<()> {
         let executor = mock_executor("executor-id1".to_string());
         while let Some(task) = graph.pop_next_task(&executor.id)? {
             let task_status = mock_completed_task(task, &executor.id);
diff --git a/ballista/scheduler/src/state/execution_graph_dot.rs 
b/ballista/scheduler/src/state/execution_graph_dot.rs
index 26261107b..e08fd663f 100644
--- a/ballista/scheduler/src/state/execution_graph_dot.rs
+++ b/ballista/scheduler/src/state/execution_graph_dot.rs
@@ -45,19 +45,19 @@ use std::sync::Arc;
 
 /// Utility for producing dot diagrams from execution graphs
 pub struct ExecutionGraphDot<'a> {
-    graph: &'a ExecutionGraph,
+    graph: &'a dyn ExecutionGraph,
 }
 
 impl<'a> ExecutionGraphDot<'a> {
     /// Create a DOT graph from the provided ExecutionGraph
-    pub fn generate(graph: &'a ExecutionGraph) -> Result<String, fmt::Error> {
+    pub fn generate(graph: &'a dyn ExecutionGraph) -> Result<String, 
fmt::Error> {
         let mut dot = Self { graph };
         dot._generate()
     }
 
     /// Create a DOT graph for one query stage from the provided ExecutionGraph
     pub fn generate_for_query_stage(
-        graph: &ExecutionGraph,
+        graph: &dyn ExecutionGraph,
         stage_id: usize,
     ) -> Result<String, fmt::Error> {
         if let Some(stage) = graph.stages().get(&stage_id) {
@@ -410,7 +410,7 @@ fn get_file_scan(scan: &FileScanConfig) -> String {
 #[cfg(test)]
 mod tests {
     use crate::planner::DefaultDistributedPlanner;
-    use crate::state::execution_graph::ExecutionGraph;
+    use crate::state::execution_graph::StaticExecutionGraph;
     use crate::state::execution_graph_dot::ExecutionGraphDot;
     use ballista_core::error::{BallistaError, Result};
     use ballista_core::extension::SessionConfigExt;
@@ -580,7 +580,7 @@ filter_expr="]
         Ok(())
     }
 
-    async fn test_graph() -> Result<ExecutionGraph> {
+    async fn test_graph() -> Result<StaticExecutionGraph> {
         let mut config = SessionConfig::new()
             .with_target_partitions(48)
             .with_batch_size(4096);
@@ -603,7 +603,7 @@ filter_expr="]
         let plan = df.into_optimized_plan()?;
         let plan = ctx.state().create_physical_plan(&plan).await?;
         let mut planner = DefaultDistributedPlanner::new();
-        ExecutionGraph::new(
+        StaticExecutionGraph::new(
             "scheduler_id",
             "job_id",
             "job_name",
@@ -617,7 +617,7 @@ filter_expr="]
 
     // With the improvement of 
https://github.com/apache/arrow-datafusion/pull/4122,
     // Redundant RepartitionExec can be removed so that the stage number will 
be reduced
-    async fn test_graph_optimized() -> Result<ExecutionGraph> {
+    async fn test_graph_optimized() -> Result<StaticExecutionGraph> {
         let mut config = SessionConfig::new()
             .with_target_partitions(48)
             .with_batch_size(4096);
@@ -639,7 +639,7 @@ filter_expr="]
         let plan = df.into_optimized_plan()?;
         let plan = ctx.state().create_physical_plan(&plan).await?;
         let mut planner = DefaultDistributedPlanner::new();
-        ExecutionGraph::new(
+        StaticExecutionGraph::new(
             "scheduler_id",
             "job_id",
             "job_name",
diff --git a/ballista/scheduler/src/state/task_manager.rs 
b/ballista/scheduler/src/state/task_manager.rs
index b0281dc36..60e6e3dd4 100644
--- a/ballista/scheduler/src/state/task_manager.rs
+++ b/ballista/scheduler/src/state/task_manager.rs
@@ -19,7 +19,7 @@ use crate::planner::DefaultDistributedPlanner;
 use crate::scheduler_server::event::QueryStageSchedulerEvent;
 
 use crate::state::execution_graph::{
-    ExecutionGraph, ExecutionStage, RunningTaskInfo, TaskDescription,
+    ExecutionGraphBox, RunningTaskInfo, StaticExecutionGraph, TaskDescription,
 };
 use crate::state::executor_manager::ExecutorManager;
 
@@ -143,7 +143,7 @@ pub struct TaskManager<T: 'static + AsLogicalPlan, U: 
'static + AsExecutionPlan>
 #[derive(Clone)]
 pub struct JobInfoCache {
     /// The execution graph for this job, protected by a read-write lock.
-    pub execution_graph: Arc<RwLock<ExecutionGraph>>,
+    pub execution_graph: Arc<RwLock<ExecutionGraphBox>>,
     /// Cached job status for quick access.
     pub status: Option<job_status::Status>,
     #[cfg(not(feature = "disable-stage-plan-cache"))]
@@ -153,7 +153,7 @@ pub struct JobInfoCache {
 
 impl JobInfoCache {
     /// Creates a new `JobInfoCache` from an execution graph.
-    pub fn new(graph: ExecutionGraph) -> Self {
+    pub fn new(graph: ExecutionGraphBox) -> Self {
         let status = graph.status().status.clone();
         Self {
             execution_graph: Arc::new(RwLock::new(graph)),
@@ -275,7 +275,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         session_config: Arc<SessionConfig>,
     ) -> Result<()> {
         let mut planner = DefaultDistributedPlanner::new();
-        let mut graph = ExecutionGraph::new(
+        let mut graph = Box::new(StaticExecutionGraph::new(
             &self.scheduler_id,
             job_id,
             job_name,
@@ -284,7 +284,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
             queued_at,
             session_config,
             &mut planner,
-        )?;
+        )?) as ExecutionGraphBox;
         info!("Submitting execution graph: {graph:?}");
 
         self.state.submit_job(job_id.to_string(), &graph).await?;
@@ -350,15 +350,15 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
     pub(crate) async fn get_job_execution_graph(
         &self,
         job_id: &str,
-    ) -> Result<Option<Arc<ExecutionGraph>>> {
+    ) -> Result<Option<ExecutionGraphBox>> {
         if let Some(cached) = self.get_active_execution_graph(job_id) {
             let guard = cached.read().await;
 
-            Ok(Some(Arc::new(guard.deref().clone())))
+            Ok(Some(guard.deref().cloned()))
         } else {
             let graph = self.state.get_execution_graph(job_id).await?;
 
-            Ok(graph.map(Arc::new))
+            Ok(graph)
         }
     }
 
@@ -416,7 +416,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         debug!("Moving job {job_id} from Active to Success");
 
         if let Some(graph) = self.remove_active_execution_graph(job_id) {
-            let graph = graph.read().await.clone();
+            let graph = graph.read().await;
             if graph.is_successful() {
                 self.state.save_job(job_id, &graph).await?;
             } else {
@@ -516,15 +516,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
     pub async fn executor_lost(&self, executor_id: &str) -> 
Result<Vec<RunningTaskInfo>> {
         // Collect all the running task need to cancel when there are running 
stages rolled back.
         let mut running_tasks_to_cancel: Vec<RunningTaskInfo> = vec![];
-        // Collect graphs we update so we can update them in storage
-        let updated_graphs: DashMap<String, ExecutionGraph> = DashMap::new();
+
         {
             for pairs in self.active_job_cache.iter() {
-                let (job_id, job_info) = pairs.pair();
+                let (_job_id, job_info) = pairs.pair();
                 let mut graph = job_info.execution_graph.write().await;
                 let reset = graph.reset_stages_on_lost_executor(executor_id)?;
                 if !reset.0.is_empty() {
-                    updated_graphs.insert(job_id.to_owned(), graph.clone());
                     running_tasks_to_cancel.extend(reset.1);
                 }
             }
@@ -684,7 +682,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
     pub(crate) fn get_active_execution_graph(
         &self,
         job_id: &str,
-    ) -> Option<Arc<RwLock<ExecutionGraph>>> {
+    ) -> Option<Arc<RwLock<ExecutionGraphBox>>> {
         self.active_job_cache
             .get(job_id)
             .as_deref()
@@ -695,7 +693,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
     pub(crate) fn remove_active_execution_graph(
         &self,
         job_id: &str,
-    ) -> Option<Arc<RwLock<ExecutionGraph>>> {
+    ) -> Option<Arc<RwLock<ExecutionGraphBox>>> {
         self.active_job_cache
             .remove(job_id)
             .map(|value| value.1.execution_graph)
@@ -748,14 +746,9 @@ pub struct JobOverview {
     pub completed_stages: usize,
 }
 
-impl From<&ExecutionGraph> for JobOverview {
-    fn from(value: &ExecutionGraph) -> Self {
-        let mut completed_stages = 0;
-        for stage in value.stages().values() {
-            if let ExecutionStage::Successful(_) = stage {
-                completed_stages += 1;
-            }
-        }
+impl From<&ExecutionGraphBox> for JobOverview {
+    fn from(value: &ExecutionGraphBox) -> Self {
+        let completed_stages = value.completed_stages();
 
         Self {
             job_id: value.job_id().to_string(),
diff --git a/ballista/scheduler/src/test_utils.rs 
b/ballista/scheduler/src/test_utils.rs
index b4a940020..7ac6f83a9 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -57,7 +57,9 @@ use datafusion::test_util::scan_empty_with_partitions;
 use crate::cluster::BallistaCluster;
 use crate::scheduler_server::event::QueryStageSchedulerEvent;
 
-use crate::state::execution_graph::{ExecutionGraph, ExecutionStage, 
TaskDescription};
+use crate::state::execution_graph::{
+    ExecutionGraph, ExecutionStage, StaticExecutionGraph, TaskDescription,
+};
 use ballista_core::utils::{default_config_producer, default_session_builder};
 use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
 use parking_lot::Mutex;
@@ -812,14 +814,16 @@ pub fn assert_failed_event(job_id: &str, collector: 
&TestMetricsCollector) {
 }
 
 /// Revives the execution graph and completes all tasks in the next stage.
-pub fn revive_graph_and_complete_next_stage(graph: &mut ExecutionGraph) -> 
Result<usize> {
+pub fn revive_graph_and_complete_next_stage(
+    graph: &mut dyn ExecutionGraph,
+) -> Result<usize> {
     let executor = mock_executor("executor-id1".to_string());
     revive_graph_and_complete_next_stage_with_executor(graph, &executor)
 }
 
 /// Revives the execution graph and completes all tasks in the next stage 
using the given executor.
 pub fn revive_graph_and_complete_next_stage_with_executor(
-    graph: &mut ExecutionGraph,
+    graph: &mut dyn ExecutionGraph,
     executor: &ExecutorMetadata,
 ) -> Result<usize> {
     graph.revive();
@@ -855,7 +859,7 @@ pub fn revive_graph_and_complete_next_stage_with_executor(
 }
 
 /// Creates a test execution graph with a simple aggregation plan.
-pub async fn test_aggregation_plan(partition: usize) -> ExecutionGraph {
+pub async fn test_aggregation_plan(partition: usize) -> StaticExecutionGraph {
     test_aggregation_plan_with_job_id(partition, "job").await
 }
 
@@ -863,7 +867,7 @@ pub async fn test_aggregation_plan(partition: usize) -> 
ExecutionGraph {
 pub async fn test_aggregation_plan_with_job_id(
     partition: usize,
     job_id: &str,
-) -> ExecutionGraph {
+) -> StaticExecutionGraph {
     let config = SessionConfig::new().with_target_partitions(partition);
     let ctx = Arc::new(SessionContext::new_with_config(config));
     let session_state = ctx.state();
@@ -893,7 +897,8 @@ pub async fn test_aggregation_plan_with_job_id(
         DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
     );
     let mut planner = DefaultDistributedPlanner::new();
-    ExecutionGraph::new(
+
+    StaticExecutionGraph::new(
         "localhost:50050",
         job_id,
         "",
@@ -907,7 +912,7 @@ pub async fn test_aggregation_plan_with_job_id(
 }
 
 /// Creates a test execution graph with two nested aggregations.
-pub async fn test_two_aggregations_plan(partition: usize) -> ExecutionGraph {
+pub async fn test_two_aggregations_plan(partition: usize) -> 
StaticExecutionGraph {
     let config = SessionConfig::new().with_target_partitions(partition);
     let ctx = Arc::new(SessionContext::new_with_config(config));
     let session_state = ctx.state();
@@ -940,7 +945,8 @@ pub async fn test_two_aggregations_plan(partition: usize) 
-> ExecutionGraph {
         DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
     );
     let mut planner = DefaultDistributedPlanner::new();
-    ExecutionGraph::new(
+
+    StaticExecutionGraph::new(
         "localhost:50050",
         "job",
         "",
@@ -954,7 +960,7 @@ pub async fn test_two_aggregations_plan(partition: usize) 
-> ExecutionGraph {
 }
 
 /// Creates a test execution graph with a coalesce (limit) operation.
-pub async fn test_coalesce_plan(partition: usize) -> ExecutionGraph {
+pub async fn test_coalesce_plan(partition: usize) -> StaticExecutionGraph {
     let config = SessionConfig::new().with_target_partitions(partition);
     let ctx = Arc::new(SessionContext::new_with_config(config));
     let session_state = ctx.state();
@@ -979,7 +985,8 @@ pub async fn test_coalesce_plan(partition: usize) -> 
ExecutionGraph {
         .await
         .unwrap();
     let mut planner = DefaultDistributedPlanner::new();
-    ExecutionGraph::new(
+
+    StaticExecutionGraph::new(
         "localhost:50050",
         "job",
         "",
@@ -993,7 +1000,7 @@ pub async fn test_coalesce_plan(partition: usize) -> 
ExecutionGraph {
 }
 
 /// Creates a test execution graph with a join operation.
-pub async fn test_join_plan(partition: usize) -> ExecutionGraph {
+pub async fn test_join_plan(partition: usize) -> StaticExecutionGraph {
     let mut config = SessionConfig::new().with_target_partitions(partition);
     config
         .options_mut()
@@ -1039,7 +1046,7 @@ pub async fn test_join_plan(partition: usize) -> 
ExecutionGraph {
         DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
     );
     let mut planner = DefaultDistributedPlanner::new();
-    let graph = ExecutionGraph::new(
+    let graph = StaticExecutionGraph::new(
         "localhost:50050",
         "job",
         "",
@@ -1057,7 +1064,7 @@ pub async fn test_join_plan(partition: usize) -> 
ExecutionGraph {
 }
 
 /// Creates a test execution graph with a UNION ALL operation.
-pub async fn test_union_all_plan(partition: usize) -> ExecutionGraph {
+pub async fn test_union_all_plan(partition: usize) -> StaticExecutionGraph {
     let config = SessionConfig::new().with_target_partitions(partition);
     let ctx = Arc::new(SessionContext::new_with_config(config));
     let session_state = ctx.state();
@@ -1081,7 +1088,7 @@ pub async fn test_union_all_plan(partition: usize) -> 
ExecutionGraph {
         DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
     );
     let mut planner = DefaultDistributedPlanner::new();
-    let graph = ExecutionGraph::new(
+    let graph = StaticExecutionGraph::new(
         "localhost:50050",
         "job",
         "",
@@ -1099,7 +1106,7 @@ pub async fn test_union_all_plan(partition: usize) -> 
ExecutionGraph {
 }
 
 /// Creates a test execution graph with a UNION (distinct) operation.
-pub async fn test_union_plan(partition: usize) -> ExecutionGraph {
+pub async fn test_union_plan(partition: usize) -> StaticExecutionGraph {
     let config = SessionConfig::new().with_target_partitions(partition);
     let ctx = Arc::new(SessionContext::new_with_config(config));
     let session_state = ctx.state();
@@ -1123,7 +1130,7 @@ pub async fn test_union_plan(partition: usize) -> 
ExecutionGraph {
         DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
     );
     let mut planner = DefaultDistributedPlanner::new();
-    let graph = ExecutionGraph::new(
+    let graph = StaticExecutionGraph::new(
         "localhost:50050",
         "job",
         "",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to