This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new b0c792d2 Introduce CuratorTaskManager for make an active job be 
curated by only one scheduler (#153)
b0c792d2 is described below

commit b0c792d2ba8a1f76db21c974e02ffd08784829a1
Author: yahoNanJing <[email protected]>
AuthorDate: Sun Aug 28 01:00:01 2022 +0800

    Introduce CuratorTaskManager for make an active job be curated by only one 
scheduler (#153)
    
    * Add scheduler name for scheduler identification
    
    * Introduce scheduler id for execution graph as its curator
    
    * Introduce state machine for the execution stage
    
    * Introduce stage change event to execution graph
    
    * Introduce cache for active execution graphs in the curator scheduler
    
    * Fix PR review
    
    Co-authored-by: yangzhong <[email protected]>
---
 ballista/rust/core/proto/ballista.proto            |   72 +-
 ballista/rust/scheduler/scheduler_config_spec.toml |    6 +
 ballista/rust/scheduler/src/display.rs             |   32 +
 ballista/rust/scheduler/src/main.rs                |   11 +-
 .../rust/scheduler/src/scheduler_server/event.rs   |    6 +-
 .../scheduler/src/scheduler_server/event_loop.rs   |    9 +-
 .../rust/scheduler/src/scheduler_server/grpc.rs    |   17 +-
 .../rust/scheduler/src/scheduler_server/mod.rs     |  121 +-
 .../src/scheduler_server/query_stage_scheduler.rs  |   22 +-
 ballista/rust/scheduler/src/standalone.rs          |    2 +-
 .../rust/scheduler/src/state/execution_graph.rs    | 1198 +++++++++-----------
 .../src/state/execution_graph/execution_stage.rs   |  928 +++++++++++++++
 ballista/rust/scheduler/src/state/mod.rs           |   21 +-
 ballista/rust/scheduler/src/state/task_manager.rs  |  391 +++----
 14 files changed, 1832 insertions(+), 1004 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index f0c0a3aa..f3b703c6 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -408,37 +408,71 @@ enum JoinSide{
 
///////////////////////////////////////////////////////////////////////////////////////////////////
 // Ballista Scheduling
 
///////////////////////////////////////////////////////////////////////////////////////////////////
-message TaskInputPartitions {
-  uint32 partition = 1;
-  repeated PartitionLocation partition_location = 2;
+message ExecutionGraph {
+  string job_id = 1;
+  string session_id = 2;
+  JobStatus status = 3;
+  repeated ExecutionGraphStage stages = 4;
+  uint64 output_partitions = 5;
+  repeated PartitionLocation output_locations = 6;
+  string scheduler_id = 7;
 }
 
-message GraphStageInput {
-  uint32 stage_id = 1;
-  repeated TaskInputPartitions partition_locations = 2;
-  bool complete = 3;
+message ExecutionGraphStage {
+  oneof StageType {
+      UnResolvedStage unresolved_stage = 1;
+      ResolvedStage resolved_stage = 2;
+      CompletedStage completed_stage = 3;
+      FailedStage failed_stage = 4;
+  }
 }
 
+message UnResolvedStage {
+  uint64 stage_id = 1;
+  PhysicalHashRepartition output_partitioning = 2;
+  repeated uint32 output_links = 3;
+  repeated  GraphStageInput inputs = 4;
+  bytes plan = 5;
+}
 
-message ExecutionGraphStage {
+message ResolvedStage {
   uint64 stage_id = 1;
   uint32 partitions = 2;
   PhysicalHashRepartition output_partitioning = 3;
-  repeated  GraphStageInput inputs = 4;
+  repeated uint32 output_links = 4;
+  bytes plan = 5;
+}
+
+message CompletedStage {
+  uint64 stage_id = 1;
+  uint32 partitions = 2;
+  PhysicalHashRepartition output_partitioning = 3;
+  repeated uint32 output_links = 4;
   bytes plan = 5;
   repeated TaskStatus task_statuses = 6;
-  repeated uint32 output_links = 7;
-  bool resolved = 8;
-  repeated OperatorMetricsSet stage_metrics = 9;
+  repeated OperatorMetricsSet stage_metrics = 7;
 }
 
-message ExecutionGraph {
-  string job_id = 1;
-  string session_id = 2;
-  JobStatus status = 3;
-  repeated ExecutionGraphStage stages = 4;
-  uint64 output_partitions = 5;
-  repeated PartitionLocation output_locations = 6;
+message FailedStage {
+  uint64 stage_id = 1;
+  uint32 partitions = 2;
+  PhysicalHashRepartition output_partitioning = 3;
+  repeated uint32 output_links = 4;
+  bytes plan = 5;
+  repeated TaskStatus task_statuses = 6;
+  repeated OperatorMetricsSet stage_metrics = 7;
+  string error_message = 8;
+}
+
+message GraphStageInput {
+  uint32 stage_id = 1;
+  repeated TaskInputPartitions partition_locations = 2;
+  bool complete = 3;
+}
+
+message TaskInputPartitions {
+  uint32 partition = 1;
+  repeated PartitionLocation partition_location = 2;
 }
 
 message KeyValuePair {
diff --git a/ballista/rust/scheduler/scheduler_config_spec.toml 
b/ballista/rust/scheduler/scheduler_config_spec.toml
index c3be0f9d..1f89562d 100644
--- a/ballista/rust/scheduler/scheduler_config_spec.toml
+++ b/ballista/rust/scheduler/scheduler_config_spec.toml
@@ -52,6 +52,12 @@ type = "String"
 default = "std::string::String::from(\"0.0.0.0\")"
 doc = "Local host name or IP address to bind to. Default: 0.0.0.0"
 
+[[param]]
+name = "external_host"
+type = "String"
+doc = "Host name or IP address so that executors can connect to this 
scheduler. Default: localhost"
+default = "std::string::String::from(\"localhost\")"
+
 [[param]]
 abbr = "p"
 name = "bind_port"
diff --git a/ballista/rust/scheduler/src/display.rs 
b/ballista/rust/scheduler/src/display.rs
index e4557a6b..23753889 100644
--- a/ballista/rust/scheduler/src/display.rs
+++ b/ballista/rust/scheduler/src/display.rs
@@ -19,13 +19,45 @@
 //! [`crate::physical_plan::displayable`] for examples of how to
 //! format
 
+use ballista_core::utils::collect_plan_metrics;
 use datafusion::logical_plan::{StringifiedPlan, ToStringifiedPlan};
 use datafusion::physical_plan::metrics::MetricsSet;
 use datafusion::physical_plan::{
     accept, DisplayFormatType, ExecutionPlan, ExecutionPlanVisitor,
 };
+use log::{error, info};
 use std::fmt;
 
+pub fn print_stage_metrics(
+    job_id: &str,
+    stage_id: usize,
+    plan: &dyn ExecutionPlan,
+    stage_metrics: &[MetricsSet],
+) {
+    // The plan_metrics collected here is a snapshot clone from the plan 
metrics.
+    // They are all empty now and need to combine with the stage metrics in 
the ExecutionStages
+    let mut plan_metrics = collect_plan_metrics(plan);
+    if plan_metrics.len() == stage_metrics.len() {
+        plan_metrics.iter_mut().zip(stage_metrics).for_each(
+            |(plan_metric, stage_metric)| {
+                stage_metric
+                    .iter()
+                    .for_each(|s| plan_metric.push(s.clone()));
+            },
+        );
+
+        info!(
+            "=== [{}/{}] Stage finished, physical plan with metrics ===\n{}\n",
+            job_id,
+            stage_id,
+            DisplayableBallistaExecutionPlan::new(plan, &plan_metrics).indent()
+        );
+    } else {
+        error!("Fail to combine stage metrics to plan for stage [{}/{}],  plan 
metrics array size {} does not equal
+                to the stage metrics array size {}", job_id, stage_id, 
plan_metrics.len(), stage_metrics.len());
+    }
+}
+
 /// Wraps an `ExecutionPlan` to display this plan with metrics 
collected/aggregated.
 /// The metrics must be collected in the same order as how we visit and 
display the plan.
 pub struct DisplayableBallistaExecutionPlan<'a> {
diff --git a/ballista/rust/scheduler/src/main.rs 
b/ballista/rust/scheduler/src/main.rs
index 5a7a9db1..d393e4eb 100644
--- a/ballista/rust/scheduler/src/main.rs
+++ b/ballista/rust/scheduler/src/main.rs
@@ -65,8 +65,8 @@ use config::prelude::*;
 use datafusion::execution::context::default_session_builder;
 
 async fn start_server(
+    scheduler_name: String,
     config_backend: Arc<dyn StateBackendClient>,
-    namespace: String,
     addr: SocketAddr,
     policy: TaskSchedulingPolicy,
 ) -> Result<()> {
@@ -82,15 +82,15 @@ async fn start_server(
     let mut scheduler_server: SchedulerServer<LogicalPlanNode, 
PhysicalPlanNode> =
         match policy {
             TaskSchedulingPolicy::PushStaged => 
SchedulerServer::new_with_policy(
+                scheduler_name,
                 config_backend.clone(),
-                namespace.clone(),
                 policy,
                 BallistaCodec::default(),
                 default_session_builder,
             ),
             _ => SchedulerServer::new(
+                scheduler_name,
                 config_backend.clone(),
-                namespace.clone(),
                 BallistaCodec::default(),
             ),
         };
@@ -158,12 +158,13 @@ async fn main() -> Result<()> {
 
     let special_mod_log_level = opt.log_level_setting;
     let namespace = opt.namespace;
+    let external_host = opt.external_host;
     let bind_host = opt.bind_host;
     let port = opt.bind_port;
     let log_dir = opt.log_dir;
     let print_thread_info = opt.print_thread_info;
 
-    let scheduler_name = format!("scheduler_{}_{}_{}", namespace, bind_host, 
port);
+    let scheduler_name = format!("scheduler_{}_{}_{}", namespace, 
external_host, port);
     let log_file = tracing_appender::rolling::daily(log_dir, &scheduler_name);
 
     tracing_subscriber::fmt()
@@ -219,6 +220,6 @@ async fn main() -> Result<()> {
     };
 
     let policy: TaskSchedulingPolicy = opt.scheduler_policy;
-    start_server(client, namespace, addr, policy).await?;
+    start_server(scheduler_name, client, addr, policy).await?;
     Ok(())
 }
diff --git a/ballista/rust/scheduler/src/scheduler_server/event.rs 
b/ballista/rust/scheduler/src/scheduler_server/event.rs
index bcfa3e3a..0e43bfe2 100644
--- a/ballista/rust/scheduler/src/scheduler_server/event.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/event.rs
@@ -36,6 +36,10 @@ pub enum QueryStageSchedulerEvent {
         plan: Box<LogicalPlan>,
     },
     JobSubmitted(String),
+    // For a job which failed during planning
+    JobPlanningFailed(String, String),
     JobFinished(String),
-    JobFailed(String, String),
+    // For a job fails with its execution graph setting failed
+    JobRunningFailed(String),
+    JobUpdated(String),
 }
diff --git a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs 
b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
index b8e8d9fc..d60d145e 100644
--- a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
@@ -189,9 +189,8 @@ mod test {
     async fn test_offer_free_reservations() -> Result<()> {
         let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
         let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
-            Arc::new(SchedulerState::new(
+            Arc::new(SchedulerState::new_with_default_scheduler_name(
                 state_storage,
-                "default".to_string(),
                 default_session_builder,
                 BallistaCodec::default(),
             ));
@@ -227,9 +226,8 @@ mod test {
             .build()?;
         let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
         let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
-            Arc::new(SchedulerState::new(
+            Arc::new(SchedulerState::new_with_default_scheduler_name(
                 state_storage,
-                "default".to_string(),
                 default_session_builder,
                 BallistaCodec::default(),
             ));
@@ -287,9 +285,8 @@ mod test {
             .build()?;
         let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
         let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
-            Arc::new(SchedulerState::new(
+            Arc::new(SchedulerState::new_with_default_scheduler_name(
                 state_storage,
-                "default".to_string(),
                 default_session_builder,
                 BallistaCodec::default(),
             ));
diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs 
b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
index af788dc4..80a9defc 100644
--- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
@@ -391,7 +391,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
                     .and_then(|m| {
                         m.try_into_logical_plan(
                             session_ctx.deref(),
-                            self.codec.logical_extension_codec(),
+                            self.state.codec.logical_extension_codec(),
                         )
                     })
                     .map_err(|e| {
@@ -535,16 +535,15 @@ mod test {
     #[tokio::test]
     async fn test_poll_work() -> Result<(), BallistaError> {
         let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
-        let namespace = "default";
         let scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerServer::new(
+                "localhost:50050".to_owned(),
                 state_storage.clone(),
-                namespace.to_owned(),
                 BallistaCodec::default(),
             );
         let exec_meta = ExecutorRegistration {
             id: "abc".to_owned(),
-            optional_host: 
Some(OptionalHost::Host("http://host:8080".to_owned())),
+            optional_host: 
Some(OptionalHost::Host("http://localhost:8080".to_owned())),
             port: 0,
             grpc_port: 0,
             specification: Some(ExecutorSpecification { task_slots: 2 
}.into()),
@@ -562,9 +561,8 @@ mod test {
         // no response task since we told the scheduler we didn't want to 
accept one
         assert!(response.task.is_none());
         let state: SchedulerState<LogicalPlanNode, PhysicalPlanNode> =
-            SchedulerState::new(
+            SchedulerState::new_with_default_scheduler_name(
                 state_storage.clone(),
-                namespace.to_string(),
                 default_session_builder,
                 BallistaCodec::default(),
             );
@@ -580,7 +578,7 @@ mod test {
         assert_eq!(stored_executor.grpc_port, 0);
         assert_eq!(stored_executor.port, 0);
         assert_eq!(stored_executor.specification.task_slots, 2);
-        assert_eq!(stored_executor.host, "http://host:8080".to_owned());
+        assert_eq!(stored_executor.host, "http://localhost:8080".to_owned());
 
         let request: Request<PollWorkParams> = Request::new(PollWorkParams {
             metadata: Some(exec_meta.clone()),
@@ -596,9 +594,8 @@ mod test {
         // still no response task since there are no tasks in the scheduler
         assert!(response.task.is_none());
         let state: SchedulerState<LogicalPlanNode, PhysicalPlanNode> =
-            SchedulerState::new(
+            SchedulerState::new_with_default_scheduler_name(
                 state_storage.clone(),
-                namespace.to_string(),
                 default_session_builder,
                 BallistaCodec::default(),
             );
@@ -614,7 +611,7 @@ mod test {
         assert_eq!(stored_executor.grpc_port, 0);
         assert_eq!(stored_executor.port, 0);
         assert_eq!(stored_executor.specification.task_slots, 2);
-        assert_eq!(stored_executor.host, "http://host:8080".to_owned());
+        assert_eq!(stored_executor.host, "http://localhost:8080".to_owned());
 
         Ok(())
     }
diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs 
b/ballista/rust/scheduler/src/scheduler_server/mod.rs
index a672c597..df376c4c 100644
--- a/ballista/rust/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs
@@ -53,23 +53,23 @@ pub(crate) type SessionBuilder = fn(SessionConfig) -> 
SessionState;
 
 #[derive(Clone)]
 pub struct SchedulerServer<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> {
+    pub scheduler_name: String,
     pub(crate) state: Arc<SchedulerState<T, U>>,
     pub start_time: u128,
     policy: TaskSchedulingPolicy,
     event_loop: Option<EventLoop<SchedulerServerEvent>>,
     pub(crate) query_stage_event_loop: EventLoop<QueryStageSchedulerEvent>,
-    codec: BallistaCodec<T, U>,
 }
 
 impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> 
SchedulerServer<T, U> {
     pub fn new(
+        scheduler_name: String,
         config: Arc<dyn StateBackendClient>,
-        namespace: String,
         codec: BallistaCodec<T, U>,
     ) -> Self {
         SchedulerServer::new_with_policy(
+            scheduler_name,
             config,
-            namespace,
             TaskSchedulingPolicy::PullStaged,
             codec,
             default_session_builder,
@@ -77,14 +77,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
     }
 
     pub fn new_with_builder(
+        scheduler_name: String,
         config: Arc<dyn StateBackendClient>,
-        namespace: String,
         codec: BallistaCodec<T, U>,
         session_builder: SessionBuilder,
     ) -> Self {
         SchedulerServer::new_with_policy(
+            scheduler_name,
             config,
-            namespace,
             TaskSchedulingPolicy::PullStaged,
             codec,
             session_builder,
@@ -92,32 +92,47 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
     }
 
     pub fn new_with_policy(
+        scheduler_name: String,
         config: Arc<dyn StateBackendClient>,
-        namespace: String,
         policy: TaskSchedulingPolicy,
         codec: BallistaCodec<T, U>,
         session_builder: SessionBuilder,
     ) -> Self {
         let state = Arc::new(SchedulerState::new(
             config,
-            namespace,
             session_builder,
-            codec.clone(),
+            codec,
+            scheduler_name.clone(),
         ));
 
-        let event_loop = if matches!(policy, TaskSchedulingPolicy::PushStaged) 
{
-            let event_action: Arc<SchedulerServerEventAction<T, U>> =
-                Arc::new(SchedulerServerEventAction::new(state.clone()));
-            let event_loop = EventLoop::new("scheduler".to_owned(), 10000, 
event_action);
-            Some(event_loop)
+        let event_action: Option<Arc<dyn EventAction<SchedulerServerEvent>>> =
+            if matches!(policy, TaskSchedulingPolicy::PushStaged) {
+                Some(Arc::new(SchedulerServerEventAction::new(state.clone())))
+            } else {
+                None
+            };
+        SchedulerServer::new_with_event_action(scheduler_name, state, 
event_action)
+    }
+
+    fn new_with_event_action(
+        scheduler_name: String,
+        state: Arc<SchedulerState<T, U>>,
+        event_action: Option<Arc<dyn EventAction<SchedulerServerEvent>>>,
+    ) -> Self {
+        let event_loop = event_action.map(|event_action| {
+            EventLoop::new("scheduler".to_owned(), 10000, event_action)
+        });
+        let policy = if event_loop.is_some() {
+            TaskSchedulingPolicy::PushStaged
         } else {
-            None
+            TaskSchedulingPolicy::PullStaged
         };
         let query_stage_scheduler =
             Arc::new(QueryStageScheduler::new(state.clone(), None));
         let query_stage_event_loop =
             EventLoop::new("query_stage".to_owned(), 10000, 
query_stage_scheduler);
         Self {
+            scheduler_name,
             state,
             start_time: SystemTime::now()
                 .duration_since(UNIX_EPOCH)
@@ -126,40 +141,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
             policy,
             event_loop,
             query_stage_event_loop,
-            codec,
-            // session_builder,
-        }
-    }
-
-    pub fn new_with_event_action(
-        config: Arc<dyn StateBackendClient>,
-        namespace: String,
-        codec: BallistaCodec<T, U>,
-        session_builder: SessionBuilder,
-        event_action: Arc<dyn EventAction<SchedulerServerEvent>>,
-    ) -> Self {
-        let state = Arc::new(SchedulerState::new(
-            config,
-            namespace,
-            session_builder,
-            codec.clone(),
-        ));
-
-        let event_loop = EventLoop::new("scheduler".to_owned(), 10000, 
event_action);
-        let query_stage_scheduler =
-            Arc::new(QueryStageScheduler::new(state.clone(), None));
-        let query_stage_event_loop =
-            EventLoop::new("query_stage".to_owned(), 10000, 
query_stage_scheduler);
-        Self {
-            state,
-            start_time: SystemTime::now()
-                .duration_since(UNIX_EPOCH)
-                .unwrap()
-                .as_millis(),
-            policy: TaskSchedulingPolicy::PushStaged,
-            event_loop: Some(event_loop),
-            query_stage_event_loop,
-            codec,
         }
     }
 
@@ -287,6 +268,7 @@ mod test {
     use crate::state::backend::standalone::StandaloneClient;
 
     use crate::state::executor_manager::ExecutorReservation;
+    use crate::state::SchedulerState;
     use crate::test_utils::{
         await_condition, ExplodingTableProvider, SchedulerEventObserver,
     };
@@ -345,15 +327,18 @@ mod test {
             .await
             .expect("submitting plan");
 
-        loop {
-            // Refresh the ExecutionGraph
-            let mut graph = scheduler
-                .state
-                .task_manager
-                .get_execution_graph(job_id)
-                .await?;
-
-            if let Some(task) = graph.pop_next_task("executor-1")? {
+        // Refresh the ExecutionGraph
+        while let Some(graph) = scheduler
+            .state
+            .task_manager
+            .get_active_execution_graph(job_id)
+            .await
+        {
+            let task = {
+                let mut graph = graph.write().await;
+                graph.pop_next_task("executor-1")?
+            };
+            if let Some(task) = task {
                 let mut partitions: Vec<ShuffleWritePartition> = vec![];
 
                 let num_partitions = task
@@ -396,9 +381,11 @@ mod test {
         let final_graph = scheduler
             .state
             .task_manager
-            .get_execution_graph(job_id)
-            .await?;
+            .get_active_execution_graph(job_id)
+            .await
+            .expect("Fail to find graph in the cache");
 
+        let final_graph = final_graph.read().await;
         assert!(final_graph.complete());
         assert_eq!(final_graph.output_locations().len(), 4);
 
@@ -756,8 +743,8 @@ mod test {
         let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
         let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerServer::new_with_policy(
+                "localhost:50050".to_owned(),
                 state_storage.clone(),
-                "default".to_owned(),
                 policy,
                 BallistaCodec::default(),
                 default_session_builder,
@@ -771,14 +758,16 @@ mod test {
         event_action: Arc<dyn EventAction<SchedulerServerEvent>>,
     ) -> Result<SchedulerServer<LogicalPlanNode, PhysicalPlanNode>> {
         let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
-
+        let state = Arc::new(SchedulerState::new_with_default_scheduler_name(
+            state_storage,
+            default_session_builder,
+            BallistaCodec::default(),
+        ));
         let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerServer::new_with_event_action(
-                state_storage.clone(),
-                "default".to_owned(),
-                BallistaCodec::default(),
-                default_session_builder,
-                event_action,
+                "localhost:50050".to_owned(),
+                state,
+                Some(event_action),
             );
         scheduler.init().await?;
 
diff --git 
a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs 
b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 8ae059f8..1c403ea6 100644
--- a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -88,8 +88,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
                             .await
                     {
                         let msg = format!("Error planning job {}: {:?}", 
job_id, e);
-                        error!("{}", msg);
-                        QueryStageSchedulerEvent::JobFailed(job_id, msg)
+                        error!("{}", &msg);
+                        QueryStageSchedulerEvent::JobPlanningFailed(job_id, 
msg)
                     } else {
                         QueryStageSchedulerEvent::JobSubmitted(job_id)
                     };
@@ -138,17 +138,25 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                     }
                 }
             }
-            QueryStageSchedulerEvent::JobFinished(job_id) => {
-                info!("Job {} complete", job_id);
-                self.state.task_manager.complete_job(&job_id).await?;
-            }
-            QueryStageSchedulerEvent::JobFailed(job_id, fail_message) => {
+            QueryStageSchedulerEvent::JobPlanningFailed(job_id, fail_message) 
=> {
                 error!("Job {} failed: {}", job_id, fail_message);
                 self.state
                     .task_manager
                     .fail_job(&job_id, fail_message)
                     .await?;
             }
+            QueryStageSchedulerEvent::JobFinished(job_id) => {
+                info!("Job {} complete", job_id);
+                self.state.task_manager.complete_job(&job_id).await?;
+            }
+            QueryStageSchedulerEvent::JobRunningFailed(job_id) => {
+                error!("Job {} running failed", job_id);
+                self.state.task_manager.fail_running_job(&job_id).await?;
+            }
+            QueryStageSchedulerEvent::JobUpdated(job_id) => {
+                error!("Job {} Updated", job_id);
+                self.state.task_manager.update_job(&job_id).await?;
+            }
         }
 
         Ok(())
diff --git a/ballista/rust/scheduler/src/standalone.rs 
b/ballista/rust/scheduler/src/standalone.rs
index 0de81b85..4abc70a4 100644
--- a/ballista/rust/scheduler/src/standalone.rs
+++ b/ballista/rust/scheduler/src/standalone.rs
@@ -35,8 +35,8 @@ pub async fn new_standalone_scheduler() -> Result<SocketAddr> 
{
 
     let mut scheduler_server: SchedulerServer<LogicalPlanNode, 
PhysicalPlanNode> =
         SchedulerServer::new(
+            "localhost:50050".to_owned(),
             Arc::new(client),
-            "ballista".to_string(),
             BallistaCodec::default(),
         );
     scheduler_server.init().await?;
diff --git a/ballista/rust/scheduler/src/state/execution_graph.rs 
b/ballista/rust/scheduler/src/state/execution_graph.rs
index b831ffe9..596272d6 100644
--- a/ballista/rust/scheduler/src/state/execution_graph.rs
+++ b/ballista/rust/scheduler/src/state/execution_graph.rs
@@ -15,61 +15,40 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::display::DisplayableBallistaExecutionPlan;
-use crate::planner::DistributedPlanner;
+use std::collections::HashMap;
+use std::convert::TryInto;
+use std::fmt::{Debug, Formatter};
+use std::sync::Arc;
+
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use datafusion::physical_plan::{
+    accept, ExecutionPlan, ExecutionPlanVisitor, Partitioning,
+};
+use datafusion::prelude::SessionContext;
+use datafusion_proto::logical_plan::AsLogicalPlan;
+use log::{error, info, warn};
+
 use ballista_core::error::{BallistaError, Result};
 use ballista_core::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec};
-
 use ballista_core::serde::protobuf::{
-    self, CompletedJob, JobStatus, OperatorMetricsSet, QueuedJob, TaskStatus,
+    self, execution_graph_stage::StageType, CompletedJob, JobStatus, QueuedJob,
+    TaskStatus,
 };
 use ballista_core::serde::protobuf::{job_status, FailedJob, 
ShuffleWritePartition};
 use ballista_core::serde::protobuf::{task_status, RunningTask};
 use ballista_core::serde::scheduler::{
     ExecutorMetadata, PartitionId, PartitionLocation, PartitionStats,
 };
-use datafusion::physical_plan::{
-    accept, ExecutionPlan, ExecutionPlanVisitor, Metric, Partitioning,
-};
-use log::{debug, info};
-use std::collections::HashMap;
-use std::convert::TryInto;
-use std::fmt::{Debug, Formatter};
-
-use 
ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
-use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
 use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
-use ballista_core::utils::collect_plan_metrics;
-use datafusion::physical_plan::display::DisplayableExecutionPlan;
-use datafusion::physical_plan::metrics::{MetricValue, MetricsSet};
-use datafusion::prelude::SessionContext;
-use datafusion_proto::logical_plan::AsLogicalPlan;
-use std::sync::Arc;
 
-/// Represents the basic unit of work for the Ballista executor. Will execute
-/// one partition of one stage on one task slot.
-#[derive(Clone)]
-pub struct Task {
-    pub session_id: String,
-    pub partition: PartitionId,
-    pub plan: Arc<dyn ExecutionPlan>,
-    pub output_partitioning: Option<Partitioning>,
-}
+use crate::display::print_stage_metrics;
+use crate::planner::DistributedPlanner;
+use crate::scheduler_server::event::QueryStageSchedulerEvent;
+use crate::state::execution_graph::execution_stage::{
+    CompletedStage, ExecutionStage, FailedStage, ResolvedStage, 
UnresolvedStage,
+};
 
-impl Debug for Task {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
-        write!(
-            f,
-            "Task[session_id: {}, job: {}, stage: {}, partition: {}]\n{}",
-            self.session_id,
-            self.partition.job_id,
-            self.partition.stage_id,
-            self.partition.partition_id,
-            plan
-        )
-    }
-}
+mod execution_stage;
 
 /// Represents the DAG for a distributed query plan.
 ///
@@ -96,17 +75,13 @@ impl Debug for Task {
 ///
 ///
 /// ExecutionGraph[job_id=job, session_id=session, available_tasks=1, 
complete=false]
-/// Stage[id=2, partitions=4, children=1, completed_tasks=0, resolved=false, 
scheduled_tasks=0, available_tasks=0]
+/// =========UnResolvedStage[id=2, children=1]=========
 /// Inputs{1: StageOutput { partition_locations: {}, complete: false }}
-///
 /// ShuffleWriterExec: None
 ///   AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], 
aggr=[SUM(?table?.gmv)]
 ///     CoalesceBatchesExec: target_batch_size=4096
 ///       UnresolvedShuffleExec
-///
-/// Stage[id=1, partitions=1, children=0, completed_tasks=0, resolved=true, 
scheduled_tasks=0, available_tasks=1]
-/// Inputs{}
-///
+/// =========ResolvedStage[id=1, partitions=1]=========
 /// ShuffleWriterExec: Some(Hash([Column { name: "id", index: 0 }], 4))
 ///   AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[SUM(?table?.gmv)]
 ///     TableScan: some_table
@@ -120,6 +95,8 @@ impl Debug for Task {
 /// publish its outputs to the `ExecutionGraph`s `output_locations` 
representing the final query results.
 #[derive(Clone)]
 pub struct ExecutionGraph {
+    /// Curator scheduler name
+    scheduler_id: String,
     /// ID for this job
     job_id: String,
     /// Session ID for this job
@@ -136,6 +113,7 @@ pub struct ExecutionGraph {
 
 impl ExecutionGraph {
     pub fn new(
+        scheduler_id: &str,
         job_id: &str,
         session_id: &str,
         plan: Arc<dyn ExecutionPlan>,
@@ -150,6 +128,7 @@ impl ExecutionGraph {
         let stages = builder.build(shuffle_stages)?;
 
         Ok(Self {
+            scheduler_id: scheduler_id.to_string(),
             job_id: job_id.to_string(),
             session_id: session_id.to_string(),
             status: JobStatus {
@@ -175,7 +154,37 @@ impl ExecutionGraph {
 
     /// An ExecutionGraph is complete if all its stages are complete
     pub fn complete(&self) -> bool {
-        self.stages.values().all(|s| s.complete())
+        self.stages
+            .values()
+            .all(|s| matches!(s, ExecutionStage::Completed(_)))
+    }
+
+    /// Revive the execution graph by converting the resolved stages to 
running stages
+    /// If any stages are converted, return true; else false.
+    pub fn revive(&mut self) -> bool {
+        let running_stages = self
+            .stages
+            .values()
+            .filter_map(|stage| {
+                if let ExecutionStage::Resolved(resolved_stage) = stage {
+                    Some(resolved_stage.to_running())
+                } else {
+                    None
+                }
+            })
+            .collect::<Vec<_>>();
+
+        if running_stages.is_empty() {
+            false
+        } else {
+            for running_stage in running_stages {
+                self.stages.insert(
+                    running_stage.stage_id,
+                    ExecutionStage::Running(running_stage),
+                );
+            }
+            true
+        }
     }
 
     /// Update task statuses and task metrics in the graph.
@@ -183,138 +192,175 @@ impl ExecutionGraph {
     pub fn update_task_status(
         &mut self,
         executor: &ExecutorMetadata,
-        statuses: Vec<TaskStatus>,
-    ) -> Result<()> {
-        for status in statuses.into_iter() {
-            if let TaskStatus {
-                task_id:
-                    Some(protobuf::PartitionId {
-                        job_id,
-                        stage_id,
-                        partition_id,
-                    }),
-                metrics: operator_metrics,
-                status: Some(task_status),
-            } = status
-            {
-                if job_id != self.job_id() {
+        task_statuses: Vec<TaskStatus>,
+    ) -> Result<Option<QueryStageSchedulerEvent>> {
+        let job_id = self.job_id().to_owned();
+        // First of all, classify the statuses by stages
+        let mut job_task_statuses: HashMap<usize, Vec<TaskStatus>> = 
HashMap::new();
+        for task_status in task_statuses {
+            if let Some(task_id) = task_status.task_id.as_ref() {
+                if task_id.job_id != job_id {
                     return Err(BallistaError::Internal(format!(
                         "Error updating job {}: Invalid task status job ID {}",
-                        self.job_id(),
-                        job_id
+                        job_id, task_id.job_id
                     )));
                 }
+                let stage_task_statuses = job_task_statuses
+                    .entry(task_id.stage_id as usize)
+                    .or_insert_with(Vec::new);
+                stage_task_statuses.push(task_status);
+            } else {
+                error!("There's no task id when updating status");
+            }
+        }
 
-                let stage_id = stage_id as usize;
-                let partition = partition_id as usize;
-                if let Some(stage) = self.stages.get_mut(&stage_id) {
-                    stage.update_task_status(partition, task_status.clone());
-                    let stage_plan = stage.plan.clone();
-                    let stage_complete = stage.complete();
-
-                    // TODO Should be able to reschedule this task.
-                    if let task_status::Status::Failed(failed_task) = 
task_status {
-                        self.status = JobStatus {
-                            status: Some(job_status::Status::Failed(FailedJob {
-                                error: format!(
-                                    "Task {}/{}/{} failed: {}",
-                                    job_id, stage_id, partition_id, 
failed_task.error
-                                ),
-                            })),
-                        };
-                        return Ok(());
-                    } else if let 
task_status::Status::Completed(completed_task) =
-                        task_status
-                    {
-                        // update task metrics for completed task
-                        stage.update_task_metrics(partition, 
operator_metrics)?;
-
-                        // if this stage is completed, we want to combine the 
stage metrics to plan's metric set and print out the plan
-                        if stage_complete && 
stage.stage_metrics.as_ref().is_some() {
-                            // The plan_metrics collected here is a snapshot 
clone from the plan metrics.
-                            // They are all empty now and need to combine with 
the stage metrics in the ExecutionStages
-                            let mut plan_metrics =
-                                collect_plan_metrics(stage_plan.as_ref());
-                            let stage_metrics = stage
-                                .stage_metrics
-                                .as_ref()
-                                .expect("stage metrics should not be None.");
-                            if plan_metrics.len() != stage_metrics.len() {
-                                return 
Err(BallistaError::Internal(format!("Error combine stage metrics to plan for 
stage {},  plan metrics array size {} does not equal \
-                to the stage metrics array size {}", stage_id, 
plan_metrics.len(), stage_metrics.len())));
+        // Revive before updating due to some updates not saved
+        // It will be refined later
+        self.revive();
+
+        let mut events = vec![];
+        for (stage_id, stage_task_statuses) in job_task_statuses {
+            if let Some(stage) = self.stages.get_mut(&stage_id) {
+                if let ExecutionStage::Running(running_stage) = stage {
+                    let mut locations = vec![];
+                    for task_status in stage_task_statuses.into_iter() {
+                        if let TaskStatus {
+                            task_id:
+                                Some(protobuf::PartitionId {
+                                    job_id,
+                                    stage_id,
+                                    partition_id,
+                                }),
+                            metrics: operator_metrics,
+                            status: Some(status),
+                        } = task_status
+                        {
+                            let stage_id = stage_id as usize;
+                            let partition_id = partition_id as usize;
+
+                            running_stage
+                                .update_task_status(partition_id, 
status.clone());
+
+                            // TODO Should be able to reschedule this task.
+                            if let task_status::Status::Failed(failed_task) = 
status {
+                                events.push(StageEvent::StageFailed(
+                                    stage_id,
+                                    format!(
+                                        "Task {}/{}/{} failed: {}",
+                                        job_id, stage_id, partition_id, 
failed_task.error
+                                    ),
+                                ));
+                                break;
+                            } else if let 
task_status::Status::Completed(completed_task) =
+                                status
+                            {
+                                // update task metrics for completed task
+                                running_stage.update_task_metrics(
+                                    partition_id,
+                                    operator_metrics,
+                                )?;
+
+                                locations.append(&mut partition_to_location(
+                                    &job_id,
+                                    stage_id,
+                                    executor,
+                                    completed_task.partitions,
+                                ));
+                            } else {
+                                warn!("The task {}/{}/{} with status {:?} is 
invalid for updating", job_id, stage_id, partition_id, status);
                             }
-                            
plan_metrics.iter_mut().zip(stage_metrics).for_each(
-                                |(plan_metric, stage_metric)| {
-                                    stage_metric
-                                        .iter()
-                                        .for_each(|s| 
plan_metric.push(s.clone()));
-                                },
-                            );
-
-                            info!(
-                                "=== [{}/{}/{}] Stage finished, physical plan 
with metrics ===\n{}\n",
-                                job_id,
+                        }
+                    }
+                    let is_completed = running_stage.is_completed();
+                    if is_completed {
+                        events.push(StageEvent::StageCompleted(stage_id));
+                        // if this stage is completed, we want to combine the 
stage metrics to plan's metric set and print out the plan
+                        if let Some(stage_metrics) = 
running_stage.stage_metrics.as_ref()
+                        {
+                            print_stage_metrics(
+                                &job_id,
                                 stage_id,
-                                partition,
-                                
DisplayableBallistaExecutionPlan::new(stage_plan.as_ref(), 
plan_metrics.as_ref()).indent()
+                                running_stage.plan.as_ref(),
+                                stage_metrics,
                             );
                         }
-
-                        let locations = partition_to_location(
-                            self.job_id.as_str(),
-                            stage_id,
-                            executor,
-                            completed_task.partitions,
-                        );
-
-                        let output_links = stage.output_links.clone();
-                        if output_links.is_empty() {
-                            // If `output_links` is empty, then this is a 
final stage
-                            self.output_locations.extend(locations);
-                        } else {
-                            for link in output_links.into_iter() {
-                                // If this is an intermediate stage, we need 
to push its `PartitionLocation`s to the parent stage
-                                if let Some(linked_stage) = 
self.stages.get_mut(&link) {
-                                    linked_stage.add_input_partitions(
-                                        stage_id,
-                                        partition,
-                                        locations.clone(),
-                                    )?;
-
-                                    // If all tasks for this stage are 
complete, mark the input complete in the parent stage
-                                    if stage_complete {
-                                        linked_stage.complete_input(stage_id);
-                                    }
-
-                                    // If all input partitions are ready, we 
can resolve any UnresolvedShuffleExec in the parent stage plan
-                                    if linked_stage.resolvable() {
-                                        linked_stage.resolve_shuffles()?;
-                                    }
-                                } else {
-                                    return 
Err(BallistaError::Internal(format!("Error updating job {}: Invalid output link 
{} for stage {}", job_id, stage_id, link)));
-                                }
-                            }
-                        }
                     }
+
+                    let output_links = running_stage.output_links.clone();
+                    events.append(&mut self.update_stage_output_links(
+                        stage_id,
+                        is_completed,
+                        locations,
+                        output_links,
+                    )?);
                 } else {
                     return Err(BallistaError::Internal(format!(
-                        "Invalid stage ID {} for job {}",
+                        "Stage {}/{} is not in running when updating the 
status of tasks {:?}",
+                        job_id,
                         stage_id,
-                        self.job_id()
+                        stage_task_statuses.into_iter().map(|task_status| 
task_status.task_id.map(|task_id| task_id.partition_id)).collect::<Vec<_>>(),
                     )));
                 }
+            } else {
+                return Err(BallistaError::Internal(format!(
+                    "Invalid stage ID {} for job {}",
+                    stage_id, job_id
+                )));
             }
         }
 
-        Ok(())
+        self.processing_stage_events(events)
     }
 
-    /// Total number of tasks in this plan that are ready for scheduling
-    pub fn available_tasks(&self) -> usize {
-        self.stages
-            .iter()
-            .map(|(_, stage)| stage.available_tasks())
-            .sum()
+    fn update_stage_output_links(
+        &mut self,
+        stage_id: usize,
+        is_completed: bool,
+        locations: Vec<PartitionLocation>,
+        output_links: Vec<usize>,
+    ) -> Result<Vec<StageEvent>> {
+        let mut ret = vec![];
+        let job_id = &self.job_id;
+        if output_links.is_empty() {
+            // If `output_links` is empty, then this is a final stage
+            self.output_locations.extend(locations);
+        } else {
+            for link in output_links.iter() {
+                // If this is an intermediate stage, we need to push its 
`PartitionLocation`s to the parent stage
+                if let Some(linked_stage) = self.stages.get_mut(link) {
+                    if let ExecutionStage::UnResolved(linked_unresolved_stage) 
=
+                        linked_stage
+                    {
+                        linked_unresolved_stage
+                            .add_input_partitions(stage_id, 
locations.clone())?;
+
+                        // If all tasks for this stage are complete, mark the 
input complete in the parent stage
+                        if is_completed {
+                            linked_unresolved_stage.complete_input(stage_id);
+                        }
+
+                        // If all input partitions are ready, we can resolve 
any UnresolvedShuffleExec in the parent stage plan
+                        if linked_unresolved_stage.resolvable() {
+                            ret.push(StageEvent::StageResolved(
+                                linked_unresolved_stage.stage_id,
+                            ));
+                        }
+                    } else {
+                        return Err(BallistaError::Internal(format!(
+                            "Error updating job {}: The stage {} as the output 
link of stage {}  should be unresolved",
+                            job_id, link, stage_id
+                        )));
+                    }
+                } else {
+                    return Err(BallistaError::Internal(format!(
+                        "Error updating job {}: Invalid output link {} for 
stage {}",
+                        job_id, stage_id, link
+                    )));
+                }
+            }
+        }
+
+        Ok(ret)
     }
 
     /// Return all currently running tasks along with the executor ID on which 
they are assigned
@@ -322,24 +368,42 @@ impl ExecutionGraph {
         self.stages
             .iter()
             .flat_map(|(_, stage)| {
-                stage
-                    .running_tasks()
-                    .iter()
-                    .map(|(stage_id, partition, executor_id)| {
-                        (
-                            PartitionId {
-                                job_id: self.job_id.clone(),
-                                stage_id: *stage_id,
-                                partition_id: *partition,
-                            },
-                            executor_id.clone(),
-                        )
-                    })
-                    .collect::<Vec<(PartitionId, String)>>()
+                if let ExecutionStage::Running(stage) = stage {
+                    stage
+                        .running_tasks()
+                        .into_iter()
+                        .map(|(stage_id, partition_id, executor_id)| {
+                            (
+                                PartitionId {
+                                    job_id: self.job_id.clone(),
+                                    stage_id,
+                                    partition_id,
+                                },
+                                executor_id,
+                            )
+                        })
+                        .collect::<Vec<(PartitionId, String)>>()
+                } else {
+                    vec![]
+                }
             })
             .collect::<Vec<(PartitionId, String)>>()
     }
 
+    /// Total number of tasks in this plan that are ready for scheduling
+    pub fn available_tasks(&self) -> usize {
+        self.stages
+            .iter()
+            .map(|(_, stage)| {
+                if let ExecutionStage::Running(stage) = stage {
+                    stage.available_tasks()
+                } else {
+                    0
+                }
+            })
+            .sum()
+    }
+
     /// Get next task that can be assigned to the given executor.
     /// This method should only be called when the resulting task is 
immediately
     /// being launched as the status will be set to Running and it will not be
@@ -349,39 +413,183 @@ impl ExecutionGraph {
     pub fn pop_next_task(&mut self, executor_id: &str) -> Result<Option<Task>> 
{
         let job_id = self.job_id.clone();
         let session_id = self.session_id.clone();
-        self.stages.iter_mut().find(|(_stage_id, stage)| {
-            stage.resolved() && stage.available_tasks() > 0
+        let mut next_task = self.stages.iter_mut().find(|(_stage_id, stage)| {
+            if let ExecutionStage::Running(stage) = stage {
+                stage.available_tasks() > 0
+            } else {
+                false
+            }
         }).map(|(stage_id, stage)| {
-            let (partition_id,_) = stage
-                .task_statuses
-                .iter()
-                .enumerate()
-                .find(|(_partition,status)| status.is_none())
-                .ok_or_else(|| {
-                    BallistaError::Internal(format!("Error getting next task 
for job {}: Stage {} is ready but has no pending tasks", job_id, stage_id))
-                })?;
-
-            let partition = PartitionId {
-                job_id,
-                stage_id: *stage_id,
-                partition_id
-            };
+            if let ExecutionStage::Running(stage) = stage {
+                let (partition_id, _) = stage
+                    .task_statuses
+                    .iter()
+                    .enumerate()
+                    .find(|(_partition, status)| status.is_none())
+                    .ok_or_else(|| {
+                        BallistaError::Internal(format!("Error getting next 
task for job {}: Stage {} is ready but has no pending tasks", job_id, stage_id))
+                    })?;
+
+                let partition = PartitionId {
+                    job_id,
+                    stage_id: *stage_id,
+                    partition_id,
+                };
+
+                // Set the status to Running
+                stage.task_statuses[partition_id] = 
Some(task_status::Status::Running(RunningTask {
+                    executor_id: executor_id.to_owned()
+                }));
+
+                Ok(Task {
+                    session_id,
+                    partition,
+                    plan: stage.plan.clone(),
+                    output_partitioning: stage.output_partitioning.clone(),
+                })
+            } else {
+                Err(BallistaError::General(format!("Stage {} is not a running 
stage", stage_id)))
+            }
+        }).transpose()?;
+
+        // If no available tasks found in the running stage,
+        // try to find a resolved stage and convert it to the running stage
+        if next_task.is_none() {
+            if self.revive() {
+                next_task = self.pop_next_task(executor_id)?;
+            } else {
+                next_task = None;
+            }
+        }
 
-            // Set the status to Running
-            stage.task_statuses[partition_id] = 
Some(task_status::Status::Running(RunningTask {
-                executor_id: executor_id.to_owned()
-            }));
+        Ok(next_task)
+    }
 
-            Ok(Task {
-                session_id,
-                partition,
-                plan: stage.plan.clone(),
-                output_partitioning: stage.output_partitioning.clone()
-            })
-        }).transpose()
+    pub fn update_status(&mut self, status: JobStatus) {
+        self.status = status;
+    }
+
+    /// Reset the status for the given task. This should be called is a task 
failed to
+    /// launch and it needs to be returned to the set of available tasks and be
+    /// re-scheduled.
+    pub fn reset_task_status(&mut self, task: Task) {
+        let stage_id = task.partition.stage_id;
+        let partition = task.partition.partition_id;
+
+        if let Some(ExecutionStage::Running(stage)) = 
self.stages.get_mut(&stage_id) {
+            stage.task_statuses[partition] = None;
+        }
+    }
+
+    pub fn output_locations(&self) -> Vec<PartitionLocation> {
+        self.output_locations.clone()
+    }
+
+    /// Processing stage events for stage state changing
+    fn processing_stage_events(
+        &mut self,
+        events: Vec<StageEvent>,
+    ) -> Result<Option<QueryStageSchedulerEvent>> {
+        let mut has_resolved = false;
+        let mut job_err_msg = "".to_owned();
+        for event in events {
+            match event {
+                StageEvent::StageResolved(stage_id) => {
+                    self.resolve_stage(stage_id)?;
+                    has_resolved = true;
+                }
+                StageEvent::StageCompleted(stage_id) => {
+                    self.complete_stage(stage_id);
+                }
+                StageEvent::StageFailed(stage_id, err_msg) => {
+                    job_err_msg = format!("{}{}\n", job_err_msg, &err_msg);
+                    self.fail_stage(stage_id, err_msg);
+                }
+            }
+        }
+
+        let event = if !job_err_msg.is_empty() {
+            // If this ExecutionGraph is complete, fail it
+            info!("Job {} is failed", self.job_id());
+            self.fail_job(job_err_msg);
+
+            Some(QueryStageSchedulerEvent::JobRunningFailed(
+                self.job_id.clone(),
+            ))
+        } else if self.complete() {
+            // If this ExecutionGraph is complete, finalize it
+            info!(
+                "Job {} is complete, finalizing output partitions",
+                self.job_id()
+            );
+            self.complete_job()?;
+            Some(QueryStageSchedulerEvent::JobFinished(self.job_id.clone()))
+        } else if has_resolved {
+            Some(QueryStageSchedulerEvent::JobUpdated(self.job_id.clone()))
+        } else {
+            None
+        };
+
+        Ok(event)
+    }
+
+    /// Convert unresolved stage to be resolved
+    fn resolve_stage(&mut self, stage_id: usize) -> Result<bool> {
+        if let Some(ExecutionStage::UnResolved(stage)) = 
self.stages.remove(&stage_id) {
+            self.stages
+                .insert(stage_id, 
ExecutionStage::Resolved(stage.to_resolved()?));
+            Ok(true)
+        } else {
+            warn!(
+                "Fail to find a unresolved stage {}/{} to resolve",
+                self.job_id(),
+                stage_id
+            );
+            Ok(false)
+        }
+    }
+
+    /// Convert running stage to be completed
+    fn complete_stage(&mut self, stage_id: usize) -> bool {
+        if let Some(ExecutionStage::Running(stage)) = 
self.stages.remove(&stage_id) {
+            self.stages
+                .insert(stage_id, 
ExecutionStage::Completed(stage.to_completed()));
+            true
+        } else {
+            warn!(
+                "Fail to find a running stage {}/{} to complete",
+                self.job_id(),
+                stage_id
+            );
+            false
+        }
+    }
+
+    /// Convert running stage to be failed
+    fn fail_stage(&mut self, stage_id: usize, err_msg: String) -> bool {
+        if let Some(ExecutionStage::Running(stage)) = 
self.stages.remove(&stage_id) {
+            self.stages
+                .insert(stage_id, 
ExecutionStage::Failed(stage.to_failed(err_msg)));
+            true
+        } else {
+            warn!(
+                "Fail to find a running stage {}/{} to fail",
+                self.job_id(),
+                stage_id
+            );
+            false
+        }
+    }
+
+    /// fail job with error message
+    pub fn fail_job(&mut self, error: String) {
+        self.status = JobStatus {
+            status: Some(job_status::Status::Failed(FailedJob { error })),
+        };
     }
 
-    pub fn finalize(&mut self) -> Result<()> {
+    /// finalize job as completed
+    fn complete_job(&mut self) -> Result<()> {
         if !self.complete() {
             return Err(BallistaError::Internal(format!(
                 "Attempt to finalize an incomplete job {}",
@@ -404,26 +612,6 @@ impl ExecutionGraph {
         Ok(())
     }
 
-    pub fn update_status(&mut self, status: JobStatus) {
-        self.status = status;
-    }
-
-    /// Reset the status for the given task. This should be called is a task 
failed to
-    /// launch and it needs to be returned to the set of available tasks and be
-    /// re-scheduled.
-    pub fn reset_task_status(&mut self, task: Task) {
-        let stage_id = task.partition.stage_id;
-        let partition = task.partition.partition_id;
-
-        if let Some(stage) = self.stages.get_mut(&stage_id) {
-            stage.task_statuses[partition] = None;
-        }
-    }
-
-    pub fn output_locations(&self) -> Vec<PartitionLocation> {
-        self.output_locations.clone()
-    }
-
     pub(crate) async fn decode_execution_graph<
         T: 'static + AsLogicalPlan,
         U: 'static + AsExecutionPlan,
@@ -433,87 +621,33 @@ impl ExecutionGraph {
         session_ctx: &SessionContext,
     ) -> Result<ExecutionGraph> {
         let mut stages: HashMap<usize, ExecutionStage> = HashMap::new();
-        for stage in proto.stages {
-            let plan_proto = U::try_decode(stage.plan.as_slice())?;
-            let plan = plan_proto.try_into_physical_plan(
-                session_ctx,
-                session_ctx.runtime_env().as_ref(),
-                codec.physical_extension_codec(),
-            )?;
-
-            let stage_id = stage.stage_id as usize;
-            let partitions: usize = stage.partitions as usize;
-
-            let mut task_statuses: Vec<Option<task_status::Status>> =
-                vec![None; partitions];
-
-            for status in stage.task_statuses {
-                if let Some(task_id) = status.task_id.as_ref() {
-                    task_statuses[task_id.partition_id as usize] = 
status.status
+        for graph_stage in proto.stages {
+            let stage_type = graph_stage.stage_type.expect("Unexpected empty 
stage");
+
+            let execution_stage = match stage_type {
+                StageType::UnresolvedStage(stage) => {
+                    let stage: UnresolvedStage =
+                        UnresolvedStage::decode(stage, codec, session_ctx)?;
+                    (stage.stage_id, ExecutionStage::UnResolved(stage))
+                }
+                StageType::ResolvedStage(stage) => {
+                    let stage: ResolvedStage =
+                        ResolvedStage::decode(stage, codec, session_ctx)?;
+                    (stage.stage_id, ExecutionStage::Resolved(stage))
+                }
+                StageType::CompletedStage(stage) => {
+                    let stage: CompletedStage =
+                        CompletedStage::decode(stage, codec, session_ctx)?;
+                    (stage.stage_id, ExecutionStage::Completed(stage))
+                }
+                StageType::FailedStage(stage) => {
+                    let stage: FailedStage =
+                        FailedStage::decode(stage, codec, session_ctx)?;
+                    (stage.stage_id, ExecutionStage::Failed(stage))
                 }
-            }
-
-            let output_partitioning: Option<Partitioning> =
-                parse_protobuf_hash_partitioning(
-                    stage.output_partitioning.as_ref(),
-                    session_ctx,
-                    plan.schema().as_ref(),
-                )?;
-
-            let mut inputs: HashMap<usize, StageOutput> = HashMap::new();
-
-            for input in stage.inputs {
-                let stage_id = input.stage_id as usize;
-
-                let outputs = input
-                    .partition_locations
-                    .into_iter()
-                    .map(|loc| {
-                        let partition = loc.partition as usize;
-                        let locations = loc
-                            .partition_location
-                            .into_iter()
-                            .map(|l| l.try_into())
-                            .collect::<Result<Vec<_>>>()?;
-                        Ok((partition, locations))
-                    })
-                    .collect::<Result<HashMap<usize, 
Vec<PartitionLocation>>>>()?;
-
-                inputs.insert(
-                    stage_id,
-                    StageOutput {
-                        partition_locations: outputs,
-                        complete: input.complete,
-                    },
-                );
-            }
-            let stage_metrics = if stage.stage_metrics.is_empty() {
-                None
-            } else {
-                let ms = stage
-                    .stage_metrics
-                    .into_iter()
-                    .map(|m| m.try_into())
-                    .collect::<Result<Vec<_>>>()?;
-                Some(ms)
             };
 
-            let execution_stage = ExecutionStage {
-                stage_id: stage.stage_id as usize,
-                partitions,
-                output_partitioning,
-                inputs,
-                plan,
-                task_statuses,
-                output_links: stage
-                    .output_links
-                    .into_iter()
-                    .map(|l| l as usize)
-                    .collect(),
-                resolved: stage.resolved,
-                stage_metrics,
-            };
-            stages.insert(stage_id, execution_stage);
+            stages.insert(execution_stage.0, execution_stage.1);
         }
 
         let output_locations: Vec<PartitionLocation> = proto
@@ -523,6 +657,7 @@ impl ExecutionGraph {
             .collect::<Result<Vec<_>>>()?;
 
         Ok(ExecutionGraph {
+            scheduler_id: proto.scheduler_id,
             job_id: proto.job_id,
             session_id: proto.session_id,
             status: proto.status.ok_or_else(|| {
@@ -536,6 +671,8 @@ impl ExecutionGraph {
         })
     }
 
+    /// Running stages will not be persisted so that will not be encoded.
+    /// Running stages will be convert back to the resolved stages to be 
encoded and persisted
     pub(crate) fn encode_execution_graph<
         T: 'static + AsLogicalPlan,
         U: 'static + AsExecutionPlan,
@@ -547,76 +684,27 @@ impl ExecutionGraph {
 
         let stages = graph
             .stages
-            .into_iter()
-            .map(|(stage_id, stage)| {
-                let mut plan: Vec<u8> = vec![];
-
-                U::try_from_physical_plan(stage.plan, 
codec.physical_extension_codec())
-                    .and_then(|proto| proto.try_encode(&mut plan))?;
-
-                let mut inputs: Vec<protobuf::GraphStageInput> = vec![];
-
-                for (stage, output) in stage.inputs.into_iter() {
-                    inputs.push(protobuf::GraphStageInput {
-                        stage_id: stage as u32,
-                        partition_locations: output
-                            .partition_locations
-                            .into_iter()
-                            .map(|(partition, locations)| {
-                                Ok(protobuf::TaskInputPartitions {
-                                    partition: partition as u32,
-                                    partition_location: locations
-                                        .into_iter()
-                                        .map(|l| l.try_into())
-                                        .collect::<Result<Vec<_>>>()?,
-                                })
-                            })
-                            .collect::<Result<Vec<_>>>()?,
-                        complete: output.complete,
-                    });
-                }
-
-                let task_statuses: Vec<protobuf::TaskStatus> = stage
-                    .task_statuses
-                    .into_iter()
-                    .enumerate()
-                    .filter_map(|(partition, status)| {
-                        status.map(|status| protobuf::TaskStatus {
-                            task_id: Some(protobuf::PartitionId {
-                                job_id: job_id.clone(),
-                                stage_id: stage_id as u32,
-                                partition_id: partition as u32,
-                            }),
-                            // task metrics should not persist.
-                            metrics: vec![],
-                            status: Some(status),
-                        })
-                    })
-                    .collect();
-
-                let output_partitioning =
-                    
hash_partitioning_to_proto(stage.output_partitioning.as_ref())?;
-
-                let stage_metrics = stage
-                    .stage_metrics
-                    .unwrap_or_default()
-                    .into_iter()
-                    .map(|m| m.try_into())
-                    .collect::<Result<Vec<_>>>()?;
+            .into_values()
+            .map(|stage| {
+                let stage_type = match stage {
+                    ExecutionStage::UnResolved(stage) => {
+                        
StageType::UnresolvedStage(UnresolvedStage::encode(stage, codec)?)
+                    }
+                    ExecutionStage::Resolved(stage) => {
+                        StageType::ResolvedStage(ResolvedStage::encode(stage, 
codec)?)
+                    }
+                    ExecutionStage::Running(stage) => StageType::ResolvedStage(
+                        ResolvedStage::encode(stage.to_resolved(), codec)?,
+                    ),
+                    ExecutionStage::Completed(stage) => 
StageType::CompletedStage(
+                        CompletedStage::encode(job_id.clone(), stage, codec)?,
+                    ),
+                    ExecutionStage::Failed(stage) => StageType::FailedStage(
+                        FailedStage::encode(job_id.clone(), stage, codec)?,
+                    ),
+                };
                 Ok(protobuf::ExecutionGraphStage {
-                    stage_id: stage_id as u64,
-                    partitions: stage.partitions as u32,
-                    output_partitioning,
-                    inputs,
-                    plan,
-                    task_statuses,
-                    output_links: stage
-                        .output_links
-                        .into_iter()
-                        .map(|l| l as u32)
-                        .collect(),
-                    resolved: stage.resolved,
-                    stage_metrics,
+                    stage_type: Some(stage_type),
                 })
             })
             .collect::<Result<Vec<_>>>()?;
@@ -634,6 +722,7 @@ impl ExecutionGraph {
             stages,
             output_partitions: graph.output_partitions as u64,
             output_locations,
+            scheduler_id: graph.scheduler_id,
         })
     }
 }
@@ -645,294 +734,9 @@ impl Debug for ExecutionGraph {
             .iter()
             .map(|(_, stage)| format!("{:?}", stage))
             .collect::<Vec<String>>()
-            .join("\n");
-        write!(f, "ExecutionGraph[job_id={}, session_id={}, 
available_tasks={}, complete={}]\n{}", self.job_id, self.session_id, 
self.available_tasks(), self.complete(), stages)
-    }
-}
-
-/// This data structure collects the partition locations for an 
`ExecutionStage`.
-/// Each `ExecutionStage` will hold a `StageOutput`s for each of its child 
stages.
-/// When all tasks for the child stage are complete, it will mark the 
`StageOutput`
-#[derive(Clone, Debug, Default)]
-struct StageOutput {
-    /// Map from partition -> partition locations
-    partition_locations: HashMap<usize, Vec<PartitionLocation>>,
-    /// Flag indicating whether all tasks are complete
-    complete: bool,
-}
-
-impl StageOutput {
-    pub fn new() -> Self {
-        Self {
-            partition_locations: HashMap::new(),
-            complete: false,
-        }
-    }
-
-    /// Add a `PartitionLocation` to the `StageOutput`
-    pub fn add_partition(&mut self, partition_location: PartitionLocation) {
-        if let Some(parts) = self
-            .partition_locations
-            .get_mut(&partition_location.partition_id.partition_id)
-        {
-            parts.push(partition_location)
-        } else {
-            self.partition_locations.insert(
-                partition_location.partition_id.partition_id,
-                vec![partition_location],
-            );
-        }
-    }
-
-    pub fn is_complete(&self) -> bool {
-        self.complete
-    }
-}
-
-/// A stage in the ExecutionGraph.
-///
-/// This represents a set of tasks (one per each `partition`) which can
-/// be executed concurrently.
-#[derive(Clone)]
-struct ExecutionStage {
-    /// Stage ID
-    stage_id: usize,
-    /// Total number of output partitions for this stage.
-    /// This stage will produce on task for partition.
-    partitions: usize,
-    /// Output partitioning for this stage.
-    output_partitioning: Option<Partitioning>,
-    /// Represents the outputs from this stage's child stages.
-    /// This stage can only be resolved an executed once all child stages are 
completed.
-    inputs: HashMap<usize, StageOutput>,
-    // `ExecutionPlan` for this stage
-    plan: Arc<dyn ExecutionPlan>,
-    /// Status of each already scheduled task. If status is None, the 
partition has not yet been scheduled
-    task_statuses: Vec<Option<task_status::Status>>,
-    /// Stage ID of the stage that will take this stages outputs as inputs.
-    /// If `output_links` is empty then this the final stage in the 
`ExecutionGraph`
-    output_links: Vec<usize>,
-    /// Flag indicating whether all input partitions have been resolved and 
the plan
-    /// has UnresovledShuffleExec operators resolved to ShuffleReadExec 
operators.
-    resolved: bool,
-    /// Combined metrics of the already finished tasks in the stage, If it is 
None, no task is finished yet.
-    stage_metrics: Option<Vec<MetricsSet>>,
-}
-
-impl Debug for ExecutionStage {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
-        let scheduled_tasks = self.task_statuses.iter().filter(|t| 
t.is_some()).count();
-
-        write!(
-            f,
-            "Stage[id={}, partitions={:?}, children={}, completed_tasks={}, 
resolved={}, scheduled_tasks={}, available_tasks={}]\nInputs{:?}\n\n{}",
-            self.stage_id,
-            self.partitions,
-            self.inputs.len(),
-            self.completed_tasks(),
-            self.resolved,
-            scheduled_tasks,
-            self.available_tasks(),
-            self.inputs,
-            plan
-        )
-    }
-}
-
-impl ExecutionStage {
-    pub fn new(
-        stage_id: usize,
-        plan: Arc<dyn ExecutionPlan>,
-        output_partitioning: Option<Partitioning>,
-        output_links: Vec<usize>,
-        child_stages: Vec<usize>,
-    ) -> Self {
-        let num_tasks = plan.output_partitioning().partition_count();
-
-        let resolved = child_stages.is_empty();
-
-        let mut inputs: HashMap<usize, StageOutput> = HashMap::new();
-
-        for input_stage_id in &child_stages {
-            inputs.insert(*input_stage_id, StageOutput::new());
-        }
-
-        Self {
-            stage_id,
-            partitions: num_tasks,
-            output_partitioning,
-            inputs,
-            plan,
-            task_statuses: vec![None; num_tasks],
-            output_links,
-            resolved,
-            stage_metrics: None,
-        }
-    }
-
-    /// Returns a vector of currently running tasks in this stage
-    pub fn running_tasks(&self) -> Vec<(usize, usize, String)> {
-        if self.resolved {
-            self.task_statuses
-                .iter()
-                .enumerate()
-                .filter_map(|(partition, status)| match status {
-                    Some(task_status::Status::Running(RunningTask { 
executor_id })) => {
-                        Some((self.stage_id, partition, executor_id.clone()))
-                    }
-                    _ => None,
-                })
-                .collect()
-        } else {
-            vec![]
-        }
-    }
-
-    /// Returns true if all inputs are complete and we can resolve all
-    /// UnresolvedShuffleExec operators to ShuffleReadExec
-    pub fn resolvable(&self) -> bool {
-        self.inputs.iter().all(|(_, outputs)| outputs.is_complete())
-    }
-
-    /// Returns `true` if all tasks for this stage are complete
-    pub fn complete(&self) -> bool {
-        self.task_statuses
-            .iter()
-            .all(|status| matches!(status, 
Some(task_status::Status::Completed(_))))
-    }
-
-    /// Returns the number of tasks
-    pub fn completed_tasks(&self) -> usize {
-        self.task_statuses
-            .iter()
-            .filter(|status| matches!(status, 
Some(task_status::Status::Completed(_))))
-            .count()
-    }
-
-    /// Marks the input stage ID as complete.
-    pub fn complete_input(&mut self, stage_id: usize) {
-        if let Some(input) = self.inputs.get_mut(&stage_id) {
-            input.complete = true;
-        }
-    }
-
-    /// Returns true if the stage plan has all UnresolvedShuffleExec operators 
resolved to
-    /// ShuffleReadExec
-    pub fn resolved(&self) -> bool {
-        self.resolved
-    }
-
-    /// Returns the number of tasks in this stage which are available for 
scheduling.
-    /// If the stage is not yet resolved, then this will return `0`, otherwise 
it will
-    /// return the number of tasks where the task status is not yet set.
-    pub fn available_tasks(&self) -> usize {
-        if self.resolved {
-            self.task_statuses.iter().filter(|s| s.is_none()).count()
-        } else {
-            0
-        }
-    }
-
-    /// Resolve any UnresolvedShuffleExec operators within this stage's plan
-    pub fn resolve_shuffles(&mut self) -> Result<()> {
-        println!("Resolving shuffles\n{:?}", self);
-        if self.resolved {
-            // If this stage has no input shuffles, then it is already resolved
-            Ok(())
-        } else {
-            let input_locations = self
-                .inputs
-                .iter()
-                .map(|(stage, outputs)| (*stage, 
outputs.partition_locations.clone()))
-                .collect();
-            // Otherwise, rewrite the plan to replace UnresolvedShuffleExec 
with ShuffleReadExec
-            let new_plan = crate::planner::remove_unresolved_shuffles(
-                self.plan.clone(),
-                &input_locations,
-            )?;
-            self.plan = new_plan;
-            self.resolved = true;
-            Ok(())
-        }
-    }
-
-    /// Update the status for task partition
-    pub fn update_task_status(&mut self, partition: usize, status: 
task_status::Status) {
-        debug!("Updating task status for partition {}", partition);
-        self.task_statuses[partition] = Some(status);
-    }
-
-    /// update and combine the task metrics to the stage metrics
-    pub fn update_task_metrics(
-        &mut self,
-        partition: usize,
-        metrics: Vec<OperatorMetricsSet>,
-    ) -> Result<()> {
-        if let Some(combined_metrics) = &mut self.stage_metrics {
-            if metrics.len() != combined_metrics.len() {
-                return Err(BallistaError::Internal(format!("Error updating 
task metrics to stage {}, task metrics array size {} does not equal \
-                with the stage metrics array size {} for task {}", 
self.stage_id, metrics.len(), combined_metrics.len(), partition)));
-            }
-            let metrics_values_array = metrics
-                .into_iter()
-                .map(|ms| {
-                    ms.metrics
-                        .into_iter()
-                        .map(|m| m.try_into())
-                        .collect::<Result<Vec<_>>>()
-                })
-                .collect::<Result<Vec<_>>>()?;
-
-            let new_metrics_set = combined_metrics
-                .iter_mut()
-                .zip(metrics_values_array)
-                .map(|(first, second)| {
-                    Self::combine_metrics_set(first, second, partition)
-                })
-                .collect();
-            self.stage_metrics = Some(new_metrics_set)
-        } else {
-            let new_metrics_set = metrics
-                .into_iter()
-                .map(|ms| ms.try_into())
-                .collect::<Result<Vec<_>>>()?;
-            if !new_metrics_set.is_empty() {
-                self.stage_metrics = Some(new_metrics_set)
-            }
-        }
-        Ok(())
-    }
-
-    pub fn combine_metrics_set(
-        first: &mut MetricsSet,
-        second: Vec<MetricValue>,
-        partition: usize,
-    ) -> MetricsSet {
-        for metric_value in second {
-            // TODO recheck the lable logic
-            let new_metric = Arc::new(Metric::new(metric_value, 
Some(partition)));
-            first.push(new_metric);
-        }
-        first.aggregate_by_partition()
-    }
-
-    /// Add input partitions published from an input stage.
-    pub fn add_input_partitions(
-        &mut self,
-        stage_id: usize,
-        _partition_id: usize,
-        locations: Vec<PartitionLocation>,
-    ) -> Result<()> {
-        if let Some(stage_inputs) = self.inputs.get_mut(&stage_id) {
-            for partition in locations {
-                stage_inputs.add_partition(partition);
-            }
-        } else {
-            return Err(BallistaError::Internal(format!("Error adding input 
partitions to stage {}, {} is not a valid child stage ID", self.stage_id, 
stage_id)));
-        }
-
-        Ok(())
+            .join("");
+        write!(f, "ExecutionGraph[job_id={}, session_id={}, 
available_tasks={}, complete={}]\n{}",
+               self.job_id, self.session_id, self.available_tasks(), 
self.complete(), stages)
     }
 }
 
@@ -980,16 +784,23 @@ impl ExecutionStageBuilder {
                 .remove(&stage_id)
                 .unwrap_or_default();
 
-            execution_stages.insert(
-                stage_id,
-                ExecutionStage::new(
+            let stage = if child_stages.is_empty() {
+                ExecutionStage::Resolved(ResolvedStage::new(
+                    stage_id,
+                    stage,
+                    partitioning,
+                    output_links,
+                ))
+            } else {
+                ExecutionStage::UnResolved(UnresolvedStage::new(
                     stage_id,
                     stage,
                     partitioning,
                     output_links,
                     child_stages,
-                ),
-            );
+                ))
+            };
+            execution_stages.insert(stage_id, stage);
         }
 
         Ok(execution_stages)
@@ -1032,6 +843,38 @@ impl ExecutionPlanVisitor for ExecutionStageBuilder {
     }
 }
 
+#[derive(Clone)]
+pub enum StageEvent {
+    StageResolved(usize),
+    StageCompleted(usize),
+    StageFailed(usize, String),
+}
+
+/// Represents the basic unit of work for the Ballista executor. Will execute
+/// one partition of one stage on one task slot.
+#[derive(Clone)]
+pub struct Task {
+    pub session_id: String,
+    pub partition: PartitionId,
+    pub plan: Arc<dyn ExecutionPlan>,
+    pub output_partitioning: Option<Partitioning>,
+}
+
+impl Debug for Task {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
+        write!(
+            f,
+            "Task[session_id: {}, job: {}, stage: {}, partition: {}]\n{}",
+            self.session_id,
+            self.partition.job_id,
+            self.partition.stage_id,
+            self.partition.partition_id,
+            plan
+        )
+    }
+}
+
 fn partition_to_location(
     job_id: &str,
     stage_id: usize,
@@ -1059,19 +902,20 @@ fn partition_to_location(
 
 #[cfg(test)]
 mod test {
-    use crate::state::execution_graph::ExecutionGraph;
-    use ballista_core::error::Result;
-    use ballista_core::serde::protobuf::{self, job_status, task_status};
-    use ballista_core::serde::scheduler::{ExecutorMetadata, 
ExecutorSpecification};
+    use std::sync::Arc;
+
     use datafusion::arrow::datatypes::{DataType, Field, Schema};
     use datafusion::logical_expr::{col, sum, Expr};
-
     use datafusion::logical_plan::JoinType;
     use datafusion::physical_plan::display::DisplayableExecutionPlan;
     use datafusion::prelude::{SessionConfig, SessionContext};
     use datafusion::test_util::scan_empty;
 
-    use std::sync::Arc;
+    use ballista_core::error::Result;
+    use ballista_core::serde::protobuf::{self, job_status, task_status};
+    use ballista_core::serde::scheduler::{ExecutorMetadata, 
ExecutorSpecification};
+
+    use crate::state::execution_graph::ExecutionGraph;
 
     #[tokio::test]
     async fn test_drain_tasks() -> Result<()> {
@@ -1124,7 +968,6 @@ mod test {
         let mut agg_graph = test_aggregation_plan(4).await;
 
         drain_tasks(&mut agg_graph)?;
-        agg_graph.finalize()?;
 
         let status = agg_graph.status();
 
@@ -1214,7 +1057,7 @@ mod test {
 
         println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
 
-        ExecutionGraph::new("job", "session", plan).unwrap()
+        ExecutionGraph::new("localhost:50050", "job", "session", plan).unwrap()
     }
 
     async fn test_coalesce_plan(partition: usize) -> ExecutionGraph {
@@ -1237,7 +1080,7 @@ mod test {
 
         let plan = ctx.create_physical_plan(&optimized_plan).await.unwrap();
 
-        ExecutionGraph::new("job", "session", plan).unwrap()
+        ExecutionGraph::new("localhost:50050", "job", "session", plan).unwrap()
     }
 
     async fn test_join_plan(partition: usize) -> ExecutionGraph {
@@ -1278,7 +1121,8 @@ mod test {
 
         println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
 
-        let graph = ExecutionGraph::new("job", "session", plan).unwrap();
+        let graph =
+            ExecutionGraph::new("localhost:50050", "job", "session", 
plan).unwrap();
 
         println!("{:?}", graph);
 
@@ -1302,7 +1146,8 @@ mod test {
 
         println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
 
-        let graph = ExecutionGraph::new("job", "session", plan).unwrap();
+        let graph =
+            ExecutionGraph::new("localhost:50050", "job", "session", 
plan).unwrap();
 
         println!("{:?}", graph);
 
@@ -1326,7 +1171,8 @@ mod test {
 
         println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
 
-        let graph = ExecutionGraph::new("job", "session", plan).unwrap();
+        let graph =
+            ExecutionGraph::new("localhost:50050", "job", "session", 
plan).unwrap();
 
         println!("{:?}", graph);
 
diff --git 
a/ballista/rust/scheduler/src/state/execution_graph/execution_stage.rs 
b/ballista/rust/scheduler/src/state/execution_graph/execution_stage.rs
new file mode 100644
index 00000000..3e3aee00
--- /dev/null
+++ b/ballista/rust/scheduler/src/state/execution_graph/execution_stage.rs
@@ -0,0 +1,928 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::convert::TryInto;
+use std::fmt::{Debug, Formatter};
+use std::sync::Arc;
+
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use datafusion::physical_plan::metrics::{MetricValue, MetricsSet};
+use datafusion::physical_plan::{ExecutionPlan, Metric, Partitioning};
+use datafusion::prelude::SessionContext;
+use datafusion_proto::logical_plan::AsLogicalPlan;
+use log::{debug, warn};
+
+use ballista_core::error::{BallistaError, Result};
+use 
ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
+use ballista_core::serde::protobuf::{self, OperatorMetricsSet};
+use ballista_core::serde::protobuf::{task_status, RunningTask};
+use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
+use ballista_core::serde::scheduler::PartitionLocation;
+use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
+
+use crate::display::DisplayableBallistaExecutionPlan;
+
+/// A stage in the ExecutionGraph,
+/// represents a set of tasks (one per each `partition`) which can be executed 
concurrently.
+/// For a stage, there are five states. And the state machine is as follows:
+///
+/// UnResolvedStage           FailedStage
+///       ↓            ↙           ↑
+///  ResolvedStage     →     RunningStage
+///                                ↓
+///                         CompletedStage
+#[derive(Clone)]
+pub(super) enum ExecutionStage {
+    UnResolved(UnresolvedStage),
+    Resolved(ResolvedStage),
+    Running(RunningStage),
+    Completed(CompletedStage),
+    Failed(FailedStage),
+}
+
+impl Debug for ExecutionStage {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        match self {
+            ExecutionStage::UnResolved(unresolved_stage) => 
unresolved_stage.fmt(f),
+            ExecutionStage::Resolved(resolved_stage) => resolved_stage.fmt(f),
+            ExecutionStage::Running(running_stage) => running_stage.fmt(f),
+            ExecutionStage::Completed(completed_stage) => 
completed_stage.fmt(f),
+            ExecutionStage::Failed(failed_stage) => failed_stage.fmt(f),
+        }
+    }
+}
+
+/// For a stage whose input stages are not all completed, we say it's a 
unresolved stage
+#[derive(Clone)]
+pub(super) struct UnresolvedStage {
+    /// Stage ID
+    pub(super) stage_id: usize,
+    /// Output partitioning for this stage.
+    pub(super) output_partitioning: Option<Partitioning>,
+    /// Stage ID of the stage that will take this stages outputs as inputs.
+    /// If `output_links` is empty then this the final stage in the 
`ExecutionGraph`
+    pub(super) output_links: Vec<usize>,
+    /// Represents the outputs from this stage's child stages.
+    /// This stage can only be resolved an executed once all child stages are 
completed.
+    pub(super) inputs: HashMap<usize, StageOutput>,
+    /// `ExecutionPlan` for this stage
+    pub(super) plan: Arc<dyn ExecutionPlan>,
+}
+
+/// For a stage, if it has no inputs or all of its input stages are completed,
+/// then we call it as a resolved stage
+#[derive(Clone)]
+pub(super) struct ResolvedStage {
+    /// Stage ID
+    pub(super) stage_id: usize,
+    /// Total number of output partitions for this stage.
+    /// This stage will produce on task for partition.
+    pub(super) partitions: usize,
+    /// Output partitioning for this stage.
+    pub(super) output_partitioning: Option<Partitioning>,
+    /// Stage ID of the stage that will take this stages outputs as inputs.
+    /// If `output_links` is empty then this the final stage in the 
`ExecutionGraph`
+    pub(super) output_links: Vec<usize>,
+    /// `ExecutionPlan` for this stage
+    pub(super) plan: Arc<dyn ExecutionPlan>,
+}
+
+/// Different from the resolved stage, a running stage will
+/// 1. save the execution plan as encoded one to avoid serialization cost for 
creating task definition
+/// 2. manage the task statuses
+/// 3. manage the stage-level combined metrics
+/// Running stages will only be maintained in memory and will not saved to the 
backend storage
+#[derive(Clone)]
+pub(super) struct RunningStage {
+    /// Stage ID
+    pub(super) stage_id: usize,
+    /// Total number of output partitions for this stage.
+    /// This stage will produce on task for partition.
+    pub(super) partitions: usize,
+    /// Output partitioning for this stage.
+    pub(super) output_partitioning: Option<Partitioning>,
+    /// Stage ID of the stage that will take this stages outputs as inputs.
+    /// If `output_links` is empty then this the final stage in the 
`ExecutionGraph`
+    pub(super) output_links: Vec<usize>,
+    /// `ExecutionPlan` for this stage
+    pub(super) plan: Arc<dyn ExecutionPlan>,
+    /// Status of each already scheduled task. If status is None, the 
partition has not yet been scheduled
+    pub(super) task_statuses: Vec<Option<task_status::Status>>,
+    /// Combined metrics of the already finished tasks in the stage, If it is 
None, no task is finished yet.
+    pub(super) stage_metrics: Option<Vec<MetricsSet>>,
+}
+
+/// If a stage finishes successfully, its task statuses and metrics will be 
finalized
+#[derive(Clone)]
+pub(super) struct CompletedStage {
+    /// Stage ID
+    pub(super) stage_id: usize,
+    /// Total number of output partitions for this stage.
+    /// This stage will produce on task for partition.
+    pub(super) partitions: usize,
+    /// Output partitioning for this stage.
+    pub(super) output_partitioning: Option<Partitioning>,
+    /// Stage ID of the stage that will take this stages outputs as inputs.
+    /// If `output_links` is empty then this the final stage in the 
`ExecutionGraph`
+    pub(super) output_links: Vec<usize>,
+    /// `ExecutionPlan` for this stage
+    pub(super) plan: Arc<dyn ExecutionPlan>,
+    /// Status of each already scheduled task.
+    pub(super) task_statuses: Vec<task_status::Status>,
+    /// Combined metrics of the already finished tasks in the stage.
+    pub(super) stage_metrics: Vec<MetricsSet>,
+}
+
+/// If a stage fails, it will be with an error message
+#[derive(Clone)]
+pub(super) struct FailedStage {
+    /// Stage ID
+    pub(super) stage_id: usize,
+    /// Total number of output partitions for this stage.
+    /// This stage will produce on task for partition.
+    pub(super) partitions: usize,
+    /// Output partitioning for this stage.
+    pub(super) output_partitioning: Option<Partitioning>,
+    /// Stage ID of the stage that will take this stages outputs as inputs.
+    /// If `output_links` is empty then this the final stage in the 
`ExecutionGraph`
+    pub(super) output_links: Vec<usize>,
+    /// `ExecutionPlan` for this stage
+    pub(super) plan: Arc<dyn ExecutionPlan>,
+    /// Status of each already scheduled task. If status is None, the 
partition has not yet been scheduled
+    pub(super) task_statuses: Vec<Option<task_status::Status>>,
+    /// Combined metrics of the already finished tasks in the stage, If it is 
None, no task is finished yet.
+    pub(super) stage_metrics: Option<Vec<MetricsSet>>,
+    /// Error message
+    pub(super) error_message: String,
+}
+
+impl UnresolvedStage {
+    pub(super) fn new(
+        stage_id: usize,
+        plan: Arc<dyn ExecutionPlan>,
+        output_partitioning: Option<Partitioning>,
+        output_links: Vec<usize>,
+        child_stages: Vec<usize>,
+    ) -> Self {
+        let mut inputs: HashMap<usize, StageOutput> = HashMap::new();
+        for input_stage_id in child_stages {
+            inputs.insert(input_stage_id, StageOutput::new());
+        }
+
+        Self {
+            stage_id,
+            output_partitioning,
+            output_links,
+            inputs,
+            plan,
+        }
+    }
+
+    /// Add input partitions published from an input stage.
+    pub(super) fn add_input_partitions(
+        &mut self,
+        stage_id: usize,
+        locations: Vec<PartitionLocation>,
+    ) -> Result<()> {
+        if let Some(stage_inputs) = self.inputs.get_mut(&stage_id) {
+            for partition in locations {
+                stage_inputs.add_partition(partition);
+            }
+        } else {
+            return Err(BallistaError::Internal(format!("Error adding input 
partitions to stage {}, {} is not a valid child stage ID", self.stage_id, 
stage_id)));
+        }
+
+        Ok(())
+    }
+
+    /// Marks the input stage ID as complete.
+    pub(super) fn complete_input(&mut self, stage_id: usize) {
+        if let Some(input) = self.inputs.get_mut(&stage_id) {
+            input.complete = true;
+        }
+    }
+
+    /// Returns true if all inputs are complete and we can resolve all
+    /// UnresolvedShuffleExec operators to ShuffleReadExec
+    pub(super) fn resolvable(&self) -> bool {
+        self.inputs.iter().all(|(_, input)| input.is_complete())
+    }
+
+    /// Change to the resolved state
+    pub(super) fn to_resolved(&self) -> Result<ResolvedStage> {
+        let input_locations = self
+            .inputs
+            .iter()
+            .map(|(stage, input)| (*stage, input.partition_locations.clone()))
+            .collect();
+        let plan = crate::planner::remove_unresolved_shuffles(
+            self.plan.clone(),
+            &input_locations,
+        )?;
+        Ok(ResolvedStage::new(
+            self.stage_id,
+            plan,
+            self.output_partitioning.clone(),
+            self.output_links.clone(),
+        ))
+    }
+
+    pub(super) fn decode<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>(
+        stage: protobuf::UnResolvedStage,
+        codec: &BallistaCodec<T, U>,
+        session_ctx: &SessionContext,
+    ) -> Result<UnresolvedStage> {
+        let plan_proto = U::try_decode(&stage.plan)?;
+        let plan = plan_proto.try_into_physical_plan(
+            session_ctx,
+            session_ctx.runtime_env().as_ref(),
+            codec.physical_extension_codec(),
+        )?;
+
+        let output_partitioning: Option<Partitioning> = 
parse_protobuf_hash_partitioning(
+            stage.output_partitioning.as_ref(),
+            session_ctx,
+            plan.schema().as_ref(),
+        )?;
+
+        let mut inputs: HashMap<usize, StageOutput> = HashMap::new();
+        for input in stage.inputs {
+            let stage_id = input.stage_id as usize;
+
+            let outputs = input
+                .partition_locations
+                .into_iter()
+                .map(|loc| {
+                    let partition = loc.partition as usize;
+                    let locations = loc
+                        .partition_location
+                        .into_iter()
+                        .map(|l| l.try_into())
+                        .collect::<Result<Vec<_>>>()?;
+                    Ok((partition, locations))
+                })
+                .collect::<Result<HashMap<usize, Vec<PartitionLocation>>>>()?;
+
+            inputs.insert(
+                stage_id,
+                StageOutput {
+                    partition_locations: outputs,
+                    complete: input.complete,
+                },
+            );
+        }
+
+        Ok(UnresolvedStage {
+            stage_id: stage.stage_id as usize,
+            output_partitioning,
+            output_links: stage.output_links.into_iter().map(|l| l as 
usize).collect(),
+            plan,
+            inputs,
+        })
+    }
+
+    pub(super) fn encode<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>(
+        stage: UnresolvedStage,
+        codec: &BallistaCodec<T, U>,
+    ) -> Result<protobuf::UnResolvedStage> {
+        let mut plan: Vec<u8> = vec![];
+        U::try_from_physical_plan(stage.plan, codec.physical_extension_codec())
+            .and_then(|proto| proto.try_encode(&mut plan))?;
+
+        let mut inputs: Vec<protobuf::GraphStageInput> = vec![];
+        for (stage_id, output) in stage.inputs.into_iter() {
+            inputs.push(protobuf::GraphStageInput {
+                stage_id: stage_id as u32,
+                partition_locations: output
+                    .partition_locations
+                    .into_iter()
+                    .map(|(partition, locations)| {
+                        Ok(protobuf::TaskInputPartitions {
+                            partition: partition as u32,
+                            partition_location: locations
+                                .into_iter()
+                                .map(|l| l.try_into())
+                                .collect::<Result<Vec<_>>>()?,
+                        })
+                    })
+                    .collect::<Result<Vec<_>>>()?,
+                complete: output.complete,
+            });
+        }
+
+        let output_partitioning =
+            hash_partitioning_to_proto(stage.output_partitioning.as_ref())?;
+
+        Ok(protobuf::UnResolvedStage {
+            stage_id: stage.stage_id as u64,
+            output_partitioning,
+            output_links: stage.output_links.into_iter().map(|l| l as 
u32).collect(),
+            inputs,
+            plan,
+        })
+    }
+}
+
+impl Debug for UnresolvedStage {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
+
+        write!(
+            f,
+            "=========UnResolvedStage[id={}, 
children={}]=========\nInputs{:?}\n{}",
+            self.stage_id,
+            self.inputs.len(),
+            self.inputs,
+            plan
+        )
+    }
+}
+
+impl ResolvedStage {
+    pub(super) fn new(
+        stage_id: usize,
+        plan: Arc<dyn ExecutionPlan>,
+        output_partitioning: Option<Partitioning>,
+        output_links: Vec<usize>,
+    ) -> Self {
+        let partitions = plan.output_partitioning().partition_count();
+
+        Self {
+            stage_id,
+            partitions,
+            output_partitioning,
+            output_links,
+            plan,
+        }
+    }
+
+    /// Change to the running state
+    pub(super) fn to_running(&self) -> RunningStage {
+        RunningStage::new(
+            self.stage_id,
+            self.plan.clone(),
+            self.partitions,
+            self.output_partitioning.clone(),
+            self.output_links.clone(),
+        )
+    }
+
+    pub(super) fn decode<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>(
+        stage: protobuf::ResolvedStage,
+        codec: &BallistaCodec<T, U>,
+        session_ctx: &SessionContext,
+    ) -> Result<ResolvedStage> {
+        let plan_proto = U::try_decode(&stage.plan)?;
+        let plan = plan_proto.try_into_physical_plan(
+            session_ctx,
+            session_ctx.runtime_env().as_ref(),
+            codec.physical_extension_codec(),
+        )?;
+
+        let output_partitioning: Option<Partitioning> = 
parse_protobuf_hash_partitioning(
+            stage.output_partitioning.as_ref(),
+            session_ctx,
+            plan.schema().as_ref(),
+        )?;
+
+        Ok(ResolvedStage {
+            stage_id: stage.stage_id as usize,
+            partitions: stage.partitions as usize,
+            output_partitioning,
+            output_links: stage.output_links.into_iter().map(|l| l as 
usize).collect(),
+            plan,
+        })
+    }
+
+    pub(super) fn encode<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>(
+        stage: ResolvedStage,
+        codec: &BallistaCodec<T, U>,
+    ) -> Result<protobuf::ResolvedStage> {
+        let mut plan: Vec<u8> = vec![];
+        U::try_from_physical_plan(stage.plan, codec.physical_extension_codec())
+            .and_then(|proto| proto.try_encode(&mut plan))?;
+
+        let output_partitioning =
+            hash_partitioning_to_proto(stage.output_partitioning.as_ref())?;
+
+        Ok(protobuf::ResolvedStage {
+            stage_id: stage.stage_id as u64,
+            partitions: stage.partitions as u32,
+            output_partitioning,
+            output_links: stage.output_links.into_iter().map(|l| l as 
u32).collect(),
+            plan,
+        })
+    }
+}
+
+impl Debug for ResolvedStage {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
+
+        write!(
+            f,
+            "=========ResolvedStage[id={}, partitions={}]=========\n{}",
+            self.stage_id, self.partitions, plan
+        )
+    }
+}
+
+impl RunningStage {
+    pub(super) fn new(
+        stage_id: usize,
+        plan: Arc<dyn ExecutionPlan>,
+        partitions: usize,
+        output_partitioning: Option<Partitioning>,
+        output_links: Vec<usize>,
+    ) -> Self {
+        Self {
+            stage_id,
+            partitions,
+            output_partitioning,
+            output_links,
+            plan,
+            task_statuses: vec![None; partitions],
+            stage_metrics: None,
+        }
+    }
+
+    pub(super) fn to_completed(&self) -> CompletedStage {
+        let task_statuses = self
+            .task_statuses
+            .iter()
+            .enumerate()
+            .map(|(task_id, status)| {
+                status.clone().unwrap_or_else(|| {
+                    panic!(
+                        "The status of task {}/{} should not be none",
+                        self.stage_id, task_id
+                    )
+                })
+            })
+            .collect();
+        let stage_metrics = self.stage_metrics.clone().unwrap_or_else(|| {
+            warn!("The metrics for stage {} should not be none", 
self.stage_id);
+            vec![]
+        });
+        CompletedStage {
+            stage_id: self.stage_id,
+            partitions: self.partitions,
+            output_partitioning: self.output_partitioning.clone(),
+            output_links: self.output_links.clone(),
+            plan: self.plan.clone(),
+            task_statuses,
+            stage_metrics,
+        }
+    }
+
+    pub(super) fn to_failed(&self, error_message: String) -> FailedStage {
+        FailedStage {
+            stage_id: self.stage_id,
+            partitions: self.partitions,
+            output_partitioning: self.output_partitioning.clone(),
+            output_links: self.output_links.clone(),
+            plan: self.plan.clone(),
+            task_statuses: self.task_statuses.clone(),
+            stage_metrics: self.stage_metrics.clone(),
+            error_message,
+        }
+    }
+
+    pub(super) fn to_resolved(&self) -> ResolvedStage {
+        ResolvedStage::new(
+            self.stage_id,
+            self.plan.clone(),
+            self.output_partitioning.clone(),
+            self.output_links.clone(),
+        )
+    }
+
+    /// Returns `true` if all tasks for this stage are complete
+    pub(super) fn is_completed(&self) -> bool {
+        self.task_statuses
+            .iter()
+            .all(|status| matches!(status, 
Some(task_status::Status::Completed(_))))
+    }
+
+    /// Returns the number of completed tasks
+    pub(super) fn completed_tasks(&self) -> usize {
+        self.task_statuses
+            .iter()
+            .filter(|status| matches!(status, 
Some(task_status::Status::Completed(_))))
+            .count()
+    }
+
+    /// Returns the number of scheduled tasks
+    pub(super) fn scheduled_tasks(&self) -> usize {
+        self.task_statuses.iter().filter(|s| s.is_some()).count()
+    }
+
+    /// Returns a vector of currently running tasks in this stage
+    pub(super) fn running_tasks(&self) -> Vec<(usize, usize, String)> {
+        self.task_statuses
+            .iter()
+            .enumerate()
+            .filter_map(|(partition, status)| match status {
+                Some(task_status::Status::Running(RunningTask { executor_id 
})) => {
+                    Some((self.stage_id, partition, executor_id.clone()))
+                }
+                _ => None,
+            })
+            .collect()
+    }
+
+    /// Returns the number of tasks in this stage which are available for 
scheduling.
+    /// If the stage is not yet resolved, then this will return `0`, otherwise 
it will
+    /// return the number of tasks where the task status is not yet set.
+    pub(super) fn available_tasks(&self) -> usize {
+        self.task_statuses.iter().filter(|s| s.is_none()).count()
+    }
+
+    /// Update the status for task partition
+    pub(super) fn update_task_status(
+        &mut self,
+        partition_id: usize,
+        status: task_status::Status,
+    ) {
+        debug!("Updating task status for partition {}", partition_id);
+        self.task_statuses[partition_id] = Some(status);
+    }
+
+    /// update and combine the task metrics to the stage metrics
+    pub(super) fn update_task_metrics(
+        &mut self,
+        partition: usize,
+        metrics: Vec<OperatorMetricsSet>,
+    ) -> Result<()> {
+        if let Some(combined_metrics) = &mut self.stage_metrics {
+            if metrics.len() != combined_metrics.len() {
+                return Err(BallistaError::Internal(format!("Error updating 
task metrics to stage {}, task metrics array size {} does not equal \
+                with the stage metrics array size {} for task {}", 
self.stage_id, metrics.len(), combined_metrics.len(), partition)));
+            }
+            let metrics_values_array = metrics
+                .into_iter()
+                .map(|ms| {
+                    ms.metrics
+                        .into_iter()
+                        .map(|m| m.try_into())
+                        .collect::<Result<Vec<_>>>()
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            let new_metrics_set = combined_metrics
+                .iter_mut()
+                .zip(metrics_values_array)
+                .map(|(first, second)| {
+                    Self::combine_metrics_set(first, second, partition)
+                })
+                .collect();
+            self.stage_metrics = Some(new_metrics_set)
+        } else {
+            let new_metrics_set = metrics
+                .into_iter()
+                .map(|ms| ms.try_into())
+                .collect::<Result<Vec<_>>>()?;
+            if !new_metrics_set.is_empty() {
+                self.stage_metrics = Some(new_metrics_set)
+            }
+        }
+        Ok(())
+    }
+
+    pub(super) fn combine_metrics_set(
+        first: &mut MetricsSet,
+        second: Vec<MetricValue>,
+        partition: usize,
+    ) -> MetricsSet {
+        for metric_value in second {
+            // TODO recheck the lable logic
+            let new_metric = Arc::new(Metric::new(metric_value, 
Some(partition)));
+            first.push(new_metric);
+        }
+        first.aggregate_by_partition()
+    }
+}
+
+impl Debug for RunningStage {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
+
+        write!(
+            f,
+            "=========RunningStage[id={}, partitions={}, completed_tasks={}, 
scheduled_tasks={}, available_tasks={}]=========\n{}",
+            self.stage_id,
+            self.partitions,
+            self.completed_tasks(),
+            self.scheduled_tasks(),
+            self.available_tasks(),
+            plan
+        )
+    }
+}
+
+impl CompletedStage {
+    pub(super) fn decode<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>(
+        stage: protobuf::CompletedStage,
+        codec: &BallistaCodec<T, U>,
+        session_ctx: &SessionContext,
+    ) -> Result<CompletedStage> {
+        let plan_proto = U::try_decode(&stage.plan)?;
+        let plan = plan_proto.try_into_physical_plan(
+            session_ctx,
+            session_ctx.runtime_env().as_ref(),
+            codec.physical_extension_codec(),
+        )?;
+
+        let output_partitioning: Option<Partitioning> = 
parse_protobuf_hash_partitioning(
+            stage.output_partitioning.as_ref(),
+            session_ctx,
+            plan.schema().as_ref(),
+        )?;
+
+        let task_statuses = stage
+            .task_statuses
+            .into_iter()
+            .enumerate()
+            .map(|(task_id, status)| {
+                status.status.unwrap_or_else(|| {
+                    panic!("Status for task {} should not be none", task_id)
+                })
+            })
+            .collect();
+
+        let stage_metrics = stage
+            .stage_metrics
+            .into_iter()
+            .map(|m| m.try_into())
+            .collect::<Result<Vec<_>>>()?;
+
+        Ok(CompletedStage {
+            stage_id: stage.stage_id as usize,
+            partitions: stage.partitions as usize,
+            output_partitioning,
+            output_links: stage.output_links.into_iter().map(|l| l as 
usize).collect(),
+            plan,
+            task_statuses,
+            stage_metrics,
+        })
+    }
+
+    pub(super) fn encode<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>(
+        job_id: String,
+        stage: CompletedStage,
+        codec: &BallistaCodec<T, U>,
+    ) -> Result<protobuf::CompletedStage> {
+        let stage_id = stage.stage_id;
+
+        let mut plan: Vec<u8> = vec![];
+        U::try_from_physical_plan(stage.plan, codec.physical_extension_codec())
+            .and_then(|proto| proto.try_encode(&mut plan))?;
+
+        let output_partitioning =
+            hash_partitioning_to_proto(stage.output_partitioning.as_ref())?;
+
+        let task_statuses: Vec<protobuf::TaskStatus> = stage
+            .task_statuses
+            .into_iter()
+            .enumerate()
+            .map(|(partition, status)| {
+                protobuf::TaskStatus {
+                    task_id: Some(protobuf::PartitionId {
+                        job_id: job_id.clone(),
+                        stage_id: stage_id as u32,
+                        partition_id: partition as u32,
+                    }),
+                    // task metrics should not persist.
+                    metrics: vec![],
+                    status: Some(status),
+                }
+            })
+            .collect();
+
+        let stage_metrics = stage
+            .stage_metrics
+            .into_iter()
+            .map(|m| m.try_into())
+            .collect::<Result<Vec<_>>>()?;
+
+        Ok(protobuf::CompletedStage {
+            stage_id: stage_id as u64,
+            partitions: stage.partitions as u32,
+            output_partitioning,
+            output_links: stage.output_links.into_iter().map(|l| l as 
u32).collect(),
+            plan,
+            task_statuses,
+            stage_metrics,
+        })
+    }
+}
+
+impl Debug for CompletedStage {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let plan = DisplayableBallistaExecutionPlan::new(
+            self.plan.as_ref(),
+            &self.stage_metrics,
+        )
+        .indent();
+
+        write!(
+            f,
+            "=========CompletedStage[id={}, partitions={}]=========\n{}",
+            self.stage_id, self.partitions, plan
+        )
+    }
+}
+
+impl FailedStage {
+    /// Returns the number of completed tasks
+    pub(super) fn completed_tasks(&self) -> usize {
+        self.task_statuses
+            .iter()
+            .filter(|status| matches!(status, 
Some(task_status::Status::Completed(_))))
+            .count()
+    }
+
+    /// Returns the number of scheduled tasks
+    pub(super) fn scheduled_tasks(&self) -> usize {
+        self.task_statuses.iter().filter(|s| s.is_some()).count()
+    }
+
+    /// Returns the number of tasks in this stage which are available for 
scheduling.
+    /// If the stage is not yet resolved, then this will return `0`, otherwise 
it will
+    /// return the number of tasks where the task status is not yet set.
+    pub(super) fn available_tasks(&self) -> usize {
+        self.task_statuses.iter().filter(|s| s.is_none()).count()
+    }
+
+    pub(super) fn decode<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>(
+        stage: protobuf::FailedStage,
+        codec: &BallistaCodec<T, U>,
+        session_ctx: &SessionContext,
+    ) -> Result<FailedStage> {
+        let plan_proto = U::try_decode(&stage.plan)?;
+        let plan = plan_proto.try_into_physical_plan(
+            session_ctx,
+            session_ctx.runtime_env().as_ref(),
+            codec.physical_extension_codec(),
+        )?;
+
+        let output_partitioning: Option<Partitioning> = 
parse_protobuf_hash_partitioning(
+            stage.output_partitioning.as_ref(),
+            session_ctx,
+            plan.schema().as_ref(),
+        )?;
+
+        let mut task_statuses: Vec<Option<task_status::Status>> =
+            vec![None; stage.partitions as usize];
+        for status in stage.task_statuses {
+            if let Some(task_id) = status.task_id.as_ref() {
+                task_statuses[task_id.partition_id as usize] = status.status
+            }
+        }
+
+        let stage_metrics = if stage.stage_metrics.is_empty() {
+            None
+        } else {
+            let ms = stage
+                .stage_metrics
+                .into_iter()
+                .map(|m| m.try_into())
+                .collect::<Result<Vec<_>>>()?;
+            Some(ms)
+        };
+
+        Ok(FailedStage {
+            stage_id: stage.stage_id as usize,
+            partitions: stage.partitions as usize,
+            output_partitioning,
+            output_links: stage.output_links.into_iter().map(|l| l as 
usize).collect(),
+            plan,
+            task_statuses,
+            stage_metrics,
+            error_message: stage.error_message,
+        })
+    }
+
+    pub(super) fn encode<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>(
+        job_id: String,
+        stage: FailedStage,
+        codec: &BallistaCodec<T, U>,
+    ) -> Result<protobuf::FailedStage> {
+        let stage_id = stage.stage_id;
+
+        let mut plan: Vec<u8> = vec![];
+        U::try_from_physical_plan(stage.plan, codec.physical_extension_codec())
+            .and_then(|proto| proto.try_encode(&mut plan))?;
+
+        let output_partitioning =
+            hash_partitioning_to_proto(stage.output_partitioning.as_ref())?;
+
+        let task_statuses: Vec<protobuf::TaskStatus> = stage
+            .task_statuses
+            .into_iter()
+            .enumerate()
+            .filter_map(|(partition, status)| {
+                status.map(|status| protobuf::TaskStatus {
+                    task_id: Some(protobuf::PartitionId {
+                        job_id: job_id.clone(),
+                        stage_id: stage_id as u32,
+                        partition_id: partition as u32,
+                    }),
+                    // task metrics should not persist.
+                    metrics: vec![],
+                    status: Some(status),
+                })
+            })
+            .collect();
+
+        let stage_metrics = stage
+            .stage_metrics
+            .unwrap_or_default()
+            .into_iter()
+            .map(|m| m.try_into())
+            .collect::<Result<Vec<_>>>()?;
+
+        Ok(protobuf::FailedStage {
+            stage_id: stage_id as u64,
+            partitions: stage.partitions as u32,
+            output_partitioning,
+            output_links: stage.output_links.into_iter().map(|l| l as 
u32).collect(),
+            plan,
+            task_statuses,
+            stage_metrics,
+            error_message: stage.error_message,
+        })
+    }
+}
+
+impl Debug for FailedStage {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
+
+        write!(
+            f,
+            "=========FailedStage[id={}, partitions={}, completed_tasks={}, 
scheduled_tasks={}, available_tasks={}, error_message={}]=========\n{}",
+            self.stage_id,
+            self.partitions,
+            self.completed_tasks(),
+            self.scheduled_tasks(),
+            self.available_tasks(),
+            self.error_message,
+            plan
+        )
+    }
+}
+
+/// This data structure collects the partition locations for an 
`ExecutionStage`.
+/// Each `ExecutionStage` will hold a `StageOutput`s for each of its child 
stages.
+/// When all tasks for the child stage are complete, it will mark the 
`StageOutput`
+#[derive(Clone, Debug, Default)]
+pub(super) struct StageOutput {
+    /// Map from partition -> partition locations
+    partition_locations: HashMap<usize, Vec<PartitionLocation>>,
+    /// Flag indicating whether all tasks are complete
+    complete: bool,
+}
+
+impl StageOutput {
+    pub(super) fn new() -> Self {
+        Self {
+            partition_locations: HashMap::new(),
+            complete: false,
+        }
+    }
+
+    /// Add a `PartitionLocation` to the `StageOutput`
+    pub(super) fn add_partition(&mut self, partition_location: 
PartitionLocation) {
+        if let Some(parts) = self
+            .partition_locations
+            .get_mut(&partition_location.partition_id.partition_id)
+        {
+            parts.push(partition_location)
+        } else {
+            self.partition_locations.insert(
+                partition_location.partition_id.partition_id,
+                vec![partition_location],
+            );
+        }
+    }
+
+    pub(super) fn is_complete(&self) -> bool {
+        self.complete
+    }
+}
diff --git a/ballista/rust/scheduler/src/state/mod.rs 
b/ballista/rust/scheduler/src/state/mod.rs
index 12546a8e..5668c6b1 100644
--- a/ballista/rust/scheduler/src/state/mod.rs
+++ b/ballista/rust/scheduler/src/state/mod.rs
@@ -82,15 +82,29 @@ pub(super) struct SchedulerState<T: 'static + 
AsLogicalPlan, U: 'static + AsExec
     pub executor_manager: ExecutorManager,
     pub task_manager: TaskManager<T, U>,
     pub session_manager: SessionManager,
-    _codec: BallistaCodec<T, U>,
+    pub codec: BallistaCodec<T, U>,
 }
 
 impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> 
SchedulerState<T, U> {
+    #[cfg(test)]
+    pub fn new_with_default_scheduler_name(
+        config_client: Arc<dyn StateBackendClient>,
+        session_builder: SessionBuilder,
+        codec: BallistaCodec<T, U>,
+    ) -> Self {
+        SchedulerState::new(
+            config_client,
+            session_builder,
+            codec,
+            "localhost:50050".to_owned(),
+        )
+    }
+
     pub fn new(
         config_client: Arc<dyn StateBackendClient>,
-        _namespace: String,
         session_builder: SessionBuilder,
         codec: BallistaCodec<T, U>,
+        scheduler_name: String,
     ) -> Self {
         Self {
             executor_manager: ExecutorManager::new(config_client.clone()),
@@ -98,9 +112,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
                 config_client.clone(),
                 session_builder,
                 codec.clone(),
+                scheduler_name,
             ),
             session_manager: SessionManager::new(config_client, 
session_builder),
-            _codec: codec,
+            codec,
         }
     }
 
diff --git a/ballista/rust/scheduler/src/state/task_manager.rs 
b/ballista/rust/scheduler/src/state/task_manager.rs
index 58c49a3b..480f9369 100644
--- a/ballista/rust/scheduler/src/state/task_manager.rs
+++ b/ballista/rust/scheduler/src/state/task_manager.rs
@@ -40,13 +40,14 @@ use datafusion_proto::logical_plan::AsLogicalPlan;
 use log::{debug, error, info, warn};
 use rand::distributions::Alphanumeric;
 use rand::{thread_rng, Rng};
-use std::collections::{HashMap, HashSet};
+use std::collections::HashMap;
 use std::default::Default;
 use std::sync::Arc;
 use tokio::sync::RwLock;
 use tonic::transport::Channel;
 
 type ExecutorClients = Arc<RwLock<HashMap<String, 
ExecutorGrpcClient<Channel>>>>;
+type ExecutionGraphCache = Arc<RwLock<HashMap<String, 
Arc<RwLock<ExecutionGraph>>>>>;
 
 #[derive(Clone)]
 pub struct TaskManager<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> {
@@ -55,6 +56,9 @@ pub struct TaskManager<T: 'static + AsLogicalPlan, U: 'static 
+ AsExecutionPlan>
     clients: ExecutorClients,
     session_builder: SessionBuilder,
     codec: BallistaCodec<T, U>,
+    scheduler_id: String,
+    // Cache for active execution graphs curated by this scheduler
+    active_job_cache: ExecutionGraphCache,
 }
 
 impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, 
U> {
@@ -62,38 +66,53 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         state: Arc<dyn StateBackendClient>,
         session_builder: SessionBuilder,
         codec: BallistaCodec<T, U>,
+        scheduler_id: String,
     ) -> Self {
         Self {
             state,
             clients: Default::default(),
             session_builder,
             codec,
+            scheduler_id,
+            active_job_cache: Arc::new(RwLock::new(HashMap::new())),
         }
     }
 
     /// Generate an ExecutionGraph for the job and save it to the persistent 
state.
+    /// By default, this job will be curated by the scheduler which receives 
it.
+    /// Then we will also save it to the active execution graph
     pub async fn submit_job(
         &self,
         job_id: &str,
         session_id: &str,
         plan: Arc<dyn ExecutionPlan>,
     ) -> Result<()> {
-        let graph = ExecutionGraph::new(job_id, session_id, plan)?;
+        let mut graph =
+            ExecutionGraph::new(&self.scheduler_id, job_id, session_id, plan)?;
         info!("Submitting execution graph: {:?}", graph);
         self.state
             .put(
                 Keyspace::ActiveJobs,
                 job_id.to_owned(),
-                self.encode_execution_graph(graph)?,
+                self.encode_execution_graph(graph.clone())?,
             )
             .await?;
 
+        graph.revive();
+
+        let mut active_graph_cache = self.active_job_cache.write().await;
+        active_graph_cache.insert(job_id.to_owned(), 
Arc::new(RwLock::new(graph)));
+
         Ok(())
     }
 
-    /// Get the status of of a job. First look in Active/Completed jobs, and 
then in Failed jobs
+    /// Get the status of of a job. First look in the active cache.
+    /// If no one found, then in the Active/Completed jobs, and then in Failed 
jobs
     pub async fn get_job_status(&self, job_id: &str) -> 
Result<Option<JobStatus>> {
-        if let Ok(graph) = self.get_execution_graph(job_id).await {
+        if let Some(graph) = self.get_active_execution_graph(job_id).await {
+            let status = graph.read().await.status();
+            Ok(Some(status))
+        } else if let Ok(graph) = self.get_execution_graph(job_id).await {
             Ok(Some(graph.status()))
         } else {
             let value = self.state.get(Keyspace::FailedJobs, job_id).await?;
@@ -107,123 +126,63 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         }
     }
 
-    /// Generate a new random Job ID
-    pub fn generate_job_id(&self) -> String {
-        let mut rng = thread_rng();
-        std::iter::repeat(())
-            .map(|()| rng.sample(Alphanumeric))
-            .map(char::from)
-            .take(7)
-            .collect()
-    }
-
-    /// Atomically update given task statuses in the respective job and return 
a tuple containing:
+    /// Update given task statuses in the respective job and return a tuple 
containing:
     /// 1. A list of QueryStageSchedulerEvent to publish.
     /// 2. A list of reservations that can now be offered.
-    ///
-    /// When a task is updated, there may or may not be more tasks pending for 
its job. If there are more
-    /// tasks pending then we want to reschedule one of those tasks on the 
same task slot. In that case
-    /// we will set the `job_id` on the `ExecutorReservation` so the scheduler 
attempts to assign tasks from
-    /// the same job. Note that when the scheduler attempts to fill the 
reservation, there is no guarantee
-    /// that the available task is still available.
     pub(crate) async fn update_task_statuses(
         &self,
         executor: &ExecutorMetadata,
         task_status: Vec<TaskStatus>,
     ) -> Result<(Vec<QueryStageSchedulerEvent>, Vec<ExecutorReservation>)> {
-        let lock = self.state.lock(Keyspace::ActiveJobs, "").await?;
-
-        with_lock(lock, async {
-            let mut events: Vec<QueryStageSchedulerEvent> = vec![];
-            let mut reservation: Vec<ExecutorReservation> = vec![];
+        let mut job_updates: HashMap<String, Vec<TaskStatus>> = HashMap::new();
+        for status in task_status {
+            debug!("Task Update\n{:?}", status);
+            if let Some(job_id) = status.task_id.as_ref().map(|id| &id.job_id) 
{
+                let job_task_statuses =
+                    job_updates.entry(job_id.clone()).or_insert_with(Vec::new);
+                job_task_statuses.push(status);
+            } else {
+                warn!("Received task with no job ID");
+            }
+        }
 
-            let mut job_updates: HashMap<String, Vec<TaskStatus>> = 
HashMap::new();
+        let mut events: Vec<QueryStageSchedulerEvent> = vec![];
+        let mut total_num_tasks = 0;
+        for (job_id, statuses) in job_updates {
+            let num_tasks = statuses.len();
+            debug!("Updating {} tasks in job {}", num_tasks, job_id);
 
-            for status in task_status {
-                debug!("Task Update\n{:?}", status);
-                if let Some(job_id) = status.task_id.as_ref().map(|id| 
&id.job_id) {
-                    if let Some(statuses) = job_updates.get_mut(job_id) {
-                        statuses.push(status)
-                    } else {
-                        job_updates.insert(job_id.clone(), vec![status]);
-                    }
-                } else {
-                    warn!("Received task with no job ID");
-                }
-            }
+            total_num_tasks += num_tasks;
 
-            let mut txn_ops: Vec<(Keyspace, String, Vec<u8>)> = vec![];
-
-            for (job_id, statuses) in job_updates {
-                let num_tasks = statuses.len();
-                debug!("Updating {} tasks in job {}", num_tasks, job_id);
-
-                let mut graph = self.get_execution_graph(&job_id).await?;
-
-                graph.update_task_status(executor, statuses)?;
-
-                if graph.complete() {
-                    // If this ExecutionGraph is complete, finalize it
-                    info!(
-                        "Job {} is complete, finalizing output partitions",
-                        graph.job_id()
-                    );
-                    graph.finalize()?;
-                    
events.push(QueryStageSchedulerEvent::JobFinished(job_id.clone()));
-
-                    for _ in 0..num_tasks {
-                        reservation
-                            
.push(ExecutorReservation::new_free(executor.id.to_owned()));
-                    }
-                } else if let Some(job_status::Status::Failed(failure)) =
-                    graph.status().status
-                {
-                    events.push(QueryStageSchedulerEvent::JobFailed(
-                        job_id.clone(),
-                        failure.error,
-                    ));
-
-                    for _ in 0..num_tasks {
-                        reservation
-                            
.push(ExecutorReservation::new_free(executor.id.to_owned()));
-                    }
-                } else {
-                    // Otherwise keep the task slots reserved for this job
-                    for _ in 0..num_tasks {
-                        reservation.push(ExecutorReservation::new_assigned(
-                            executor.id.to_owned(),
-                            job_id.clone(),
-                        ));
-                    }
-                }
+            let graph = self.get_active_execution_graph(&job_id).await;
+            let job_event = if let Some(graph) = graph {
+                let mut graph = graph.write().await;
+                graph.update_task_status(executor, statuses)?
+            } else {
+                // TODO Deal with curator changed case
+                error!("Fail to find job {} in the active cache and it may not 
be curated by this scheduler", job_id);
+                None
+            };
 
-                txn_ops.push((
-                    Keyspace::ActiveJobs,
-                    job_id.clone(),
-                    self.encode_execution_graph(graph)?,
-                ));
+            if let Some(event) = job_event {
+                events.push(event);
             }
+        }
 
-            self.state.put_txn(txn_ops).await?;
-
-            Ok((events, reservation))
-        })
-        .await
+        let reservation = (0..total_num_tasks)
+            .into_iter()
+            .map(|_| ExecutorReservation::new_free(executor.id.to_owned()))
+            .collect();
+        Ok((events, reservation))
     }
 
     /// Take a list of executor reservations and fill them with tasks that are 
ready
-    /// to be scheduled. When the reservation is filled, the underlying stage 
task in the
-    /// `ExecutionGraph` will be set to a status of Running, so if the task is 
not subsequently launched
-    /// we must ensure that the task status is reset.
+    /// to be scheduled.
     ///
     /// Here we use the following  algorithm:
     ///
-    /// 1. For each reservation with a `job_id` assigned try and assign 
another task from the same job.
-    /// 2. If a reservation either does not have a `job_id` or there are no 
available tasks for its `job_id`,
-    ///    add it to a list of "free" reservations.
-    /// 3. For each free reservation, try to assign a task from one of the 
jobs we have already considered.
-    /// 4. If we cannot find a task, then looks for a task among all active 
jobs
-    /// 5. If we cannot find a task in all active jobs, then add the 
reservation to the list of unassigned reservations
+    /// 1. For each free reservation, try to assign a task from one of the 
active jobs
+    /// 2. If we cannot find a task in all active jobs, then add the 
reservation to the list of unassigned reservations
     ///
     /// Finally, we return:
     /// 1. A list of assignments which is a (Executor ID, Task) tuple
@@ -241,96 +200,55 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
             })
             .collect();
 
-        let lock = self.state.lock(Keyspace::ActiveJobs, "").await?;
-        with_lock(lock, async {
-            let mut jobs: Vec<String> =
-                self.get_active_jobs().await?.into_iter().collect();
-
-            let mut assignments: Vec<(String, Task)> = vec![];
-            let mut unassigned: Vec<ExecutorReservation> = vec![];
-            // Need to collect graphs we update so we can update them in 
storage when we are done
-            let mut graphs: HashMap<String, ExecutionGraph> = HashMap::new();
-            // Now try and find tasks for free reservations from current set 
of graphs
-            for reservation in free_reservations {
-                debug!(
-                    "Filling free reservation for executor {}",
-                    reservation.executor_id
-                );
-                let mut assigned = false;
-                let executor_id = reservation.executor_id.clone();
-
-                // Try and find a task in the graphs we already have locks on
-                if let Ok(Some(assignment)) = find_next_task(&executor_id, 
&mut graphs) {
-                    debug!(
-                        "Filled free reservation for executor {} with task 
{:?}",
-                        reservation.executor_id, assignment.1
-                    );
-                    // First check if we can find another task
-                    assignments.push(assignment);
-                    assigned = true;
+        let mut assignments: Vec<(String, Task)> = vec![];
+        let mut pending_tasks = 0usize;
+        let mut assign_tasks = 0usize;
+        let job_cache = self.active_job_cache.read().await;
+        for (_job_id, graph) in job_cache.iter() {
+            let mut graph = graph.write().await;
+            for reservation in free_reservations.iter().skip(assign_tasks) {
+                if let Some(task) = 
graph.pop_next_task(&reservation.executor_id)? {
+                    assignments.push((reservation.executor_id.clone(), task));
+                    assign_tasks += 1;
                 } else {
-                    // Otherwise start searching through other active jobs.
-                    debug!(
-                        "Filling free reservation for executor {} from active 
jobs {:?}",
-                        reservation.executor_id, jobs
-                    );
-                    while let Some(job_id) = jobs.pop() {
-                        if graphs.get(&job_id).is_none() {
-                            let mut graph = 
self.get_execution_graph(&job_id).await?;
-
-                            if let Ok(Some(task)) = 
graph.pop_next_task(&executor_id) {
-                                debug!(
-                                    "Filled free reservation for executor {} 
with task {:?}",
-                                    reservation.executor_id, task
-                                );
-                                assignments.push((executor_id.clone(), task));
-                                graphs.insert(job_id, graph);
-                                assigned = true;
-                                break;
-                            } else {
-                                debug!("No available tasks for job {}", 
job_id);
-                            }
-                        }
-                    }
-                }
-
-                if !assigned {
-                    debug!(
-                        "Unable to fill reservation for executor {}, no tasks 
available",
-                        executor_id
-                    );
-                    unassigned.push(reservation);
+                    break;
                 }
             }
+            if assign_tasks >= free_reservations.len() {
+                pending_tasks = graph.available_tasks();
+                break;
+            }
+        }
 
-            let mut pending_tasks = 0;
-
-            // Transactional update graphs now that we have assigned tasks
-            let txn_ops: Vec<(Keyspace, String, Vec<u8>)> = graphs
-                .into_iter()
-                .map(|(job_id, graph)| {
-                    pending_tasks += graph.available_tasks();
-                    let value = self.encode_execution_graph(graph)?;
-                    Ok((Keyspace::ActiveJobs, job_id, value))
-                })
-                .collect::<Result<Vec<_>>>()?;
-
-            self.state.put_txn(txn_ops).await?;
-
-            Ok((assignments, unassigned, pending_tasks))
-        }).await
+        let mut unassigned = vec![];
+        for reservation in free_reservations.iter().skip(assign_tasks) {
+            unassigned.push(reservation.clone());
+        }
+        Ok((assignments, unassigned, pending_tasks))
     }
 
-    /// Move the given job to the CompletedJobs keyspace in persistent storage.
+    /// Mark a job as completed. This will create a key under the 
CompletedJobs keyspace
+    /// and remove the job from ActiveJobs
     pub async fn complete_job(&self, job_id: &str) -> Result<()> {
         debug!("Moving job {} from Active to Completed", job_id);
         let lock = self.state.lock(Keyspace::ActiveJobs, "").await?;
-        with_lock(
-            lock,
-            self.state
-                .mv(Keyspace::ActiveJobs, Keyspace::CompletedJobs, job_id),
-        )
-        .await
+        with_lock(lock, self.state.delete(Keyspace::ActiveJobs, 
job_id)).await?;
+
+        if let Some(graph) = self.get_active_execution_graph(job_id).await {
+            let graph = graph.read().await.clone();
+            if graph.complete() {
+                let value = self.encode_execution_graph(graph)?;
+                self.state
+                    .put(Keyspace::CompletedJobs, job_id.to_owned(), value)
+                    .await?;
+            } else {
+                error!("Job {} has not finished and cannot be completed", 
job_id);
+            }
+        } else {
+            warn!("Fail to find job {} in the cache", job_id);
+        }
+
+        Ok(())
     }
 
     pub(crate) async fn cancel_job(
@@ -395,6 +313,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
     /// and remove the job from ActiveJobs or QueuedJobs
     /// TODO this should be atomic
     pub async fn fail_job(&self, job_id: &str, error_message: String) -> 
Result<()> {
+        debug!("Moving job {} from Active or Queue to Failed", job_id);
         let lock = self.state.lock(Keyspace::ActiveJobs, "").await?;
         self.fail_job_inner(lock, job_id, error_message).await
     }
@@ -407,16 +326,66 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
     ) -> Result<()> {
         with_lock(lock, self.state.delete(Keyspace::ActiveJobs, 
job_id)).await?;
 
-        let status = JobStatus {
-            status: Some(job_status::Status::Failed(FailedJob {
-                error: error_message,
-            })),
+        let value = if let Some(graph) = 
self.get_active_execution_graph(job_id).await {
+            let mut graph = graph.write().await;
+            graph.fail_job(error_message);
+            let graph = graph.clone();
+
+            self.encode_execution_graph(graph)?
+        } else {
+            warn!("Fail to find job {} in the cache", job_id);
+
+            let status = JobStatus {
+                status: Some(job_status::Status::Failed(FailedJob {
+                    error: error_message.clone(),
+                })),
+            };
+            encode_protobuf(&status)?
         };
-        let value = encode_protobuf(&status)?;
 
         self.state
             .put(Keyspace::FailedJobs, job_id.to_owned(), value)
-            .await
+            .await?;
+
+        Ok(())
+    }
+
+    /// Mark a job as failed. This will create a key under the FailedJobs 
keyspace
+    /// and remove the job from ActiveJobs or QueuedJobs
+    /// TODO this should be atomic
+    pub async fn fail_running_job(&self, job_id: &str) -> Result<()> {
+        if let Some(graph) = self.get_active_execution_graph(job_id).await {
+            let graph = graph.read().await.clone();
+            let value = self.encode_execution_graph(graph)?;
+
+            debug!("Moving job {} from Active to Failed", job_id);
+            let lock = self.state.lock(Keyspace::ActiveJobs, "").await?;
+            with_lock(lock, self.state.delete(Keyspace::ActiveJobs, 
job_id)).await?;
+            self.state
+                .put(Keyspace::FailedJobs, job_id.to_owned(), value)
+                .await?;
+        } else {
+            warn!("Fail to find job {} in the cache", job_id);
+        }
+
+        Ok(())
+    }
+
+    pub async fn update_job(&self, job_id: &str) -> Result<()> {
+        debug!("Update job {} in Active", job_id);
+        if let Some(graph) = self.get_active_execution_graph(job_id).await {
+            let mut graph = graph.write().await;
+            graph.revive();
+            let graph = graph.clone();
+            let value = self.encode_execution_graph(graph)?;
+            self.state
+                .put(Keyspace::ActiveJobs, job_id.to_owned(), value)
+                .await?;
+        } else {
+            warn!("Fail to find job {} in the cache", job_id);
+        }
+
+        Ok(())
     }
 
     #[cfg(not(test))]
@@ -476,9 +445,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
     /// Retrieve the number of available tasks for the given job. The value 
returned
     /// is strictly a point-in-time snapshot
     pub async fn get_available_task_count(&self, job_id: &str) -> 
Result<usize> {
-        let graph = self.get_execution_graph(job_id).await?;
-
-        Ok(graph.available_tasks())
+        if let Some(graph) = self.get_active_execution_graph(job_id).await {
+            let available_tasks = graph.read().await.available_tasks();
+            Ok(available_tasks)
+        } else {
+            warn!("Fail to find job {} in the cache", job_id);
+            Ok(0)
+        }
     }
 
     #[allow(dead_code)]
@@ -506,12 +479,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         Ok(task_definition)
     }
 
-    ///  Return a set of active job IDs. This will return all keys
-    /// in the `ActiveJobs` keyspace stripped of any prefixes used for
-    /// the storage layer (i.e. just the Job IDs).
-    async fn get_active_jobs(&self) -> Result<HashSet<String>> {
-        debug!("Scanning for active job IDs");
-        self.state.scan_keys(Keyspace::ActiveJobs).await
+    /// Get the `ExecutionGraph` for the given job ID from cache
+    pub(crate) async fn get_active_execution_graph(
+        &self,
+        job_id: &str,
+    ) -> Option<Arc<RwLock<ExecutionGraph>>> {
+        let active_graph_cache = self.active_job_cache.read().await;
+        active_graph_cache.get(job_id).cloned()
     }
 
     /// Get the `ExecutionGraph` for the given job ID. This will search fist 
in the `ActiveJobs`
@@ -559,17 +533,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
 
         encode_protobuf(&proto)
     }
-}
 
-/// Find the next available task in a set of `ExecutionGraph`s
-fn find_next_task(
-    executor_id: &str,
-    graphs: &mut HashMap<String, ExecutionGraph>,
-) -> Result<Option<(String, Task)>> {
-    for graph in graphs.values_mut() {
-        if let Ok(Some(task)) = graph.pop_next_task(executor_id) {
-            return Ok(Some((executor_id.to_owned(), task)));
-        }
+    /// Generate a new random Job ID
+    pub fn generate_job_id(&self) -> String {
+        let mut rng = thread_rng();
+        std::iter::repeat(())
+            .map(|()| rng.sample(Alphanumeric))
+            .map(char::from)
+            .take(7)
+            .collect()
     }
-    Ok(None)
 }

Reply via email to