This is an automated email from the ASF dual-hosted git repository.
milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new ec3f1be84 execution_graph extract to a trait (#1361)
ec3f1be84 is described below
commit ec3f1be840ac72600e4a111cc65970b0e31d6e1d
Author: Marko Milenković <[email protected]>
AuthorDate: Mon Feb 2 10:06:41 2026 +0000
execution_graph extract to a trait (#1361)
---
ballista/scheduler/src/cluster/memory.rs | 33 +-
ballista/scheduler/src/cluster/mod.rs | 29 +-
ballista/scheduler/src/cluster/test_util/mod.rs | 12 +-
ballista/scheduler/src/state/execution_graph.rs | 1135 +++++++++++---------
.../scheduler/src/state/execution_graph_dot.rs | 16 +-
ballista/scheduler/src/state/task_manager.rs | 39 +-
ballista/scheduler/src/test_utils.rs | 39 +-
7 files changed, 716 insertions(+), 587 deletions(-)
diff --git a/ballista/scheduler/src/cluster/memory.rs
b/ballista/scheduler/src/cluster/memory.rs
index a1db74ecb..87ef6709f 100644
--- a/ballista/scheduler/src/cluster/memory.rs
+++ b/ballista/scheduler/src/cluster/memory.rs
@@ -21,7 +21,7 @@ use crate::cluster::{
bind_task_consistent_hash, bind_task_round_robin, get_scan_files,
is_skip_consistent_hash,
};
-use crate::state::execution_graph::ExecutionGraph;
+use crate::state::execution_graph::ExecutionGraphBox;
use async_trait::async_trait;
use ballista_core::ConfigProducer;
use ballista_core::error::{BallistaError, Result};
@@ -347,7 +347,7 @@ impl ClusterState for InMemoryClusterState {
pub struct InMemoryJobState {
scheduler: String,
/// Jobs which have either completed successfully or failed
- completed_jobs: DashMap<String, (JobStatus, Option<ExecutionGraph>)>,
+ completed_jobs: DashMap<String, (JobStatus, Option<ExecutionGraphBox>)>,
/// In-memory store of queued jobs. Map from Job ID -> (Job Name,
queued_at timestamp)
queued_jobs: DashMap<String, (String, u64)>,
/// In-memory store of running job statuses. Map from Job ID -> JobStatus
@@ -382,7 +382,7 @@ impl InMemoryJobState {
#[async_trait]
impl JobState for InMemoryJobState {
- async fn submit_job(&self, job_id: String, graph: &ExecutionGraph) ->
Result<()> {
+ async fn submit_job(&self, job_id: String, graph: &ExecutionGraphBox) ->
Result<()> {
if self.queued_jobs.get(&job_id).is_some() {
self.running_jobs
.insert(job_id.clone(), graph.status().clone());
@@ -423,21 +423,24 @@ impl JobState for InMemoryJobState {
Ok(None)
}
- async fn get_execution_graph(&self, job_id: &str) ->
Result<Option<ExecutionGraph>> {
+ async fn get_execution_graph(
+ &self,
+ job_id: &str,
+ ) -> Result<Option<ExecutionGraphBox>> {
Ok(self
.completed_jobs
.get(job_id)
.as_deref()
- .and_then(|(_, graph)| graph.clone()))
+ .and_then(|(_, graph)| graph.as_ref().map(|e| e.cloned())))
}
- async fn try_acquire_job(&self, _job_id: &str) ->
Result<Option<ExecutionGraph>> {
+ async fn try_acquire_job(&self, _job_id: &str) ->
Result<Option<ExecutionGraphBox>> {
// Always return None. The only state stored here are for completed
jobs
// which cannot be acquired
Ok(None)
}
- async fn save_job(&self, job_id: &str, graph: &ExecutionGraph) ->
Result<()> {
+ async fn save_job(&self, job_id: &str, graph: &ExecutionGraphBox) ->
Result<()> {
let status = graph.status().clone();
debug!("saving state for job {job_id} with status {:?}", status);
@@ -448,7 +451,7 @@ impl JobState for InMemoryJobState {
Some(Status::Successful(_)) | Some(Status::Failed(_))
) {
self.completed_jobs
- .insert(job_id.to_string(), (status.clone(),
Some(graph.clone())));
+ .insert(job_id.to_string(), (status.clone(),
Some(graph.cloned())));
self.running_jobs.remove(job_id);
} else {
// otherwise update running job
@@ -576,7 +579,7 @@ mod test {
Arc::new(default_session_builder),
Arc::new(default_config_producer),
),
- test_aggregation_plan(4).await,
+ Box::new(test_aggregation_plan(4).await),
)
.await?;
test_job_lifecycle(
@@ -585,7 +588,7 @@ mod test {
Arc::new(default_session_builder),
Arc::new(default_config_producer),
),
- test_two_aggregations_plan(4).await,
+ Box::new(test_two_aggregations_plan(4).await),
)
.await?;
test_job_lifecycle(
@@ -594,7 +597,7 @@ mod test {
Arc::new(default_session_builder),
Arc::new(default_config_producer),
),
- test_join_plan(4).await,
+ Box::new(test_join_plan(4).await),
)
.await?;
@@ -609,7 +612,7 @@ mod test {
Arc::new(default_session_builder),
Arc::new(default_config_producer),
),
- test_aggregation_plan(4).await,
+ Box::new(test_aggregation_plan(4).await),
)
.await?;
test_job_planning_failure(
@@ -618,7 +621,7 @@ mod test {
Arc::new(default_session_builder),
Arc::new(default_config_producer),
),
- test_two_aggregations_plan(4).await,
+ Box::new(test_two_aggregations_plan(4).await),
)
.await?;
test_job_planning_failure(
@@ -627,7 +630,7 @@ mod test {
Arc::new(default_session_builder),
Arc::new(default_config_producer),
),
- test_join_plan(4).await,
+ Box::new(test_join_plan(4).await),
)
.await?;
@@ -654,7 +657,7 @@ mod test {
});
barrier.wait().await;
- test_job_lifecycle(state, test_aggregation_plan(4).await).await?;
+ test_job_lifecycle(state,
Box::new(test_aggregation_plan(4).await)).await?;
let result = events.await?;
assert_eq!(2, result.len());
match result.last().unwrap() {
diff --git a/ballista/scheduler/src/cluster/mod.rs
b/ballista/scheduler/src/cluster/mod.rs
index 7b1f4d27a..5d01091cc 100644
--- a/ballista/scheduler/src/cluster/mod.rs
+++ b/ballista/scheduler/src/cluster/mod.rs
@@ -43,7 +43,9 @@ use crate::cluster::memory::{InMemoryClusterState,
InMemoryJobState};
use crate::config::{SchedulerConfig, TaskDistributionPolicy};
use crate::scheduler_server::SessionBuilder;
-use crate::state::execution_graph::{ExecutionGraph, TaskDescription,
create_task_info};
+use crate::state::execution_graph::{
+ ExecutionGraphBox, TaskDescription, create_task_info,
+};
use crate::state::task_manager::JobInfoCache;
/// Event broadcasting and subscription for cluster state changes.
@@ -299,7 +301,7 @@ pub trait JobState: Send + Sync {
/// Submits a new job to the job state.
///
/// The submitter is assumed to own the job.
- async fn submit_job(&self, job_id: String, graph: &ExecutionGraph) ->
Result<()>;
+ async fn submit_job(&self, job_id: String, graph: &ExecutionGraphBox) ->
Result<()>;
/// Returns the set of all active job IDs.
async fn get_jobs(&self) -> Result<HashSet<String>>;
@@ -311,12 +313,15 @@ pub trait JobState: Send + Sync {
///
/// The job may not belong to the caller, and the graph may be updated
/// by another scheduler after this call returns.
- async fn get_execution_graph(&self, job_id: &str) ->
Result<Option<ExecutionGraph>>;
+ async fn get_execution_graph(
+ &self,
+ job_id: &str,
+ ) -> Result<Option<ExecutionGraphBox>>;
/// Persists the current state of an owned job.
///
/// Returns an error if the job is not owned by the caller.
- async fn save_job(&self, job_id: &str, graph: &ExecutionGraph) ->
Result<()>;
+ async fn save_job(&self, job_id: &str, graph: &ExecutionGraphBox) ->
Result<()>;
/// Marks an unscheduled job as failed.
///
@@ -330,7 +335,7 @@ pub trait JobState: Send + Sync {
///
/// Returns the execution graph if the job is still running and
successfully acquired,
/// otherwise returns None.
- async fn try_acquire_job(&self, job_id: &str) ->
Result<Option<ExecutionGraph>>;
+ async fn try_acquire_job(&self, job_id: &str) ->
Result<Option<ExecutionGraphBox>>;
/// Returns a stream of job state events.
async fn job_state_events(&self) -> Result<JobStateEventStream>;
@@ -764,7 +769,7 @@ mod test {
BoundTask, TopologyNode, bind_task_bias, bind_task_consistent_hash,
bind_task_round_robin,
};
- use crate::state::execution_graph::ExecutionGraph;
+ use crate::state::execution_graph::{ExecutionGraph, StaticExecutionGraph};
use crate::state::task_manager::JobInfoCache;
use crate::test_utils::{
mock_completed_task, revive_graph_and_complete_next_stage,
@@ -995,8 +1000,14 @@ mod test {
let graph_b = mock_graph("job_b", num_partition, 7).await?;
let mut active_jobs = HashMap::new();
- active_jobs.insert(graph_a.job_id().to_string(),
JobInfoCache::new(graph_a));
- active_jobs.insert(graph_b.job_id().to_string(),
JobInfoCache::new(graph_b));
+ active_jobs.insert(
+ graph_a.job_id().to_string(),
+ JobInfoCache::new(Box::new(graph_a)),
+ );
+ active_jobs.insert(
+ graph_b.job_id().to_string(),
+ JobInfoCache::new(Box::new(graph_b)),
+ );
Ok(active_jobs)
}
@@ -1005,7 +1016,7 @@ mod test {
job_id: &str,
num_target_partitions: usize,
num_pending_task: usize,
- ) -> Result<ExecutionGraph> {
+ ) -> Result<StaticExecutionGraph> {
let mut graph =
test_aggregation_plan_with_job_id(num_target_partitions,
job_id).await;
let executor = ExecutorMetadata {
diff --git a/ballista/scheduler/src/cluster/test_util/mod.rs
b/ballista/scheduler/src/cluster/test_util/mod.rs
index 625f4afad..d9e960004 100644
--- a/ballista/scheduler/src/cluster/test_util/mod.rs
+++ b/ballista/scheduler/src/cluster/test_util/mod.rs
@@ -17,7 +17,7 @@
use crate::cluster::{JobState, JobStateEvent};
use crate::scheduler_server::timestamp_millis;
-use crate::state::execution_graph::ExecutionGraph;
+use crate::state::execution_graph::ExecutionGraphBox;
use crate::test_utils::{await_condition, mock_completed_task, mock_executor};
use ballista_core::error::Result;
use ballista_core::serde::protobuf::JobStatus;
@@ -87,7 +87,7 @@ impl<S: JobState> JobStateTest<S> {
}
/// Submits a job with the given execution graph.
- pub async fn submit_job(self, graph: &ExecutionGraph) -> Result<Self> {
+ pub async fn submit_job(self, graph: &ExecutionGraphBox) -> Result<Self> {
self.state
.submit_job(graph.job_id().to_string(), graph)
.await?;
@@ -113,7 +113,7 @@ impl<S: JobState> JobStateTest<S> {
}
/// Updates the job with the given execution graph.
- pub async fn update_job(self, graph: &ExecutionGraph) -> Result<Self> {
+ pub async fn update_job(self, graph: &ExecutionGraphBox) -> Result<Self> {
self.state.save_job(graph.job_id(), graph).await?;
Ok(self)
}
@@ -172,7 +172,7 @@ impl<S: JobState> JobStateTest<S> {
/// Tests the complete job lifecycle from queued to successful.
pub async fn test_job_lifecycle<S: JobState>(
state: S,
- mut graph: ExecutionGraph,
+ mut graph: ExecutionGraphBox,
) -> Result<()> {
let test = JobStateTest::new(state).await?;
@@ -201,7 +201,7 @@ pub async fn test_job_lifecycle<S: JobState>(
/// Tests job failure during the planning phase.
pub async fn test_job_planning_failure<S: JobState>(
state: S,
- graph: ExecutionGraph,
+ graph: ExecutionGraphBox,
) -> Result<()> {
let test = JobStateTest::new(state).await?;
@@ -216,7 +216,7 @@ pub async fn test_job_planning_failure<S: JobState>(
Ok(())
}
-fn drain_tasks(graph: &mut ExecutionGraph) -> Result<()> {
+fn drain_tasks(graph: &mut ExecutionGraphBox) -> Result<()> {
let executor = mock_executor("executor-id1".to_string());
while let Some(task) = graph.pop_next_task(&executor.id)? {
let task_status = mock_completed_task(task, &executor.id);
diff --git a/ballista/scheduler/src/state/execution_graph.rs
b/ballista/scheduler/src/state/execution_graph.rs
index 8d07ae022..f37b92944 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -52,6 +52,9 @@ pub(crate) use crate::state::execution_stage::{
};
use crate::state::task_manager::UpdatedStages;
+/// Boxed [ExecutionGraph]
+pub type ExecutionGraphBox = Box<dyn ExecutionGraph + Send + Sync>;
+
/// Represents the DAG for a distributed query plan.
///
/// A distributed query plan consists of a set of stages which must be
executed sequentially.
@@ -96,8 +99,141 @@ use crate::state::task_manager::UpdatedStages;
///
/// If a stage has `output_links` is empty then it is the final stage in this
query, and it should
/// publish its outputs to the `ExecutionGraph`s `output_locations`
representing the final query results.
+pub trait ExecutionGraph: Debug {
+ /// Returns the job ID for this execution graph.
+ fn job_id(&self) -> &str;
+
+ /// Returns the job name for this execution graph.
+ fn job_name(&self) -> &str;
+
+ /// Returns the session ID associated with this job.
+ fn session_id(&self) -> &str;
+
+ /// Returns the current job status.
+ fn status(&self) -> &JobStatus;
+
+ /// Returns the timestamp when this job started execution.
+ fn start_time(&self) -> u64;
+
+ /// Returns the timestamp when this job started execution.
+ fn end_time(&self) -> u64;
+
+ /// Number of completed stages
+ fn completed_stages(&self) -> usize;
+
+ /// An ExecutionGraph is successful if all its stages are successful
+ fn is_successful(&self) -> bool;
+
+ /// Revive the execution graph by converting the resolved stages to
running stages
+ /// If any stages are converted, return true; else false.
+ fn revive(&mut self) -> bool;
+
+ /// Update task statuses and task metrics in the graph.
+ /// This will also push shuffle partitions to their respective shuffle
read stages.
+ fn update_task_status(
+ &mut self,
+ executor: &ExecutorMetadata,
+ task_statuses: Vec<TaskStatus>,
+ max_task_failures: usize,
+ max_stage_failures: usize,
+ ) -> Result<Vec<QueryStageSchedulerEvent>>;
+
+ /// Returns all the currently running stage IDs.
+ fn running_stages(&self) -> Vec<usize>;
+
+ /// Returns all currently running tasks along with the executor ID on
which they are assigned.
+ fn running_tasks(&self) -> Vec<RunningTaskInfo>;
+
+ /// Returns the total number of tasks in this plan that are ready for
scheduling.
+ fn available_tasks(&self) -> usize;
+
+ /// Fetches a running stage that has available tasks, excluding stages in
the blacklist.
+ ///
+ /// Returns a mutable reference to the running stage and the task ID
generator
+ /// if a suitable stage is found.
+ fn fetch_running_stage(
+ &mut self,
+ black_list: &[usize],
+ ) -> Option<(&mut RunningStage, &mut usize)>;
+
+ /// Updates the job status.
+ fn update_status(&mut self, status: JobStatus);
+
+ /// Returns the output partition locations for the final stage results.
+ fn output_locations(&self) -> Vec<PartitionLocation>;
+
+ /// Reset running and successful stages on a given executor
+ /// This will first check the unresolved/resolved/running stages and reset
the running tasks and successful tasks.
+ /// Then it will check the successful stage and whether there are running
parent stages need to read shuffle from it.
+ /// If yes, reset the successful tasks and roll back the resolved shuffle
recursively.
+ ///
+ /// Returns the reset stage ids and running tasks should be killed
+ fn reset_stages_on_lost_executor(
+ &mut self,
+ executor_id: &str,
+ ) -> Result<(HashSet<usize>, Vec<RunningTaskInfo>)>;
+
+ /// Converts an unresolved stage to resolved state.
+ ///
+ /// Returns true if the stage was successfully resolved, false if the stage
+ /// was not found or not in unresolved state.
+ fn resolve_stage(&mut self, stage_id: usize) -> Result<bool>;
+
+ /// Converts a running stage to successful state.
+ ///
+ /// Returns true if the stage was successfully marked as complete.
+ fn succeed_stage(&mut self, stage_id: usize) -> bool;
+
+ /// Converts a running stage to failed state with the given error message.
+ ///
+ /// Returns true if the stage was found and marked as failed.
+ fn fail_stage(&mut self, stage_id: usize, err_msg: String) -> bool;
+
+ /// Convert running stage to be unresolved,
+ /// Returns a Vec of RunningTaskInfo for running tasks in this stage.
+ fn rollback_running_stage(
+ &mut self,
+ stage_id: usize,
+ failure_reasons: HashSet<String>,
+ ) -> Result<Vec<RunningTaskInfo>>;
+
+ /// Convert resolved stage to be unresolved
+ fn rollback_resolved_stage(&mut self, stage_id: usize) -> Result<bool>;
+
+ /// Converts a successful stage back to running state for re-execution.
+ ///
+ /// This is used when some outputs from the stage have been lost and tasks
+ /// need to be re-run.
+ fn rerun_successful_stage(&mut self, stage_id: usize) -> bool;
+
+ /// fail job with error message
+ fn fail_job(&mut self, error: String);
+
+ /// Marks the job as successfully completed.
+ ///
+ /// This should only be called after all stages have completed
successfully.
+ /// Returns an error if the job is not in a successful state.
+ fn succeed_job(&mut self) -> Result<()>;
+
+ /// Exposes executions stages and stage id's
+ fn stages(&self) -> &HashMap<usize, ExecutionStage>;
+
+ /// returns next task to run
+ /// (used for testing only)
+ #[cfg(test)]
+ fn pop_next_task(&mut self, executor_id: &str) ->
Result<Option<TaskDescription>>;
+
+ /// Returns the total number of stages in this execution graph.
+ fn stage_count(&self) -> usize;
+
+ /// Clones execution graph
+ fn cloned(&self) -> ExecutionGraphBox;
+}
+
+/// [ExecutionGraph] implementation which generates
+/// all stages on job submission time
#[derive(Clone)]
-pub struct ExecutionGraph {
+pub struct StaticExecutionGraph {
/// Curator scheduler name. Can be `None` is `ExecutionGraph` is not
currently curated by any scheduler
#[allow(dead_code)] // not used at the moment, will be used later
scheduler_id: Option<String>,
@@ -117,9 +253,7 @@ pub struct ExecutionGraph {
end_time: u64,
/// Map from Stage ID -> ExecutionStage
stages: HashMap<usize, ExecutionStage>,
- /// Total number fo output partitions
- #[allow(dead_code)] // not used at the moment, will be used later
- output_partitions: usize,
+
/// Locations of this `ExecutionGraph` final output locations
output_locations: Vec<PartitionLocation>,
/// Task ID generator, generate unique TID in the execution graph
@@ -149,7 +283,7 @@ pub struct RunningTaskInfo {
pub executor_id: String,
}
-impl ExecutionGraph {
+impl StaticExecutionGraph {
/// Creates a new `ExecutionGraph` from a physical execution plan.
///
/// This will use the `DistributedPlanner` to break the plan into stages
@@ -165,7 +299,6 @@ impl ExecutionGraph {
session_config: Arc<SessionConfig>,
planner: &mut dyn DistributedPlanner,
) -> Result<Self> {
- let output_partitions =
plan.properties().output_partitioning().partition_count();
let shuffle_stages =
planner.plan_query_stages(job_id, plan, session_config.options())?;
@@ -193,7 +326,6 @@ impl ExecutionGraph {
start_time: started_at,
end_time: 0,
stages,
- output_partitions,
output_locations: vec![],
task_id_gen: 0,
failed_stage_attempts: HashMap::new(),
@@ -201,101 +333,372 @@ impl ExecutionGraph {
})
}
- /// Returns the job ID for this execution graph.
- pub fn job_id(&self) -> &str {
- self.job_id.as_str()
+ #[cfg(test)]
+ fn next_task_id(&mut self) -> usize {
+ let new_tid = self.task_id_gen;
+ self.task_id_gen += 1;
+ new_tid
}
- /// Returns the job name for this execution graph.
- pub fn job_name(&self) -> &str {
- self.job_name.as_str()
- }
+ /// Processing stage status update after task status changing
+ fn processing_stages_update(
+ &mut self,
+ updated_stages: UpdatedStages,
+ ) -> Result<Vec<QueryStageSchedulerEvent>> {
+ let job_id = self.job_id().to_owned();
+ let mut has_resolved = false;
+ let mut job_err_msg = "".to_owned();
- /// Returns the session ID associated with this job.
- pub fn session_id(&self) -> &str {
- self.session_id.as_str()
- }
+ for stage_id in updated_stages.resolved_stages {
+ self.resolve_stage(stage_id)?;
+ has_resolved = true;
+ }
- /// Returns the current job status.
- pub fn status(&self) -> &JobStatus {
- &self.status
- }
+ for stage_id in updated_stages.successful_stages {
+ self.succeed_stage(stage_id);
+ }
- /// Returns the timestamp when this job started execution.
- pub fn start_time(&self) -> u64 {
- self.start_time
- }
+ // Fail the stage and also abort the job
+ for (stage_id, err_msg) in &updated_stages.failed_stages {
+ job_err_msg =
+ format!("Job failed due to stage {stage_id} failed:
{err_msg}\n");
+ }
- /// Returns the timestamp when this job completed (0 if still running).
- pub fn end_time(&self) -> u64 {
- self.end_time
- }
+ let mut events = vec![];
+ // Only handle the rollback logic when there are no failed stages
+ if updated_stages.failed_stages.is_empty() {
+ let mut running_tasks_to_cancel = vec![];
+ for (stage_id, failure_reasons) in
updated_stages.rollback_running_stages {
+ let tasks = self.rollback_running_stage(stage_id,
failure_reasons)?;
+ running_tasks_to_cancel.extend(tasks);
+ }
- /// Returns the total number of stages in this execution graph.
- pub fn stage_count(&self) -> usize {
- self.stages.len()
- }
+ for stage_id in updated_stages.resubmit_successful_stages {
+ self.rerun_successful_stage(stage_id);
+ }
- /// Generates and returns the next unique task ID for this execution graph.
- pub fn next_task_id(&mut self) -> usize {
- let new_tid = self.task_id_gen;
- self.task_id_gen += 1;
- new_tid
- }
+ if !running_tasks_to_cancel.is_empty() {
+ events.push(QueryStageSchedulerEvent::CancelTasks(
+ running_tasks_to_cancel,
+ ));
+ }
+ }
- /// Exposes executions stages and stage id's
- pub fn stages(&self) -> &HashMap<usize, ExecutionStage> {
- &self.stages
+ if !updated_stages.failed_stages.is_empty() {
+ info!("Job {job_id} is failed");
+ self.fail_job(job_err_msg.clone());
+ events.push(QueryStageSchedulerEvent::JobRunningFailed {
+ job_id,
+ fail_message: job_err_msg,
+ queued_at: self.queued_at,
+ failed_at: timestamp_millis(),
+ });
+ } else if self.is_successful() {
+ // If this ExecutionGraph is successful, finish it
+ info!("Job {job_id} is success, finalizing output partitions");
+ self.succeed_job()?;
+ events.push(QueryStageSchedulerEvent::JobFinished {
+ job_id,
+ queued_at: self.queued_at,
+ completed_at: timestamp_millis(),
+ });
+ } else if has_resolved {
+ events.push(QueryStageSchedulerEvent::JobUpdated(job_id))
+ }
+ Ok(events)
}
- /// An ExecutionGraph is successful if all its stages are successful
- pub fn is_successful(&self) -> bool {
- self.stages
- .values()
- .all(|s| matches!(s, ExecutionStage::Successful(_)))
- }
+ /// Return a Vec of resolvable stage ids
+ fn update_stage_output_links(
+ &mut self,
+ stage_id: usize,
+ is_completed: bool,
+ locations: Vec<PartitionLocation>,
+ output_links: Vec<usize>,
+ ) -> Result<Vec<usize>> {
+ let mut resolved_stages = vec![];
+ let job_id = &self.job_id;
+ if output_links.is_empty() {
+ // If `output_links` is empty, then this is a final stage
+ self.output_locations.extend(locations);
+ } else {
+ for link in output_links.iter() {
+ // If this is an intermediate stage, we need to push its
`PartitionLocation`s to the parent stage
+ if let Some(linked_stage) = self.stages.get_mut(link) {
+ if let ExecutionStage::UnResolved(linked_unresolved_stage)
=
+ linked_stage
+ {
+ linked_unresolved_stage
+ .add_input_partitions(stage_id,
locations.clone())?;
- /// Returns true if all stages in this graph have completed successfully.
- pub fn is_complete(&self) -> bool {
- self.stages
- .values()
- .all(|s| matches!(s, ExecutionStage::Successful(_)))
- }
+ // If all tasks for this stage are complete, mark the
input complete in the parent stage
+ if is_completed {
+ linked_unresolved_stage.complete_input(stage_id);
+ }
- /// Revive the execution graph by converting the resolved stages to
running stages
- /// If any stages are converted, return true; else false.
- pub fn revive(&mut self) -> bool {
- let running_stages = self
- .stages
- .values()
- .filter_map(|stage| {
- if let ExecutionStage::Resolved(resolved_stage) = stage {
- Some(resolved_stage.to_running())
+ // If all input partitions are ready, we can resolve
any UnresolvedShuffleExec in the parent stage plan
+ if linked_unresolved_stage.resolvable() {
+
resolved_stages.push(linked_unresolved_stage.stage_id);
+ }
+ } else {
+ return Err(BallistaError::Internal(format!(
+ "Error updating job {job_id}: The stage {link} as
the output link of stage {stage_id} should be unresolved"
+ )));
+ }
} else {
- None
+ return Err(BallistaError::Internal(format!(
+ "Error updating job {job_id}: Invalid output link
{stage_id} for stage {link}"
+ )));
}
- })
- .collect::<Vec<_>>();
-
- if running_stages.is_empty() {
- false
- } else {
- for running_stage in running_stages {
- self.stages.insert(
- running_stage.stage_id,
- ExecutionStage::Running(running_stage),
- );
}
- true
}
+ Ok(resolved_stages)
}
- /// Update task statuses and task metrics in the graph.
- /// This will also push shuffle partitions to their respective shuffle
read stages.
- pub fn update_task_status(
- &mut self,
- executor: &ExecutorMetadata,
- task_statuses: Vec<TaskStatus>,
+ fn get_running_stage_id(&mut self, black_list: &[usize]) -> Option<usize> {
+ let mut running_stage_id = self.stages.iter().find_map(|(stage_id,
stage)| {
+ if black_list.contains(stage_id) {
+ None
+ } else if let ExecutionStage::Running(stage) = stage {
+ if stage.available_tasks() > 0 {
+ Some(*stage_id)
+ } else {
+ None
+ }
+ } else {
+ None
+ }
+ });
+
+ // If no available tasks found in the running stage,
+ // try to find a resolved stage and convert it to the running stage
+ if running_stage_id.is_none() {
+ if self.revive() {
+ running_stage_id = self.get_running_stage_id(black_list);
+ } else {
+ running_stage_id = None;
+ }
+ }
+
+ running_stage_id
+ }
+
+ fn reset_stages_internal(
+ &mut self,
+ executor_id: &str,
+ ) -> Result<(HashSet<usize>, Vec<RunningTaskInfo>)> {
+ let job_id = self.job_id.clone();
+ // collect the input stages that need to resubmit
+ let mut resubmit_inputs: HashSet<usize> = HashSet::new();
+
+ let mut reset_running_stage = HashSet::new();
+ let mut rollback_resolved_stages = HashSet::new();
+ let mut rollback_running_stages = HashSet::new();
+ let mut resubmit_successful_stages = HashSet::new();
+
+ let mut empty_inputs: HashMap<usize, StageOutput> = HashMap::new();
+ // check the unresolved, resolved and running stages
+ self.stages
+ .iter_mut()
+ .for_each(|(stage_id, stage)| {
+ let stage_inputs = match stage {
+ ExecutionStage::UnResolved(stage) => {
+ &mut stage.inputs
+ }
+ ExecutionStage::Resolved(stage) => {
+ &mut stage.inputs
+ }
+ ExecutionStage::Running(stage) => {
+ let reset = stage.reset_tasks(executor_id);
+ if reset > 0 {
+ warn!(
+ "Reset {reset} tasks for running job/stage
{job_id}/{stage_id} on lost Executor {executor_id}"
+ );
+ reset_running_stage.insert(*stage_id);
+ }
+ &mut stage.inputs
+ }
+ _ => &mut empty_inputs
+ };
+
+ // For each stage input, check whether there are input
locations match that executor
+ // and calculate the resubmit input stages if the input stages
are successful.
+ let mut rollback_stage = false;
+ stage_inputs.iter_mut().for_each(|(input_stage_id,
stage_output)| {
+ let mut match_found = false;
+ stage_output.partition_locations.iter_mut().for_each(
+ |(_partition, locs)| {
+ let before_len = locs.len();
+ locs.retain(|loc| loc.executor_meta.id !=
executor_id);
+ if locs.len() < before_len {
+ match_found = true;
+ }
+ },
+ );
+ if match_found {
+ stage_output.complete = false;
+ rollback_stage = true;
+ resubmit_inputs.insert(*input_stage_id);
+ }
+ });
+
+ if rollback_stage {
+ match stage {
+ ExecutionStage::Resolved(_) => {
+ rollback_resolved_stages.insert(*stage_id);
+ warn!(
+ "Roll back resolved job/stage {job_id}/{stage_id}
and change ShuffleReaderExec back to UnresolvedShuffleExec");
+ }
+ ExecutionStage::Running(_) => {
+ rollback_running_stages.insert(*stage_id);
+ warn!(
+ "Roll back running job/stage {job_id}/{stage_id}
and change ShuffleReaderExec back to UnresolvedShuffleExec");
+ }
+ _ => {}
+ }
+ }
+ });
+
+ // check and reset the successful stages
+ if !resubmit_inputs.is_empty() {
+ self.stages
+ .iter_mut()
+ .filter(|(stage_id, _stage)|
resubmit_inputs.contains(stage_id))
+ .filter_map(|(_stage_id, stage)| {
+ if let ExecutionStage::Successful(success) = stage {
+ Some(success)
+ } else {
+ None
+ }
+ })
+ .for_each(|stage| {
+ let reset = stage.reset_tasks(executor_id);
+ if reset > 0 {
+ resubmit_successful_stages.insert(stage.stage_id);
+ warn!(
+ "Reset {} tasks for successful job/stage {}/{} on
lost Executor {}",
+ reset, job_id, stage.stage_id, executor_id
+ )
+ }
+ });
+ }
+
+ for stage_id in rollback_resolved_stages.iter() {
+ self.rollback_resolved_stage(*stage_id)?;
+ }
+
+ let mut all_running_tasks = vec![];
+ for stage_id in rollback_running_stages.iter() {
+ let tasks = self.rollback_running_stage(
+ *stage_id,
+ HashSet::from([executor_id.to_owned()]),
+ )?;
+ all_running_tasks.extend(tasks);
+ }
+
+ for stage_id in resubmit_successful_stages.iter() {
+ self.rerun_successful_stage(*stage_id);
+ }
+
+ let mut reset_stage = HashSet::new();
+ reset_stage.extend(reset_running_stage);
+ reset_stage.extend(rollback_resolved_stages);
+ reset_stage.extend(rollback_running_stages);
+ reset_stage.extend(resubmit_successful_stages);
+ Ok((reset_stage, all_running_tasks))
+ }
+
+ /// Clear the stage failure count for this stage if the stage is finally
success
+ fn clear_stage_failure(&mut self, stage_id: usize) {
+ self.failed_stage_attempts.remove(&stage_id);
+ }
+}
+
+impl ExecutionGraph for StaticExecutionGraph {
+ fn cloned(&self) -> ExecutionGraphBox {
+ Box::new(self.clone())
+ }
+
+ fn job_id(&self) -> &str {
+ self.job_id.as_str()
+ }
+
+ fn job_name(&self) -> &str {
+ self.job_name.as_str()
+ }
+
+ fn session_id(&self) -> &str {
+ self.session_id.as_str()
+ }
+
+ fn status(&self) -> &JobStatus {
+ &self.status
+ }
+
+ fn start_time(&self) -> u64 {
+ self.start_time
+ }
+
+ fn end_time(&self) -> u64 {
+ self.end_time
+ }
+
+ fn completed_stages(&self) -> usize {
+ let mut completed_stages = 0;
+ for stage in self.stages.values() {
+ if let ExecutionStage::Successful(_) = stage {
+ completed_stages += 1;
+ }
+ }
+ completed_stages
+ }
+ /// An ExecutionGraph is successful if all its stages are successful
+ fn is_successful(&self) -> bool {
+ self.stages
+ .values()
+ .all(|s| matches!(s, ExecutionStage::Successful(_)))
+ }
+
+ // pub fn is_complete(&self) -> bool {
+ // self.stages
+ // .values()
+ // .all(|s| matches!(s, ExecutionStage::Successful(_)))
+ // }
+
+ /// Revive the execution graph by converting the resolved stages to
running stages
+ /// If any stages are converted, return true; else false.
+ fn revive(&mut self) -> bool {
+ let running_stages = self
+ .stages
+ .values()
+ .filter_map(|stage| {
+ if let ExecutionStage::Resolved(resolved_stage) = stage {
+ Some(resolved_stage.to_running())
+ } else {
+ None
+ }
+ })
+ .collect::<Vec<_>>();
+
+ if running_stages.is_empty() {
+ false
+ } else {
+ for running_stage in running_stages {
+ self.stages.insert(
+ running_stage.stage_id,
+ ExecutionStage::Running(running_stage),
+ );
+ }
+ true
+ }
+ }
+
+ /// Update task statuses and task metrics in the graph.
+ /// This will also push shuffle partitions to their respective shuffle
read stages.
+ fn update_task_status(
+ &mut self,
+ executor: &ExecutorMetadata,
+ task_statuses: Vec<TaskStatus>,
max_task_failures: usize,
max_stage_failures: usize,
) -> Result<Vec<QueryStageSchedulerEvent>> {
@@ -687,123 +1090,8 @@ impl ExecutionGraph {
})
}
- /// Processing stage status update after task status changing
- fn processing_stages_update(
- &mut self,
- updated_stages: UpdatedStages,
- ) -> Result<Vec<QueryStageSchedulerEvent>> {
- let job_id = self.job_id().to_owned();
- let mut has_resolved = false;
- let mut job_err_msg = "".to_owned();
-
- for stage_id in updated_stages.resolved_stages {
- self.resolve_stage(stage_id)?;
- has_resolved = true;
- }
-
- for stage_id in updated_stages.successful_stages {
- self.succeed_stage(stage_id);
- }
-
- // Fail the stage and also abort the job
- for (stage_id, err_msg) in &updated_stages.failed_stages {
- job_err_msg =
- format!("Job failed due to stage {stage_id} failed:
{err_msg}\n");
- }
-
- let mut events = vec![];
- // Only handle the rollback logic when there are no failed stages
- if updated_stages.failed_stages.is_empty() {
- let mut running_tasks_to_cancel = vec![];
- for (stage_id, failure_reasons) in
updated_stages.rollback_running_stages {
- let tasks = self.rollback_running_stage(stage_id,
failure_reasons)?;
- running_tasks_to_cancel.extend(tasks);
- }
-
- for stage_id in updated_stages.resubmit_successful_stages {
- self.rerun_successful_stage(stage_id);
- }
-
- if !running_tasks_to_cancel.is_empty() {
- events.push(QueryStageSchedulerEvent::CancelTasks(
- running_tasks_to_cancel,
- ));
- }
- }
-
- if !updated_stages.failed_stages.is_empty() {
- info!("Job {job_id} is failed");
- self.fail_job(job_err_msg.clone());
- events.push(QueryStageSchedulerEvent::JobRunningFailed {
- job_id,
- fail_message: job_err_msg,
- queued_at: self.queued_at,
- failed_at: timestamp_millis(),
- });
- } else if self.is_successful() {
- // If this ExecutionGraph is successful, finish it
- info!("Job {job_id} is success, finalizing output partitions");
- self.succeed_job()?;
- events.push(QueryStageSchedulerEvent::JobFinished {
- job_id,
- queued_at: self.queued_at,
- completed_at: timestamp_millis(),
- });
- } else if has_resolved {
- events.push(QueryStageSchedulerEvent::JobUpdated(job_id))
- }
- Ok(events)
- }
-
- /// Return a Vec of resolvable stage ids
- fn update_stage_output_links(
- &mut self,
- stage_id: usize,
- is_completed: bool,
- locations: Vec<PartitionLocation>,
- output_links: Vec<usize>,
- ) -> Result<Vec<usize>> {
- let mut resolved_stages = vec![];
- let job_id = &self.job_id;
- if output_links.is_empty() {
- // If `output_links` is empty, then this is a final stage
- self.output_locations.extend(locations);
- } else {
- for link in output_links.iter() {
- // If this is an intermediate stage, we need to push its
`PartitionLocation`s to the parent stage
- if let Some(linked_stage) = self.stages.get_mut(link) {
- if let ExecutionStage::UnResolved(linked_unresolved_stage)
=
- linked_stage
- {
- linked_unresolved_stage
- .add_input_partitions(stage_id,
locations.clone())?;
-
- // If all tasks for this stage are complete, mark the
input complete in the parent stage
- if is_completed {
- linked_unresolved_stage.complete_input(stage_id);
- }
-
- // If all input partitions are ready, we can resolve
any UnresolvedShuffleExec in the parent stage plan
- if linked_unresolved_stage.resolvable() {
-
resolved_stages.push(linked_unresolved_stage.stage_id);
- }
- } else {
- return Err(BallistaError::Internal(format!(
- "Error updating job {job_id}: The stage {link} as
the output link of stage {stage_id} should be unresolved"
- )));
- }
- } else {
- return Err(BallistaError::Internal(format!(
- "Error updating job {job_id}: Invalid output link
{stage_id} for stage {link}"
- )));
- }
- }
- }
- Ok(resolved_stages)
- }
-
- /// Returns all the currently running stage IDs.
- pub fn running_stages(&self) -> Vec<usize> {
+ /// Return all the currently running stage ids
+ fn running_stages(&self) -> Vec<usize> {
self.stages
.iter()
.filter_map(|(stage_id, stage)| {
@@ -816,8 +1104,8 @@ impl ExecutionGraph {
.collect::<Vec<_>>()
}
- /// Returns all currently running tasks along with the executor ID on
which they are assigned.
- pub fn running_tasks(&self) -> Vec<RunningTaskInfo> {
+ /// Return all currently running tasks along with the executor ID on which
they are assigned
+ fn running_tasks(&self) -> Vec<RunningTaskInfo> {
self.stages
.iter()
.flat_map(|(_, stage)| {
@@ -842,8 +1130,8 @@ impl ExecutionGraph {
.collect::<Vec<RunningTaskInfo>>()
}
- /// Returns the total number of tasks in this plan that are ready for
scheduling.
- pub fn available_tasks(&self) -> usize {
+ /// Total number of tasks in this plan that are ready for scheduling
+ fn available_tasks(&self) -> usize {
self.stages
.values()
.map(|stage| {
@@ -856,119 +1144,7 @@ impl ExecutionGraph {
.sum()
}
- /// Get next task that can be assigned to the given executor.
- /// This method should only be called when the resulting task is
immediately
- /// being launched as the status will be set to Running and it will not be
- /// available to the scheduler.
- /// If the task is not launched the status must be reset to allow the task
to
- /// be scheduled elsewhere.
- pub fn pop_next_task(
- &mut self,
- executor_id: &str,
- ) -> Result<Option<TaskDescription>> {
- if matches!(
- self.status,
- JobStatus {
- status: Some(job_status::Status::Failed(_)),
- ..
- }
- ) {
- warn!("Call pop_next_task on failed Job");
- return Ok(None);
- }
-
- let job_id = self.job_id.clone();
- let session_id = self.session_id.clone();
-
- let find_candidate = self.stages.iter().any(|(_stage_id, stage)| {
- if let ExecutionStage::Running(stage) = stage {
- stage.available_tasks() > 0
- } else {
- false
- }
- });
- let next_task_id = if find_candidate {
- Some(self.next_task_id())
- } else {
- None
- };
-
- let mut next_task = self.stages.iter_mut().find(|(_stage_id, stage)| {
- if let ExecutionStage::Running(stage) = stage {
- stage.available_tasks() > 0
- } else {
- false
- }
- }).map(|(stage_id, stage)| {
- if let ExecutionStage::Running(stage) = stage {
- let (partition_id, _) = stage
- .task_infos
- .iter()
- .enumerate()
- .find(|(_partition, info)| info.is_none())
- .ok_or_else(|| {
- BallistaError::Internal(format!("Error getting next
task for job {job_id}: Stage {stage_id} is ready but has no pending tasks"))
- })?;
-
- let partition = PartitionId {
- job_id,
- stage_id: *stage_id,
- partition_id,
- };
-
- let task_id = next_task_id.unwrap();
- let task_attempt = stage.task_failure_numbers[partition_id];
- let task_info = TaskInfo {
- task_id,
- scheduled_time: SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .unwrap()
- .as_millis(),
- // Those times will be updated when the task finish
- launch_time: 0,
- start_exec_time: 0,
- end_exec_time: 0,
- finish_time: 0,
- task_status: task_status::Status::Running(RunningTask {
- executor_id: executor_id.to_owned()
- }),
- };
-
- // Set the task info to Running for new task
- stage.task_infos[partition_id] = Some(task_info);
-
- Ok(TaskDescription {
- session_id,
- partition,
- stage_attempt_num: stage.stage_attempt_num,
- task_id,
- task_attempt,
- plan: stage.plan.clone(),
- session_config: self.session_config.clone()
- })
- } else {
- Err(BallistaError::General(format!("Stage {stage_id} is not a
running stage")))
- }
- }).transpose()?;
-
- // If no available tasks found in the running stage,
- // try to find a resolved stage and convert it to the running stage
- if next_task.is_none() {
- if self.revive() {
- next_task = self.pop_next_task(executor_id)?;
- } else {
- next_task = None;
- }
- }
-
- Ok(next_task)
- }
-
- /// Fetches a running stage that has available tasks, excluding stages in
the blacklist.
- ///
- /// Returns a mutable reference to the running stage and the task ID
generator
- /// if a suitable stage is found.
- pub fn fetch_running_stage(
+ fn fetch_running_stage(
&mut self,
black_list: &[usize],
) -> Option<(&mut RunningStage, &mut usize)> {
@@ -998,41 +1174,11 @@ impl ExecutionGraph {
}
}
- fn get_running_stage_id(&mut self, black_list: &[usize]) -> Option<usize> {
- let mut running_stage_id = self.stages.iter().find_map(|(stage_id,
stage)| {
- if black_list.contains(stage_id) {
- None
- } else if let ExecutionStage::Running(stage) = stage {
- if stage.available_tasks() > 0 {
- Some(*stage_id)
- } else {
- None
- }
- } else {
- None
- }
- });
-
- // If no available tasks found in the running stage,
- // try to find a resolved stage and convert it to the running stage
- if running_stage_id.is_none() {
- if self.revive() {
- running_stage_id = self.get_running_stage_id(black_list);
- } else {
- running_stage_id = None;
- }
- }
-
- running_stage_id
- }
-
- /// Updates the job status.
- pub fn update_status(&mut self, status: JobStatus) {
+ fn update_status(&mut self, status: JobStatus) {
self.status = status;
}
- /// Returns the output partition locations for the final stage results.
- pub fn output_locations(&self) -> Vec<PartitionLocation> {
+ fn output_locations(&self) -> Vec<PartitionLocation> {
self.output_locations.clone()
}
@@ -1042,7 +1188,7 @@ impl ExecutionGraph {
/// If yes, reset the successful tasks and roll back the resolved shuffle
recursively.
///
/// Returns the reset stage ids and running tasks should be killed
- pub fn reset_stages_on_lost_executor(
+ fn reset_stages_on_lost_executor(
&mut self,
executor_id: &str,
) -> Result<(HashSet<usize>, Vec<RunningTaskInfo>)> {
@@ -1059,136 +1205,8 @@ impl ExecutionGraph {
}
}
- fn reset_stages_internal(
- &mut self,
- executor_id: &str,
- ) -> Result<(HashSet<usize>, Vec<RunningTaskInfo>)> {
- let job_id = self.job_id.clone();
- // collect the input stages that need to resubmit
- let mut resubmit_inputs: HashSet<usize> = HashSet::new();
-
- let mut reset_running_stage = HashSet::new();
- let mut rollback_resolved_stages = HashSet::new();
- let mut rollback_running_stages = HashSet::new();
- let mut resubmit_successful_stages = HashSet::new();
-
- let mut empty_inputs: HashMap<usize, StageOutput> = HashMap::new();
- // check the unresolved, resolved and running stages
- self.stages
- .iter_mut()
- .for_each(|(stage_id, stage)| {
- let stage_inputs = match stage {
- ExecutionStage::UnResolved(stage) => {
- &mut stage.inputs
- }
- ExecutionStage::Resolved(stage) => {
- &mut stage.inputs
- }
- ExecutionStage::Running(stage) => {
- let reset = stage.reset_tasks(executor_id);
- if reset > 0 {
- warn!(
- "Reset {reset} tasks for running job/stage
{job_id}/{stage_id} on lost Executor {executor_id}"
- );
- reset_running_stage.insert(*stage_id);
- }
- &mut stage.inputs
- }
- _ => &mut empty_inputs
- };
-
- // For each stage input, check whether there are input
locations match that executor
- // and calculate the resubmit input stages if the input stages
are successful.
- let mut rollback_stage = false;
- stage_inputs.iter_mut().for_each(|(input_stage_id,
stage_output)| {
- let mut match_found = false;
- stage_output.partition_locations.iter_mut().for_each(
- |(_partition, locs)| {
- let before_len = locs.len();
- locs.retain(|loc| loc.executor_meta.id !=
executor_id);
- if locs.len() < before_len {
- match_found = true;
- }
- },
- );
- if match_found {
- stage_output.complete = false;
- rollback_stage = true;
- resubmit_inputs.insert(*input_stage_id);
- }
- });
-
- if rollback_stage {
- match stage {
- ExecutionStage::Resolved(_) => {
- rollback_resolved_stages.insert(*stage_id);
- warn!(
- "Roll back resolved job/stage {job_id}/{stage_id}
and change ShuffleReaderExec back to UnresolvedShuffleExec");
- }
- ExecutionStage::Running(_) => {
- rollback_running_stages.insert(*stage_id);
- warn!(
- "Roll back running job/stage {job_id}/{stage_id}
and change ShuffleReaderExec back to UnresolvedShuffleExec");
- }
- _ => {}
- }
- }
- });
-
- // check and reset the successful stages
- if !resubmit_inputs.is_empty() {
- self.stages
- .iter_mut()
- .filter(|(stage_id, _stage)|
resubmit_inputs.contains(stage_id))
- .filter_map(|(_stage_id, stage)| {
- if let ExecutionStage::Successful(success) = stage {
- Some(success)
- } else {
- None
- }
- })
- .for_each(|stage| {
- let reset = stage.reset_tasks(executor_id);
- if reset > 0 {
- resubmit_successful_stages.insert(stage.stage_id);
- warn!(
- "Reset {} tasks for successful job/stage {}/{} on
lost Executor {}",
- reset, job_id, stage.stage_id, executor_id
- )
- }
- });
- }
-
- for stage_id in rollback_resolved_stages.iter() {
- self.rollback_resolved_stage(*stage_id)?;
- }
-
- let mut all_running_tasks = vec![];
- for stage_id in rollback_running_stages.iter() {
- let tasks = self.rollback_running_stage(
- *stage_id,
- HashSet::from([executor_id.to_owned()]),
- )?;
- all_running_tasks.extend(tasks);
- }
-
- for stage_id in resubmit_successful_stages.iter() {
- self.rerun_successful_stage(*stage_id);
- }
-
- let mut reset_stage = HashSet::new();
- reset_stage.extend(reset_running_stage);
- reset_stage.extend(rollback_resolved_stages);
- reset_stage.extend(rollback_running_stages);
- reset_stage.extend(resubmit_successful_stages);
- Ok((reset_stage, all_running_tasks))
- }
-
- /// Converts an unresolved stage to resolved state.
- ///
- /// Returns true if the stage was successfully resolved, false if the stage
- /// was not found or not in unresolved state.
- pub fn resolve_stage(&mut self, stage_id: usize) -> Result<bool> {
+ /// Convert unresolved stage to be resolved
+ fn resolve_stage(&mut self, stage_id: usize) -> Result<bool> {
if let Some(ExecutionStage::UnResolved(stage)) =
self.stages.remove(&stage_id) {
self.stages.insert(
stage_id,
@@ -1207,10 +1225,8 @@ impl ExecutionGraph {
}
}
- /// Converts a running stage to successful state.
- ///
- /// Returns true if the stage was successfully marked as complete.
- pub fn succeed_stage(&mut self, stage_id: usize) -> bool {
+ /// Convert running stage to be successful
+ fn succeed_stage(&mut self, stage_id: usize) -> bool {
if let Some(ExecutionStage::Running(stage)) =
self.stages.remove(&stage_id) {
self.stages
.insert(stage_id,
ExecutionStage::Successful(stage.to_successful()));
@@ -1226,10 +1242,8 @@ impl ExecutionGraph {
}
}
- /// Converts a running stage to failed state with the given error message.
- ///
- /// Returns true if the stage was found and marked as failed.
- pub fn fail_stage(&mut self, stage_id: usize, err_msg: String) -> bool {
+ /// Convert running stage to be failed
+ fn fail_stage(&mut self, stage_id: usize, err_msg: String) -> bool {
if let Some(ExecutionStage::Running(stage)) =
self.stages.remove(&stage_id) {
self.stages
.insert(stage_id,
ExecutionStage::Failed(stage.to_failed(err_msg)));
@@ -1246,7 +1260,7 @@ impl ExecutionGraph {
/// Convert running stage to be unresolved,
/// Returns a Vec of RunningTaskInfo for running tasks in this stage.
- pub fn rollback_running_stage(
+ fn rollback_running_stage(
&mut self,
stage_id: usize,
failure_reasons: HashSet<String>,
@@ -1281,7 +1295,7 @@ impl ExecutionGraph {
}
/// Convert resolved stage to be unresolved
- pub fn rollback_resolved_stage(&mut self, stage_id: usize) -> Result<bool>
{
+ fn rollback_resolved_stage(&mut self, stage_id: usize) -> Result<bool> {
if let Some(ExecutionStage::Resolved(stage)) =
self.stages.remove(&stage_id) {
self.stages
.insert(stage_id,
ExecutionStage::UnResolved(stage.to_unresolved()?));
@@ -1296,11 +1310,8 @@ impl ExecutionGraph {
}
}
- /// Converts a successful stage back to running state for re-execution.
- ///
- /// This is used when some outputs from the stage have been lost and tasks
- /// need to be re-run.
- pub fn rerun_successful_stage(&mut self, stage_id: usize) -> bool {
+ /// Convert successful stage to be running
+ fn rerun_successful_stage(&mut self, stage_id: usize) -> bool {
if let Some(ExecutionStage::Successful(stage)) =
self.stages.remove(&stage_id) {
self.stages
.insert(stage_id, ExecutionStage::Running(stage.to_running()));
@@ -1316,7 +1327,7 @@ impl ExecutionGraph {
}
/// fail job with error message
- pub fn fail_job(&mut self, error: String) {
+ fn fail_job(&mut self, error: String) {
self.status = JobStatus {
job_id: self.job_id.clone(),
job_name: self.job_name.clone(),
@@ -1329,11 +1340,8 @@ impl ExecutionGraph {
};
}
- /// Marks the job as successfully completed.
- ///
- /// This should only be called after all stages have completed
successfully.
- /// Returns an error if the job is not in a successful state.
- pub fn succeed_job(&mut self) -> Result<()> {
+ /// Mark the job success
+ fn succeed_job(&mut self) -> Result<()> {
if !self.is_successful() {
return Err(BallistaError::Internal(format!(
"Attempt to finalize an incomplete job {}",
@@ -1367,13 +1375,122 @@ impl ExecutionGraph {
Ok(())
}
- /// Clear the stage failure count for this stage if the stage is finally
success
- fn clear_stage_failure(&mut self, stage_id: usize) {
- self.failed_stage_attempts.remove(&stage_id);
+ fn stages(&self) -> &HashMap<usize, ExecutionStage> {
+ &self.stages
+ }
+
+ fn stage_count(&self) -> usize {
+ self.stages.len()
+ }
+
+ /// Get next task that can be assigned to the given executor.
+ /// This method should only be called when the resulting task is
immediately
+ /// being launched as the status will be set to Running and it will not be
+ /// available to the scheduler.
+ /// If the task is not launched the status must be reset to allow the task
to
+ /// be scheduled elsewhere.
+ #[cfg(test)]
+ fn pop_next_task(&mut self, executor_id: &str) ->
Result<Option<TaskDescription>> {
+ if matches!(
+ self.status,
+ JobStatus {
+ status: Some(job_status::Status::Failed(_)),
+ ..
+ }
+ ) {
+ warn!("Call pop_next_task on failed Job");
+ return Ok(None);
+ }
+
+ let job_id = self.job_id.clone();
+ let session_id = self.session_id.clone();
+
+ let find_candidate = self.stages.iter().any(|(_stage_id, stage)| {
+ if let ExecutionStage::Running(stage) = stage {
+ stage.available_tasks() > 0
+ } else {
+ false
+ }
+ });
+ let next_task_id = if find_candidate {
+ Some(self.next_task_id())
+ } else {
+ None
+ };
+
+ let mut next_task = self.stages.iter_mut().find(|(_stage_id, stage)| {
+ if let ExecutionStage::Running(stage) = stage {
+ stage.available_tasks() > 0
+ } else {
+ false
+ }
+ }).map(|(stage_id, stage)| {
+ if let ExecutionStage::Running(stage) = stage {
+ let (partition_id, _) = stage
+ .task_infos
+ .iter()
+ .enumerate()
+ .find(|(_partition, info)| info.is_none())
+ .ok_or_else(|| {
+ BallistaError::Internal(format!("Error getting next
task for job {job_id}: Stage {stage_id} is ready but has no pending tasks"))
+ })?;
+
+ let partition = PartitionId {
+ job_id,
+ stage_id: *stage_id,
+ partition_id,
+ };
+
+ let task_id = next_task_id.unwrap();
+ let task_attempt = stage.task_failure_numbers[partition_id];
+ let task_info = TaskInfo {
+ task_id,
+ scheduled_time: SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap()
+ .as_millis(),
+ // Those times will be updated when the task finish
+ launch_time: 0,
+ start_exec_time: 0,
+ end_exec_time: 0,
+ finish_time: 0,
+ task_status: task_status::Status::Running(RunningTask {
+ executor_id: executor_id.to_owned()
+ }),
+ };
+
+ // Set the task info to Running for new task
+ stage.task_infos[partition_id] = Some(task_info);
+
+ Ok(TaskDescription {
+ session_id,
+ partition,
+ stage_attempt_num: stage.stage_attempt_num,
+ task_id,
+ task_attempt,
+ plan: stage.plan.clone(),
+ session_config: self.session_config.clone()
+ })
+ } else {
+ Err(BallistaError::General(format!("Stage {stage_id} is not a
running stage")))
+ }
+ }).transpose()?;
+
+ // If no available tasks found in the running stage,
+ // try to find a resolved stage and convert it to the running stage
+ if next_task.is_none() {
+ if self.revive() {
+ next_task = self.pop_next_task(executor_id)?;
+ } else {
+ next_task = None;
+ }
+ }
+
+ Ok(next_task)
}
}
-impl Debug for ExecutionGraph {
+impl Debug for StaticExecutionGraph {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let stages = self
.stages
@@ -1704,8 +1821,6 @@ mod test {
let outputs = agg_graph.output_locations();
- assert_eq!(outputs.len(), agg_graph.output_partitions);
-
for location in outputs {
assert_eq!(location.executor_meta.host, "localhost2".to_owned());
}
@@ -1965,7 +2080,7 @@ mod test {
assert!(
matches!(
- agg_graph.status,
+ agg_graph.status(),
JobStatus {
status: Some(job_status::Status::Failed(_)),
..
@@ -2242,7 +2357,7 @@ mod test {
drain_tasks(&mut agg_graph)?;
assert!(!agg_graph.is_successful(), "Expect to fail the agg plan");
- let failure_reason = format!("{:?}", agg_graph.status);
+ let failure_reason = format!("{:?}", agg_graph.status());
assert!(failure_reason.contains("Job failed due to stage 2 failed:
Stage 2 has failed 4 times, most recent failure reason"));
assert!(failure_reason.contains("FetchPartitionError"));
@@ -2723,7 +2838,7 @@ mod test {
// todo!()
// }
- fn drain_tasks(graph: &mut ExecutionGraph) -> Result<()> {
+ fn drain_tasks(graph: &mut dyn ExecutionGraph) -> Result<()> {
let executor = mock_executor("executor-id1".to_string());
while let Some(task) = graph.pop_next_task(&executor.id)? {
let task_status = mock_completed_task(task, &executor.id);
diff --git a/ballista/scheduler/src/state/execution_graph_dot.rs
b/ballista/scheduler/src/state/execution_graph_dot.rs
index 26261107b..e08fd663f 100644
--- a/ballista/scheduler/src/state/execution_graph_dot.rs
+++ b/ballista/scheduler/src/state/execution_graph_dot.rs
@@ -45,19 +45,19 @@ use std::sync::Arc;
/// Utility for producing dot diagrams from execution graphs
pub struct ExecutionGraphDot<'a> {
- graph: &'a ExecutionGraph,
+ graph: &'a dyn ExecutionGraph,
}
impl<'a> ExecutionGraphDot<'a> {
/// Create a DOT graph from the provided ExecutionGraph
- pub fn generate(graph: &'a ExecutionGraph) -> Result<String, fmt::Error> {
+ pub fn generate(graph: &'a dyn ExecutionGraph) -> Result<String,
fmt::Error> {
let mut dot = Self { graph };
dot._generate()
}
/// Create a DOT graph for one query stage from the provided ExecutionGraph
pub fn generate_for_query_stage(
- graph: &ExecutionGraph,
+ graph: &dyn ExecutionGraph,
stage_id: usize,
) -> Result<String, fmt::Error> {
if let Some(stage) = graph.stages().get(&stage_id) {
@@ -410,7 +410,7 @@ fn get_file_scan(scan: &FileScanConfig) -> String {
#[cfg(test)]
mod tests {
use crate::planner::DefaultDistributedPlanner;
- use crate::state::execution_graph::ExecutionGraph;
+ use crate::state::execution_graph::StaticExecutionGraph;
use crate::state::execution_graph_dot::ExecutionGraphDot;
use ballista_core::error::{BallistaError, Result};
use ballista_core::extension::SessionConfigExt;
@@ -580,7 +580,7 @@ filter_expr="]
Ok(())
}
- async fn test_graph() -> Result<ExecutionGraph> {
+ async fn test_graph() -> Result<StaticExecutionGraph> {
let mut config = SessionConfig::new()
.with_target_partitions(48)
.with_batch_size(4096);
@@ -603,7 +603,7 @@ filter_expr="]
let plan = df.into_optimized_plan()?;
let plan = ctx.state().create_physical_plan(&plan).await?;
let mut planner = DefaultDistributedPlanner::new();
- ExecutionGraph::new(
+ StaticExecutionGraph::new(
"scheduler_id",
"job_id",
"job_name",
@@ -617,7 +617,7 @@ filter_expr="]
// With the improvement of
https://github.com/apache/arrow-datafusion/pull/4122,
// Redundant RepartitionExec can be removed so that the stage number will
be reduced
- async fn test_graph_optimized() -> Result<ExecutionGraph> {
+ async fn test_graph_optimized() -> Result<StaticExecutionGraph> {
let mut config = SessionConfig::new()
.with_target_partitions(48)
.with_batch_size(4096);
@@ -639,7 +639,7 @@ filter_expr="]
let plan = df.into_optimized_plan()?;
let plan = ctx.state().create_physical_plan(&plan).await?;
let mut planner = DefaultDistributedPlanner::new();
- ExecutionGraph::new(
+ StaticExecutionGraph::new(
"scheduler_id",
"job_id",
"job_name",
diff --git a/ballista/scheduler/src/state/task_manager.rs
b/ballista/scheduler/src/state/task_manager.rs
index b0281dc36..60e6e3dd4 100644
--- a/ballista/scheduler/src/state/task_manager.rs
+++ b/ballista/scheduler/src/state/task_manager.rs
@@ -19,7 +19,7 @@ use crate::planner::DefaultDistributedPlanner;
use crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::state::execution_graph::{
- ExecutionGraph, ExecutionStage, RunningTaskInfo, TaskDescription,
+ ExecutionGraphBox, RunningTaskInfo, StaticExecutionGraph, TaskDescription,
};
use crate::state::executor_manager::ExecutorManager;
@@ -143,7 +143,7 @@ pub struct TaskManager<T: 'static + AsLogicalPlan, U:
'static + AsExecutionPlan>
#[derive(Clone)]
pub struct JobInfoCache {
/// The execution graph for this job, protected by a read-write lock.
- pub execution_graph: Arc<RwLock<ExecutionGraph>>,
+ pub execution_graph: Arc<RwLock<ExecutionGraphBox>>,
/// Cached job status for quick access.
pub status: Option<job_status::Status>,
#[cfg(not(feature = "disable-stage-plan-cache"))]
@@ -153,7 +153,7 @@ pub struct JobInfoCache {
impl JobInfoCache {
/// Creates a new `JobInfoCache` from an execution graph.
- pub fn new(graph: ExecutionGraph) -> Self {
+ pub fn new(graph: ExecutionGraphBox) -> Self {
let status = graph.status().status.clone();
Self {
execution_graph: Arc::new(RwLock::new(graph)),
@@ -275,7 +275,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
session_config: Arc<SessionConfig>,
) -> Result<()> {
let mut planner = DefaultDistributedPlanner::new();
- let mut graph = ExecutionGraph::new(
+ let mut graph = Box::new(StaticExecutionGraph::new(
&self.scheduler_id,
job_id,
job_name,
@@ -284,7 +284,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
queued_at,
session_config,
&mut planner,
- )?;
+ )?) as ExecutionGraphBox;
info!("Submitting execution graph: {graph:?}");
self.state.submit_job(job_id.to_string(), &graph).await?;
@@ -350,15 +350,15 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
pub(crate) async fn get_job_execution_graph(
&self,
job_id: &str,
- ) -> Result<Option<Arc<ExecutionGraph>>> {
+ ) -> Result<Option<ExecutionGraphBox>> {
if let Some(cached) = self.get_active_execution_graph(job_id) {
let guard = cached.read().await;
- Ok(Some(Arc::new(guard.deref().clone())))
+ Ok(Some(guard.deref().cloned()))
} else {
let graph = self.state.get_execution_graph(job_id).await?;
- Ok(graph.map(Arc::new))
+ Ok(graph)
}
}
@@ -416,7 +416,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
debug!("Moving job {job_id} from Active to Success");
if let Some(graph) = self.remove_active_execution_graph(job_id) {
- let graph = graph.read().await.clone();
+ let graph = graph.read().await;
if graph.is_successful() {
self.state.save_job(job_id, &graph).await?;
} else {
@@ -516,15 +516,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
pub async fn executor_lost(&self, executor_id: &str) ->
Result<Vec<RunningTaskInfo>> {
// Collect all the running task need to cancel when there are running
stages rolled back.
let mut running_tasks_to_cancel: Vec<RunningTaskInfo> = vec![];
- // Collect graphs we update so we can update them in storage
- let updated_graphs: DashMap<String, ExecutionGraph> = DashMap::new();
+
{
for pairs in self.active_job_cache.iter() {
- let (job_id, job_info) = pairs.pair();
+ let (_job_id, job_info) = pairs.pair();
let mut graph = job_info.execution_graph.write().await;
let reset = graph.reset_stages_on_lost_executor(executor_id)?;
if !reset.0.is_empty() {
- updated_graphs.insert(job_id.to_owned(), graph.clone());
running_tasks_to_cancel.extend(reset.1);
}
}
@@ -684,7 +682,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
pub(crate) fn get_active_execution_graph(
&self,
job_id: &str,
- ) -> Option<Arc<RwLock<ExecutionGraph>>> {
+ ) -> Option<Arc<RwLock<ExecutionGraphBox>>> {
self.active_job_cache
.get(job_id)
.as_deref()
@@ -695,7 +693,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
pub(crate) fn remove_active_execution_graph(
&self,
job_id: &str,
- ) -> Option<Arc<RwLock<ExecutionGraph>>> {
+ ) -> Option<Arc<RwLock<ExecutionGraphBox>>> {
self.active_job_cache
.remove(job_id)
.map(|value| value.1.execution_graph)
@@ -748,14 +746,9 @@ pub struct JobOverview {
pub completed_stages: usize,
}
-impl From<&ExecutionGraph> for JobOverview {
- fn from(value: &ExecutionGraph) -> Self {
- let mut completed_stages = 0;
- for stage in value.stages().values() {
- if let ExecutionStage::Successful(_) = stage {
- completed_stages += 1;
- }
- }
+impl From<&ExecutionGraphBox> for JobOverview {
+ fn from(value: &ExecutionGraphBox) -> Self {
+ let completed_stages = value.completed_stages();
Self {
job_id: value.job_id().to_string(),
diff --git a/ballista/scheduler/src/test_utils.rs
b/ballista/scheduler/src/test_utils.rs
index b4a940020..7ac6f83a9 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -57,7 +57,9 @@ use datafusion::test_util::scan_empty_with_partitions;
use crate::cluster::BallistaCluster;
use crate::scheduler_server::event::QueryStageSchedulerEvent;
-use crate::state::execution_graph::{ExecutionGraph, ExecutionStage,
TaskDescription};
+use crate::state::execution_graph::{
+ ExecutionGraph, ExecutionStage, StaticExecutionGraph, TaskDescription,
+};
use ballista_core::utils::{default_config_producer, default_session_builder};
use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
use parking_lot::Mutex;
@@ -812,14 +814,16 @@ pub fn assert_failed_event(job_id: &str, collector:
&TestMetricsCollector) {
}
/// Revives the execution graph and completes all tasks in the next stage.
-pub fn revive_graph_and_complete_next_stage(graph: &mut ExecutionGraph) ->
Result<usize> {
+pub fn revive_graph_and_complete_next_stage(
+ graph: &mut dyn ExecutionGraph,
+) -> Result<usize> {
let executor = mock_executor("executor-id1".to_string());
revive_graph_and_complete_next_stage_with_executor(graph, &executor)
}
/// Revives the execution graph and completes all tasks in the next stage
using the given executor.
pub fn revive_graph_and_complete_next_stage_with_executor(
- graph: &mut ExecutionGraph,
+ graph: &mut dyn ExecutionGraph,
executor: &ExecutorMetadata,
) -> Result<usize> {
graph.revive();
@@ -855,7 +859,7 @@ pub fn revive_graph_and_complete_next_stage_with_executor(
}
/// Creates a test execution graph with a simple aggregation plan.
-pub async fn test_aggregation_plan(partition: usize) -> ExecutionGraph {
+pub async fn test_aggregation_plan(partition: usize) -> StaticExecutionGraph {
test_aggregation_plan_with_job_id(partition, "job").await
}
@@ -863,7 +867,7 @@ pub async fn test_aggregation_plan(partition: usize) ->
ExecutionGraph {
pub async fn test_aggregation_plan_with_job_id(
partition: usize,
job_id: &str,
-) -> ExecutionGraph {
+) -> StaticExecutionGraph {
let config = SessionConfig::new().with_target_partitions(partition);
let ctx = Arc::new(SessionContext::new_with_config(config));
let session_state = ctx.state();
@@ -893,7 +897,8 @@ pub async fn test_aggregation_plan_with_job_id(
DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
);
let mut planner = DefaultDistributedPlanner::new();
- ExecutionGraph::new(
+
+ StaticExecutionGraph::new(
"localhost:50050",
job_id,
"",
@@ -907,7 +912,7 @@ pub async fn test_aggregation_plan_with_job_id(
}
/// Creates a test execution graph with two nested aggregations.
-pub async fn test_two_aggregations_plan(partition: usize) -> ExecutionGraph {
+pub async fn test_two_aggregations_plan(partition: usize) ->
StaticExecutionGraph {
let config = SessionConfig::new().with_target_partitions(partition);
let ctx = Arc::new(SessionContext::new_with_config(config));
let session_state = ctx.state();
@@ -940,7 +945,8 @@ pub async fn test_two_aggregations_plan(partition: usize)
-> ExecutionGraph {
DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
);
let mut planner = DefaultDistributedPlanner::new();
- ExecutionGraph::new(
+
+ StaticExecutionGraph::new(
"localhost:50050",
"job",
"",
@@ -954,7 +960,7 @@ pub async fn test_two_aggregations_plan(partition: usize)
-> ExecutionGraph {
}
/// Creates a test execution graph with a coalesce (limit) operation.
-pub async fn test_coalesce_plan(partition: usize) -> ExecutionGraph {
+pub async fn test_coalesce_plan(partition: usize) -> StaticExecutionGraph {
let config = SessionConfig::new().with_target_partitions(partition);
let ctx = Arc::new(SessionContext::new_with_config(config));
let session_state = ctx.state();
@@ -979,7 +985,8 @@ pub async fn test_coalesce_plan(partition: usize) ->
ExecutionGraph {
.await
.unwrap();
let mut planner = DefaultDistributedPlanner::new();
- ExecutionGraph::new(
+
+ StaticExecutionGraph::new(
"localhost:50050",
"job",
"",
@@ -993,7 +1000,7 @@ pub async fn test_coalesce_plan(partition: usize) ->
ExecutionGraph {
}
/// Creates a test execution graph with a join operation.
-pub async fn test_join_plan(partition: usize) -> ExecutionGraph {
+pub async fn test_join_plan(partition: usize) -> StaticExecutionGraph {
let mut config = SessionConfig::new().with_target_partitions(partition);
config
.options_mut()
@@ -1039,7 +1046,7 @@ pub async fn test_join_plan(partition: usize) ->
ExecutionGraph {
DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
);
let mut planner = DefaultDistributedPlanner::new();
- let graph = ExecutionGraph::new(
+ let graph = StaticExecutionGraph::new(
"localhost:50050",
"job",
"",
@@ -1057,7 +1064,7 @@ pub async fn test_join_plan(partition: usize) ->
ExecutionGraph {
}
/// Creates a test execution graph with a UNION ALL operation.
-pub async fn test_union_all_plan(partition: usize) -> ExecutionGraph {
+pub async fn test_union_all_plan(partition: usize) -> StaticExecutionGraph {
let config = SessionConfig::new().with_target_partitions(partition);
let ctx = Arc::new(SessionContext::new_with_config(config));
let session_state = ctx.state();
@@ -1081,7 +1088,7 @@ pub async fn test_union_all_plan(partition: usize) ->
ExecutionGraph {
DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
);
let mut planner = DefaultDistributedPlanner::new();
- let graph = ExecutionGraph::new(
+ let graph = StaticExecutionGraph::new(
"localhost:50050",
"job",
"",
@@ -1099,7 +1106,7 @@ pub async fn test_union_all_plan(partition: usize) ->
ExecutionGraph {
}
/// Creates a test execution graph with a UNION (distinct) operation.
-pub async fn test_union_plan(partition: usize) -> ExecutionGraph {
+pub async fn test_union_plan(partition: usize) -> StaticExecutionGraph {
let config = SessionConfig::new().with_target_partitions(partition);
let ctx = Arc::new(SessionContext::new_with_config(config));
let session_state = ctx.state();
@@ -1123,7 +1130,7 @@ pub async fn test_union_plan(partition: usize) ->
ExecutionGraph {
DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
);
let mut planner = DefaultDistributedPlanner::new();
- let graph = ExecutionGraph::new(
+ let graph = StaticExecutionGraph::new(
"localhost:50050",
"job",
"",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]