This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 914dba6e Move ExecutionGraph encoding and decoding logic into
execution_graph for better encapsulation (#150)
914dba6e is described below
commit 914dba6e8125b4ed6810b4560758981764fcdfb6
Author: yahoNanJing <[email protected]>
AuthorDate: Mon Aug 22 22:02:34 2022 +0800
Move ExecutionGraph encoding and decoding logic into execution_graph for
better encapsulation (#150)
Co-authored-by: yangzhong <[email protected]>
---
.../rust/scheduler/src/state/execution_graph.rs | 1448 +++++++++++---------
ballista/rust/scheduler/src/state/task_manager.rs | 213 +--
2 files changed, 843 insertions(+), 818 deletions(-)
diff --git a/ballista/rust/scheduler/src/state/execution_graph.rs
b/ballista/rust/scheduler/src/state/execution_graph.rs
index d928f969..e3599092 100644
--- a/ballista/rust/scheduler/src/state/execution_graph.rs
+++ b/ballista/rust/scheduler/src/state/execution_graph.rs
@@ -36,740 +36,958 @@ use std::collections::HashMap;
use std::convert::TryInto;
use std::fmt::{Debug, Formatter};
+use
ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
+use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
+use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
use ballista_core::utils::collect_plan_metrics;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::metrics::{MetricValue, MetricsSet};
+use datafusion::prelude::SessionContext;
+use datafusion_proto::logical_plan::AsLogicalPlan;
use std::sync::Arc;
-/// This data structure collects the partition locations for an
`ExecutionStage`.
-/// Each `ExecutionStage` will hold a `StageOutput`s for each of its child
stages.
-/// When all tasks for the child stage are complete, it will mark the
`StageOutput`
-#[derive(Clone, Debug, Default)]
-pub struct StageOutput {
- /// Map from partition -> partition locations
- pub(crate) partition_locations: HashMap<usize, Vec<PartitionLocation>>,
- /// Flag indicating whether all tasks are complete
- pub(crate) complete: bool,
-}
-
-impl StageOutput {
- pub fn new() -> Self {
- Self {
- partition_locations: HashMap::new(),
- complete: false,
- }
- }
-
- /// Add a `PartitionLocation` to the `StageOutput`
- pub fn add_partition(&mut self, partition_location: PartitionLocation) {
- if let Some(parts) = self
- .partition_locations
- .get_mut(&partition_location.partition_id.partition_id)
- {
- parts.push(partition_location)
- } else {
- self.partition_locations.insert(
- partition_location.partition_id.partition_id,
- vec![partition_location],
- );
- }
- }
-
- pub fn is_complete(&self) -> bool {
- self.complete
- }
-}
-
-/// A stage in the ExecutionGraph.
-///
-/// This represents a set of tasks (one per each `partition`) which can
-/// be executed concurrently.
+/// Represents the basic unit of work for the Ballista executor. Will execute
+/// one partition of one stage on one task slot.
#[derive(Clone)]
-pub struct ExecutionStage {
- /// Stage ID
- pub(crate) stage_id: usize,
- /// Total number of output partitions for this stage.
- /// This stage will produce on task for partition.
- pub(crate) partitions: usize,
- /// Output partitioning for this stage.
- pub(crate) output_partitioning: Option<Partitioning>,
- /// Represents the outputs from this stage's child stages.
- /// This stage can only be resolved an executed once all child stages are
completed.
- pub(crate) inputs: HashMap<usize, StageOutput>,
- // `ExecutionPlan` for this stage
- pub(crate) plan: Arc<dyn ExecutionPlan>,
- /// Status of each already scheduled task. If status is None, the
partition has not yet been scheduled
- pub(crate) task_statuses: Vec<Option<task_status::Status>>,
- /// Stage ID of the stage that will take this stages outputs as inputs.
- /// If `output_links` is empty then this the final stage in the
`ExecutionGraph`
- pub(crate) output_links: Vec<usize>,
- /// Flag indicating whether all input partitions have been resolved and
the plan
- /// has UnresovledShuffleExec operators resolved to ShuffleReadExec
operators.
- pub(crate) resolved: bool,
- /// Combined metrics of the already finished tasks in the stage, If it is
None, no task is finished yet.
- pub(crate) stage_metrics: Option<Vec<MetricsSet>>,
+pub struct Task {
+ pub session_id: String,
+ pub partition: PartitionId,
+ pub plan: Arc<dyn ExecutionPlan>,
+ pub output_partitioning: Option<Partitioning>,
}
-impl Debug for ExecutionStage {
+impl Debug for Task {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
- let scheduled_tasks = self.task_statuses.iter().filter(|t|
t.is_some()).count();
-
write!(
f,
- "Stage[id={}, partitions={:?}, children={}, completed_tasks={},
resolved={}, scheduled_tasks={}, available_tasks={}]\nInputs{:?}\n\n{}",
- self.stage_id,
- self.partitions,
- self.inputs.len(),
- self.completed_tasks(),
- self.resolved,
- scheduled_tasks,
- self.available_tasks(),
- self.inputs,
+ "Task[session_id: {}, job: {}, stage: {}, partition: {}]\n{}",
+ self.session_id,
+ self.partition.job_id,
+ self.partition.stage_id,
+ self.partition.partition_id,
plan
)
}
}
-impl ExecutionStage {
+/// Represents the DAG for a distributed query plan.
+///
+/// A distributed query plan consists of a set of stages which must be
executed sequentially.
+///
+/// Each stage consists of a set of partitions which can be executed in
parallel, where each partition
+/// represents a `Task`, which is the basic unit of scheduling in Ballista.
+///
+/// As an example, consider a SQL query which performs a simple aggregation:
+///
+/// `SELECT id, SUM(gmv) FROM some_table GROUP BY id`
+///
+/// This will produce a DataFusion execution plan that looks something like
+///
+///
+/// CoalesceBatchesExec: target_batch_size=4096
+/// RepartitionExec: partitioning=Hash([Column { name: "id", index: 0 }],
4)
+/// AggregateExec: mode=Partial, gby=[id@0 as id],
aggr=[SUM(some_table.gmv)]
+/// TableScan: some_table
+///
+/// The Ballista `DistributedPlanner` will turn this into a distributed plan
by creating a shuffle
+/// boundary (called a "Stage") whenever the underlying plan needs to perform
a repartition.
+/// In this case we end up with a distributed plan with two stages:
+///
+///
+/// ExecutionGraph[job_id=job, session_id=session, available_tasks=1,
complete=false]
+/// Stage[id=2, partitions=4, children=1, completed_tasks=0, resolved=false,
scheduled_tasks=0, available_tasks=0]
+/// Inputs{1: StageOutput { partition_locations: {}, complete: false }}
+///
+/// ShuffleWriterExec: None
+/// AggregateExec: mode=FinalPartitioned, gby=[id@0 as id],
aggr=[SUM(?table?.gmv)]
+/// CoalesceBatchesExec: target_batch_size=4096
+/// UnresolvedShuffleExec
+///
+/// Stage[id=1, partitions=1, children=0, completed_tasks=0, resolved=true,
scheduled_tasks=0, available_tasks=1]
+/// Inputs{}
+///
+/// ShuffleWriterExec: Some(Hash([Column { name: "id", index: 0 }], 4))
+/// AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[SUM(?table?.gmv)]
+/// TableScan: some_table
+///
+///
+/// The DAG structure of this `ExecutionGraph` is encoded in the stages. Each
stage's `input` field
+/// will indicate which stages it depends on, and each stage's `output_links`
will indicate which
+/// stage it needs to publish its output to.
+///
+/// If a stage has `output_links` is empty then it is the final stage in this
query, and it should
+/// publish its outputs to the `ExecutionGraph`s `output_locations`
representing the final query results.
+#[derive(Clone)]
+pub struct ExecutionGraph {
+ /// ID for this job
+ job_id: String,
+ /// Session ID for this job
+ session_id: String,
+ /// Status of this job
+ status: JobStatus,
+ /// Map from Stage ID -> ExecutionStage
+ stages: HashMap<usize, ExecutionStage>,
+ /// Total number fo output partitions
+ output_partitions: usize,
+ /// Locations of this `ExecutionGraph` final output locations
+ output_locations: Vec<PartitionLocation>,
+}
+
+impl ExecutionGraph {
pub fn new(
- stage_id: usize,
+ job_id: &str,
+ session_id: &str,
plan: Arc<dyn ExecutionPlan>,
- output_partitioning: Option<Partitioning>,
- output_links: Vec<usize>,
- child_stages: Vec<usize>,
- ) -> Self {
- let num_tasks = plan.output_partitioning().partition_count();
+ ) -> Result<Self> {
+ let mut planner = DistributedPlanner::new();
- let resolved = child_stages.is_empty();
+ let output_partitions = plan.output_partitioning().partition_count();
- let mut inputs: HashMap<usize, StageOutput> = HashMap::new();
+ let shuffle_stages = planner.plan_query_stages(job_id, plan)?;
- for input_stage_id in &child_stages {
- inputs.insert(*input_stage_id, StageOutput::new());
- }
+ let builder = ExecutionStageBuilder::new();
+ let stages = builder.build(shuffle_stages)?;
- Self {
- stage_id,
- partitions: num_tasks,
- output_partitioning,
- inputs,
- plan,
- task_statuses: vec![None; num_tasks],
- output_links,
- resolved,
- stage_metrics: None,
- }
+ Ok(Self {
+ job_id: job_id.to_string(),
+ session_id: session_id.to_string(),
+ status: JobStatus {
+ status: Some(job_status::Status::Queued(QueuedJob {})),
+ },
+ stages,
+ output_partitions,
+ output_locations: vec![],
+ })
}
- /// Returns true if all inputs are complete and we can resolve all
- /// UnresolvedShuffleExec operators to ShuffleReadExec
- pub fn resolvable(&self) -> bool {
- self.inputs.iter().all(|(_, outputs)| outputs.is_complete())
+ pub fn job_id(&self) -> &str {
+ self.job_id.as_str()
}
- /// Returns `true` if all tasks for this stage are complete
- pub fn complete(&self) -> bool {
- self.task_statuses
- .iter()
- .all(|status| matches!(status,
Some(task_status::Status::Completed(_))))
+ pub fn session_id(&self) -> &str {
+ self.session_id.as_str()
}
- /// Returns the number of tasks
- pub fn completed_tasks(&self) -> usize {
- self.task_statuses
- .iter()
- .filter(|status| matches!(status,
Some(task_status::Status::Completed(_))))
- .count()
+ pub fn status(&self) -> JobStatus {
+ self.status.clone()
}
- /// Marks the input stage ID as complete.
- pub fn complete_input(&mut self, stage_id: usize) {
- if let Some(input) = self.inputs.get_mut(&stage_id) {
- input.complete = true;
- }
+ /// An ExecutionGraph is complete if all its stages are complete
+ pub fn complete(&self) -> bool {
+ self.stages.values().all(|s| s.complete())
}
- /// Returns true if the stage plan has all UnresolvedShuffleExec operators
resolved to
- /// ShuffleReadExec
- pub fn resolved(&self) -> bool {
- self.resolved
- }
+ /// Update task statuses and task metrics in the graph.
+ /// This will also push shuffle partitions to their respective shuffle
read stages.
+ pub fn update_task_status(
+ &mut self,
+ executor: &ExecutorMetadata,
+ statuses: Vec<TaskStatus>,
+ ) -> Result<()> {
+ for status in statuses.into_iter() {
+ if let TaskStatus {
+ task_id:
+ Some(protobuf::PartitionId {
+ job_id,
+ stage_id,
+ partition_id,
+ }),
+ metrics: operator_metrics,
+ status: Some(task_status),
+ } = status
+ {
+ if job_id != self.job_id() {
+ return Err(BallistaError::Internal(format!(
+ "Error updating job {}: Invalid task status job ID {}",
+ self.job_id(),
+ job_id
+ )));
+ }
- /// Returns the number of tasks in this stage which are available for
scheduling.
- /// If the stage is not yet resolved, then this will return `0`, otherwise
it will
- /// return the number of tasks where the task status is not yet set.
- pub fn available_tasks(&self) -> usize {
- if self.resolved {
- self.task_statuses.iter().filter(|s| s.is_none()).count()
- } else {
- 0
- }
- }
+ let stage_id = stage_id as usize;
+ let partition = partition_id as usize;
+ if let Some(stage) = self.stages.get_mut(&stage_id) {
+ stage.update_task_status(partition, task_status.clone());
+ let stage_plan = stage.plan.clone();
+ let stage_complete = stage.complete();
- /// Resolve any UnresolvedShuffleExec operators within this stage's plan
- pub fn resolve_shuffles(&mut self) -> Result<()> {
- println!("Resolving shuffles\n{:?}", self);
- if self.resolved {
- // If this stage has no input shuffles, then it is already resolved
- Ok(())
- } else {
- let input_locations = self
- .inputs
- .iter()
- .map(|(stage, outputs)| (*stage,
outputs.partition_locations.clone()))
- .collect();
- // Otherwise, rewrite the plan to replace UnresolvedShuffleExec
with ShuffleReadExec
- let new_plan = crate::planner::remove_unresolved_shuffles(
- self.plan.clone(),
- &input_locations,
- )?;
- self.plan = new_plan;
- self.resolved = true;
- Ok(())
- }
- }
+ // TODO Should be able to reschedule this task.
+ if let task_status::Status::Failed(failed_task) =
task_status {
+ self.status = JobStatus {
+ status: Some(job_status::Status::Failed(FailedJob {
+ error: format!(
+ "Task {}/{}/{} failed: {}",
+ job_id, stage_id, partition_id,
failed_task.error
+ ),
+ })),
+ };
+ return Ok(());
+ } else if let
task_status::Status::Completed(completed_task) =
+ task_status
+ {
+ // update task metrics for completed task
+ stage.update_task_metrics(partition,
operator_metrics)?;
- /// Update the status for task partition
- pub fn update_task_status(&mut self, partition: usize, status:
task_status::Status) {
- debug!("Updating task status for partition {}", partition);
- self.task_statuses[partition] = Some(status);
- }
+ // if this stage is completed, we want to combine the
stage metrics to plan's metric set and print out the plan
+ if stage_complete &&
stage.stage_metrics.as_ref().is_some() {
+ // The plan_metrics collected here is a snapshot
clone from the plan metrics.
+ // They are all empty now and need to combine with
the stage metrics in the ExecutionStages
+ let mut plan_metrics =
+ collect_plan_metrics(stage_plan.as_ref());
+ let stage_metrics = stage
+ .stage_metrics
+ .as_ref()
+ .expect("stage metrics should not be None.");
+ if plan_metrics.len() != stage_metrics.len() {
+ return
Err(BallistaError::Internal(format!("Error combine stage metrics to plan for
stage {}, plan metrics array size {} does not equal \
+ to the stage metrics array size {}", stage_id,
plan_metrics.len(), stage_metrics.len())));
+ }
+
plan_metrics.iter_mut().zip(stage_metrics).for_each(
+ |(plan_metric, stage_metric)| {
+ stage_metric
+ .iter()
+ .for_each(|s|
plan_metric.push(s.clone()));
+ },
+ );
- /// update and combine the task metrics to the stage metrics
- pub fn update_task_metrics(
- &mut self,
- partition: usize,
- metrics: Vec<OperatorMetricsSet>,
- ) -> Result<()> {
- if let Some(combined_metrics) = &mut self.stage_metrics {
- if metrics.len() != combined_metrics.len() {
- return Err(BallistaError::Internal(format!("Error updating
task metrics to stage {}, task metrics array size {} does not equal \
- with the stage metrics array size {} for task {}",
self.stage_id, metrics.len(), combined_metrics.len(), partition)));
- }
- let metrics_values_array = metrics
- .into_iter()
- .map(|ms| {
- ms.metrics
- .into_iter()
- .map(|m| m.try_into())
- .collect::<Result<Vec<_>>>()
- })
- .collect::<Result<Vec<_>>>()?;
+ info!(
+ "=== [{}/{}/{}] Stage finished, physical plan
with metrics ===\n{}\n",
+ job_id,
+ stage_id,
+ partition,
+
DisplayableBallistaExecutionPlan::new(stage_plan.as_ref(),
plan_metrics.as_ref()).indent()
+ );
+ }
- let new_metrics_set = combined_metrics
- .iter_mut()
- .zip(metrics_values_array)
- .map(|(first, second)| {
- Self::combine_metrics_set(first, second, partition)
- })
- .collect();
- self.stage_metrics = Some(new_metrics_set)
- } else {
- let new_metrics_set = metrics
- .into_iter()
- .map(|ms| ms.try_into())
- .collect::<Result<Vec<_>>>()?;
- if !new_metrics_set.is_empty() {
- self.stage_metrics = Some(new_metrics_set)
+ let locations = partition_to_location(
+ self.job_id.as_str(),
+ stage_id,
+ executor,
+ completed_task.partitions,
+ );
+
+ let output_links = stage.output_links.clone();
+ if output_links.is_empty() {
+ // If `output_links` is empty, then this is a
final stage
+ self.output_locations.extend(locations);
+ } else {
+ for link in output_links.into_iter() {
+ // If this is an intermediate stage, we need
to push its `PartitionLocation`s to the parent stage
+ if let Some(linked_stage) =
self.stages.get_mut(&link) {
+ linked_stage.add_input_partitions(
+ stage_id,
+ partition,
+ locations.clone(),
+ )?;
+
+ // If all tasks for this stage are
complete, mark the input complete in the parent stage
+ if stage_complete {
+ linked_stage.complete_input(stage_id);
+ }
+
+ // If all input partitions are ready, we
can resolve any UnresolvedShuffleExec in the parent stage plan
+ if linked_stage.resolvable() {
+ linked_stage.resolve_shuffles()?;
+ }
+ } else {
+ return
Err(BallistaError::Internal(format!("Error updating job {}: Invalid output link
{} for stage {}", job_id, stage_id, link)));
+ }
+ }
+ }
+ }
+ } else {
+ return Err(BallistaError::Internal(format!(
+ "Invalid stage ID {} for job {}",
+ stage_id,
+ self.job_id()
+ )));
+ }
}
}
+
Ok(())
}
- pub fn combine_metrics_set(
- first: &mut MetricsSet,
- second: Vec<MetricValue>,
- partition: usize,
- ) -> MetricsSet {
- for metric_value in second {
- // TODO recheck the lable logic
- let new_metric = Arc::new(Metric::new(metric_value,
Some(partition)));
- first.push(new_metric);
- }
- first.aggregate_by_partition()
+ /// Total number of tasks in this plan that are ready for scheduling
+ pub fn available_tasks(&self) -> usize {
+ self.stages
+ .iter()
+ .map(|(_, stage)| stage.available_tasks())
+ .sum()
}
- /// Add input partitions published from an input stage.
- pub fn add_input_partitions(
- &mut self,
- stage_id: usize,
- _partition_id: usize,
- locations: Vec<PartitionLocation>,
- ) -> Result<()> {
- if let Some(stage_inputs) = self.inputs.get_mut(&stage_id) {
- for partition in locations {
- stage_inputs.add_partition(partition);
- }
- } else {
- return Err(BallistaError::Internal(format!("Error adding input
partitions to stage {}, {} is not a valid child stage ID", self.stage_id,
stage_id)));
- }
+ /// Get next task that can be assigned to the given executor.
+ /// This method should only be called when the resulting task is
immediately
+ /// being launched as the status will be set to Running and it will not be
+ /// available to the scheduler.
+ /// If the task is not launched the status must be reset to allow the task
to
+ /// be scheduled elsewhere.
+ pub fn pop_next_task(&mut self, executor_id: &str) -> Result<Option<Task>>
{
+ let job_id = self.job_id.clone();
+ let session_id = self.session_id.clone();
+ self.stages.iter_mut().find(|(_stage_id, stage)| {
+ stage.resolved() && stage.available_tasks() > 0
+ }).map(|(stage_id, stage)| {
+ let (partition_id,_) = stage
+ .task_statuses
+ .iter()
+ .enumerate()
+ .find(|(_partition,status)| status.is_none())
+ .ok_or_else(|| {
+ BallistaError::Internal(format!("Error getting next task
for job {}: Stage {} is ready but has no pending tasks", job_id, stage_id))
+ })?;
- Ok(())
- }
-}
+ let partition = PartitionId {
+ job_id,
+ stage_id: *stage_id,
+ partition_id
+ };
-/// Utility for building a set of `ExecutionStage`s from
-/// a list of `ShuffleWriterExec`.
-///
-/// This will infer the dependency structure for the stages
-/// so that we can construct a DAG from the stages.
-struct ExecutionStageBuilder {
- /// Stage ID which is currently being visited
- current_stage_id: usize,
- /// Map from stage ID -> List of child stage IDs
- stage_dependencies: HashMap<usize, Vec<usize>>,
- /// Map from Stage ID -> output link
- output_links: HashMap<usize, Vec<usize>>,
-}
+ // Set the status to Running
+ stage.task_statuses[partition_id] =
Some(task_status::Status::Running(RunningTask {
+ executor_id: executor_id.to_owned()
+ }));
-impl ExecutionStageBuilder {
- pub fn new() -> Self {
- Self {
- current_stage_id: 0,
- stage_dependencies: HashMap::new(),
- output_links: HashMap::new(),
- }
+ Ok(Task {
+ session_id,
+ partition,
+ plan: stage.plan.clone(),
+ output_partitioning: stage.output_partitioning.clone()
+ })
+ }).transpose()
}
- pub fn build(
- mut self,
- stages: Vec<Arc<ShuffleWriterExec>>,
- ) -> Result<HashMap<usize, ExecutionStage>> {
- let mut execution_stages: HashMap<usize, ExecutionStage> =
HashMap::new();
- // First, build the dependency graph
- for stage in &stages {
- accept(stage.as_ref(), &mut self)?;
+ pub fn finalize(&mut self) -> Result<()> {
+ if !self.complete() {
+ return Err(BallistaError::Internal(format!(
+ "Attempt to finalize an incomplete job {}",
+ self.job_id()
+ )));
}
- // Now, create the execution stages
- for stage in stages {
- let partitioning = stage.shuffle_output_partitioning().cloned();
- let stage_id = stage.stage_id();
- let output_links =
self.output_links.remove(&stage_id).unwrap_or_default();
+ let partition_location = self
+ .output_locations()
+ .into_iter()
+ .map(|l| l.try_into())
+ .collect::<Result<Vec<_>>>()?;
- let child_stages = self
- .stage_dependencies
- .remove(&stage_id)
- .unwrap_or_default();
+ self.status = JobStatus {
+ status: Some(job_status::Status::Completed(CompletedJob {
+ partition_location,
+ })),
+ };
- execution_stages.insert(
- stage_id,
- ExecutionStage::new(
- stage_id,
- stage,
- partitioning,
- output_links,
- child_stages,
- ),
- );
+ Ok(())
+ }
+
+ pub fn update_status(&mut self, status: JobStatus) {
+ self.status = status;
+ }
+
+ /// Reset the status for the given task. This should be called is a task
failed to
+ /// launch and it needs to be returned to the set of available tasks and be
+ /// re-scheduled.
+ pub fn reset_task_status(&mut self, task: Task) {
+ let stage_id = task.partition.stage_id;
+ let partition = task.partition.partition_id;
+
+ if let Some(stage) = self.stages.get_mut(&stage_id) {
+ stage.task_statuses[partition] = None;
}
+ }
- Ok(execution_stages)
+ pub fn output_locations(&self) -> Vec<PartitionLocation> {
+ self.output_locations.clone()
}
-}
-impl ExecutionPlanVisitor for ExecutionStageBuilder {
- type Error = BallistaError;
+ pub(crate) async fn decode_execution_graph<
+ T: 'static + AsLogicalPlan,
+ U: 'static + AsExecutionPlan,
+ >(
+ proto: protobuf::ExecutionGraph,
+ codec: &BallistaCodec<T, U>,
+ session_ctx: &SessionContext,
+ ) -> Result<ExecutionGraph> {
+ let mut stages: HashMap<usize, ExecutionStage> = HashMap::new();
+ for stage in proto.stages {
+ let plan_proto = U::try_decode(stage.plan.as_slice())?;
+ let plan = plan_proto.try_into_physical_plan(
+ session_ctx,
+ session_ctx.runtime_env().as_ref(),
+ codec.physical_extension_codec(),
+ )?;
- fn pre_visit(
- &mut self,
- plan: &dyn ExecutionPlan,
- ) -> std::result::Result<bool, Self::Error> {
- if let Some(shuffle_write) =
plan.as_any().downcast_ref::<ShuffleWriterExec>() {
- self.current_stage_id = shuffle_write.stage_id();
- } else if let Some(unresolved_shuffle) =
- plan.as_any().downcast_ref::<UnresolvedShuffleExec>()
- {
- if let Some(output_links) =
- self.output_links.get_mut(&unresolved_shuffle.stage_id)
- {
- if !output_links.contains(&self.current_stage_id) {
- output_links.push(self.current_stage_id);
+ let stage_id = stage.stage_id as usize;
+ let partitions: usize = stage.partitions as usize;
+
+ let mut task_statuses: Vec<Option<task_status::Status>> =
+ vec![None; partitions];
+
+ for status in stage.task_statuses {
+ if let Some(task_id) = status.task_id.as_ref() {
+ task_statuses[task_id.partition_id as usize] =
status.status
}
- } else {
- self.output_links
- .insert(unresolved_shuffle.stage_id,
vec![self.current_stage_id]);
}
- if let Some(deps) =
self.stage_dependencies.get_mut(&self.current_stage_id) {
- if !deps.contains(&unresolved_shuffle.stage_id) {
- deps.push(unresolved_shuffle.stage_id);
- }
- } else {
- self.stage_dependencies
- .insert(self.current_stage_id,
vec![unresolved_shuffle.stage_id]);
+ let output_partitioning: Option<Partitioning> =
+ parse_protobuf_hash_partitioning(
+ stage.output_partitioning.as_ref(),
+ session_ctx,
+ plan.schema().as_ref(),
+ )?;
+
+ let mut inputs: HashMap<usize, StageOutput> = HashMap::new();
+
+ for input in stage.inputs {
+ let stage_id = input.stage_id as usize;
+
+ let outputs = input
+ .partition_locations
+ .into_iter()
+ .map(|loc| {
+ let partition = loc.partition as usize;
+ let locations = loc
+ .partition_location
+ .into_iter()
+ .map(|l| l.try_into())
+ .collect::<Result<Vec<_>>>()?;
+ Ok((partition, locations))
+ })
+ .collect::<Result<HashMap<usize,
Vec<PartitionLocation>>>>()?;
+
+ inputs.insert(
+ stage_id,
+ StageOutput {
+ partition_locations: outputs,
+ complete: input.complete,
+ },
+ );
}
+ let stage_metrics = if stage.stage_metrics.is_empty() {
+ None
+ } else {
+ let ms = stage
+ .stage_metrics
+ .into_iter()
+ .map(|m| m.try_into())
+ .collect::<Result<Vec<_>>>()?;
+ Some(ms)
+ };
+
+ let execution_stage = ExecutionStage {
+ stage_id: stage.stage_id as usize,
+ partitions,
+ output_partitioning,
+ inputs,
+ plan,
+ task_statuses,
+ output_links: stage
+ .output_links
+ .into_iter()
+ .map(|l| l as usize)
+ .collect(),
+ resolved: stage.resolved,
+ stage_metrics,
+ };
+ stages.insert(stage_id, execution_stage);
}
- Ok(true)
- }
-}
-/// Represents the basic unit of work for the Ballista executor. Will execute
-/// one partition of one stage on one task slot.
-#[derive(Clone)]
-pub struct Task {
- pub session_id: String,
- pub partition: PartitionId,
- pub plan: Arc<dyn ExecutionPlan>,
- pub output_partitioning: Option<Partitioning>,
-}
+ let output_locations: Vec<PartitionLocation> = proto
+ .output_locations
+ .into_iter()
+ .map(|loc| loc.try_into())
+ .collect::<Result<Vec<_>>>()?;
-impl Debug for Task {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
- write!(
- f,
- "Task[session_id: {}, job: {}, stage: {}, partition: {}]\n{}",
- self.session_id,
- self.partition.job_id,
- self.partition.stage_id,
- self.partition.partition_id,
- plan
- )
+ Ok(ExecutionGraph {
+ job_id: proto.job_id,
+ session_id: proto.session_id,
+ status: proto.status.ok_or_else(|| {
+ BallistaError::Internal(
+ "Invalid Execution Graph: missing job status".to_owned(),
+ )
+ })?,
+ stages,
+ output_partitions: proto.output_partitions as usize,
+ output_locations,
+ })
}
-}
-
-/// Represents the DAG for a distributed query plan.
-///
-/// A distributed query plan consists of a set of stages which must be
executed sequentially.
-///
-/// Each stage consists of a set of partitions which can be executed in
parallel, where each partition
-/// represents a `Task`, which is the basic unit of scheduling in Ballista.
-///
-/// As an example, consider a SQL query which performs a simple aggregation:
-///
-/// `SELECT id, SUM(gmv) FROM some_table GROUP BY id`
-///
-/// This will produce a DataFusion execution plan that looks something like
-///
-///
-/// CoalesceBatchesExec: target_batch_size=4096
-/// RepartitionExec: partitioning=Hash([Column { name: "id", index: 0 }],
4)
-/// AggregateExec: mode=Partial, gby=[id@0 as id],
aggr=[SUM(some_table.gmv)]
-/// TableScan: some_table
-///
-/// The Ballista `DistributedPlanner` will turn this into a distributed plan
by creating a shuffle
-/// boundary (called a "Stage") whenever the underlying plan needs to perform
a repartition.
-/// In this case we end up with a distributed plan with two stages:
-///
-///
-/// ExecutionGraph[job_id=job, session_id=session, available_tasks=1,
complete=false]
-/// Stage[id=2, partitions=4, children=1, completed_tasks=0, resolved=false,
scheduled_tasks=0, available_tasks=0]
-/// Inputs{1: StageOutput { partition_locations: {}, complete: false }}
-///
-/// ShuffleWriterExec: None
-/// AggregateExec: mode=FinalPartitioned, gby=[id@0 as id],
aggr=[SUM(?table?.gmv)]
-/// CoalesceBatchesExec: target_batch_size=4096
-/// UnresolvedShuffleExec
-///
-/// Stage[id=1, partitions=1, children=0, completed_tasks=0, resolved=true,
scheduled_tasks=0, available_tasks=1]
-/// Inputs{}
-///
-/// ShuffleWriterExec: Some(Hash([Column { name: "id", index: 0 }], 4))
-/// AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[SUM(?table?.gmv)]
-/// TableScan: some_table
-///
-///
-/// The DAG structure of this `ExecutionGraph` is encoded in the stages. Each
stage's `input` field
-/// will indicate which stages it depends on, and each stage's `output_links`
will indicate which
-/// stage it needs to publish its output to.
-///
-/// If a stage has `output_links` is empty then it is the final stage in this
query, and it should
-/// publish its outputs to the `ExecutionGraph`s `output_locations`
representing the final query results.
-#[derive(Clone)]
-pub struct ExecutionGraph {
- /// ID for this job
- pub(crate) job_id: String,
- /// Session ID for this job
- pub(crate) session_id: String,
- /// Status of this job
- pub(crate) status: JobStatus,
- /// Map from Stage ID -> ExecutionStage
- pub(crate) stages: HashMap<usize, ExecutionStage>,
- /// Total number fo output partitions
- pub(crate) output_partitions: usize,
- /// Locations of this `ExecutionGraph` final output locations
- pub(crate) output_locations: Vec<PartitionLocation>,
-}
-impl ExecutionGraph {
- pub fn new(
- job_id: &str,
- session_id: &str,
- plan: Arc<dyn ExecutionPlan>,
- ) -> Result<Self> {
- let mut planner = DistributedPlanner::new();
+ pub(crate) fn encode_execution_graph<
+ T: 'static + AsLogicalPlan,
+ U: 'static + AsExecutionPlan,
+ >(
+ graph: ExecutionGraph,
+ codec: &BallistaCodec<T, U>,
+ ) -> Result<protobuf::ExecutionGraph> {
+ let job_id = graph.job_id().to_owned();
- let output_partitions = plan.output_partitioning().partition_count();
+ let stages = graph
+ .stages
+ .into_iter()
+ .map(|(stage_id, stage)| {
+ let mut plan: Vec<u8> = vec![];
+
+ U::try_from_physical_plan(stage.plan,
codec.physical_extension_codec())
+ .and_then(|proto| proto.try_encode(&mut plan))?;
+
+ let mut inputs: Vec<protobuf::GraphStageInput> = vec![];
+
+ for (stage, output) in stage.inputs.into_iter() {
+ inputs.push(protobuf::GraphStageInput {
+ stage_id: stage as u32,
+ partition_locations: output
+ .partition_locations
+ .into_iter()
+ .map(|(partition, locations)| {
+ Ok(protobuf::TaskInputPartitions {
+ partition: partition as u32,
+ partition_location: locations
+ .into_iter()
+ .map(|l| l.try_into())
+ .collect::<Result<Vec<_>>>()?,
+ })
+ })
+ .collect::<Result<Vec<_>>>()?,
+ complete: output.complete,
+ });
+ }
- let shuffle_stages = planner.plan_query_stages(job_id, plan)?;
+ let task_statuses: Vec<protobuf::TaskStatus> = stage
+ .task_statuses
+ .into_iter()
+ .enumerate()
+ .filter_map(|(partition, status)| {
+ status.map(|status| protobuf::TaskStatus {
+ task_id: Some(protobuf::PartitionId {
+ job_id: job_id.clone(),
+ stage_id: stage_id as u32,
+ partition_id: partition as u32,
+ }),
+ // task metrics should not persist.
+ metrics: vec![],
+ status: Some(status),
+ })
+ })
+ .collect();
+
+ let output_partitioning =
+
hash_partitioning_to_proto(stage.output_partitioning.as_ref())?;
+
+ let stage_metrics = stage
+ .stage_metrics
+ .unwrap_or_default()
+ .into_iter()
+ .map(|m| m.try_into())
+ .collect::<Result<Vec<_>>>()?;
+ Ok(protobuf::ExecutionGraphStage {
+ stage_id: stage_id as u64,
+ partitions: stage.partitions as u32,
+ output_partitioning,
+ inputs,
+ plan,
+ task_statuses,
+ output_links: stage
+ .output_links
+ .into_iter()
+ .map(|l| l as u32)
+ .collect(),
+ resolved: stage.resolved,
+ stage_metrics,
+ })
+ })
+ .collect::<Result<Vec<_>>>()?;
- let builder = ExecutionStageBuilder::new();
- let stages = builder.build(shuffle_stages)?;
+ let output_locations: Vec<protobuf::PartitionLocation> = graph
+ .output_locations
+ .into_iter()
+ .map(|loc| loc.try_into())
+ .collect::<Result<Vec<_>>>()?;
- Ok(Self {
- job_id: job_id.to_string(),
- session_id: session_id.to_string(),
- status: JobStatus {
- status: Some(job_status::Status::Queued(QueuedJob {})),
- },
+ Ok(protobuf::ExecutionGraph {
+ job_id: graph.job_id,
+ session_id: graph.session_id,
+ status: Some(graph.status),
stages,
- output_partitions,
- output_locations: vec![],
+ output_partitions: graph.output_partitions as u64,
+ output_locations,
})
}
+}
+
+impl Debug for ExecutionGraph {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let stages = self
+ .stages
+ .iter()
+ .map(|(_, stage)| format!("{:?}", stage))
+ .collect::<Vec<String>>()
+ .join("\n");
+ write!(f, "ExecutionGraph[job_id={}, session_id={},
available_tasks={}, complete={}]\n{}", self.job_id, self.session_id,
self.available_tasks(), self.complete(), stages)
+ }
+}
- pub fn job_id(&self) -> &str {
- self.job_id.as_str()
+/// This data structure collects the partition locations for an
`ExecutionStage`.
+/// Each `ExecutionStage` will hold a `StageOutput`s for each of its child
stages.
+/// When all tasks for the child stage are complete, it will mark the
`StageOutput`
+#[derive(Clone, Debug, Default)]
+struct StageOutput {
+ /// Map from partition -> partition locations
+ partition_locations: HashMap<usize, Vec<PartitionLocation>>,
+ /// Flag indicating whether all tasks are complete
+ complete: bool,
+}
+
+impl StageOutput {
+ pub fn new() -> Self {
+ Self {
+ partition_locations: HashMap::new(),
+ complete: false,
+ }
}
- pub fn session_id(&self) -> &str {
- self.session_id.as_str()
+ /// Add a `PartitionLocation` to the `StageOutput`
+ pub fn add_partition(&mut self, partition_location: PartitionLocation) {
+ if let Some(parts) = self
+ .partition_locations
+ .get_mut(&partition_location.partition_id.partition_id)
+ {
+ parts.push(partition_location)
+ } else {
+ self.partition_locations.insert(
+ partition_location.partition_id.partition_id,
+ vec![partition_location],
+ );
+ }
}
- pub fn status(&self) -> JobStatus {
- self.status.clone()
+ pub fn is_complete(&self) -> bool {
+ self.complete
}
+}
- /// An ExecutionGraph is complete if all its stages are complete
- pub fn complete(&self) -> bool {
- self.stages.values().all(|s| s.complete())
+/// A stage in the ExecutionGraph.
+///
+/// This represents a set of tasks (one per each `partition`) which can
+/// be executed concurrently.
+#[derive(Clone)]
+struct ExecutionStage {
+ /// Stage ID
+ stage_id: usize,
+ /// Total number of output partitions for this stage.
+ /// This stage will produce on task for partition.
+ partitions: usize,
+ /// Output partitioning for this stage.
+ output_partitioning: Option<Partitioning>,
+ /// Represents the outputs from this stage's child stages.
+ /// This stage can only be resolved an executed once all child stages are
completed.
+ inputs: HashMap<usize, StageOutput>,
+ // `ExecutionPlan` for this stage
+ plan: Arc<dyn ExecutionPlan>,
+ /// Status of each already scheduled task. If status is None, the
partition has not yet been scheduled
+ task_statuses: Vec<Option<task_status::Status>>,
+ /// Stage ID of the stage that will take this stages outputs as inputs.
+ /// If `output_links` is empty then this the final stage in the
`ExecutionGraph`
+ output_links: Vec<usize>,
+ /// Flag indicating whether all input partitions have been resolved and
the plan
+ /// has UnresovledShuffleExec operators resolved to ShuffleReadExec
operators.
+ resolved: bool,
+ /// Combined metrics of the already finished tasks in the stage, If it is
None, no task is finished yet.
+ stage_metrics: Option<Vec<MetricsSet>>,
+}
+
+impl Debug for ExecutionStage {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
+ let scheduled_tasks = self.task_statuses.iter().filter(|t|
t.is_some()).count();
+
+ write!(
+ f,
+ "Stage[id={}, partitions={:?}, children={}, completed_tasks={},
resolved={}, scheduled_tasks={}, available_tasks={}]\nInputs{:?}\n\n{}",
+ self.stage_id,
+ self.partitions,
+ self.inputs.len(),
+ self.completed_tasks(),
+ self.resolved,
+ scheduled_tasks,
+ self.available_tasks(),
+ self.inputs,
+ plan
+ )
}
+}
- /// Update task statuses and task metrics in the graph.
- /// This will also push shuffle partitions to their respective shuffle
read stages.
- pub fn update_task_status(
- &mut self,
- executor: &ExecutorMetadata,
- statuses: Vec<TaskStatus>,
- ) -> Result<()> {
- for status in statuses.into_iter() {
- if let TaskStatus {
- task_id:
- Some(protobuf::PartitionId {
- job_id,
- stage_id,
- partition_id,
- }),
- metrics: operator_metrics,
- status: Some(task_status),
- } = status
- {
- if job_id != self.job_id() {
- return Err(BallistaError::Internal(format!(
- "Error updating job {}: Invalid task status job ID {}",
- self.job_id(),
- job_id
- )));
- }
+impl ExecutionStage {
+ pub fn new(
+ stage_id: usize,
+ plan: Arc<dyn ExecutionPlan>,
+ output_partitioning: Option<Partitioning>,
+ output_links: Vec<usize>,
+ child_stages: Vec<usize>,
+ ) -> Self {
+ let num_tasks = plan.output_partitioning().partition_count();
- let stage_id = stage_id as usize;
- let partition = partition_id as usize;
- if let Some(stage) = self.stages.get_mut(&stage_id) {
- stage.update_task_status(partition, task_status.clone());
- let stage_plan = stage.plan.clone();
- let stage_complete = stage.complete();
+ let resolved = child_stages.is_empty();
- // TODO Should be able to reschedule this task.
- if let task_status::Status::Failed(failed_task) =
task_status {
- self.status = JobStatus {
- status: Some(job_status::Status::Failed(FailedJob {
- error: format!(
- "Task {}/{}/{} failed: {}",
- job_id, stage_id, partition_id,
failed_task.error
- ),
- })),
- };
- return Ok(());
- } else if let
task_status::Status::Completed(completed_task) =
- task_status
- {
- // update task metrics for completed task
- stage.update_task_metrics(partition,
operator_metrics)?;
+ let mut inputs: HashMap<usize, StageOutput> = HashMap::new();
- // if this stage is completed, we want to combine the
stage metrics to plan's metric set and print out the plan
- if stage_complete &&
stage.stage_metrics.as_ref().is_some() {
- // The plan_metrics collected here is a snapshot
clone from the plan metrics.
- // They are all empty now and need to combine with
the stage metrics in the ExecutionStages
- let mut plan_metrics =
- collect_plan_metrics(stage_plan.as_ref());
- let stage_metrics = stage
- .stage_metrics
- .as_ref()
- .expect("stage metrics should not be None.");
- if plan_metrics.len() != stage_metrics.len() {
- return
Err(BallistaError::Internal(format!("Error combine stage metrics to plan for
stage {}, plan metrics array size {} does not equal \
- to the stage metrics array size {}", stage_id,
plan_metrics.len(), stage_metrics.len())));
- }
-
plan_metrics.iter_mut().zip(stage_metrics).for_each(
- |(plan_metric, stage_metric)| {
- stage_metric
- .iter()
- .for_each(|s|
plan_metric.push(s.clone()));
- },
- );
+ for input_stage_id in &child_stages {
+ inputs.insert(*input_stage_id, StageOutput::new());
+ }
- info!(
- "=== [{}/{}/{}] Stage finished, physical plan
with metrics ===\n{}\n",
- job_id,
- stage_id,
- partition,
-
DisplayableBallistaExecutionPlan::new(stage_plan.as_ref(),
plan_metrics.as_ref()).indent()
- );
- }
+ Self {
+ stage_id,
+ partitions: num_tasks,
+ output_partitioning,
+ inputs,
+ plan,
+ task_statuses: vec![None; num_tasks],
+ output_links,
+ resolved,
+ stage_metrics: None,
+ }
+ }
- let locations = partition_to_location(
- self.job_id.as_str(),
- stage_id,
- executor,
- completed_task.partitions,
- );
+ /// Returns true if all inputs are complete and we can resolve all
+ /// UnresolvedShuffleExec operators to ShuffleReadExec
+ pub fn resolvable(&self) -> bool {
+ self.inputs.iter().all(|(_, outputs)| outputs.is_complete())
+ }
- let output_links = stage.output_links.clone();
- if output_links.is_empty() {
- // If `output_links` is empty, then this is a
final stage
- self.output_locations.extend(locations);
- } else {
- for link in output_links.into_iter() {
- // If this is an intermediate stage, we need
to push its `PartitionLocation`s to the parent stage
- if let Some(linked_stage) =
self.stages.get_mut(&link) {
- linked_stage.add_input_partitions(
- stage_id,
- partition,
- locations.clone(),
- )?;
+ /// Returns `true` if all tasks for this stage are complete
+ pub fn complete(&self) -> bool {
+ self.task_statuses
+ .iter()
+ .all(|status| matches!(status,
Some(task_status::Status::Completed(_))))
+ }
- // If all tasks for this stage are
complete, mark the input complete in the parent stage
- if stage_complete {
- linked_stage.complete_input(stage_id);
- }
+ /// Returns the number of tasks
+ pub fn completed_tasks(&self) -> usize {
+ self.task_statuses
+ .iter()
+ .filter(|status| matches!(status,
Some(task_status::Status::Completed(_))))
+ .count()
+ }
- // If all input partitions are ready, we
can resolve any UnresolvedShuffleExec in the parent stage plan
- if linked_stage.resolvable() {
- linked_stage.resolve_shuffles()?;
- }
- } else {
- return
Err(BallistaError::Internal(format!("Error updating job {}: Invalid output link
{} for stage {}", job_id, stage_id, link)));
- }
- }
- }
- }
- } else {
- return Err(BallistaError::Internal(format!(
- "Invalid stage ID {} for job {}",
- stage_id,
- self.job_id()
- )));
- }
- }
+ /// Marks the input stage ID as complete.
+ pub fn complete_input(&mut self, stage_id: usize) {
+ if let Some(input) = self.inputs.get_mut(&stage_id) {
+ input.complete = true;
}
+ }
- Ok(())
+ /// Returns true if the stage plan has all UnresolvedShuffleExec operators
resolved to
+ /// ShuffleReadExec
+ pub fn resolved(&self) -> bool {
+ self.resolved
}
- /// Total number of tasks in this plan that are ready for scheduling
+ /// Returns the number of tasks in this stage which are available for
scheduling.
+ /// If the stage is not yet resolved, then this will return `0`, otherwise
it will
+ /// return the number of tasks where the task status is not yet set.
pub fn available_tasks(&self) -> usize {
- self.stages
- .iter()
- .map(|(_, stage)| stage.available_tasks())
- .sum()
+ if self.resolved {
+ self.task_statuses.iter().filter(|s| s.is_none()).count()
+ } else {
+ 0
+ }
}
- /// Get next task that can be assigned to the given executor.
- /// This method should only be called when the resulting task is
immediately
- /// being launched as the status will be set to Running and it will not be
- /// available to the scheduler.
- /// If the task is not launched the status must be reset to allow the task
to
- /// be scheduled elsewhere.
- pub fn pop_next_task(&mut self, executor_id: &str) -> Result<Option<Task>>
{
- let job_id = self.job_id.clone();
- let session_id = self.session_id.clone();
- self.stages.iter_mut().find(|(_stage_id, stage)| {
- stage.resolved() && stage.available_tasks() > 0
- }).map(|(stage_id, stage)| {
- let (partition_id,_) = stage
- .task_statuses
+ /// Resolve any UnresolvedShuffleExec operators within this stage's plan
+ pub fn resolve_shuffles(&mut self) -> Result<()> {
+ println!("Resolving shuffles\n{:?}", self);
+ if self.resolved {
+ // If this stage has no input shuffles, then it is already resolved
+ Ok(())
+ } else {
+ let input_locations = self
+ .inputs
.iter()
- .enumerate()
- .find(|(_partition,status)| status.is_none())
- .ok_or_else(|| {
- BallistaError::Internal(format!("Error getting next task for
job {}: Stage {} is ready but has no pending tasks", job_id, stage_id))
- })?;
+ .map(|(stage, outputs)| (*stage,
outputs.partition_locations.clone()))
+ .collect();
+ // Otherwise, rewrite the plan to replace UnresolvedShuffleExec
with ShuffleReadExec
+ let new_plan = crate::planner::remove_unresolved_shuffles(
+ self.plan.clone(),
+ &input_locations,
+ )?;
+ self.plan = new_plan;
+ self.resolved = true;
+ Ok(())
+ }
+ }
- let partition = PartitionId {
- job_id,
- stage_id: *stage_id,
- partition_id
- };
+ /// Update the status for task partition
+ pub fn update_task_status(&mut self, partition: usize, status:
task_status::Status) {
+ debug!("Updating task status for partition {}", partition);
+ self.task_statuses[partition] = Some(status);
+ }
- // Set the status to Running
- stage.task_statuses[partition_id] =
Some(task_status::Status::Running(RunningTask {
- executor_id: executor_id.to_owned()
- }));
+ /// update and combine the task metrics to the stage metrics
+ pub fn update_task_metrics(
+ &mut self,
+ partition: usize,
+ metrics: Vec<OperatorMetricsSet>,
+ ) -> Result<()> {
+ if let Some(combined_metrics) = &mut self.stage_metrics {
+ if metrics.len() != combined_metrics.len() {
+ return Err(BallistaError::Internal(format!("Error updating
task metrics to stage {}, task metrics array size {} does not equal \
+ with the stage metrics array size {} for task {}",
self.stage_id, metrics.len(), combined_metrics.len(), partition)));
+ }
+ let metrics_values_array = metrics
+ .into_iter()
+ .map(|ms| {
+ ms.metrics
+ .into_iter()
+ .map(|m| m.try_into())
+ .collect::<Result<Vec<_>>>()
+ })
+ .collect::<Result<Vec<_>>>()?;
- Ok(Task {
- session_id,
- partition,
- plan: stage.plan.clone(),
- output_partitioning: stage.output_partitioning.clone()
- })
- }).transpose()
+ let new_metrics_set = combined_metrics
+ .iter_mut()
+ .zip(metrics_values_array)
+ .map(|(first, second)| {
+ Self::combine_metrics_set(first, second, partition)
+ })
+ .collect();
+ self.stage_metrics = Some(new_metrics_set)
+ } else {
+ let new_metrics_set = metrics
+ .into_iter()
+ .map(|ms| ms.try_into())
+ .collect::<Result<Vec<_>>>()?;
+ if !new_metrics_set.is_empty() {
+ self.stage_metrics = Some(new_metrics_set)
+ }
+ }
+ Ok(())
}
- pub fn finalize(&mut self) -> Result<()> {
- if !self.complete() {
- return Err(BallistaError::Internal(format!(
- "Attempt to finalize an incomplete job {}",
- self.job_id()
- )));
+ pub fn combine_metrics_set(
+ first: &mut MetricsSet,
+ second: Vec<MetricValue>,
+ partition: usize,
+ ) -> MetricsSet {
+ for metric_value in second {
+ // TODO recheck the lable logic
+ let new_metric = Arc::new(Metric::new(metric_value,
Some(partition)));
+ first.push(new_metric);
}
+ first.aggregate_by_partition()
+ }
- let partition_location = self
- .output_locations()
- .into_iter()
- .map(|l| l.try_into())
- .collect::<Result<Vec<_>>>()?;
-
- self.status = JobStatus {
- status: Some(job_status::Status::Completed(CompletedJob {
- partition_location,
- })),
- };
+ /// Add input partitions published from an input stage.
+ pub fn add_input_partitions(
+ &mut self,
+ stage_id: usize,
+ _partition_id: usize,
+ locations: Vec<PartitionLocation>,
+ ) -> Result<()> {
+ if let Some(stage_inputs) = self.inputs.get_mut(&stage_id) {
+ for partition in locations {
+ stage_inputs.add_partition(partition);
+ }
+ } else {
+ return Err(BallistaError::Internal(format!("Error adding input
partitions to stage {}, {} is not a valid child stage ID", self.stage_id,
stage_id)));
+ }
Ok(())
}
+}
- pub fn update_status(&mut self, status: JobStatus) {
- self.status = status;
+/// Utility for building a set of `ExecutionStage`s from
+/// a list of `ShuffleWriterExec`.
+///
+/// This will infer the dependency structure for the stages
+/// so that we can construct a DAG from the stages.
+struct ExecutionStageBuilder {
+ /// Stage ID which is currently being visited
+ current_stage_id: usize,
+ /// Map from stage ID -> List of child stage IDs
+ stage_dependencies: HashMap<usize, Vec<usize>>,
+ /// Map from Stage ID -> output link
+ output_links: HashMap<usize, Vec<usize>>,
+}
+
+impl ExecutionStageBuilder {
+ pub fn new() -> Self {
+ Self {
+ current_stage_id: 0,
+ stage_dependencies: HashMap::new(),
+ output_links: HashMap::new(),
+ }
}
- /// Reset the status for the given task. This should be called is a task
failed to
- /// launch and it needs to be returned to the set of available tasks and be
- /// re-scheduled.
- pub fn reset_task_status(&mut self, task: Task) {
- let stage_id = task.partition.stage_id;
- let partition = task.partition.partition_id;
+ pub fn build(
+ mut self,
+ stages: Vec<Arc<ShuffleWriterExec>>,
+ ) -> Result<HashMap<usize, ExecutionStage>> {
+ let mut execution_stages: HashMap<usize, ExecutionStage> =
HashMap::new();
+ // First, build the dependency graph
+ for stage in &stages {
+ accept(stage.as_ref(), &mut self)?;
+ }
- if let Some(stage) = self.stages.get_mut(&stage_id) {
- stage.task_statuses[partition] = None;
+ // Now, create the execution stages
+ for stage in stages {
+ let partitioning = stage.shuffle_output_partitioning().cloned();
+ let stage_id = stage.stage_id();
+ let output_links =
self.output_links.remove(&stage_id).unwrap_or_default();
+
+ let child_stages = self
+ .stage_dependencies
+ .remove(&stage_id)
+ .unwrap_or_default();
+
+ execution_stages.insert(
+ stage_id,
+ ExecutionStage::new(
+ stage_id,
+ stage,
+ partitioning,
+ output_links,
+ child_stages,
+ ),
+ );
}
- }
- pub fn output_locations(&self) -> Vec<PartitionLocation> {
- self.output_locations.clone()
+ Ok(execution_stages)
}
}
-impl Debug for ExecutionGraph {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- let stages = self
- .stages
- .iter()
- .map(|(_, stage)| format!("{:?}", stage))
- .collect::<Vec<String>>()
- .join("\n");
- write!(f, "ExecutionGraph[job_id={}, session_id={},
available_tasks={}, complete={}]\n{}", self.job_id, self.session_id,
self.available_tasks(), self.complete(), stages)
+impl ExecutionPlanVisitor for ExecutionStageBuilder {
+ type Error = BallistaError;
+
+ fn pre_visit(
+ &mut self,
+ plan: &dyn ExecutionPlan,
+ ) -> std::result::Result<bool, Self::Error> {
+ if let Some(shuffle_write) =
plan.as_any().downcast_ref::<ShuffleWriterExec>() {
+ self.current_stage_id = shuffle_write.stage_id();
+ } else if let Some(unresolved_shuffle) =
+ plan.as_any().downcast_ref::<UnresolvedShuffleExec>()
+ {
+ if let Some(output_links) =
+ self.output_links.get_mut(&unresolved_shuffle.stage_id)
+ {
+ if !output_links.contains(&self.current_stage_id) {
+ output_links.push(self.current_stage_id);
+ }
+ } else {
+ self.output_links
+ .insert(unresolved_shuffle.stage_id,
vec![self.current_stage_id]);
+ }
+
+ if let Some(deps) =
self.stage_dependencies.get_mut(&self.current_stage_id) {
+ if !deps.contains(&unresolved_shuffle.stage_id) {
+ deps.push(unresolved_shuffle.stage_id);
+ }
+ } else {
+ self.stage_dependencies
+ .insert(self.current_stage_id,
vec![unresolved_shuffle.stage_id]);
+ }
+ }
+ Ok(true)
}
}
diff --git a/ballista/rust/scheduler/src/state/task_manager.rs
b/ballista/rust/scheduler/src/state/task_manager.rs
index dcee3722..bb0db021 100644
--- a/ballista/rust/scheduler/src/state/task_manager.rs
+++ b/ballista/rust/scheduler/src/state/task_manager.rs
@@ -18,30 +18,29 @@
use crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::scheduler_server::SessionBuilder;
use crate::state::backend::{Keyspace, StateBackendClient};
-use crate::state::execution_graph::{ExecutionGraph, ExecutionStage,
StageOutput, Task};
+use crate::state::execution_graph::{ExecutionGraph, Task};
use crate::state::executor_manager::ExecutorReservation;
use crate::state::{decode_protobuf, encode_protobuf, with_lock};
use ballista_core::config::BallistaConfig;
-use ballista_core::error::{BallistaError, Result};
-use
ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
+#[cfg(not(test))]
+use ballista_core::error::BallistaError;
+use ballista_core::error::Result;
use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
use crate::state::session_manager::create_datafusion_context;
use ballista_core::serde::protobuf::{
- self, job_status, task_status, FailedJob, JobStatus, PartitionId,
TaskDefinition,
- TaskStatus,
+ self, job_status, FailedJob, JobStatus, PartitionId, TaskDefinition,
TaskStatus,
};
use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
-use ballista_core::serde::scheduler::{ExecutorMetadata, PartitionLocation};
+use ballista_core::serde::scheduler::ExecutorMetadata;
use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
-use datafusion::physical_plan::{ExecutionPlan, Partitioning};
+use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion_proto::logical_plan::AsLogicalPlan;
use log::{debug, info, warn};
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use std::collections::{HashMap, HashSet};
-use std::convert::TryInto;
use std::default::Default;
use std::sync::Arc;
use tokio::sync::RwLock;
@@ -484,206 +483,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
let session_id = &proto.session_id;
let session_ctx = self.get_session(session_id).await?;
- let mut stages: HashMap<usize, ExecutionStage> = HashMap::new();
- for stage in proto.stages {
- let plan_proto = U::try_decode(stage.plan.as_slice())?;
- let plan = plan_proto.try_into_physical_plan(
- session_ctx.as_ref(),
- session_ctx.runtime_env().as_ref(),
- self.codec.physical_extension_codec(),
- )?;
-
- let stage_id = stage.stage_id as usize;
- let partitions: usize = stage.partitions as usize;
-
- let mut task_statuses: Vec<Option<task_status::Status>> =
- vec![None; partitions];
-
- for status in stage.task_statuses {
- if let Some(task_id) = status.task_id.as_ref() {
- task_statuses[task_id.partition_id as usize] =
status.status
- }
- }
-
- let output_partitioning: Option<Partitioning> =
- parse_protobuf_hash_partitioning(
- stage.output_partitioning.as_ref(),
- session_ctx.as_ref(),
- plan.schema().as_ref(),
- )?;
-
- let mut inputs: HashMap<usize, StageOutput> = HashMap::new();
-
- for input in stage.inputs {
- let stage_id = input.stage_id as usize;
-
- let outputs = input
- .partition_locations
- .into_iter()
- .map(|loc| {
- let partition = loc.partition as usize;
- let locations = loc
- .partition_location
- .into_iter()
- .map(|l| l.try_into())
- .collect::<Result<Vec<_>>>()?;
- Ok((partition, locations))
- })
- .collect::<Result<HashMap<usize,
Vec<PartitionLocation>>>>()?;
-
- inputs.insert(
- stage_id,
- StageOutput {
- partition_locations: outputs,
- complete: input.complete,
- },
- );
- }
- let stage_metrics = if stage.stage_metrics.is_empty() {
- None
- } else {
- let ms = stage
- .stage_metrics
- .into_iter()
- .map(|m| m.try_into())
- .collect::<Result<Vec<_>>>()?;
- Some(ms)
- };
-
- let execution_stage = ExecutionStage {
- stage_id: stage.stage_id as usize,
- partitions,
- output_partitioning,
- inputs,
- plan,
- task_statuses,
- output_links: stage
- .output_links
- .into_iter()
- .map(|l| l as usize)
- .collect(),
- resolved: stage.resolved,
- stage_metrics,
- };
- stages.insert(stage_id, execution_stage);
- }
- let output_locations: Vec<PartitionLocation> = proto
- .output_locations
- .into_iter()
- .map(|loc| loc.try_into())
- .collect::<Result<Vec<_>>>()?;
-
- Ok(ExecutionGraph {
- job_id: proto.job_id,
- session_id: proto.session_id,
- status: proto.status.ok_or_else(|| {
- BallistaError::Internal(
- "Invalid Execution Graph: missing job status".to_owned(),
- )
- })?,
- stages,
- output_partitions: proto.output_partitions as usize,
- output_locations,
- })
+ ExecutionGraph::decode_execution_graph(proto, &self.codec,
&session_ctx).await
}
fn encode_execution_graph(&self, graph: ExecutionGraph) -> Result<Vec<u8>>
{
- let job_id = graph.job_id().to_owned();
-
- let stages = graph
- .stages
- .into_iter()
- .map(|(stage_id, stage)| {
- let mut plan: Vec<u8> = vec![];
-
- U::try_from_physical_plan(
- stage.plan,
- self.codec.physical_extension_codec(),
- )
- .and_then(|proto| proto.try_encode(&mut plan))?;
-
- let mut inputs: Vec<protobuf::GraphStageInput> = vec![];
-
- for (stage, output) in stage.inputs.into_iter() {
- inputs.push(protobuf::GraphStageInput {
- stage_id: stage as u32,
- partition_locations: output
- .partition_locations
- .into_iter()
- .map(|(partition, locations)| {
- Ok(protobuf::TaskInputPartitions {
- partition: partition as u32,
- partition_location: locations
- .into_iter()
- .map(|l| l.try_into())
- .collect::<Result<Vec<_>>>()?,
- })
- })
- .collect::<Result<Vec<_>>>()?,
- complete: output.complete,
- });
- }
+ let proto = ExecutionGraph::encode_execution_graph(graph,
&self.codec)?;
- let task_statuses: Vec<protobuf::TaskStatus> = stage
- .task_statuses
- .into_iter()
- .enumerate()
- .filter_map(|(partition, status)| {
- status.map(|status| protobuf::TaskStatus {
- task_id: Some(protobuf::PartitionId {
- job_id: job_id.clone(),
- stage_id: stage_id as u32,
- partition_id: partition as u32,
- }),
- // task metrics should not persist.
- metrics: vec![],
- status: Some(status),
- })
- })
- .collect();
-
- let output_partitioning =
-
hash_partitioning_to_proto(stage.output_partitioning.as_ref())?;
-
- let stage_metrics = stage
- .stage_metrics
- .unwrap_or_default()
- .into_iter()
- .map(|m| m.try_into())
- .collect::<Result<Vec<_>>>()?;
- Ok(protobuf::ExecutionGraphStage {
- stage_id: stage_id as u64,
- partitions: stage.partitions as u32,
- output_partitioning,
- inputs,
- plan,
- task_statuses,
- output_links: stage
- .output_links
- .into_iter()
- .map(|l| l as u32)
- .collect(),
- resolved: stage.resolved,
- stage_metrics,
- })
- })
- .collect::<Result<Vec<_>>>()?;
-
- let output_locations: Vec<protobuf::PartitionLocation> = graph
- .output_locations
- .into_iter()
- .map(|loc| loc.try_into())
- .collect::<Result<Vec<_>>>()?;
-
- encode_protobuf(&protobuf::ExecutionGraph {
- job_id: graph.job_id,
- session_id: graph.session_id,
- status: Some(graph.status),
- stages,
- output_partitions: graph.output_partitions as u64,
- output_locations,
- })
+ encode_protobuf(&proto)
}
}