This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 914dba6e Move ExecutionGraph encoding and decoding logic into 
execution_graph for better encapsulation (#150)
914dba6e is described below

commit 914dba6e8125b4ed6810b4560758981764fcdfb6
Author: yahoNanJing <[email protected]>
AuthorDate: Mon Aug 22 22:02:34 2022 +0800

    Move ExecutionGraph encoding and decoding logic into execution_graph for 
better encapsulation (#150)
    
    Co-authored-by: yangzhong <[email protected]>
---
 .../rust/scheduler/src/state/execution_graph.rs    | 1448 +++++++++++---------
 ballista/rust/scheduler/src/state/task_manager.rs  |  213 +--
 2 files changed, 843 insertions(+), 818 deletions(-)

diff --git a/ballista/rust/scheduler/src/state/execution_graph.rs 
b/ballista/rust/scheduler/src/state/execution_graph.rs
index d928f969..e3599092 100644
--- a/ballista/rust/scheduler/src/state/execution_graph.rs
+++ b/ballista/rust/scheduler/src/state/execution_graph.rs
@@ -36,740 +36,958 @@ use std::collections::HashMap;
 use std::convert::TryInto;
 use std::fmt::{Debug, Formatter};
 
+use 
ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
+use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
+use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
 use ballista_core::utils::collect_plan_metrics;
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
 use datafusion::physical_plan::metrics::{MetricValue, MetricsSet};
+use datafusion::prelude::SessionContext;
+use datafusion_proto::logical_plan::AsLogicalPlan;
 use std::sync::Arc;
 
-/// This data structure collects the partition locations for an 
`ExecutionStage`.
-/// Each `ExecutionStage` will hold a `StageOutput`s for each of its child 
stages.
-/// When all tasks for the child stage are complete, it will mark the 
`StageOutput`
-#[derive(Clone, Debug, Default)]
-pub struct StageOutput {
-    /// Map from partition -> partition locations
-    pub(crate) partition_locations: HashMap<usize, Vec<PartitionLocation>>,
-    /// Flag indicating whether all tasks are complete
-    pub(crate) complete: bool,
-}
-
-impl StageOutput {
-    pub fn new() -> Self {
-        Self {
-            partition_locations: HashMap::new(),
-            complete: false,
-        }
-    }
-
-    /// Add a `PartitionLocation` to the `StageOutput`
-    pub fn add_partition(&mut self, partition_location: PartitionLocation) {
-        if let Some(parts) = self
-            .partition_locations
-            .get_mut(&partition_location.partition_id.partition_id)
-        {
-            parts.push(partition_location)
-        } else {
-            self.partition_locations.insert(
-                partition_location.partition_id.partition_id,
-                vec![partition_location],
-            );
-        }
-    }
-
-    pub fn is_complete(&self) -> bool {
-        self.complete
-    }
-}
-
-/// A stage in the ExecutionGraph.
-///
-/// This represents a set of tasks (one per each `partition`) which can
-/// be executed concurrently.
+/// Represents the basic unit of work for the Ballista executor. Will execute
+/// one partition of one stage on one task slot.
 #[derive(Clone)]
-pub struct ExecutionStage {
-    /// Stage ID
-    pub(crate) stage_id: usize,
-    /// Total number of output partitions for this stage.
-    /// This stage will produce on task for partition.
-    pub(crate) partitions: usize,
-    /// Output partitioning for this stage.
-    pub(crate) output_partitioning: Option<Partitioning>,
-    /// Represents the outputs from this stage's child stages.
-    /// This stage can only be resolved an executed once all child stages are 
completed.
-    pub(crate) inputs: HashMap<usize, StageOutput>,
-    // `ExecutionPlan` for this stage
-    pub(crate) plan: Arc<dyn ExecutionPlan>,
-    /// Status of each already scheduled task. If status is None, the 
partition has not yet been scheduled
-    pub(crate) task_statuses: Vec<Option<task_status::Status>>,
-    /// Stage ID of the stage that will take this stages outputs as inputs.
-    /// If `output_links` is empty then this the final stage in the 
`ExecutionGraph`
-    pub(crate) output_links: Vec<usize>,
-    /// Flag indicating whether all input partitions have been resolved and 
the plan
-    /// has UnresovledShuffleExec operators resolved to ShuffleReadExec 
operators.
-    pub(crate) resolved: bool,
-    /// Combined metrics of the already finished tasks in the stage, If it is 
None, no task is finished yet.
-    pub(crate) stage_metrics: Option<Vec<MetricsSet>>,
+pub struct Task {
+    pub session_id: String,
+    pub partition: PartitionId,
+    pub plan: Arc<dyn ExecutionPlan>,
+    pub output_partitioning: Option<Partitioning>,
 }
 
-impl Debug for ExecutionStage {
+impl Debug for Task {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
-        let scheduled_tasks = self.task_statuses.iter().filter(|t| 
t.is_some()).count();
-
         write!(
             f,
-            "Stage[id={}, partitions={:?}, children={}, completed_tasks={}, 
resolved={}, scheduled_tasks={}, available_tasks={}]\nInputs{:?}\n\n{}",
-            self.stage_id,
-            self.partitions,
-            self.inputs.len(),
-            self.completed_tasks(),
-            self.resolved,
-            scheduled_tasks,
-            self.available_tasks(),
-            self.inputs,
+            "Task[session_id: {}, job: {}, stage: {}, partition: {}]\n{}",
+            self.session_id,
+            self.partition.job_id,
+            self.partition.stage_id,
+            self.partition.partition_id,
             plan
         )
     }
 }
 
-impl ExecutionStage {
+/// Represents the DAG for a distributed query plan.
+///
+/// A distributed query plan consists of a set of stages which must be 
executed sequentially.
+///
+/// Each stage consists of a set of partitions which can be executed in 
parallel, where each partition
+/// represents a `Task`, which is the basic unit of scheduling in Ballista.
+///
+/// As an example, consider a SQL query which performs a simple aggregation:
+///
+/// `SELECT id, SUM(gmv) FROM some_table GROUP BY id`
+///
+/// This will produce a DataFusion execution plan that looks something like
+///
+///
+///   CoalesceBatchesExec: target_batch_size=4096
+///     RepartitionExec: partitioning=Hash([Column { name: "id", index: 0 }], 
4)
+///       AggregateExec: mode=Partial, gby=[id@0 as id], 
aggr=[SUM(some_table.gmv)]
+///         TableScan: some_table
+///
+/// The Ballista `DistributedPlanner` will turn this into a distributed plan 
by creating a shuffle
+/// boundary (called a "Stage") whenever the underlying plan needs to perform 
a repartition.
+/// In this case we end up with a distributed plan with two stages:
+///
+///
+/// ExecutionGraph[job_id=job, session_id=session, available_tasks=1, 
complete=false]
+/// Stage[id=2, partitions=4, children=1, completed_tasks=0, resolved=false, 
scheduled_tasks=0, available_tasks=0]
+/// Inputs{1: StageOutput { partition_locations: {}, complete: false }}
+///
+/// ShuffleWriterExec: None
+///   AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], 
aggr=[SUM(?table?.gmv)]
+///     CoalesceBatchesExec: target_batch_size=4096
+///       UnresolvedShuffleExec
+///
+/// Stage[id=1, partitions=1, children=0, completed_tasks=0, resolved=true, 
scheduled_tasks=0, available_tasks=1]
+/// Inputs{}
+///
+/// ShuffleWriterExec: Some(Hash([Column { name: "id", index: 0 }], 4))
+///   AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[SUM(?table?.gmv)]
+///     TableScan: some_table
+///
+///
+/// The DAG structure of this `ExecutionGraph` is encoded in the stages. Each 
stage's `input` field
+/// will indicate which stages it depends on, and each stage's `output_links` 
will indicate which
+/// stage it needs to publish its output to.
+///
+/// If a stage has `output_links` is empty then it is the final stage in this 
query, and it should
+/// publish its outputs to the `ExecutionGraph`s `output_locations` 
representing the final query results.
+#[derive(Clone)]
+pub struct ExecutionGraph {
+    /// ID for this job
+    job_id: String,
+    /// Session ID for this job
+    session_id: String,
+    /// Status of this job
+    status: JobStatus,
+    /// Map from Stage ID -> ExecutionStage
+    stages: HashMap<usize, ExecutionStage>,
+    /// Total number fo output partitions
+    output_partitions: usize,
+    /// Locations of this `ExecutionGraph` final output locations
+    output_locations: Vec<PartitionLocation>,
+}
+
+impl ExecutionGraph {
     pub fn new(
-        stage_id: usize,
+        job_id: &str,
+        session_id: &str,
         plan: Arc<dyn ExecutionPlan>,
-        output_partitioning: Option<Partitioning>,
-        output_links: Vec<usize>,
-        child_stages: Vec<usize>,
-    ) -> Self {
-        let num_tasks = plan.output_partitioning().partition_count();
+    ) -> Result<Self> {
+        let mut planner = DistributedPlanner::new();
 
-        let resolved = child_stages.is_empty();
+        let output_partitions = plan.output_partitioning().partition_count();
 
-        let mut inputs: HashMap<usize, StageOutput> = HashMap::new();
+        let shuffle_stages = planner.plan_query_stages(job_id, plan)?;
 
-        for input_stage_id in &child_stages {
-            inputs.insert(*input_stage_id, StageOutput::new());
-        }
+        let builder = ExecutionStageBuilder::new();
+        let stages = builder.build(shuffle_stages)?;
 
-        Self {
-            stage_id,
-            partitions: num_tasks,
-            output_partitioning,
-            inputs,
-            plan,
-            task_statuses: vec![None; num_tasks],
-            output_links,
-            resolved,
-            stage_metrics: None,
-        }
+        Ok(Self {
+            job_id: job_id.to_string(),
+            session_id: session_id.to_string(),
+            status: JobStatus {
+                status: Some(job_status::Status::Queued(QueuedJob {})),
+            },
+            stages,
+            output_partitions,
+            output_locations: vec![],
+        })
     }
 
-    /// Returns true if all inputs are complete and we can resolve all
-    /// UnresolvedShuffleExec operators to ShuffleReadExec
-    pub fn resolvable(&self) -> bool {
-        self.inputs.iter().all(|(_, outputs)| outputs.is_complete())
+    pub fn job_id(&self) -> &str {
+        self.job_id.as_str()
     }
 
-    /// Returns `true` if all tasks for this stage are complete
-    pub fn complete(&self) -> bool {
-        self.task_statuses
-            .iter()
-            .all(|status| matches!(status, 
Some(task_status::Status::Completed(_))))
+    pub fn session_id(&self) -> &str {
+        self.session_id.as_str()
     }
 
-    /// Returns the number of tasks
-    pub fn completed_tasks(&self) -> usize {
-        self.task_statuses
-            .iter()
-            .filter(|status| matches!(status, 
Some(task_status::Status::Completed(_))))
-            .count()
+    pub fn status(&self) -> JobStatus {
+        self.status.clone()
     }
 
-    /// Marks the input stage ID as complete.
-    pub fn complete_input(&mut self, stage_id: usize) {
-        if let Some(input) = self.inputs.get_mut(&stage_id) {
-            input.complete = true;
-        }
+    /// An ExecutionGraph is complete if all its stages are complete
+    pub fn complete(&self) -> bool {
+        self.stages.values().all(|s| s.complete())
     }
 
-    /// Returns true if the stage plan has all UnresolvedShuffleExec operators 
resolved to
-    /// ShuffleReadExec
-    pub fn resolved(&self) -> bool {
-        self.resolved
-    }
+    /// Update task statuses and task metrics in the graph.
+    /// This will also push shuffle partitions to their respective shuffle 
read stages.
+    pub fn update_task_status(
+        &mut self,
+        executor: &ExecutorMetadata,
+        statuses: Vec<TaskStatus>,
+    ) -> Result<()> {
+        for status in statuses.into_iter() {
+            if let TaskStatus {
+                task_id:
+                    Some(protobuf::PartitionId {
+                        job_id,
+                        stage_id,
+                        partition_id,
+                    }),
+                metrics: operator_metrics,
+                status: Some(task_status),
+            } = status
+            {
+                if job_id != self.job_id() {
+                    return Err(BallistaError::Internal(format!(
+                        "Error updating job {}: Invalid task status job ID {}",
+                        self.job_id(),
+                        job_id
+                    )));
+                }
 
-    /// Returns the number of tasks in this stage which are available for 
scheduling.
-    /// If the stage is not yet resolved, then this will return `0`, otherwise 
it will
-    /// return the number of tasks where the task status is not yet set.
-    pub fn available_tasks(&self) -> usize {
-        if self.resolved {
-            self.task_statuses.iter().filter(|s| s.is_none()).count()
-        } else {
-            0
-        }
-    }
+                let stage_id = stage_id as usize;
+                let partition = partition_id as usize;
+                if let Some(stage) = self.stages.get_mut(&stage_id) {
+                    stage.update_task_status(partition, task_status.clone());
+                    let stage_plan = stage.plan.clone();
+                    let stage_complete = stage.complete();
 
-    /// Resolve any UnresolvedShuffleExec operators within this stage's plan
-    pub fn resolve_shuffles(&mut self) -> Result<()> {
-        println!("Resolving shuffles\n{:?}", self);
-        if self.resolved {
-            // If this stage has no input shuffles, then it is already resolved
-            Ok(())
-        } else {
-            let input_locations = self
-                .inputs
-                .iter()
-                .map(|(stage, outputs)| (*stage, 
outputs.partition_locations.clone()))
-                .collect();
-            // Otherwise, rewrite the plan to replace UnresolvedShuffleExec 
with ShuffleReadExec
-            let new_plan = crate::planner::remove_unresolved_shuffles(
-                self.plan.clone(),
-                &input_locations,
-            )?;
-            self.plan = new_plan;
-            self.resolved = true;
-            Ok(())
-        }
-    }
+                    // TODO Should be able to reschedule this task.
+                    if let task_status::Status::Failed(failed_task) = 
task_status {
+                        self.status = JobStatus {
+                            status: Some(job_status::Status::Failed(FailedJob {
+                                error: format!(
+                                    "Task {}/{}/{} failed: {}",
+                                    job_id, stage_id, partition_id, 
failed_task.error
+                                ),
+                            })),
+                        };
+                        return Ok(());
+                    } else if let 
task_status::Status::Completed(completed_task) =
+                        task_status
+                    {
+                        // update task metrics for completed task
+                        stage.update_task_metrics(partition, 
operator_metrics)?;
 
-    /// Update the status for task partition
-    pub fn update_task_status(&mut self, partition: usize, status: 
task_status::Status) {
-        debug!("Updating task status for partition {}", partition);
-        self.task_statuses[partition] = Some(status);
-    }
+                        // if this stage is completed, we want to combine the 
stage metrics to plan's metric set and print out the plan
+                        if stage_complete && 
stage.stage_metrics.as_ref().is_some() {
+                            // The plan_metrics collected here is a snapshot 
clone from the plan metrics.
+                            // They are all empty now and need to combine with 
the stage metrics in the ExecutionStages
+                            let mut plan_metrics =
+                                collect_plan_metrics(stage_plan.as_ref());
+                            let stage_metrics = stage
+                                .stage_metrics
+                                .as_ref()
+                                .expect("stage metrics should not be None.");
+                            if plan_metrics.len() != stage_metrics.len() {
+                                return 
Err(BallistaError::Internal(format!("Error combine stage metrics to plan for 
stage {},  plan metrics array size {} does not equal \
+                to the stage metrics array size {}", stage_id, 
plan_metrics.len(), stage_metrics.len())));
+                            }
+                            
plan_metrics.iter_mut().zip(stage_metrics).for_each(
+                                |(plan_metric, stage_metric)| {
+                                    stage_metric
+                                        .iter()
+                                        .for_each(|s| 
plan_metric.push(s.clone()));
+                                },
+                            );
 
-    /// update and combine the task metrics to the stage metrics
-    pub fn update_task_metrics(
-        &mut self,
-        partition: usize,
-        metrics: Vec<OperatorMetricsSet>,
-    ) -> Result<()> {
-        if let Some(combined_metrics) = &mut self.stage_metrics {
-            if metrics.len() != combined_metrics.len() {
-                return Err(BallistaError::Internal(format!("Error updating 
task metrics to stage {}, task metrics array size {} does not equal \
-                with the stage metrics array size {} for task {}", 
self.stage_id, metrics.len(), combined_metrics.len(), partition)));
-            }
-            let metrics_values_array = metrics
-                .into_iter()
-                .map(|ms| {
-                    ms.metrics
-                        .into_iter()
-                        .map(|m| m.try_into())
-                        .collect::<Result<Vec<_>>>()
-                })
-                .collect::<Result<Vec<_>>>()?;
+                            info!(
+                                "=== [{}/{}/{}] Stage finished, physical plan 
with metrics ===\n{}\n",
+                                job_id,
+                                stage_id,
+                                partition,
+                                
DisplayableBallistaExecutionPlan::new(stage_plan.as_ref(), 
plan_metrics.as_ref()).indent()
+                            );
+                        }
 
-            let new_metrics_set = combined_metrics
-                .iter_mut()
-                .zip(metrics_values_array)
-                .map(|(first, second)| {
-                    Self::combine_metrics_set(first, second, partition)
-                })
-                .collect();
-            self.stage_metrics = Some(new_metrics_set)
-        } else {
-            let new_metrics_set = metrics
-                .into_iter()
-                .map(|ms| ms.try_into())
-                .collect::<Result<Vec<_>>>()?;
-            if !new_metrics_set.is_empty() {
-                self.stage_metrics = Some(new_metrics_set)
+                        let locations = partition_to_location(
+                            self.job_id.as_str(),
+                            stage_id,
+                            executor,
+                            completed_task.partitions,
+                        );
+
+                        let output_links = stage.output_links.clone();
+                        if output_links.is_empty() {
+                            // If `output_links` is empty, then this is a 
final stage
+                            self.output_locations.extend(locations);
+                        } else {
+                            for link in output_links.into_iter() {
+                                // If this is an intermediate stage, we need 
to push its `PartitionLocation`s to the parent stage
+                                if let Some(linked_stage) = 
self.stages.get_mut(&link) {
+                                    linked_stage.add_input_partitions(
+                                        stage_id,
+                                        partition,
+                                        locations.clone(),
+                                    )?;
+
+                                    // If all tasks for this stage are 
complete, mark the input complete in the parent stage
+                                    if stage_complete {
+                                        linked_stage.complete_input(stage_id);
+                                    }
+
+                                    // If all input partitions are ready, we 
can resolve any UnresolvedShuffleExec in the parent stage plan
+                                    if linked_stage.resolvable() {
+                                        linked_stage.resolve_shuffles()?;
+                                    }
+                                } else {
+                                    return 
Err(BallistaError::Internal(format!("Error updating job {}: Invalid output link 
{} for stage {}", job_id, stage_id, link)));
+                                }
+                            }
+                        }
+                    }
+                } else {
+                    return Err(BallistaError::Internal(format!(
+                        "Invalid stage ID {} for job {}",
+                        stage_id,
+                        self.job_id()
+                    )));
+                }
             }
         }
+
         Ok(())
     }
 
-    pub fn combine_metrics_set(
-        first: &mut MetricsSet,
-        second: Vec<MetricValue>,
-        partition: usize,
-    ) -> MetricsSet {
-        for metric_value in second {
-            // TODO recheck the lable logic
-            let new_metric = Arc::new(Metric::new(metric_value, 
Some(partition)));
-            first.push(new_metric);
-        }
-        first.aggregate_by_partition()
+    /// Total number of tasks in this plan that are ready for scheduling
+    pub fn available_tasks(&self) -> usize {
+        self.stages
+            .iter()
+            .map(|(_, stage)| stage.available_tasks())
+            .sum()
     }
 
-    /// Add input partitions published from an input stage.
-    pub fn add_input_partitions(
-        &mut self,
-        stage_id: usize,
-        _partition_id: usize,
-        locations: Vec<PartitionLocation>,
-    ) -> Result<()> {
-        if let Some(stage_inputs) = self.inputs.get_mut(&stage_id) {
-            for partition in locations {
-                stage_inputs.add_partition(partition);
-            }
-        } else {
-            return Err(BallistaError::Internal(format!("Error adding input 
partitions to stage {}, {} is not a valid child stage ID", self.stage_id, 
stage_id)));
-        }
+    /// Get next task that can be assigned to the given executor.
+    /// This method should only be called when the resulting task is 
immediately
+    /// being launched as the status will be set to Running and it will not be
+    /// available to the scheduler.
+    /// If the task is not launched the status must be reset to allow the task 
to
+    /// be scheduled elsewhere.
+    pub fn pop_next_task(&mut self, executor_id: &str) -> Result<Option<Task>> 
{
+        let job_id = self.job_id.clone();
+        let session_id = self.session_id.clone();
+        self.stages.iter_mut().find(|(_stage_id, stage)| {
+            stage.resolved() && stage.available_tasks() > 0
+        }).map(|(stage_id, stage)| {
+            let (partition_id,_) = stage
+                .task_statuses
+                .iter()
+                .enumerate()
+                .find(|(_partition,status)| status.is_none())
+                .ok_or_else(|| {
+                    BallistaError::Internal(format!("Error getting next task 
for job {}: Stage {} is ready but has no pending tasks", job_id, stage_id))
+                })?;
 
-        Ok(())
-    }
-}
+            let partition = PartitionId {
+                job_id,
+                stage_id: *stage_id,
+                partition_id
+            };
 
-/// Utility for building a set of `ExecutionStage`s from
-/// a list of `ShuffleWriterExec`.
-///
-/// This will infer the dependency structure for the stages
-/// so that we can construct a DAG from the stages.
-struct ExecutionStageBuilder {
-    /// Stage ID which is currently being visited
-    current_stage_id: usize,
-    /// Map from stage ID -> List of child stage IDs
-    stage_dependencies: HashMap<usize, Vec<usize>>,
-    /// Map from Stage ID -> output link
-    output_links: HashMap<usize, Vec<usize>>,
-}
+            // Set the status to Running
+            stage.task_statuses[partition_id] = 
Some(task_status::Status::Running(RunningTask {
+                executor_id: executor_id.to_owned()
+            }));
 
-impl ExecutionStageBuilder {
-    pub fn new() -> Self {
-        Self {
-            current_stage_id: 0,
-            stage_dependencies: HashMap::new(),
-            output_links: HashMap::new(),
-        }
+            Ok(Task {
+                session_id,
+                partition,
+                plan: stage.plan.clone(),
+                output_partitioning: stage.output_partitioning.clone()
+            })
+        }).transpose()
     }
 
-    pub fn build(
-        mut self,
-        stages: Vec<Arc<ShuffleWriterExec>>,
-    ) -> Result<HashMap<usize, ExecutionStage>> {
-        let mut execution_stages: HashMap<usize, ExecutionStage> = 
HashMap::new();
-        // First, build the dependency graph
-        for stage in &stages {
-            accept(stage.as_ref(), &mut self)?;
+    pub fn finalize(&mut self) -> Result<()> {
+        if !self.complete() {
+            return Err(BallistaError::Internal(format!(
+                "Attempt to finalize an incomplete job {}",
+                self.job_id()
+            )));
         }
 
-        // Now, create the execution stages
-        for stage in stages {
-            let partitioning = stage.shuffle_output_partitioning().cloned();
-            let stage_id = stage.stage_id();
-            let output_links = 
self.output_links.remove(&stage_id).unwrap_or_default();
+        let partition_location = self
+            .output_locations()
+            .into_iter()
+            .map(|l| l.try_into())
+            .collect::<Result<Vec<_>>>()?;
 
-            let child_stages = self
-                .stage_dependencies
-                .remove(&stage_id)
-                .unwrap_or_default();
+        self.status = JobStatus {
+            status: Some(job_status::Status::Completed(CompletedJob {
+                partition_location,
+            })),
+        };
 
-            execution_stages.insert(
-                stage_id,
-                ExecutionStage::new(
-                    stage_id,
-                    stage,
-                    partitioning,
-                    output_links,
-                    child_stages,
-                ),
-            );
+        Ok(())
+    }
+
+    pub fn update_status(&mut self, status: JobStatus) {
+        self.status = status;
+    }
+
+    /// Reset the status for the given task. This should be called is a task 
failed to
+    /// launch and it needs to be returned to the set of available tasks and be
+    /// re-scheduled.
+    pub fn reset_task_status(&mut self, task: Task) {
+        let stage_id = task.partition.stage_id;
+        let partition = task.partition.partition_id;
+
+        if let Some(stage) = self.stages.get_mut(&stage_id) {
+            stage.task_statuses[partition] = None;
         }
+    }
 
-        Ok(execution_stages)
+    pub fn output_locations(&self) -> Vec<PartitionLocation> {
+        self.output_locations.clone()
     }
-}
 
-impl ExecutionPlanVisitor for ExecutionStageBuilder {
-    type Error = BallistaError;
+    pub(crate) async fn decode_execution_graph<
+        T: 'static + AsLogicalPlan,
+        U: 'static + AsExecutionPlan,
+    >(
+        proto: protobuf::ExecutionGraph,
+        codec: &BallistaCodec<T, U>,
+        session_ctx: &SessionContext,
+    ) -> Result<ExecutionGraph> {
+        let mut stages: HashMap<usize, ExecutionStage> = HashMap::new();
+        for stage in proto.stages {
+            let plan_proto = U::try_decode(stage.plan.as_slice())?;
+            let plan = plan_proto.try_into_physical_plan(
+                session_ctx,
+                session_ctx.runtime_env().as_ref(),
+                codec.physical_extension_codec(),
+            )?;
 
-    fn pre_visit(
-        &mut self,
-        plan: &dyn ExecutionPlan,
-    ) -> std::result::Result<bool, Self::Error> {
-        if let Some(shuffle_write) = 
plan.as_any().downcast_ref::<ShuffleWriterExec>() {
-            self.current_stage_id = shuffle_write.stage_id();
-        } else if let Some(unresolved_shuffle) =
-            plan.as_any().downcast_ref::<UnresolvedShuffleExec>()
-        {
-            if let Some(output_links) =
-                self.output_links.get_mut(&unresolved_shuffle.stage_id)
-            {
-                if !output_links.contains(&self.current_stage_id) {
-                    output_links.push(self.current_stage_id);
+            let stage_id = stage.stage_id as usize;
+            let partitions: usize = stage.partitions as usize;
+
+            let mut task_statuses: Vec<Option<task_status::Status>> =
+                vec![None; partitions];
+
+            for status in stage.task_statuses {
+                if let Some(task_id) = status.task_id.as_ref() {
+                    task_statuses[task_id.partition_id as usize] = 
status.status
                 }
-            } else {
-                self.output_links
-                    .insert(unresolved_shuffle.stage_id, 
vec![self.current_stage_id]);
             }
 
-            if let Some(deps) = 
self.stage_dependencies.get_mut(&self.current_stage_id) {
-                if !deps.contains(&unresolved_shuffle.stage_id) {
-                    deps.push(unresolved_shuffle.stage_id);
-                }
-            } else {
-                self.stage_dependencies
-                    .insert(self.current_stage_id, 
vec![unresolved_shuffle.stage_id]);
+            let output_partitioning: Option<Partitioning> =
+                parse_protobuf_hash_partitioning(
+                    stage.output_partitioning.as_ref(),
+                    session_ctx,
+                    plan.schema().as_ref(),
+                )?;
+
+            let mut inputs: HashMap<usize, StageOutput> = HashMap::new();
+
+            for input in stage.inputs {
+                let stage_id = input.stage_id as usize;
+
+                let outputs = input
+                    .partition_locations
+                    .into_iter()
+                    .map(|loc| {
+                        let partition = loc.partition as usize;
+                        let locations = loc
+                            .partition_location
+                            .into_iter()
+                            .map(|l| l.try_into())
+                            .collect::<Result<Vec<_>>>()?;
+                        Ok((partition, locations))
+                    })
+                    .collect::<Result<HashMap<usize, 
Vec<PartitionLocation>>>>()?;
+
+                inputs.insert(
+                    stage_id,
+                    StageOutput {
+                        partition_locations: outputs,
+                        complete: input.complete,
+                    },
+                );
             }
+            let stage_metrics = if stage.stage_metrics.is_empty() {
+                None
+            } else {
+                let ms = stage
+                    .stage_metrics
+                    .into_iter()
+                    .map(|m| m.try_into())
+                    .collect::<Result<Vec<_>>>()?;
+                Some(ms)
+            };
+
+            let execution_stage = ExecutionStage {
+                stage_id: stage.stage_id as usize,
+                partitions,
+                output_partitioning,
+                inputs,
+                plan,
+                task_statuses,
+                output_links: stage
+                    .output_links
+                    .into_iter()
+                    .map(|l| l as usize)
+                    .collect(),
+                resolved: stage.resolved,
+                stage_metrics,
+            };
+            stages.insert(stage_id, execution_stage);
         }
-        Ok(true)
-    }
-}
 
-/// Represents the basic unit of work for the Ballista executor. Will execute
-/// one partition of one stage on one task slot.
-#[derive(Clone)]
-pub struct Task {
-    pub session_id: String,
-    pub partition: PartitionId,
-    pub plan: Arc<dyn ExecutionPlan>,
-    pub output_partitioning: Option<Partitioning>,
-}
+        let output_locations: Vec<PartitionLocation> = proto
+            .output_locations
+            .into_iter()
+            .map(|loc| loc.try_into())
+            .collect::<Result<Vec<_>>>()?;
 
-impl Debug for Task {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
-        write!(
-            f,
-            "Task[session_id: {}, job: {}, stage: {}, partition: {}]\n{}",
-            self.session_id,
-            self.partition.job_id,
-            self.partition.stage_id,
-            self.partition.partition_id,
-            plan
-        )
+        Ok(ExecutionGraph {
+            job_id: proto.job_id,
+            session_id: proto.session_id,
+            status: proto.status.ok_or_else(|| {
+                BallistaError::Internal(
+                    "Invalid Execution Graph: missing job status".to_owned(),
+                )
+            })?,
+            stages,
+            output_partitions: proto.output_partitions as usize,
+            output_locations,
+        })
     }
-}
-
-/// Represents the DAG for a distributed query plan.
-///
-/// A distributed query plan consists of a set of stages which must be 
executed sequentially.
-///
-/// Each stage consists of a set of partitions which can be executed in 
parallel, where each partition
-/// represents a `Task`, which is the basic unit of scheduling in Ballista.
-///
-/// As an example, consider a SQL query which performs a simple aggregation:
-///
-/// `SELECT id, SUM(gmv) FROM some_table GROUP BY id`
-///
-/// This will produce a DataFusion execution plan that looks something like
-///
-///
-///   CoalesceBatchesExec: target_batch_size=4096
-///     RepartitionExec: partitioning=Hash([Column { name: "id", index: 0 }], 
4)
-///       AggregateExec: mode=Partial, gby=[id@0 as id], 
aggr=[SUM(some_table.gmv)]
-///         TableScan: some_table
-///
-/// The Ballista `DistributedPlanner` will turn this into a distributed plan 
by creating a shuffle
-/// boundary (called a "Stage") whenever the underlying plan needs to perform 
a repartition.
-/// In this case we end up with a distributed plan with two stages:
-///
-///
-/// ExecutionGraph[job_id=job, session_id=session, available_tasks=1, 
complete=false]
-/// Stage[id=2, partitions=4, children=1, completed_tasks=0, resolved=false, 
scheduled_tasks=0, available_tasks=0]
-/// Inputs{1: StageOutput { partition_locations: {}, complete: false }}
-///
-/// ShuffleWriterExec: None
-///   AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], 
aggr=[SUM(?table?.gmv)]
-///     CoalesceBatchesExec: target_batch_size=4096
-///       UnresolvedShuffleExec
-///
-/// Stage[id=1, partitions=1, children=0, completed_tasks=0, resolved=true, 
scheduled_tasks=0, available_tasks=1]
-/// Inputs{}
-///
-/// ShuffleWriterExec: Some(Hash([Column { name: "id", index: 0 }], 4))
-///   AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[SUM(?table?.gmv)]
-///     TableScan: some_table
-///
-///
-/// The DAG structure of this `ExecutionGraph` is encoded in the stages. Each 
stage's `input` field
-/// will indicate which stages it depends on, and each stage's `output_links` 
will indicate which
-/// stage it needs to publish its output to.
-///
-/// If a stage has `output_links` is empty then it is the final stage in this 
query, and it should
-/// publish its outputs to the `ExecutionGraph`s `output_locations` 
representing the final query results.
-#[derive(Clone)]
-pub struct ExecutionGraph {
-    /// ID for this job
-    pub(crate) job_id: String,
-    /// Session ID for this job
-    pub(crate) session_id: String,
-    /// Status of this job
-    pub(crate) status: JobStatus,
-    /// Map from Stage ID -> ExecutionStage
-    pub(crate) stages: HashMap<usize, ExecutionStage>,
-    /// Total number fo output partitions
-    pub(crate) output_partitions: usize,
-    /// Locations of this `ExecutionGraph` final output locations
-    pub(crate) output_locations: Vec<PartitionLocation>,
-}
 
-impl ExecutionGraph {
-    pub fn new(
-        job_id: &str,
-        session_id: &str,
-        plan: Arc<dyn ExecutionPlan>,
-    ) -> Result<Self> {
-        let mut planner = DistributedPlanner::new();
+    pub(crate) fn encode_execution_graph<
+        T: 'static + AsLogicalPlan,
+        U: 'static + AsExecutionPlan,
+    >(
+        graph: ExecutionGraph,
+        codec: &BallistaCodec<T, U>,
+    ) -> Result<protobuf::ExecutionGraph> {
+        let job_id = graph.job_id().to_owned();
 
-        let output_partitions = plan.output_partitioning().partition_count();
+        let stages = graph
+            .stages
+            .into_iter()
+            .map(|(stage_id, stage)| {
+                let mut plan: Vec<u8> = vec![];
+
+                U::try_from_physical_plan(stage.plan, 
codec.physical_extension_codec())
+                    .and_then(|proto| proto.try_encode(&mut plan))?;
+
+                let mut inputs: Vec<protobuf::GraphStageInput> = vec![];
+
+                for (stage, output) in stage.inputs.into_iter() {
+                    inputs.push(protobuf::GraphStageInput {
+                        stage_id: stage as u32,
+                        partition_locations: output
+                            .partition_locations
+                            .into_iter()
+                            .map(|(partition, locations)| {
+                                Ok(protobuf::TaskInputPartitions {
+                                    partition: partition as u32,
+                                    partition_location: locations
+                                        .into_iter()
+                                        .map(|l| l.try_into())
+                                        .collect::<Result<Vec<_>>>()?,
+                                })
+                            })
+                            .collect::<Result<Vec<_>>>()?,
+                        complete: output.complete,
+                    });
+                }
 
-        let shuffle_stages = planner.plan_query_stages(job_id, plan)?;
+                let task_statuses: Vec<protobuf::TaskStatus> = stage
+                    .task_statuses
+                    .into_iter()
+                    .enumerate()
+                    .filter_map(|(partition, status)| {
+                        status.map(|status| protobuf::TaskStatus {
+                            task_id: Some(protobuf::PartitionId {
+                                job_id: job_id.clone(),
+                                stage_id: stage_id as u32,
+                                partition_id: partition as u32,
+                            }),
+                            // task metrics should not persist.
+                            metrics: vec![],
+                            status: Some(status),
+                        })
+                    })
+                    .collect();
+
+                let output_partitioning =
+                    
hash_partitioning_to_proto(stage.output_partitioning.as_ref())?;
+
+                let stage_metrics = stage
+                    .stage_metrics
+                    .unwrap_or_default()
+                    .into_iter()
+                    .map(|m| m.try_into())
+                    .collect::<Result<Vec<_>>>()?;
+                Ok(protobuf::ExecutionGraphStage {
+                    stage_id: stage_id as u64,
+                    partitions: stage.partitions as u32,
+                    output_partitioning,
+                    inputs,
+                    plan,
+                    task_statuses,
+                    output_links: stage
+                        .output_links
+                        .into_iter()
+                        .map(|l| l as u32)
+                        .collect(),
+                    resolved: stage.resolved,
+                    stage_metrics,
+                })
+            })
+            .collect::<Result<Vec<_>>>()?;
 
-        let builder = ExecutionStageBuilder::new();
-        let stages = builder.build(shuffle_stages)?;
+        let output_locations: Vec<protobuf::PartitionLocation> = graph
+            .output_locations
+            .into_iter()
+            .map(|loc| loc.try_into())
+            .collect::<Result<Vec<_>>>()?;
 
-        Ok(Self {
-            job_id: job_id.to_string(),
-            session_id: session_id.to_string(),
-            status: JobStatus {
-                status: Some(job_status::Status::Queued(QueuedJob {})),
-            },
+        Ok(protobuf::ExecutionGraph {
+            job_id: graph.job_id,
+            session_id: graph.session_id,
+            status: Some(graph.status),
             stages,
-            output_partitions,
-            output_locations: vec![],
+            output_partitions: graph.output_partitions as u64,
+            output_locations,
         })
     }
+}
+
+impl Debug for ExecutionGraph {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let stages = self
+            .stages
+            .iter()
+            .map(|(_, stage)| format!("{:?}", stage))
+            .collect::<Vec<String>>()
+            .join("\n");
+        write!(f, "ExecutionGraph[job_id={}, session_id={}, 
available_tasks={}, complete={}]\n{}", self.job_id, self.session_id, 
self.available_tasks(), self.complete(), stages)
+    }
+}
 
-    pub fn job_id(&self) -> &str {
-        self.job_id.as_str()
+/// This data structure collects the partition locations for an 
`ExecutionStage`.
+/// Each `ExecutionStage` will hold a `StageOutput`s for each of its child 
stages.
+/// When all tasks for the child stage are complete, it will mark the 
`StageOutput`
+#[derive(Clone, Debug, Default)]
+struct StageOutput {
+    /// Map from partition -> partition locations
+    partition_locations: HashMap<usize, Vec<PartitionLocation>>,
+    /// Flag indicating whether all tasks are complete
+    complete: bool,
+}
+
+impl StageOutput {
+    pub fn new() -> Self {
+        Self {
+            partition_locations: HashMap::new(),
+            complete: false,
+        }
     }
 
-    pub fn session_id(&self) -> &str {
-        self.session_id.as_str()
+    /// Add a `PartitionLocation` to the `StageOutput`
+    pub fn add_partition(&mut self, partition_location: PartitionLocation) {
+        if let Some(parts) = self
+            .partition_locations
+            .get_mut(&partition_location.partition_id.partition_id)
+        {
+            parts.push(partition_location)
+        } else {
+            self.partition_locations.insert(
+                partition_location.partition_id.partition_id,
+                vec![partition_location],
+            );
+        }
     }
 
-    pub fn status(&self) -> JobStatus {
-        self.status.clone()
+    pub fn is_complete(&self) -> bool {
+        self.complete
     }
+}
 
-    /// An ExecutionGraph is complete if all its stages are complete
-    pub fn complete(&self) -> bool {
-        self.stages.values().all(|s| s.complete())
+/// A stage in the ExecutionGraph.
+///
+/// This represents a set of tasks (one per each `partition`) which can
+/// be executed concurrently.
+#[derive(Clone)]
+struct ExecutionStage {
+    /// Stage ID
+    stage_id: usize,
+    /// Total number of output partitions for this stage.
+    /// This stage will produce on task for partition.
+    partitions: usize,
+    /// Output partitioning for this stage.
+    output_partitioning: Option<Partitioning>,
+    /// Represents the outputs from this stage's child stages.
+    /// This stage can only be resolved an executed once all child stages are 
completed.
+    inputs: HashMap<usize, StageOutput>,
+    // `ExecutionPlan` for this stage
+    plan: Arc<dyn ExecutionPlan>,
+    /// Status of each already scheduled task. If status is None, the 
partition has not yet been scheduled
+    task_statuses: Vec<Option<task_status::Status>>,
+    /// Stage ID of the stage that will take this stages outputs as inputs.
+    /// If `output_links` is empty then this the final stage in the 
`ExecutionGraph`
+    output_links: Vec<usize>,
+    /// Flag indicating whether all input partitions have been resolved and 
the plan
+    /// has UnresovledShuffleExec operators resolved to ShuffleReadExec 
operators.
+    resolved: bool,
+    /// Combined metrics of the already finished tasks in the stage, If it is 
None, no task is finished yet.
+    stage_metrics: Option<Vec<MetricsSet>>,
+}
+
+impl Debug for ExecutionStage {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
+        let scheduled_tasks = self.task_statuses.iter().filter(|t| 
t.is_some()).count();
+
+        write!(
+            f,
+            "Stage[id={}, partitions={:?}, children={}, completed_tasks={}, 
resolved={}, scheduled_tasks={}, available_tasks={}]\nInputs{:?}\n\n{}",
+            self.stage_id,
+            self.partitions,
+            self.inputs.len(),
+            self.completed_tasks(),
+            self.resolved,
+            scheduled_tasks,
+            self.available_tasks(),
+            self.inputs,
+            plan
+        )
     }
+}
 
-    /// Update task statuses and task metrics in the graph.
-    /// This will also push shuffle partitions to their respective shuffle 
read stages.
-    pub fn update_task_status(
-        &mut self,
-        executor: &ExecutorMetadata,
-        statuses: Vec<TaskStatus>,
-    ) -> Result<()> {
-        for status in statuses.into_iter() {
-            if let TaskStatus {
-                task_id:
-                    Some(protobuf::PartitionId {
-                        job_id,
-                        stage_id,
-                        partition_id,
-                    }),
-                metrics: operator_metrics,
-                status: Some(task_status),
-            } = status
-            {
-                if job_id != self.job_id() {
-                    return Err(BallistaError::Internal(format!(
-                        "Error updating job {}: Invalid task status job ID {}",
-                        self.job_id(),
-                        job_id
-                    )));
-                }
+impl ExecutionStage {
+    pub fn new(
+        stage_id: usize,
+        plan: Arc<dyn ExecutionPlan>,
+        output_partitioning: Option<Partitioning>,
+        output_links: Vec<usize>,
+        child_stages: Vec<usize>,
+    ) -> Self {
+        let num_tasks = plan.output_partitioning().partition_count();
 
-                let stage_id = stage_id as usize;
-                let partition = partition_id as usize;
-                if let Some(stage) = self.stages.get_mut(&stage_id) {
-                    stage.update_task_status(partition, task_status.clone());
-                    let stage_plan = stage.plan.clone();
-                    let stage_complete = stage.complete();
+        let resolved = child_stages.is_empty();
 
-                    // TODO Should be able to reschedule this task.
-                    if let task_status::Status::Failed(failed_task) = 
task_status {
-                        self.status = JobStatus {
-                            status: Some(job_status::Status::Failed(FailedJob {
-                                error: format!(
-                                    "Task {}/{}/{} failed: {}",
-                                    job_id, stage_id, partition_id, 
failed_task.error
-                                ),
-                            })),
-                        };
-                        return Ok(());
-                    } else if let 
task_status::Status::Completed(completed_task) =
-                        task_status
-                    {
-                        // update task metrics for completed task
-                        stage.update_task_metrics(partition, 
operator_metrics)?;
+        let mut inputs: HashMap<usize, StageOutput> = HashMap::new();
 
-                        // if this stage is completed, we want to combine the 
stage metrics to plan's metric set and print out the plan
-                        if stage_complete && 
stage.stage_metrics.as_ref().is_some() {
-                            // The plan_metrics collected here is a snapshot 
clone from the plan metrics.
-                            // They are all empty now and need to combine with 
the stage metrics in the ExecutionStages
-                            let mut plan_metrics =
-                                collect_plan_metrics(stage_plan.as_ref());
-                            let stage_metrics = stage
-                                .stage_metrics
-                                .as_ref()
-                                .expect("stage metrics should not be None.");
-                            if plan_metrics.len() != stage_metrics.len() {
-                                return 
Err(BallistaError::Internal(format!("Error combine stage metrics to plan for 
stage {},  plan metrics array size {} does not equal \
-                to the stage metrics array size {}", stage_id, 
plan_metrics.len(), stage_metrics.len())));
-                            }
-                            
plan_metrics.iter_mut().zip(stage_metrics).for_each(
-                                |(plan_metric, stage_metric)| {
-                                    stage_metric
-                                        .iter()
-                                        .for_each(|s| 
plan_metric.push(s.clone()));
-                                },
-                            );
+        for input_stage_id in &child_stages {
+            inputs.insert(*input_stage_id, StageOutput::new());
+        }
 
-                            info!(
-                                "=== [{}/{}/{}] Stage finished, physical plan 
with metrics ===\n{}\n",
-                                job_id,
-                                stage_id,
-                                partition,
-                                
DisplayableBallistaExecutionPlan::new(stage_plan.as_ref(), 
plan_metrics.as_ref()).indent()
-                            );
-                        }
+        Self {
+            stage_id,
+            partitions: num_tasks,
+            output_partitioning,
+            inputs,
+            plan,
+            task_statuses: vec![None; num_tasks],
+            output_links,
+            resolved,
+            stage_metrics: None,
+        }
+    }
 
-                        let locations = partition_to_location(
-                            self.job_id.as_str(),
-                            stage_id,
-                            executor,
-                            completed_task.partitions,
-                        );
+    /// Returns true if all inputs are complete and we can resolve all
+    /// UnresolvedShuffleExec operators to ShuffleReadExec
+    pub fn resolvable(&self) -> bool {
+        self.inputs.iter().all(|(_, outputs)| outputs.is_complete())
+    }
 
-                        let output_links = stage.output_links.clone();
-                        if output_links.is_empty() {
-                            // If `output_links` is empty, then this is a 
final stage
-                            self.output_locations.extend(locations);
-                        } else {
-                            for link in output_links.into_iter() {
-                                // If this is an intermediate stage, we need 
to push its `PartitionLocation`s to the parent stage
-                                if let Some(linked_stage) = 
self.stages.get_mut(&link) {
-                                    linked_stage.add_input_partitions(
-                                        stage_id,
-                                        partition,
-                                        locations.clone(),
-                                    )?;
+    /// Returns `true` if all tasks for this stage are complete
+    pub fn complete(&self) -> bool {
+        self.task_statuses
+            .iter()
+            .all(|status| matches!(status, 
Some(task_status::Status::Completed(_))))
+    }
 
-                                    // If all tasks for this stage are 
complete, mark the input complete in the parent stage
-                                    if stage_complete {
-                                        linked_stage.complete_input(stage_id);
-                                    }
+    /// Returns the number of tasks
+    pub fn completed_tasks(&self) -> usize {
+        self.task_statuses
+            .iter()
+            .filter(|status| matches!(status, 
Some(task_status::Status::Completed(_))))
+            .count()
+    }
 
-                                    // If all input partitions are ready, we 
can resolve any UnresolvedShuffleExec in the parent stage plan
-                                    if linked_stage.resolvable() {
-                                        linked_stage.resolve_shuffles()?;
-                                    }
-                                } else {
-                                    return 
Err(BallistaError::Internal(format!("Error updating job {}: Invalid output link 
{} for stage {}", job_id, stage_id, link)));
-                                }
-                            }
-                        }
-                    }
-                } else {
-                    return Err(BallistaError::Internal(format!(
-                        "Invalid stage ID {} for job {}",
-                        stage_id,
-                        self.job_id()
-                    )));
-                }
-            }
+    /// Marks the input stage ID as complete.
+    pub fn complete_input(&mut self, stage_id: usize) {
+        if let Some(input) = self.inputs.get_mut(&stage_id) {
+            input.complete = true;
         }
+    }
 
-        Ok(())
+    /// Returns true if the stage plan has all UnresolvedShuffleExec operators 
resolved to
+    /// ShuffleReadExec
+    pub fn resolved(&self) -> bool {
+        self.resolved
     }
 
-    /// Total number of tasks in this plan that are ready for scheduling
+    /// Returns the number of tasks in this stage which are available for 
scheduling.
+    /// If the stage is not yet resolved, then this will return `0`, otherwise 
it will
+    /// return the number of tasks where the task status is not yet set.
     pub fn available_tasks(&self) -> usize {
-        self.stages
-            .iter()
-            .map(|(_, stage)| stage.available_tasks())
-            .sum()
+        if self.resolved {
+            self.task_statuses.iter().filter(|s| s.is_none()).count()
+        } else {
+            0
+        }
     }
 
-    /// Get next task that can be assigned to the given executor.
-    /// This method should only be called when the resulting task is 
immediately
-    /// being launched as the status will be set to Running and it will not be
-    /// available to the scheduler.
-    /// If the task is not launched the status must be reset to allow the task 
to
-    /// be scheduled elsewhere.
-    pub fn pop_next_task(&mut self, executor_id: &str) -> Result<Option<Task>> 
{
-        let job_id = self.job_id.clone();
-        let session_id = self.session_id.clone();
-        self.stages.iter_mut().find(|(_stage_id, stage)| {
-            stage.resolved() && stage.available_tasks() > 0
-        }).map(|(stage_id, stage)| {
-            let (partition_id,_) = stage
-                .task_statuses
+    /// Resolve any UnresolvedShuffleExec operators within this stage's plan
+    pub fn resolve_shuffles(&mut self) -> Result<()> {
+        println!("Resolving shuffles\n{:?}", self);
+        if self.resolved {
+            // If this stage has no input shuffles, then it is already resolved
+            Ok(())
+        } else {
+            let input_locations = self
+                .inputs
                 .iter()
-                .enumerate()
-                .find(|(_partition,status)| status.is_none())
-                .ok_or_else(|| {
-                BallistaError::Internal(format!("Error getting next task for 
job {}: Stage {} is ready but has no pending tasks", job_id, stage_id))
-            })?;
+                .map(|(stage, outputs)| (*stage, 
outputs.partition_locations.clone()))
+                .collect();
+            // Otherwise, rewrite the plan to replace UnresolvedShuffleExec 
with ShuffleReadExec
+            let new_plan = crate::planner::remove_unresolved_shuffles(
+                self.plan.clone(),
+                &input_locations,
+            )?;
+            self.plan = new_plan;
+            self.resolved = true;
+            Ok(())
+        }
+    }
 
-             let partition = PartitionId {
-                job_id,
-                stage_id: *stage_id,
-                partition_id
-            };
+    /// Update the status for task partition
+    pub fn update_task_status(&mut self, partition: usize, status: 
task_status::Status) {
+        debug!("Updating task status for partition {}", partition);
+        self.task_statuses[partition] = Some(status);
+    }
 
-            // Set the status to Running
-            stage.task_statuses[partition_id] = 
Some(task_status::Status::Running(RunningTask {
-                executor_id: executor_id.to_owned()
-            }));
+    /// update and combine the task metrics to the stage metrics
+    pub fn update_task_metrics(
+        &mut self,
+        partition: usize,
+        metrics: Vec<OperatorMetricsSet>,
+    ) -> Result<()> {
+        if let Some(combined_metrics) = &mut self.stage_metrics {
+            if metrics.len() != combined_metrics.len() {
+                return Err(BallistaError::Internal(format!("Error updating 
task metrics to stage {}, task metrics array size {} does not equal \
+                with the stage metrics array size {} for task {}", 
self.stage_id, metrics.len(), combined_metrics.len(), partition)));
+            }
+            let metrics_values_array = metrics
+                .into_iter()
+                .map(|ms| {
+                    ms.metrics
+                        .into_iter()
+                        .map(|m| m.try_into())
+                        .collect::<Result<Vec<_>>>()
+                })
+                .collect::<Result<Vec<_>>>()?;
 
-            Ok(Task {
-                session_id,
-                partition,
-                plan: stage.plan.clone(),
-                output_partitioning: stage.output_partitioning.clone()
-            })
-        }).transpose()
+            let new_metrics_set = combined_metrics
+                .iter_mut()
+                .zip(metrics_values_array)
+                .map(|(first, second)| {
+                    Self::combine_metrics_set(first, second, partition)
+                })
+                .collect();
+            self.stage_metrics = Some(new_metrics_set)
+        } else {
+            let new_metrics_set = metrics
+                .into_iter()
+                .map(|ms| ms.try_into())
+                .collect::<Result<Vec<_>>>()?;
+            if !new_metrics_set.is_empty() {
+                self.stage_metrics = Some(new_metrics_set)
+            }
+        }
+        Ok(())
     }
 
-    pub fn finalize(&mut self) -> Result<()> {
-        if !self.complete() {
-            return Err(BallistaError::Internal(format!(
-                "Attempt to finalize an incomplete job {}",
-                self.job_id()
-            )));
+    pub fn combine_metrics_set(
+        first: &mut MetricsSet,
+        second: Vec<MetricValue>,
+        partition: usize,
+    ) -> MetricsSet {
+        for metric_value in second {
+            // TODO recheck the lable logic
+            let new_metric = Arc::new(Metric::new(metric_value, 
Some(partition)));
+            first.push(new_metric);
         }
+        first.aggregate_by_partition()
+    }
 
-        let partition_location = self
-            .output_locations()
-            .into_iter()
-            .map(|l| l.try_into())
-            .collect::<Result<Vec<_>>>()?;
-
-        self.status = JobStatus {
-            status: Some(job_status::Status::Completed(CompletedJob {
-                partition_location,
-            })),
-        };
+    /// Add input partitions published from an input stage.
+    pub fn add_input_partitions(
+        &mut self,
+        stage_id: usize,
+        _partition_id: usize,
+        locations: Vec<PartitionLocation>,
+    ) -> Result<()> {
+        if let Some(stage_inputs) = self.inputs.get_mut(&stage_id) {
+            for partition in locations {
+                stage_inputs.add_partition(partition);
+            }
+        } else {
+            return Err(BallistaError::Internal(format!("Error adding input 
partitions to stage {}, {} is not a valid child stage ID", self.stage_id, 
stage_id)));
+        }
 
         Ok(())
     }
+}
 
-    pub fn update_status(&mut self, status: JobStatus) {
-        self.status = status;
+/// Utility for building a set of `ExecutionStage`s from
+/// a list of `ShuffleWriterExec`.
+///
+/// This will infer the dependency structure for the stages
+/// so that we can construct a DAG from the stages.
+struct ExecutionStageBuilder {
+    /// Stage ID which is currently being visited
+    current_stage_id: usize,
+    /// Map from stage ID -> List of child stage IDs
+    stage_dependencies: HashMap<usize, Vec<usize>>,
+    /// Map from Stage ID -> output link
+    output_links: HashMap<usize, Vec<usize>>,
+}
+
+impl ExecutionStageBuilder {
+    pub fn new() -> Self {
+        Self {
+            current_stage_id: 0,
+            stage_dependencies: HashMap::new(),
+            output_links: HashMap::new(),
+        }
     }
 
-    /// Reset the status for the given task. This should be called is a task 
failed to
-    /// launch and it needs to be returned to the set of available tasks and be
-    /// re-scheduled.
-    pub fn reset_task_status(&mut self, task: Task) {
-        let stage_id = task.partition.stage_id;
-        let partition = task.partition.partition_id;
+    pub fn build(
+        mut self,
+        stages: Vec<Arc<ShuffleWriterExec>>,
+    ) -> Result<HashMap<usize, ExecutionStage>> {
+        let mut execution_stages: HashMap<usize, ExecutionStage> = 
HashMap::new();
+        // First, build the dependency graph
+        for stage in &stages {
+            accept(stage.as_ref(), &mut self)?;
+        }
 
-        if let Some(stage) = self.stages.get_mut(&stage_id) {
-            stage.task_statuses[partition] = None;
+        // Now, create the execution stages
+        for stage in stages {
+            let partitioning = stage.shuffle_output_partitioning().cloned();
+            let stage_id = stage.stage_id();
+            let output_links = 
self.output_links.remove(&stage_id).unwrap_or_default();
+
+            let child_stages = self
+                .stage_dependencies
+                .remove(&stage_id)
+                .unwrap_or_default();
+
+            execution_stages.insert(
+                stage_id,
+                ExecutionStage::new(
+                    stage_id,
+                    stage,
+                    partitioning,
+                    output_links,
+                    child_stages,
+                ),
+            );
         }
-    }
 
-    pub fn output_locations(&self) -> Vec<PartitionLocation> {
-        self.output_locations.clone()
+        Ok(execution_stages)
     }
 }
 
-impl Debug for ExecutionGraph {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        let stages = self
-            .stages
-            .iter()
-            .map(|(_, stage)| format!("{:?}", stage))
-            .collect::<Vec<String>>()
-            .join("\n");
-        write!(f, "ExecutionGraph[job_id={}, session_id={}, 
available_tasks={}, complete={}]\n{}", self.job_id, self.session_id, 
self.available_tasks(), self.complete(), stages)
+impl ExecutionPlanVisitor for ExecutionStageBuilder {
+    type Error = BallistaError;
+
+    fn pre_visit(
+        &mut self,
+        plan: &dyn ExecutionPlan,
+    ) -> std::result::Result<bool, Self::Error> {
+        if let Some(shuffle_write) = 
plan.as_any().downcast_ref::<ShuffleWriterExec>() {
+            self.current_stage_id = shuffle_write.stage_id();
+        } else if let Some(unresolved_shuffle) =
+            plan.as_any().downcast_ref::<UnresolvedShuffleExec>()
+        {
+            if let Some(output_links) =
+                self.output_links.get_mut(&unresolved_shuffle.stage_id)
+            {
+                if !output_links.contains(&self.current_stage_id) {
+                    output_links.push(self.current_stage_id);
+                }
+            } else {
+                self.output_links
+                    .insert(unresolved_shuffle.stage_id, 
vec![self.current_stage_id]);
+            }
+
+            if let Some(deps) = 
self.stage_dependencies.get_mut(&self.current_stage_id) {
+                if !deps.contains(&unresolved_shuffle.stage_id) {
+                    deps.push(unresolved_shuffle.stage_id);
+                }
+            } else {
+                self.stage_dependencies
+                    .insert(self.current_stage_id, 
vec![unresolved_shuffle.stage_id]);
+            }
+        }
+        Ok(true)
     }
 }
 
diff --git a/ballista/rust/scheduler/src/state/task_manager.rs 
b/ballista/rust/scheduler/src/state/task_manager.rs
index dcee3722..bb0db021 100644
--- a/ballista/rust/scheduler/src/state/task_manager.rs
+++ b/ballista/rust/scheduler/src/state/task_manager.rs
@@ -18,30 +18,29 @@
 use crate::scheduler_server::event::QueryStageSchedulerEvent;
 use crate::scheduler_server::SessionBuilder;
 use crate::state::backend::{Keyspace, StateBackendClient};
-use crate::state::execution_graph::{ExecutionGraph, ExecutionStage, 
StageOutput, Task};
+use crate::state::execution_graph::{ExecutionGraph, Task};
 use crate::state::executor_manager::ExecutorReservation;
 use crate::state::{decode_protobuf, encode_protobuf, with_lock};
 use ballista_core::config::BallistaConfig;
-use ballista_core::error::{BallistaError, Result};
-use 
ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
+#[cfg(not(test))]
+use ballista_core::error::BallistaError;
+use ballista_core::error::Result;
 use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
 
 use crate::state::session_manager::create_datafusion_context;
 use ballista_core::serde::protobuf::{
-    self, job_status, task_status, FailedJob, JobStatus, PartitionId, 
TaskDefinition,
-    TaskStatus,
+    self, job_status, FailedJob, JobStatus, PartitionId, TaskDefinition, 
TaskStatus,
 };
 use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
-use ballista_core::serde::scheduler::{ExecutorMetadata, PartitionLocation};
+use ballista_core::serde::scheduler::ExecutorMetadata;
 use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
-use datafusion::physical_plan::{ExecutionPlan, Partitioning};
+use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::SessionContext;
 use datafusion_proto::logical_plan::AsLogicalPlan;
 use log::{debug, info, warn};
 use rand::distributions::Alphanumeric;
 use rand::{thread_rng, Rng};
 use std::collections::{HashMap, HashSet};
-use std::convert::TryInto;
 use std::default::Default;
 use std::sync::Arc;
 use tokio::sync::RwLock;
@@ -484,206 +483,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         let session_id = &proto.session_id;
 
         let session_ctx = self.get_session(session_id).await?;
-        let mut stages: HashMap<usize, ExecutionStage> = HashMap::new();
-        for stage in proto.stages {
-            let plan_proto = U::try_decode(stage.plan.as_slice())?;
-            let plan = plan_proto.try_into_physical_plan(
-                session_ctx.as_ref(),
-                session_ctx.runtime_env().as_ref(),
-                self.codec.physical_extension_codec(),
-            )?;
-
-            let stage_id = stage.stage_id as usize;
-            let partitions: usize = stage.partitions as usize;
-
-            let mut task_statuses: Vec<Option<task_status::Status>> =
-                vec![None; partitions];
-
-            for status in stage.task_statuses {
-                if let Some(task_id) = status.task_id.as_ref() {
-                    task_statuses[task_id.partition_id as usize] = 
status.status
-                }
-            }
-
-            let output_partitioning: Option<Partitioning> =
-                parse_protobuf_hash_partitioning(
-                    stage.output_partitioning.as_ref(),
-                    session_ctx.as_ref(),
-                    plan.schema().as_ref(),
-                )?;
-
-            let mut inputs: HashMap<usize, StageOutput> = HashMap::new();
-
-            for input in stage.inputs {
-                let stage_id = input.stage_id as usize;
-
-                let outputs = input
-                    .partition_locations
-                    .into_iter()
-                    .map(|loc| {
-                        let partition = loc.partition as usize;
-                        let locations = loc
-                            .partition_location
-                            .into_iter()
-                            .map(|l| l.try_into())
-                            .collect::<Result<Vec<_>>>()?;
-                        Ok((partition, locations))
-                    })
-                    .collect::<Result<HashMap<usize, 
Vec<PartitionLocation>>>>()?;
-
-                inputs.insert(
-                    stage_id,
-                    StageOutput {
-                        partition_locations: outputs,
-                        complete: input.complete,
-                    },
-                );
-            }
-            let stage_metrics = if stage.stage_metrics.is_empty() {
-                None
-            } else {
-                let ms = stage
-                    .stage_metrics
-                    .into_iter()
-                    .map(|m| m.try_into())
-                    .collect::<Result<Vec<_>>>()?;
-                Some(ms)
-            };
-
-            let execution_stage = ExecutionStage {
-                stage_id: stage.stage_id as usize,
-                partitions,
-                output_partitioning,
-                inputs,
-                plan,
-                task_statuses,
-                output_links: stage
-                    .output_links
-                    .into_iter()
-                    .map(|l| l as usize)
-                    .collect(),
-                resolved: stage.resolved,
-                stage_metrics,
-            };
-            stages.insert(stage_id, execution_stage);
-        }
 
-        let output_locations: Vec<PartitionLocation> = proto
-            .output_locations
-            .into_iter()
-            .map(|loc| loc.try_into())
-            .collect::<Result<Vec<_>>>()?;
-
-        Ok(ExecutionGraph {
-            job_id: proto.job_id,
-            session_id: proto.session_id,
-            status: proto.status.ok_or_else(|| {
-                BallistaError::Internal(
-                    "Invalid Execution Graph: missing job status".to_owned(),
-                )
-            })?,
-            stages,
-            output_partitions: proto.output_partitions as usize,
-            output_locations,
-        })
+        ExecutionGraph::decode_execution_graph(proto, &self.codec, 
&session_ctx).await
     }
 
     fn encode_execution_graph(&self, graph: ExecutionGraph) -> Result<Vec<u8>> 
{
-        let job_id = graph.job_id().to_owned();
-
-        let stages = graph
-            .stages
-            .into_iter()
-            .map(|(stage_id, stage)| {
-                let mut plan: Vec<u8> = vec![];
-
-                U::try_from_physical_plan(
-                    stage.plan,
-                    self.codec.physical_extension_codec(),
-                )
-                .and_then(|proto| proto.try_encode(&mut plan))?;
-
-                let mut inputs: Vec<protobuf::GraphStageInput> = vec![];
-
-                for (stage, output) in stage.inputs.into_iter() {
-                    inputs.push(protobuf::GraphStageInput {
-                        stage_id: stage as u32,
-                        partition_locations: output
-                            .partition_locations
-                            .into_iter()
-                            .map(|(partition, locations)| {
-                                Ok(protobuf::TaskInputPartitions {
-                                    partition: partition as u32,
-                                    partition_location: locations
-                                        .into_iter()
-                                        .map(|l| l.try_into())
-                                        .collect::<Result<Vec<_>>>()?,
-                                })
-                            })
-                            .collect::<Result<Vec<_>>>()?,
-                        complete: output.complete,
-                    });
-                }
+        let proto = ExecutionGraph::encode_execution_graph(graph, 
&self.codec)?;
 
-                let task_statuses: Vec<protobuf::TaskStatus> = stage
-                    .task_statuses
-                    .into_iter()
-                    .enumerate()
-                    .filter_map(|(partition, status)| {
-                        status.map(|status| protobuf::TaskStatus {
-                            task_id: Some(protobuf::PartitionId {
-                                job_id: job_id.clone(),
-                                stage_id: stage_id as u32,
-                                partition_id: partition as u32,
-                            }),
-                            // task metrics should not persist.
-                            metrics: vec![],
-                            status: Some(status),
-                        })
-                    })
-                    .collect();
-
-                let output_partitioning =
-                    
hash_partitioning_to_proto(stage.output_partitioning.as_ref())?;
-
-                let stage_metrics = stage
-                    .stage_metrics
-                    .unwrap_or_default()
-                    .into_iter()
-                    .map(|m| m.try_into())
-                    .collect::<Result<Vec<_>>>()?;
-                Ok(protobuf::ExecutionGraphStage {
-                    stage_id: stage_id as u64,
-                    partitions: stage.partitions as u32,
-                    output_partitioning,
-                    inputs,
-                    plan,
-                    task_statuses,
-                    output_links: stage
-                        .output_links
-                        .into_iter()
-                        .map(|l| l as u32)
-                        .collect(),
-                    resolved: stage.resolved,
-                    stage_metrics,
-                })
-            })
-            .collect::<Result<Vec<_>>>()?;
-
-        let output_locations: Vec<protobuf::PartitionLocation> = graph
-            .output_locations
-            .into_iter()
-            .map(|loc| loc.try_into())
-            .collect::<Result<Vec<_>>>()?;
-
-        encode_protobuf(&protobuf::ExecutionGraph {
-            job_id: graph.job_id,
-            session_id: graph.session_id,
-            status: Some(graph.status),
-            stages,
-            output_partitions: graph.output_partitions as u64,
-            output_locations,
-        })
+        encode_protobuf(&proto)
     }
 }
 

Reply via email to