This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new b0c792d2 Introduce CuratorTaskManager for make an active job be
curated by only one scheduler (#153)
b0c792d2 is described below
commit b0c792d2ba8a1f76db21c974e02ffd08784829a1
Author: yahoNanJing <[email protected]>
AuthorDate: Sun Aug 28 01:00:01 2022 +0800
Introduce CuratorTaskManager for make an active job be curated by only one
scheduler (#153)
* Add scheduler name for scheduler identification
* Introduce scheduler id for execution graph as its curator
* Introduce state machine for the execution stage
* Introduce stage change event to execution graph
* Introduce cache for active execution graphs in the curator scheduler
* Fix PR review
Co-authored-by: yangzhong <[email protected]>
---
ballista/rust/core/proto/ballista.proto | 72 +-
ballista/rust/scheduler/scheduler_config_spec.toml | 6 +
ballista/rust/scheduler/src/display.rs | 32 +
ballista/rust/scheduler/src/main.rs | 11 +-
.../rust/scheduler/src/scheduler_server/event.rs | 6 +-
.../scheduler/src/scheduler_server/event_loop.rs | 9 +-
.../rust/scheduler/src/scheduler_server/grpc.rs | 17 +-
.../rust/scheduler/src/scheduler_server/mod.rs | 121 +-
.../src/scheduler_server/query_stage_scheduler.rs | 22 +-
ballista/rust/scheduler/src/standalone.rs | 2 +-
.../rust/scheduler/src/state/execution_graph.rs | 1198 +++++++++-----------
.../src/state/execution_graph/execution_stage.rs | 928 +++++++++++++++
ballista/rust/scheduler/src/state/mod.rs | 21 +-
ballista/rust/scheduler/src/state/task_manager.rs | 391 +++----
14 files changed, 1832 insertions(+), 1004 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index f0c0a3aa..f3b703c6 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -408,37 +408,71 @@ enum JoinSide{
///////////////////////////////////////////////////////////////////////////////////////////////////
// Ballista Scheduling
///////////////////////////////////////////////////////////////////////////////////////////////////
-message TaskInputPartitions {
- uint32 partition = 1;
- repeated PartitionLocation partition_location = 2;
+message ExecutionGraph {
+ string job_id = 1;
+ string session_id = 2;
+ JobStatus status = 3;
+ repeated ExecutionGraphStage stages = 4;
+ uint64 output_partitions = 5;
+ repeated PartitionLocation output_locations = 6;
+ string scheduler_id = 7;
}
-message GraphStageInput {
- uint32 stage_id = 1;
- repeated TaskInputPartitions partition_locations = 2;
- bool complete = 3;
+message ExecutionGraphStage {
+ oneof StageType {
+ UnResolvedStage unresolved_stage = 1;
+ ResolvedStage resolved_stage = 2;
+ CompletedStage completed_stage = 3;
+ FailedStage failed_stage = 4;
+ }
}
+message UnResolvedStage {
+ uint64 stage_id = 1;
+ PhysicalHashRepartition output_partitioning = 2;
+ repeated uint32 output_links = 3;
+ repeated GraphStageInput inputs = 4;
+ bytes plan = 5;
+}
-message ExecutionGraphStage {
+message ResolvedStage {
uint64 stage_id = 1;
uint32 partitions = 2;
PhysicalHashRepartition output_partitioning = 3;
- repeated GraphStageInput inputs = 4;
+ repeated uint32 output_links = 4;
+ bytes plan = 5;
+}
+
+message CompletedStage {
+ uint64 stage_id = 1;
+ uint32 partitions = 2;
+ PhysicalHashRepartition output_partitioning = 3;
+ repeated uint32 output_links = 4;
bytes plan = 5;
repeated TaskStatus task_statuses = 6;
- repeated uint32 output_links = 7;
- bool resolved = 8;
- repeated OperatorMetricsSet stage_metrics = 9;
+ repeated OperatorMetricsSet stage_metrics = 7;
}
-message ExecutionGraph {
- string job_id = 1;
- string session_id = 2;
- JobStatus status = 3;
- repeated ExecutionGraphStage stages = 4;
- uint64 output_partitions = 5;
- repeated PartitionLocation output_locations = 6;
+message FailedStage {
+ uint64 stage_id = 1;
+ uint32 partitions = 2;
+ PhysicalHashRepartition output_partitioning = 3;
+ repeated uint32 output_links = 4;
+ bytes plan = 5;
+ repeated TaskStatus task_statuses = 6;
+ repeated OperatorMetricsSet stage_metrics = 7;
+ string error_message = 8;
+}
+
+message GraphStageInput {
+ uint32 stage_id = 1;
+ repeated TaskInputPartitions partition_locations = 2;
+ bool complete = 3;
+}
+
+message TaskInputPartitions {
+ uint32 partition = 1;
+ repeated PartitionLocation partition_location = 2;
}
message KeyValuePair {
diff --git a/ballista/rust/scheduler/scheduler_config_spec.toml
b/ballista/rust/scheduler/scheduler_config_spec.toml
index c3be0f9d..1f89562d 100644
--- a/ballista/rust/scheduler/scheduler_config_spec.toml
+++ b/ballista/rust/scheduler/scheduler_config_spec.toml
@@ -52,6 +52,12 @@ type = "String"
default = "std::string::String::from(\"0.0.0.0\")"
doc = "Local host name or IP address to bind to. Default: 0.0.0.0"
+[[param]]
+name = "external_host"
+type = "String"
+doc = "Host name or IP address so that executors can connect to this
scheduler. Default: localhost"
+default = "std::string::String::from(\"localhost\")"
+
[[param]]
abbr = "p"
name = "bind_port"
diff --git a/ballista/rust/scheduler/src/display.rs
b/ballista/rust/scheduler/src/display.rs
index e4557a6b..23753889 100644
--- a/ballista/rust/scheduler/src/display.rs
+++ b/ballista/rust/scheduler/src/display.rs
@@ -19,13 +19,45 @@
//! [`crate::physical_plan::displayable`] for examples of how to
//! format
+use ballista_core::utils::collect_plan_metrics;
use datafusion::logical_plan::{StringifiedPlan, ToStringifiedPlan};
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::{
accept, DisplayFormatType, ExecutionPlan, ExecutionPlanVisitor,
};
+use log::{error, info};
use std::fmt;
+pub fn print_stage_metrics(
+ job_id: &str,
+ stage_id: usize,
+ plan: &dyn ExecutionPlan,
+ stage_metrics: &[MetricsSet],
+) {
+ // The plan_metrics collected here is a snapshot clone from the plan
metrics.
+ // They are all empty now and need to combine with the stage metrics in
the ExecutionStages
+ let mut plan_metrics = collect_plan_metrics(plan);
+ if plan_metrics.len() == stage_metrics.len() {
+ plan_metrics.iter_mut().zip(stage_metrics).for_each(
+ |(plan_metric, stage_metric)| {
+ stage_metric
+ .iter()
+ .for_each(|s| plan_metric.push(s.clone()));
+ },
+ );
+
+ info!(
+ "=== [{}/{}] Stage finished, physical plan with metrics ===\n{}\n",
+ job_id,
+ stage_id,
+ DisplayableBallistaExecutionPlan::new(plan, &plan_metrics).indent()
+ );
+ } else {
+ error!("Fail to combine stage metrics to plan for stage [{}/{}], plan
metrics array size {} does not equal
+ to the stage metrics array size {}", job_id, stage_id,
plan_metrics.len(), stage_metrics.len());
+ }
+}
+
/// Wraps an `ExecutionPlan` to display this plan with metrics
collected/aggregated.
/// The metrics must be collected in the same order as how we visit and
display the plan.
pub struct DisplayableBallistaExecutionPlan<'a> {
diff --git a/ballista/rust/scheduler/src/main.rs
b/ballista/rust/scheduler/src/main.rs
index 5a7a9db1..d393e4eb 100644
--- a/ballista/rust/scheduler/src/main.rs
+++ b/ballista/rust/scheduler/src/main.rs
@@ -65,8 +65,8 @@ use config::prelude::*;
use datafusion::execution::context::default_session_builder;
async fn start_server(
+ scheduler_name: String,
config_backend: Arc<dyn StateBackendClient>,
- namespace: String,
addr: SocketAddr,
policy: TaskSchedulingPolicy,
) -> Result<()> {
@@ -82,15 +82,15 @@ async fn start_server(
let mut scheduler_server: SchedulerServer<LogicalPlanNode,
PhysicalPlanNode> =
match policy {
TaskSchedulingPolicy::PushStaged =>
SchedulerServer::new_with_policy(
+ scheduler_name,
config_backend.clone(),
- namespace.clone(),
policy,
BallistaCodec::default(),
default_session_builder,
),
_ => SchedulerServer::new(
+ scheduler_name,
config_backend.clone(),
- namespace.clone(),
BallistaCodec::default(),
),
};
@@ -158,12 +158,13 @@ async fn main() -> Result<()> {
let special_mod_log_level = opt.log_level_setting;
let namespace = opt.namespace;
+ let external_host = opt.external_host;
let bind_host = opt.bind_host;
let port = opt.bind_port;
let log_dir = opt.log_dir;
let print_thread_info = opt.print_thread_info;
- let scheduler_name = format!("scheduler_{}_{}_{}", namespace, bind_host,
port);
+ let scheduler_name = format!("scheduler_{}_{}_{}", namespace,
external_host, port);
let log_file = tracing_appender::rolling::daily(log_dir, &scheduler_name);
tracing_subscriber::fmt()
@@ -219,6 +220,6 @@ async fn main() -> Result<()> {
};
let policy: TaskSchedulingPolicy = opt.scheduler_policy;
- start_server(client, namespace, addr, policy).await?;
+ start_server(scheduler_name, client, addr, policy).await?;
Ok(())
}
diff --git a/ballista/rust/scheduler/src/scheduler_server/event.rs
b/ballista/rust/scheduler/src/scheduler_server/event.rs
index bcfa3e3a..0e43bfe2 100644
--- a/ballista/rust/scheduler/src/scheduler_server/event.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/event.rs
@@ -36,6 +36,10 @@ pub enum QueryStageSchedulerEvent {
plan: Box<LogicalPlan>,
},
JobSubmitted(String),
+ // For a job which failed during planning
+ JobPlanningFailed(String, String),
JobFinished(String),
- JobFailed(String, String),
+ // For a job fails with its execution graph setting failed
+ JobRunningFailed(String),
+ JobUpdated(String),
}
diff --git a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
index b8e8d9fc..d60d145e 100644
--- a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
@@ -189,9 +189,8 @@ mod test {
async fn test_offer_free_reservations() -> Result<()> {
let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
- Arc::new(SchedulerState::new(
+ Arc::new(SchedulerState::new_with_default_scheduler_name(
state_storage,
- "default".to_string(),
default_session_builder,
BallistaCodec::default(),
));
@@ -227,9 +226,8 @@ mod test {
.build()?;
let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
- Arc::new(SchedulerState::new(
+ Arc::new(SchedulerState::new_with_default_scheduler_name(
state_storage,
- "default".to_string(),
default_session_builder,
BallistaCodec::default(),
));
@@ -287,9 +285,8 @@ mod test {
.build()?;
let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
- Arc::new(SchedulerState::new(
+ Arc::new(SchedulerState::new_with_default_scheduler_name(
state_storage,
- "default".to_string(),
default_session_builder,
BallistaCodec::default(),
));
diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
index af788dc4..80a9defc 100644
--- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
@@ -391,7 +391,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
.and_then(|m| {
m.try_into_logical_plan(
session_ctx.deref(),
- self.codec.logical_extension_codec(),
+ self.state.codec.logical_extension_codec(),
)
})
.map_err(|e| {
@@ -535,16 +535,15 @@ mod test {
#[tokio::test]
async fn test_poll_work() -> Result<(), BallistaError> {
let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
- let namespace = "default";
let scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new(
+ "localhost:50050".to_owned(),
state_storage.clone(),
- namespace.to_owned(),
BallistaCodec::default(),
);
let exec_meta = ExecutorRegistration {
id: "abc".to_owned(),
- optional_host:
Some(OptionalHost::Host("http://host:8080".to_owned())),
+ optional_host:
Some(OptionalHost::Host("http://localhost:8080".to_owned())),
port: 0,
grpc_port: 0,
specification: Some(ExecutorSpecification { task_slots: 2
}.into()),
@@ -562,9 +561,8 @@ mod test {
// no response task since we told the scheduler we didn't want to
accept one
assert!(response.task.is_none());
let state: SchedulerState<LogicalPlanNode, PhysicalPlanNode> =
- SchedulerState::new(
+ SchedulerState::new_with_default_scheduler_name(
state_storage.clone(),
- namespace.to_string(),
default_session_builder,
BallistaCodec::default(),
);
@@ -580,7 +578,7 @@ mod test {
assert_eq!(stored_executor.grpc_port, 0);
assert_eq!(stored_executor.port, 0);
assert_eq!(stored_executor.specification.task_slots, 2);
- assert_eq!(stored_executor.host, "http://host:8080".to_owned());
+ assert_eq!(stored_executor.host, "http://localhost:8080".to_owned());
let request: Request<PollWorkParams> = Request::new(PollWorkParams {
metadata: Some(exec_meta.clone()),
@@ -596,9 +594,8 @@ mod test {
// still no response task since there are no tasks in the scheduler
assert!(response.task.is_none());
let state: SchedulerState<LogicalPlanNode, PhysicalPlanNode> =
- SchedulerState::new(
+ SchedulerState::new_with_default_scheduler_name(
state_storage.clone(),
- namespace.to_string(),
default_session_builder,
BallistaCodec::default(),
);
@@ -614,7 +611,7 @@ mod test {
assert_eq!(stored_executor.grpc_port, 0);
assert_eq!(stored_executor.port, 0);
assert_eq!(stored_executor.specification.task_slots, 2);
- assert_eq!(stored_executor.host, "http://host:8080".to_owned());
+ assert_eq!(stored_executor.host, "http://localhost:8080".to_owned());
Ok(())
}
diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs
b/ballista/rust/scheduler/src/scheduler_server/mod.rs
index a672c597..df376c4c 100644
--- a/ballista/rust/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs
@@ -53,23 +53,23 @@ pub(crate) type SessionBuilder = fn(SessionConfig) ->
SessionState;
#[derive(Clone)]
pub struct SchedulerServer<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> {
+ pub scheduler_name: String,
pub(crate) state: Arc<SchedulerState<T, U>>,
pub start_time: u128,
policy: TaskSchedulingPolicy,
event_loop: Option<EventLoop<SchedulerServerEvent>>,
pub(crate) query_stage_event_loop: EventLoop<QueryStageSchedulerEvent>,
- codec: BallistaCodec<T, U>,
}
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
SchedulerServer<T, U> {
pub fn new(
+ scheduler_name: String,
config: Arc<dyn StateBackendClient>,
- namespace: String,
codec: BallistaCodec<T, U>,
) -> Self {
SchedulerServer::new_with_policy(
+ scheduler_name,
config,
- namespace,
TaskSchedulingPolicy::PullStaged,
codec,
default_session_builder,
@@ -77,14 +77,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
}
pub fn new_with_builder(
+ scheduler_name: String,
config: Arc<dyn StateBackendClient>,
- namespace: String,
codec: BallistaCodec<T, U>,
session_builder: SessionBuilder,
) -> Self {
SchedulerServer::new_with_policy(
+ scheduler_name,
config,
- namespace,
TaskSchedulingPolicy::PullStaged,
codec,
session_builder,
@@ -92,32 +92,47 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
}
pub fn new_with_policy(
+ scheduler_name: String,
config: Arc<dyn StateBackendClient>,
- namespace: String,
policy: TaskSchedulingPolicy,
codec: BallistaCodec<T, U>,
session_builder: SessionBuilder,
) -> Self {
let state = Arc::new(SchedulerState::new(
config,
- namespace,
session_builder,
- codec.clone(),
+ codec,
+ scheduler_name.clone(),
));
- let event_loop = if matches!(policy, TaskSchedulingPolicy::PushStaged)
{
- let event_action: Arc<SchedulerServerEventAction<T, U>> =
- Arc::new(SchedulerServerEventAction::new(state.clone()));
- let event_loop = EventLoop::new("scheduler".to_owned(), 10000,
event_action);
- Some(event_loop)
+ let event_action: Option<Arc<dyn EventAction<SchedulerServerEvent>>> =
+ if matches!(policy, TaskSchedulingPolicy::PushStaged) {
+ Some(Arc::new(SchedulerServerEventAction::new(state.clone())))
+ } else {
+ None
+ };
+ SchedulerServer::new_with_event_action(scheduler_name, state,
event_action)
+ }
+
+ fn new_with_event_action(
+ scheduler_name: String,
+ state: Arc<SchedulerState<T, U>>,
+ event_action: Option<Arc<dyn EventAction<SchedulerServerEvent>>>,
+ ) -> Self {
+ let event_loop = event_action.map(|event_action| {
+ EventLoop::new("scheduler".to_owned(), 10000, event_action)
+ });
+ let policy = if event_loop.is_some() {
+ TaskSchedulingPolicy::PushStaged
} else {
- None
+ TaskSchedulingPolicy::PullStaged
};
let query_stage_scheduler =
Arc::new(QueryStageScheduler::new(state.clone(), None));
let query_stage_event_loop =
EventLoop::new("query_stage".to_owned(), 10000,
query_stage_scheduler);
Self {
+ scheduler_name,
state,
start_time: SystemTime::now()
.duration_since(UNIX_EPOCH)
@@ -126,40 +141,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
policy,
event_loop,
query_stage_event_loop,
- codec,
- // session_builder,
- }
- }
-
- pub fn new_with_event_action(
- config: Arc<dyn StateBackendClient>,
- namespace: String,
- codec: BallistaCodec<T, U>,
- session_builder: SessionBuilder,
- event_action: Arc<dyn EventAction<SchedulerServerEvent>>,
- ) -> Self {
- let state = Arc::new(SchedulerState::new(
- config,
- namespace,
- session_builder,
- codec.clone(),
- ));
-
- let event_loop = EventLoop::new("scheduler".to_owned(), 10000,
event_action);
- let query_stage_scheduler =
- Arc::new(QueryStageScheduler::new(state.clone(), None));
- let query_stage_event_loop =
- EventLoop::new("query_stage".to_owned(), 10000,
query_stage_scheduler);
- Self {
- state,
- start_time: SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .unwrap()
- .as_millis(),
- policy: TaskSchedulingPolicy::PushStaged,
- event_loop: Some(event_loop),
- query_stage_event_loop,
- codec,
}
}
@@ -287,6 +268,7 @@ mod test {
use crate::state::backend::standalone::StandaloneClient;
use crate::state::executor_manager::ExecutorReservation;
+ use crate::state::SchedulerState;
use crate::test_utils::{
await_condition, ExplodingTableProvider, SchedulerEventObserver,
};
@@ -345,15 +327,18 @@ mod test {
.await
.expect("submitting plan");
- loop {
- // Refresh the ExecutionGraph
- let mut graph = scheduler
- .state
- .task_manager
- .get_execution_graph(job_id)
- .await?;
-
- if let Some(task) = graph.pop_next_task("executor-1")? {
+ // Refresh the ExecutionGraph
+ while let Some(graph) = scheduler
+ .state
+ .task_manager
+ .get_active_execution_graph(job_id)
+ .await
+ {
+ let task = {
+ let mut graph = graph.write().await;
+ graph.pop_next_task("executor-1")?
+ };
+ if let Some(task) = task {
let mut partitions: Vec<ShuffleWritePartition> = vec![];
let num_partitions = task
@@ -396,9 +381,11 @@ mod test {
let final_graph = scheduler
.state
.task_manager
- .get_execution_graph(job_id)
- .await?;
+ .get_active_execution_graph(job_id)
+ .await
+ .expect("Fail to find graph in the cache");
+ let final_graph = final_graph.read().await;
assert!(final_graph.complete());
assert_eq!(final_graph.output_locations().len(), 4);
@@ -756,8 +743,8 @@ mod test {
let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new_with_policy(
+ "localhost:50050".to_owned(),
state_storage.clone(),
- "default".to_owned(),
policy,
BallistaCodec::default(),
default_session_builder,
@@ -771,14 +758,16 @@ mod test {
event_action: Arc<dyn EventAction<SchedulerServerEvent>>,
) -> Result<SchedulerServer<LogicalPlanNode, PhysicalPlanNode>> {
let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
-
+ let state = Arc::new(SchedulerState::new_with_default_scheduler_name(
+ state_storage,
+ default_session_builder,
+ BallistaCodec::default(),
+ ));
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new_with_event_action(
- state_storage.clone(),
- "default".to_owned(),
- BallistaCodec::default(),
- default_session_builder,
- event_action,
+ "localhost:50050".to_owned(),
+ state,
+ Some(event_action),
);
scheduler.init().await?;
diff --git
a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 8ae059f8..1c403ea6 100644
--- a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -88,8 +88,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
.await
{
let msg = format!("Error planning job {}: {:?}",
job_id, e);
- error!("{}", msg);
- QueryStageSchedulerEvent::JobFailed(job_id, msg)
+ error!("{}", &msg);
+ QueryStageSchedulerEvent::JobPlanningFailed(job_id,
msg)
} else {
QueryStageSchedulerEvent::JobSubmitted(job_id)
};
@@ -138,17 +138,25 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
}
}
}
- QueryStageSchedulerEvent::JobFinished(job_id) => {
- info!("Job {} complete", job_id);
- self.state.task_manager.complete_job(&job_id).await?;
- }
- QueryStageSchedulerEvent::JobFailed(job_id, fail_message) => {
+ QueryStageSchedulerEvent::JobPlanningFailed(job_id, fail_message)
=> {
error!("Job {} failed: {}", job_id, fail_message);
self.state
.task_manager
.fail_job(&job_id, fail_message)
.await?;
}
+ QueryStageSchedulerEvent::JobFinished(job_id) => {
+ info!("Job {} complete", job_id);
+ self.state.task_manager.complete_job(&job_id).await?;
+ }
+ QueryStageSchedulerEvent::JobRunningFailed(job_id) => {
+ error!("Job {} running failed", job_id);
+ self.state.task_manager.fail_running_job(&job_id).await?;
+ }
+ QueryStageSchedulerEvent::JobUpdated(job_id) => {
+ error!("Job {} Updated", job_id);
+ self.state.task_manager.update_job(&job_id).await?;
+ }
}
Ok(())
diff --git a/ballista/rust/scheduler/src/standalone.rs
b/ballista/rust/scheduler/src/standalone.rs
index 0de81b85..4abc70a4 100644
--- a/ballista/rust/scheduler/src/standalone.rs
+++ b/ballista/rust/scheduler/src/standalone.rs
@@ -35,8 +35,8 @@ pub async fn new_standalone_scheduler() -> Result<SocketAddr>
{
let mut scheduler_server: SchedulerServer<LogicalPlanNode,
PhysicalPlanNode> =
SchedulerServer::new(
+ "localhost:50050".to_owned(),
Arc::new(client),
- "ballista".to_string(),
BallistaCodec::default(),
);
scheduler_server.init().await?;
diff --git a/ballista/rust/scheduler/src/state/execution_graph.rs
b/ballista/rust/scheduler/src/state/execution_graph.rs
index b831ffe9..596272d6 100644
--- a/ballista/rust/scheduler/src/state/execution_graph.rs
+++ b/ballista/rust/scheduler/src/state/execution_graph.rs
@@ -15,61 +15,40 @@
// specific language governing permissions and limitations
// under the License.
-use crate::display::DisplayableBallistaExecutionPlan;
-use crate::planner::DistributedPlanner;
+use std::collections::HashMap;
+use std::convert::TryInto;
+use std::fmt::{Debug, Formatter};
+use std::sync::Arc;
+
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use datafusion::physical_plan::{
+ accept, ExecutionPlan, ExecutionPlanVisitor, Partitioning,
+};
+use datafusion::prelude::SessionContext;
+use datafusion_proto::logical_plan::AsLogicalPlan;
+use log::{error, info, warn};
+
use ballista_core::error::{BallistaError, Result};
use ballista_core::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec};
-
use ballista_core::serde::protobuf::{
- self, CompletedJob, JobStatus, OperatorMetricsSet, QueuedJob, TaskStatus,
+ self, execution_graph_stage::StageType, CompletedJob, JobStatus, QueuedJob,
+ TaskStatus,
};
use ballista_core::serde::protobuf::{job_status, FailedJob,
ShuffleWritePartition};
use ballista_core::serde::protobuf::{task_status, RunningTask};
use ballista_core::serde::scheduler::{
ExecutorMetadata, PartitionId, PartitionLocation, PartitionStats,
};
-use datafusion::physical_plan::{
- accept, ExecutionPlan, ExecutionPlanVisitor, Metric, Partitioning,
-};
-use log::{debug, info};
-use std::collections::HashMap;
-use std::convert::TryInto;
-use std::fmt::{Debug, Formatter};
-
-use
ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
-use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
-use ballista_core::utils::collect_plan_metrics;
-use datafusion::physical_plan::display::DisplayableExecutionPlan;
-use datafusion::physical_plan::metrics::{MetricValue, MetricsSet};
-use datafusion::prelude::SessionContext;
-use datafusion_proto::logical_plan::AsLogicalPlan;
-use std::sync::Arc;
-/// Represents the basic unit of work for the Ballista executor. Will execute
-/// one partition of one stage on one task slot.
-#[derive(Clone)]
-pub struct Task {
- pub session_id: String,
- pub partition: PartitionId,
- pub plan: Arc<dyn ExecutionPlan>,
- pub output_partitioning: Option<Partitioning>,
-}
+use crate::display::print_stage_metrics;
+use crate::planner::DistributedPlanner;
+use crate::scheduler_server::event::QueryStageSchedulerEvent;
+use crate::state::execution_graph::execution_stage::{
+ CompletedStage, ExecutionStage, FailedStage, ResolvedStage,
UnresolvedStage,
+};
-impl Debug for Task {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
- write!(
- f,
- "Task[session_id: {}, job: {}, stage: {}, partition: {}]\n{}",
- self.session_id,
- self.partition.job_id,
- self.partition.stage_id,
- self.partition.partition_id,
- plan
- )
- }
-}
+mod execution_stage;
/// Represents the DAG for a distributed query plan.
///
@@ -96,17 +75,13 @@ impl Debug for Task {
///
///
/// ExecutionGraph[job_id=job, session_id=session, available_tasks=1,
complete=false]
-/// Stage[id=2, partitions=4, children=1, completed_tasks=0, resolved=false,
scheduled_tasks=0, available_tasks=0]
+/// =========UnResolvedStage[id=2, children=1]=========
/// Inputs{1: StageOutput { partition_locations: {}, complete: false }}
-///
/// ShuffleWriterExec: None
/// AggregateExec: mode=FinalPartitioned, gby=[id@0 as id],
aggr=[SUM(?table?.gmv)]
/// CoalesceBatchesExec: target_batch_size=4096
/// UnresolvedShuffleExec
-///
-/// Stage[id=1, partitions=1, children=0, completed_tasks=0, resolved=true,
scheduled_tasks=0, available_tasks=1]
-/// Inputs{}
-///
+/// =========ResolvedStage[id=1, partitions=1]=========
/// ShuffleWriterExec: Some(Hash([Column { name: "id", index: 0 }], 4))
/// AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[SUM(?table?.gmv)]
/// TableScan: some_table
@@ -120,6 +95,8 @@ impl Debug for Task {
/// publish its outputs to the `ExecutionGraph`s `output_locations`
representing the final query results.
#[derive(Clone)]
pub struct ExecutionGraph {
+ /// Curator scheduler name
+ scheduler_id: String,
/// ID for this job
job_id: String,
/// Session ID for this job
@@ -136,6 +113,7 @@ pub struct ExecutionGraph {
impl ExecutionGraph {
pub fn new(
+ scheduler_id: &str,
job_id: &str,
session_id: &str,
plan: Arc<dyn ExecutionPlan>,
@@ -150,6 +128,7 @@ impl ExecutionGraph {
let stages = builder.build(shuffle_stages)?;
Ok(Self {
+ scheduler_id: scheduler_id.to_string(),
job_id: job_id.to_string(),
session_id: session_id.to_string(),
status: JobStatus {
@@ -175,7 +154,37 @@ impl ExecutionGraph {
/// An ExecutionGraph is complete if all its stages are complete
pub fn complete(&self) -> bool {
- self.stages.values().all(|s| s.complete())
+ self.stages
+ .values()
+ .all(|s| matches!(s, ExecutionStage::Completed(_)))
+ }
+
+ /// Revive the execution graph by converting the resolved stages to
running stages
+ /// If any stages are converted, return true; else false.
+ pub fn revive(&mut self) -> bool {
+ let running_stages = self
+ .stages
+ .values()
+ .filter_map(|stage| {
+ if let ExecutionStage::Resolved(resolved_stage) = stage {
+ Some(resolved_stage.to_running())
+ } else {
+ None
+ }
+ })
+ .collect::<Vec<_>>();
+
+ if running_stages.is_empty() {
+ false
+ } else {
+ for running_stage in running_stages {
+ self.stages.insert(
+ running_stage.stage_id,
+ ExecutionStage::Running(running_stage),
+ );
+ }
+ true
+ }
}
/// Update task statuses and task metrics in the graph.
@@ -183,138 +192,175 @@ impl ExecutionGraph {
pub fn update_task_status(
&mut self,
executor: &ExecutorMetadata,
- statuses: Vec<TaskStatus>,
- ) -> Result<()> {
- for status in statuses.into_iter() {
- if let TaskStatus {
- task_id:
- Some(protobuf::PartitionId {
- job_id,
- stage_id,
- partition_id,
- }),
- metrics: operator_metrics,
- status: Some(task_status),
- } = status
- {
- if job_id != self.job_id() {
+ task_statuses: Vec<TaskStatus>,
+ ) -> Result<Option<QueryStageSchedulerEvent>> {
+ let job_id = self.job_id().to_owned();
+ // First of all, classify the statuses by stages
+ let mut job_task_statuses: HashMap<usize, Vec<TaskStatus>> =
HashMap::new();
+ for task_status in task_statuses {
+ if let Some(task_id) = task_status.task_id.as_ref() {
+ if task_id.job_id != job_id {
return Err(BallistaError::Internal(format!(
"Error updating job {}: Invalid task status job ID {}",
- self.job_id(),
- job_id
+ job_id, task_id.job_id
)));
}
+ let stage_task_statuses = job_task_statuses
+ .entry(task_id.stage_id as usize)
+ .or_insert_with(Vec::new);
+ stage_task_statuses.push(task_status);
+ } else {
+ error!("There's no task id when updating status");
+ }
+ }
- let stage_id = stage_id as usize;
- let partition = partition_id as usize;
- if let Some(stage) = self.stages.get_mut(&stage_id) {
- stage.update_task_status(partition, task_status.clone());
- let stage_plan = stage.plan.clone();
- let stage_complete = stage.complete();
-
- // TODO Should be able to reschedule this task.
- if let task_status::Status::Failed(failed_task) =
task_status {
- self.status = JobStatus {
- status: Some(job_status::Status::Failed(FailedJob {
- error: format!(
- "Task {}/{}/{} failed: {}",
- job_id, stage_id, partition_id,
failed_task.error
- ),
- })),
- };
- return Ok(());
- } else if let
task_status::Status::Completed(completed_task) =
- task_status
- {
- // update task metrics for completed task
- stage.update_task_metrics(partition,
operator_metrics)?;
-
- // if this stage is completed, we want to combine the
stage metrics to plan's metric set and print out the plan
- if stage_complete &&
stage.stage_metrics.as_ref().is_some() {
- // The plan_metrics collected here is a snapshot
clone from the plan metrics.
- // They are all empty now and need to combine with
the stage metrics in the ExecutionStages
- let mut plan_metrics =
- collect_plan_metrics(stage_plan.as_ref());
- let stage_metrics = stage
- .stage_metrics
- .as_ref()
- .expect("stage metrics should not be None.");
- if plan_metrics.len() != stage_metrics.len() {
- return
Err(BallistaError::Internal(format!("Error combine stage metrics to plan for
stage {}, plan metrics array size {} does not equal \
- to the stage metrics array size {}", stage_id,
plan_metrics.len(), stage_metrics.len())));
+ // Revive before updating due to some updates not saved
+ // It will be refined later
+ self.revive();
+
+ let mut events = vec![];
+ for (stage_id, stage_task_statuses) in job_task_statuses {
+ if let Some(stage) = self.stages.get_mut(&stage_id) {
+ if let ExecutionStage::Running(running_stage) = stage {
+ let mut locations = vec![];
+ for task_status in stage_task_statuses.into_iter() {
+ if let TaskStatus {
+ task_id:
+ Some(protobuf::PartitionId {
+ job_id,
+ stage_id,
+ partition_id,
+ }),
+ metrics: operator_metrics,
+ status: Some(status),
+ } = task_status
+ {
+ let stage_id = stage_id as usize;
+ let partition_id = partition_id as usize;
+
+ running_stage
+ .update_task_status(partition_id,
status.clone());
+
+ // TODO Should be able to reschedule this task.
+ if let task_status::Status::Failed(failed_task) =
status {
+ events.push(StageEvent::StageFailed(
+ stage_id,
+ format!(
+ "Task {}/{}/{} failed: {}",
+ job_id, stage_id, partition_id,
failed_task.error
+ ),
+ ));
+ break;
+ } else if let
task_status::Status::Completed(completed_task) =
+ status
+ {
+ // update task metrics for completed task
+ running_stage.update_task_metrics(
+ partition_id,
+ operator_metrics,
+ )?;
+
+ locations.append(&mut partition_to_location(
+ &job_id,
+ stage_id,
+ executor,
+ completed_task.partitions,
+ ));
+ } else {
+ warn!("The task {}/{}/{} with status {:?} is
invalid for updating", job_id, stage_id, partition_id, status);
}
-
plan_metrics.iter_mut().zip(stage_metrics).for_each(
- |(plan_metric, stage_metric)| {
- stage_metric
- .iter()
- .for_each(|s|
plan_metric.push(s.clone()));
- },
- );
-
- info!(
- "=== [{}/{}/{}] Stage finished, physical plan
with metrics ===\n{}\n",
- job_id,
+ }
+ }
+ let is_completed = running_stage.is_completed();
+ if is_completed {
+ events.push(StageEvent::StageCompleted(stage_id));
+ // if this stage is completed, we want to combine the
stage metrics to plan's metric set and print out the plan
+ if let Some(stage_metrics) =
running_stage.stage_metrics.as_ref()
+ {
+ print_stage_metrics(
+ &job_id,
stage_id,
- partition,
-
DisplayableBallistaExecutionPlan::new(stage_plan.as_ref(),
plan_metrics.as_ref()).indent()
+ running_stage.plan.as_ref(),
+ stage_metrics,
);
}
-
- let locations = partition_to_location(
- self.job_id.as_str(),
- stage_id,
- executor,
- completed_task.partitions,
- );
-
- let output_links = stage.output_links.clone();
- if output_links.is_empty() {
- // If `output_links` is empty, then this is a
final stage
- self.output_locations.extend(locations);
- } else {
- for link in output_links.into_iter() {
- // If this is an intermediate stage, we need
to push its `PartitionLocation`s to the parent stage
- if let Some(linked_stage) =
self.stages.get_mut(&link) {
- linked_stage.add_input_partitions(
- stage_id,
- partition,
- locations.clone(),
- )?;
-
- // If all tasks for this stage are
complete, mark the input complete in the parent stage
- if stage_complete {
- linked_stage.complete_input(stage_id);
- }
-
- // If all input partitions are ready, we
can resolve any UnresolvedShuffleExec in the parent stage plan
- if linked_stage.resolvable() {
- linked_stage.resolve_shuffles()?;
- }
- } else {
- return
Err(BallistaError::Internal(format!("Error updating job {}: Invalid output link
{} for stage {}", job_id, stage_id, link)));
- }
- }
- }
}
+
+ let output_links = running_stage.output_links.clone();
+ events.append(&mut self.update_stage_output_links(
+ stage_id,
+ is_completed,
+ locations,
+ output_links,
+ )?);
} else {
return Err(BallistaError::Internal(format!(
- "Invalid stage ID {} for job {}",
+ "Stage {}/{} is not in running when updating the
status of tasks {:?}",
+ job_id,
stage_id,
- self.job_id()
+ stage_task_statuses.into_iter().map(|task_status|
task_status.task_id.map(|task_id| task_id.partition_id)).collect::<Vec<_>>(),
)));
}
+ } else {
+ return Err(BallistaError::Internal(format!(
+ "Invalid stage ID {} for job {}",
+ stage_id, job_id
+ )));
}
}
- Ok(())
+ self.processing_stage_events(events)
}
- /// Total number of tasks in this plan that are ready for scheduling
- pub fn available_tasks(&self) -> usize {
- self.stages
- .iter()
- .map(|(_, stage)| stage.available_tasks())
- .sum()
+ fn update_stage_output_links(
+ &mut self,
+ stage_id: usize,
+ is_completed: bool,
+ locations: Vec<PartitionLocation>,
+ output_links: Vec<usize>,
+ ) -> Result<Vec<StageEvent>> {
+ let mut ret = vec![];
+ let job_id = &self.job_id;
+ if output_links.is_empty() {
+ // If `output_links` is empty, then this is a final stage
+ self.output_locations.extend(locations);
+ } else {
+ for link in output_links.iter() {
+ // If this is an intermediate stage, we need to push its
`PartitionLocation`s to the parent stage
+ if let Some(linked_stage) = self.stages.get_mut(link) {
+ if let ExecutionStage::UnResolved(linked_unresolved_stage)
=
+ linked_stage
+ {
+ linked_unresolved_stage
+ .add_input_partitions(stage_id,
locations.clone())?;
+
+ // If all tasks for this stage are complete, mark the
input complete in the parent stage
+ if is_completed {
+ linked_unresolved_stage.complete_input(stage_id);
+ }
+
+ // If all input partitions are ready, we can resolve
any UnresolvedShuffleExec in the parent stage plan
+ if linked_unresolved_stage.resolvable() {
+ ret.push(StageEvent::StageResolved(
+ linked_unresolved_stage.stage_id,
+ ));
+ }
+ } else {
+ return Err(BallistaError::Internal(format!(
+ "Error updating job {}: The stage {} as the output
link of stage {} should be unresolved",
+ job_id, link, stage_id
+ )));
+ }
+ } else {
+ return Err(BallistaError::Internal(format!(
+ "Error updating job {}: Invalid output link {} for
stage {}",
+ job_id, stage_id, link
+ )));
+ }
+ }
+ }
+
+ Ok(ret)
}
/// Return all currently running tasks along with the executor ID on which
they are assigned
@@ -322,24 +368,42 @@ impl ExecutionGraph {
self.stages
.iter()
.flat_map(|(_, stage)| {
- stage
- .running_tasks()
- .iter()
- .map(|(stage_id, partition, executor_id)| {
- (
- PartitionId {
- job_id: self.job_id.clone(),
- stage_id: *stage_id,
- partition_id: *partition,
- },
- executor_id.clone(),
- )
- })
- .collect::<Vec<(PartitionId, String)>>()
+ if let ExecutionStage::Running(stage) = stage {
+ stage
+ .running_tasks()
+ .into_iter()
+ .map(|(stage_id, partition_id, executor_id)| {
+ (
+ PartitionId {
+ job_id: self.job_id.clone(),
+ stage_id,
+ partition_id,
+ },
+ executor_id,
+ )
+ })
+ .collect::<Vec<(PartitionId, String)>>()
+ } else {
+ vec![]
+ }
})
.collect::<Vec<(PartitionId, String)>>()
}
+ /// Total number of tasks in this plan that are ready for scheduling
+ pub fn available_tasks(&self) -> usize {
+ self.stages
+ .iter()
+ .map(|(_, stage)| {
+ if let ExecutionStage::Running(stage) = stage {
+ stage.available_tasks()
+ } else {
+ 0
+ }
+ })
+ .sum()
+ }
+
/// Get next task that can be assigned to the given executor.
/// This method should only be called when the resulting task is
immediately
/// being launched as the status will be set to Running and it will not be
@@ -349,39 +413,183 @@ impl ExecutionGraph {
pub fn pop_next_task(&mut self, executor_id: &str) -> Result<Option<Task>>
{
let job_id = self.job_id.clone();
let session_id = self.session_id.clone();
- self.stages.iter_mut().find(|(_stage_id, stage)| {
- stage.resolved() && stage.available_tasks() > 0
+ let mut next_task = self.stages.iter_mut().find(|(_stage_id, stage)| {
+ if let ExecutionStage::Running(stage) = stage {
+ stage.available_tasks() > 0
+ } else {
+ false
+ }
}).map(|(stage_id, stage)| {
- let (partition_id,_) = stage
- .task_statuses
- .iter()
- .enumerate()
- .find(|(_partition,status)| status.is_none())
- .ok_or_else(|| {
- BallistaError::Internal(format!("Error getting next task
for job {}: Stage {} is ready but has no pending tasks", job_id, stage_id))
- })?;
-
- let partition = PartitionId {
- job_id,
- stage_id: *stage_id,
- partition_id
- };
+ if let ExecutionStage::Running(stage) = stage {
+ let (partition_id, _) = stage
+ .task_statuses
+ .iter()
+ .enumerate()
+ .find(|(_partition, status)| status.is_none())
+ .ok_or_else(|| {
+ BallistaError::Internal(format!("Error getting next
task for job {}: Stage {} is ready but has no pending tasks", job_id, stage_id))
+ })?;
+
+ let partition = PartitionId {
+ job_id,
+ stage_id: *stage_id,
+ partition_id,
+ };
+
+ // Set the status to Running
+ stage.task_statuses[partition_id] =
Some(task_status::Status::Running(RunningTask {
+ executor_id: executor_id.to_owned()
+ }));
+
+ Ok(Task {
+ session_id,
+ partition,
+ plan: stage.plan.clone(),
+ output_partitioning: stage.output_partitioning.clone(),
+ })
+ } else {
+ Err(BallistaError::General(format!("Stage {} is not a running
stage", stage_id)))
+ }
+ }).transpose()?;
+
+ // If no available tasks found in the running stage,
+ // try to find a resolved stage and convert it to the running stage
+ if next_task.is_none() {
+ if self.revive() {
+ next_task = self.pop_next_task(executor_id)?;
+ } else {
+ next_task = None;
+ }
+ }
- // Set the status to Running
- stage.task_statuses[partition_id] =
Some(task_status::Status::Running(RunningTask {
- executor_id: executor_id.to_owned()
- }));
+ Ok(next_task)
+ }
- Ok(Task {
- session_id,
- partition,
- plan: stage.plan.clone(),
- output_partitioning: stage.output_partitioning.clone()
- })
- }).transpose()
+ pub fn update_status(&mut self, status: JobStatus) {
+ self.status = status;
+ }
+
+ /// Reset the status for the given task. This should be called is a task
failed to
+ /// launch and it needs to be returned to the set of available tasks and be
+ /// re-scheduled.
+ pub fn reset_task_status(&mut self, task: Task) {
+ let stage_id = task.partition.stage_id;
+ let partition = task.partition.partition_id;
+
+ if let Some(ExecutionStage::Running(stage)) =
self.stages.get_mut(&stage_id) {
+ stage.task_statuses[partition] = None;
+ }
+ }
+
+ pub fn output_locations(&self) -> Vec<PartitionLocation> {
+ self.output_locations.clone()
+ }
+
+ /// Processing stage events for stage state changing
+ fn processing_stage_events(
+ &mut self,
+ events: Vec<StageEvent>,
+ ) -> Result<Option<QueryStageSchedulerEvent>> {
+ let mut has_resolved = false;
+ let mut job_err_msg = "".to_owned();
+ for event in events {
+ match event {
+ StageEvent::StageResolved(stage_id) => {
+ self.resolve_stage(stage_id)?;
+ has_resolved = true;
+ }
+ StageEvent::StageCompleted(stage_id) => {
+ self.complete_stage(stage_id);
+ }
+ StageEvent::StageFailed(stage_id, err_msg) => {
+ job_err_msg = format!("{}{}\n", job_err_msg, &err_msg);
+ self.fail_stage(stage_id, err_msg);
+ }
+ }
+ }
+
+ let event = if !job_err_msg.is_empty() {
+ // If this ExecutionGraph is complete, fail it
+ info!("Job {} is failed", self.job_id());
+ self.fail_job(job_err_msg);
+
+ Some(QueryStageSchedulerEvent::JobRunningFailed(
+ self.job_id.clone(),
+ ))
+ } else if self.complete() {
+ // If this ExecutionGraph is complete, finalize it
+ info!(
+ "Job {} is complete, finalizing output partitions",
+ self.job_id()
+ );
+ self.complete_job()?;
+ Some(QueryStageSchedulerEvent::JobFinished(self.job_id.clone()))
+ } else if has_resolved {
+ Some(QueryStageSchedulerEvent::JobUpdated(self.job_id.clone()))
+ } else {
+ None
+ };
+
+ Ok(event)
+ }
+
+ /// Convert unresolved stage to be resolved
+ fn resolve_stage(&mut self, stage_id: usize) -> Result<bool> {
+ if let Some(ExecutionStage::UnResolved(stage)) =
self.stages.remove(&stage_id) {
+ self.stages
+ .insert(stage_id,
ExecutionStage::Resolved(stage.to_resolved()?));
+ Ok(true)
+ } else {
+ warn!(
+ "Fail to find a unresolved stage {}/{} to resolve",
+ self.job_id(),
+ stage_id
+ );
+ Ok(false)
+ }
+ }
+
+ /// Convert running stage to be completed
+ fn complete_stage(&mut self, stage_id: usize) -> bool {
+ if let Some(ExecutionStage::Running(stage)) =
self.stages.remove(&stage_id) {
+ self.stages
+ .insert(stage_id,
ExecutionStage::Completed(stage.to_completed()));
+ true
+ } else {
+ warn!(
+ "Fail to find a running stage {}/{} to complete",
+ self.job_id(),
+ stage_id
+ );
+ false
+ }
+ }
+
+ /// Convert running stage to be failed
+ fn fail_stage(&mut self, stage_id: usize, err_msg: String) -> bool {
+ if let Some(ExecutionStage::Running(stage)) =
self.stages.remove(&stage_id) {
+ self.stages
+ .insert(stage_id,
ExecutionStage::Failed(stage.to_failed(err_msg)));
+ true
+ } else {
+ warn!(
+ "Fail to find a running stage {}/{} to fail",
+ self.job_id(),
+ stage_id
+ );
+ false
+ }
+ }
+
+ /// fail job with error message
+ pub fn fail_job(&mut self, error: String) {
+ self.status = JobStatus {
+ status: Some(job_status::Status::Failed(FailedJob { error })),
+ };
}
- pub fn finalize(&mut self) -> Result<()> {
+ /// finalize job as completed
+ fn complete_job(&mut self) -> Result<()> {
if !self.complete() {
return Err(BallistaError::Internal(format!(
"Attempt to finalize an incomplete job {}",
@@ -404,26 +612,6 @@ impl ExecutionGraph {
Ok(())
}
- pub fn update_status(&mut self, status: JobStatus) {
- self.status = status;
- }
-
- /// Reset the status for the given task. This should be called is a task
failed to
- /// launch and it needs to be returned to the set of available tasks and be
- /// re-scheduled.
- pub fn reset_task_status(&mut self, task: Task) {
- let stage_id = task.partition.stage_id;
- let partition = task.partition.partition_id;
-
- if let Some(stage) = self.stages.get_mut(&stage_id) {
- stage.task_statuses[partition] = None;
- }
- }
-
- pub fn output_locations(&self) -> Vec<PartitionLocation> {
- self.output_locations.clone()
- }
-
pub(crate) async fn decode_execution_graph<
T: 'static + AsLogicalPlan,
U: 'static + AsExecutionPlan,
@@ -433,87 +621,33 @@ impl ExecutionGraph {
session_ctx: &SessionContext,
) -> Result<ExecutionGraph> {
let mut stages: HashMap<usize, ExecutionStage> = HashMap::new();
- for stage in proto.stages {
- let plan_proto = U::try_decode(stage.plan.as_slice())?;
- let plan = plan_proto.try_into_physical_plan(
- session_ctx,
- session_ctx.runtime_env().as_ref(),
- codec.physical_extension_codec(),
- )?;
-
- let stage_id = stage.stage_id as usize;
- let partitions: usize = stage.partitions as usize;
-
- let mut task_statuses: Vec<Option<task_status::Status>> =
- vec![None; partitions];
-
- for status in stage.task_statuses {
- if let Some(task_id) = status.task_id.as_ref() {
- task_statuses[task_id.partition_id as usize] =
status.status
+ for graph_stage in proto.stages {
+ let stage_type = graph_stage.stage_type.expect("Unexpected empty
stage");
+
+ let execution_stage = match stage_type {
+ StageType::UnresolvedStage(stage) => {
+ let stage: UnresolvedStage =
+ UnresolvedStage::decode(stage, codec, session_ctx)?;
+ (stage.stage_id, ExecutionStage::UnResolved(stage))
+ }
+ StageType::ResolvedStage(stage) => {
+ let stage: ResolvedStage =
+ ResolvedStage::decode(stage, codec, session_ctx)?;
+ (stage.stage_id, ExecutionStage::Resolved(stage))
+ }
+ StageType::CompletedStage(stage) => {
+ let stage: CompletedStage =
+ CompletedStage::decode(stage, codec, session_ctx)?;
+ (stage.stage_id, ExecutionStage::Completed(stage))
+ }
+ StageType::FailedStage(stage) => {
+ let stage: FailedStage =
+ FailedStage::decode(stage, codec, session_ctx)?;
+ (stage.stage_id, ExecutionStage::Failed(stage))
}
- }
-
- let output_partitioning: Option<Partitioning> =
- parse_protobuf_hash_partitioning(
- stage.output_partitioning.as_ref(),
- session_ctx,
- plan.schema().as_ref(),
- )?;
-
- let mut inputs: HashMap<usize, StageOutput> = HashMap::new();
-
- for input in stage.inputs {
- let stage_id = input.stage_id as usize;
-
- let outputs = input
- .partition_locations
- .into_iter()
- .map(|loc| {
- let partition = loc.partition as usize;
- let locations = loc
- .partition_location
- .into_iter()
- .map(|l| l.try_into())
- .collect::<Result<Vec<_>>>()?;
- Ok((partition, locations))
- })
- .collect::<Result<HashMap<usize,
Vec<PartitionLocation>>>>()?;
-
- inputs.insert(
- stage_id,
- StageOutput {
- partition_locations: outputs,
- complete: input.complete,
- },
- );
- }
- let stage_metrics = if stage.stage_metrics.is_empty() {
- None
- } else {
- let ms = stage
- .stage_metrics
- .into_iter()
- .map(|m| m.try_into())
- .collect::<Result<Vec<_>>>()?;
- Some(ms)
};
- let execution_stage = ExecutionStage {
- stage_id: stage.stage_id as usize,
- partitions,
- output_partitioning,
- inputs,
- plan,
- task_statuses,
- output_links: stage
- .output_links
- .into_iter()
- .map(|l| l as usize)
- .collect(),
- resolved: stage.resolved,
- stage_metrics,
- };
- stages.insert(stage_id, execution_stage);
+ stages.insert(execution_stage.0, execution_stage.1);
}
let output_locations: Vec<PartitionLocation> = proto
@@ -523,6 +657,7 @@ impl ExecutionGraph {
.collect::<Result<Vec<_>>>()?;
Ok(ExecutionGraph {
+ scheduler_id: proto.scheduler_id,
job_id: proto.job_id,
session_id: proto.session_id,
status: proto.status.ok_or_else(|| {
@@ -536,6 +671,8 @@ impl ExecutionGraph {
})
}
+ /// Running stages will not be persisted so that will not be encoded.
+ /// Running stages will be convert back to the resolved stages to be
encoded and persisted
pub(crate) fn encode_execution_graph<
T: 'static + AsLogicalPlan,
U: 'static + AsExecutionPlan,
@@ -547,76 +684,27 @@ impl ExecutionGraph {
let stages = graph
.stages
- .into_iter()
- .map(|(stage_id, stage)| {
- let mut plan: Vec<u8> = vec![];
-
- U::try_from_physical_plan(stage.plan,
codec.physical_extension_codec())
- .and_then(|proto| proto.try_encode(&mut plan))?;
-
- let mut inputs: Vec<protobuf::GraphStageInput> = vec![];
-
- for (stage, output) in stage.inputs.into_iter() {
- inputs.push(protobuf::GraphStageInput {
- stage_id: stage as u32,
- partition_locations: output
- .partition_locations
- .into_iter()
- .map(|(partition, locations)| {
- Ok(protobuf::TaskInputPartitions {
- partition: partition as u32,
- partition_location: locations
- .into_iter()
- .map(|l| l.try_into())
- .collect::<Result<Vec<_>>>()?,
- })
- })
- .collect::<Result<Vec<_>>>()?,
- complete: output.complete,
- });
- }
-
- let task_statuses: Vec<protobuf::TaskStatus> = stage
- .task_statuses
- .into_iter()
- .enumerate()
- .filter_map(|(partition, status)| {
- status.map(|status| protobuf::TaskStatus {
- task_id: Some(protobuf::PartitionId {
- job_id: job_id.clone(),
- stage_id: stage_id as u32,
- partition_id: partition as u32,
- }),
- // task metrics should not persist.
- metrics: vec![],
- status: Some(status),
- })
- })
- .collect();
-
- let output_partitioning =
-
hash_partitioning_to_proto(stage.output_partitioning.as_ref())?;
-
- let stage_metrics = stage
- .stage_metrics
- .unwrap_or_default()
- .into_iter()
- .map(|m| m.try_into())
- .collect::<Result<Vec<_>>>()?;
+ .into_values()
+ .map(|stage| {
+ let stage_type = match stage {
+ ExecutionStage::UnResolved(stage) => {
+
StageType::UnresolvedStage(UnresolvedStage::encode(stage, codec)?)
+ }
+ ExecutionStage::Resolved(stage) => {
+ StageType::ResolvedStage(ResolvedStage::encode(stage,
codec)?)
+ }
+ ExecutionStage::Running(stage) => StageType::ResolvedStage(
+ ResolvedStage::encode(stage.to_resolved(), codec)?,
+ ),
+ ExecutionStage::Completed(stage) =>
StageType::CompletedStage(
+ CompletedStage::encode(job_id.clone(), stage, codec)?,
+ ),
+ ExecutionStage::Failed(stage) => StageType::FailedStage(
+ FailedStage::encode(job_id.clone(), stage, codec)?,
+ ),
+ };
Ok(protobuf::ExecutionGraphStage {
- stage_id: stage_id as u64,
- partitions: stage.partitions as u32,
- output_partitioning,
- inputs,
- plan,
- task_statuses,
- output_links: stage
- .output_links
- .into_iter()
- .map(|l| l as u32)
- .collect(),
- resolved: stage.resolved,
- stage_metrics,
+ stage_type: Some(stage_type),
})
})
.collect::<Result<Vec<_>>>()?;
@@ -634,6 +722,7 @@ impl ExecutionGraph {
stages,
output_partitions: graph.output_partitions as u64,
output_locations,
+ scheduler_id: graph.scheduler_id,
})
}
}
@@ -645,294 +734,9 @@ impl Debug for ExecutionGraph {
.iter()
.map(|(_, stage)| format!("{:?}", stage))
.collect::<Vec<String>>()
- .join("\n");
- write!(f, "ExecutionGraph[job_id={}, session_id={},
available_tasks={}, complete={}]\n{}", self.job_id, self.session_id,
self.available_tasks(), self.complete(), stages)
- }
-}
-
-/// This data structure collects the partition locations for an
`ExecutionStage`.
-/// Each `ExecutionStage` will hold a `StageOutput`s for each of its child
stages.
-/// When all tasks for the child stage are complete, it will mark the
`StageOutput`
-#[derive(Clone, Debug, Default)]
-struct StageOutput {
- /// Map from partition -> partition locations
- partition_locations: HashMap<usize, Vec<PartitionLocation>>,
- /// Flag indicating whether all tasks are complete
- complete: bool,
-}
-
-impl StageOutput {
- pub fn new() -> Self {
- Self {
- partition_locations: HashMap::new(),
- complete: false,
- }
- }
-
- /// Add a `PartitionLocation` to the `StageOutput`
- pub fn add_partition(&mut self, partition_location: PartitionLocation) {
- if let Some(parts) = self
- .partition_locations
- .get_mut(&partition_location.partition_id.partition_id)
- {
- parts.push(partition_location)
- } else {
- self.partition_locations.insert(
- partition_location.partition_id.partition_id,
- vec![partition_location],
- );
- }
- }
-
- pub fn is_complete(&self) -> bool {
- self.complete
- }
-}
-
-/// A stage in the ExecutionGraph.
-///
-/// This represents a set of tasks (one per each `partition`) which can
-/// be executed concurrently.
-#[derive(Clone)]
-struct ExecutionStage {
- /// Stage ID
- stage_id: usize,
- /// Total number of output partitions for this stage.
- /// This stage will produce on task for partition.
- partitions: usize,
- /// Output partitioning for this stage.
- output_partitioning: Option<Partitioning>,
- /// Represents the outputs from this stage's child stages.
- /// This stage can only be resolved an executed once all child stages are
completed.
- inputs: HashMap<usize, StageOutput>,
- // `ExecutionPlan` for this stage
- plan: Arc<dyn ExecutionPlan>,
- /// Status of each already scheduled task. If status is None, the
partition has not yet been scheduled
- task_statuses: Vec<Option<task_status::Status>>,
- /// Stage ID of the stage that will take this stages outputs as inputs.
- /// If `output_links` is empty then this the final stage in the
`ExecutionGraph`
- output_links: Vec<usize>,
- /// Flag indicating whether all input partitions have been resolved and
the plan
- /// has UnresovledShuffleExec operators resolved to ShuffleReadExec
operators.
- resolved: bool,
- /// Combined metrics of the already finished tasks in the stage, If it is
None, no task is finished yet.
- stage_metrics: Option<Vec<MetricsSet>>,
-}
-
-impl Debug for ExecutionStage {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
- let scheduled_tasks = self.task_statuses.iter().filter(|t|
t.is_some()).count();
-
- write!(
- f,
- "Stage[id={}, partitions={:?}, children={}, completed_tasks={},
resolved={}, scheduled_tasks={}, available_tasks={}]\nInputs{:?}\n\n{}",
- self.stage_id,
- self.partitions,
- self.inputs.len(),
- self.completed_tasks(),
- self.resolved,
- scheduled_tasks,
- self.available_tasks(),
- self.inputs,
- plan
- )
- }
-}
-
-impl ExecutionStage {
- pub fn new(
- stage_id: usize,
- plan: Arc<dyn ExecutionPlan>,
- output_partitioning: Option<Partitioning>,
- output_links: Vec<usize>,
- child_stages: Vec<usize>,
- ) -> Self {
- let num_tasks = plan.output_partitioning().partition_count();
-
- let resolved = child_stages.is_empty();
-
- let mut inputs: HashMap<usize, StageOutput> = HashMap::new();
-
- for input_stage_id in &child_stages {
- inputs.insert(*input_stage_id, StageOutput::new());
- }
-
- Self {
- stage_id,
- partitions: num_tasks,
- output_partitioning,
- inputs,
- plan,
- task_statuses: vec![None; num_tasks],
- output_links,
- resolved,
- stage_metrics: None,
- }
- }
-
- /// Returns a vector of currently running tasks in this stage
- pub fn running_tasks(&self) -> Vec<(usize, usize, String)> {
- if self.resolved {
- self.task_statuses
- .iter()
- .enumerate()
- .filter_map(|(partition, status)| match status {
- Some(task_status::Status::Running(RunningTask {
executor_id })) => {
- Some((self.stage_id, partition, executor_id.clone()))
- }
- _ => None,
- })
- .collect()
- } else {
- vec![]
- }
- }
-
- /// Returns true if all inputs are complete and we can resolve all
- /// UnresolvedShuffleExec operators to ShuffleReadExec
- pub fn resolvable(&self) -> bool {
- self.inputs.iter().all(|(_, outputs)| outputs.is_complete())
- }
-
- /// Returns `true` if all tasks for this stage are complete
- pub fn complete(&self) -> bool {
- self.task_statuses
- .iter()
- .all(|status| matches!(status,
Some(task_status::Status::Completed(_))))
- }
-
- /// Returns the number of tasks
- pub fn completed_tasks(&self) -> usize {
- self.task_statuses
- .iter()
- .filter(|status| matches!(status,
Some(task_status::Status::Completed(_))))
- .count()
- }
-
- /// Marks the input stage ID as complete.
- pub fn complete_input(&mut self, stage_id: usize) {
- if let Some(input) = self.inputs.get_mut(&stage_id) {
- input.complete = true;
- }
- }
-
- /// Returns true if the stage plan has all UnresolvedShuffleExec operators
resolved to
- /// ShuffleReadExec
- pub fn resolved(&self) -> bool {
- self.resolved
- }
-
- /// Returns the number of tasks in this stage which are available for
scheduling.
- /// If the stage is not yet resolved, then this will return `0`, otherwise
it will
- /// return the number of tasks where the task status is not yet set.
- pub fn available_tasks(&self) -> usize {
- if self.resolved {
- self.task_statuses.iter().filter(|s| s.is_none()).count()
- } else {
- 0
- }
- }
-
- /// Resolve any UnresolvedShuffleExec operators within this stage's plan
- pub fn resolve_shuffles(&mut self) -> Result<()> {
- println!("Resolving shuffles\n{:?}", self);
- if self.resolved {
- // If this stage has no input shuffles, then it is already resolved
- Ok(())
- } else {
- let input_locations = self
- .inputs
- .iter()
- .map(|(stage, outputs)| (*stage,
outputs.partition_locations.clone()))
- .collect();
- // Otherwise, rewrite the plan to replace UnresolvedShuffleExec
with ShuffleReadExec
- let new_plan = crate::planner::remove_unresolved_shuffles(
- self.plan.clone(),
- &input_locations,
- )?;
- self.plan = new_plan;
- self.resolved = true;
- Ok(())
- }
- }
-
- /// Update the status for task partition
- pub fn update_task_status(&mut self, partition: usize, status:
task_status::Status) {
- debug!("Updating task status for partition {}", partition);
- self.task_statuses[partition] = Some(status);
- }
-
- /// update and combine the task metrics to the stage metrics
- pub fn update_task_metrics(
- &mut self,
- partition: usize,
- metrics: Vec<OperatorMetricsSet>,
- ) -> Result<()> {
- if let Some(combined_metrics) = &mut self.stage_metrics {
- if metrics.len() != combined_metrics.len() {
- return Err(BallistaError::Internal(format!("Error updating
task metrics to stage {}, task metrics array size {} does not equal \
- with the stage metrics array size {} for task {}",
self.stage_id, metrics.len(), combined_metrics.len(), partition)));
- }
- let metrics_values_array = metrics
- .into_iter()
- .map(|ms| {
- ms.metrics
- .into_iter()
- .map(|m| m.try_into())
- .collect::<Result<Vec<_>>>()
- })
- .collect::<Result<Vec<_>>>()?;
-
- let new_metrics_set = combined_metrics
- .iter_mut()
- .zip(metrics_values_array)
- .map(|(first, second)| {
- Self::combine_metrics_set(first, second, partition)
- })
- .collect();
- self.stage_metrics = Some(new_metrics_set)
- } else {
- let new_metrics_set = metrics
- .into_iter()
- .map(|ms| ms.try_into())
- .collect::<Result<Vec<_>>>()?;
- if !new_metrics_set.is_empty() {
- self.stage_metrics = Some(new_metrics_set)
- }
- }
- Ok(())
- }
-
- pub fn combine_metrics_set(
- first: &mut MetricsSet,
- second: Vec<MetricValue>,
- partition: usize,
- ) -> MetricsSet {
- for metric_value in second {
- // TODO recheck the lable logic
- let new_metric = Arc::new(Metric::new(metric_value,
Some(partition)));
- first.push(new_metric);
- }
- first.aggregate_by_partition()
- }
-
- /// Add input partitions published from an input stage.
- pub fn add_input_partitions(
- &mut self,
- stage_id: usize,
- _partition_id: usize,
- locations: Vec<PartitionLocation>,
- ) -> Result<()> {
- if let Some(stage_inputs) = self.inputs.get_mut(&stage_id) {
- for partition in locations {
- stage_inputs.add_partition(partition);
- }
- } else {
- return Err(BallistaError::Internal(format!("Error adding input
partitions to stage {}, {} is not a valid child stage ID", self.stage_id,
stage_id)));
- }
-
- Ok(())
+ .join("");
+ write!(f, "ExecutionGraph[job_id={}, session_id={},
available_tasks={}, complete={}]\n{}",
+ self.job_id, self.session_id, self.available_tasks(),
self.complete(), stages)
}
}
@@ -980,16 +784,23 @@ impl ExecutionStageBuilder {
.remove(&stage_id)
.unwrap_or_default();
- execution_stages.insert(
- stage_id,
- ExecutionStage::new(
+ let stage = if child_stages.is_empty() {
+ ExecutionStage::Resolved(ResolvedStage::new(
+ stage_id,
+ stage,
+ partitioning,
+ output_links,
+ ))
+ } else {
+ ExecutionStage::UnResolved(UnresolvedStage::new(
stage_id,
stage,
partitioning,
output_links,
child_stages,
- ),
- );
+ ))
+ };
+ execution_stages.insert(stage_id, stage);
}
Ok(execution_stages)
@@ -1032,6 +843,38 @@ impl ExecutionPlanVisitor for ExecutionStageBuilder {
}
}
+#[derive(Clone)]
+pub enum StageEvent {
+ StageResolved(usize),
+ StageCompleted(usize),
+ StageFailed(usize, String),
+}
+
+/// Represents the basic unit of work for the Ballista executor. Will execute
+/// one partition of one stage on one task slot.
+#[derive(Clone)]
+pub struct Task {
+ pub session_id: String,
+ pub partition: PartitionId,
+ pub plan: Arc<dyn ExecutionPlan>,
+ pub output_partitioning: Option<Partitioning>,
+}
+
+impl Debug for Task {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
+ write!(
+ f,
+ "Task[session_id: {}, job: {}, stage: {}, partition: {}]\n{}",
+ self.session_id,
+ self.partition.job_id,
+ self.partition.stage_id,
+ self.partition.partition_id,
+ plan
+ )
+ }
+}
+
fn partition_to_location(
job_id: &str,
stage_id: usize,
@@ -1059,19 +902,20 @@ fn partition_to_location(
#[cfg(test)]
mod test {
- use crate::state::execution_graph::ExecutionGraph;
- use ballista_core::error::Result;
- use ballista_core::serde::protobuf::{self, job_status, task_status};
- use ballista_core::serde::scheduler::{ExecutorMetadata,
ExecutorSpecification};
+ use std::sync::Arc;
+
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::logical_expr::{col, sum, Expr};
-
use datafusion::logical_plan::JoinType;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion::test_util::scan_empty;
- use std::sync::Arc;
+ use ballista_core::error::Result;
+ use ballista_core::serde::protobuf::{self, job_status, task_status};
+ use ballista_core::serde::scheduler::{ExecutorMetadata,
ExecutorSpecification};
+
+ use crate::state::execution_graph::ExecutionGraph;
#[tokio::test]
async fn test_drain_tasks() -> Result<()> {
@@ -1124,7 +968,6 @@ mod test {
let mut agg_graph = test_aggregation_plan(4).await;
drain_tasks(&mut agg_graph)?;
- agg_graph.finalize()?;
let status = agg_graph.status();
@@ -1214,7 +1057,7 @@ mod test {
println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
- ExecutionGraph::new("job", "session", plan).unwrap()
+ ExecutionGraph::new("localhost:50050", "job", "session", plan).unwrap()
}
async fn test_coalesce_plan(partition: usize) -> ExecutionGraph {
@@ -1237,7 +1080,7 @@ mod test {
let plan = ctx.create_physical_plan(&optimized_plan).await.unwrap();
- ExecutionGraph::new("job", "session", plan).unwrap()
+ ExecutionGraph::new("localhost:50050", "job", "session", plan).unwrap()
}
async fn test_join_plan(partition: usize) -> ExecutionGraph {
@@ -1278,7 +1121,8 @@ mod test {
println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
- let graph = ExecutionGraph::new("job", "session", plan).unwrap();
+ let graph =
+ ExecutionGraph::new("localhost:50050", "job", "session",
plan).unwrap();
println!("{:?}", graph);
@@ -1302,7 +1146,8 @@ mod test {
println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
- let graph = ExecutionGraph::new("job", "session", plan).unwrap();
+ let graph =
+ ExecutionGraph::new("localhost:50050", "job", "session",
plan).unwrap();
println!("{:?}", graph);
@@ -1326,7 +1171,8 @@ mod test {
println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
- let graph = ExecutionGraph::new("job", "session", plan).unwrap();
+ let graph =
+ ExecutionGraph::new("localhost:50050", "job", "session",
plan).unwrap();
println!("{:?}", graph);
diff --git
a/ballista/rust/scheduler/src/state/execution_graph/execution_stage.rs
b/ballista/rust/scheduler/src/state/execution_graph/execution_stage.rs
new file mode 100644
index 00000000..3e3aee00
--- /dev/null
+++ b/ballista/rust/scheduler/src/state/execution_graph/execution_stage.rs
@@ -0,0 +1,928 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::convert::TryInto;
+use std::fmt::{Debug, Formatter};
+use std::sync::Arc;
+
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use datafusion::physical_plan::metrics::{MetricValue, MetricsSet};
+use datafusion::physical_plan::{ExecutionPlan, Metric, Partitioning};
+use datafusion::prelude::SessionContext;
+use datafusion_proto::logical_plan::AsLogicalPlan;
+use log::{debug, warn};
+
+use ballista_core::error::{BallistaError, Result};
+use
ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
+use ballista_core::serde::protobuf::{self, OperatorMetricsSet};
+use ballista_core::serde::protobuf::{task_status, RunningTask};
+use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
+use ballista_core::serde::scheduler::PartitionLocation;
+use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
+
+use crate::display::DisplayableBallistaExecutionPlan;
+
+/// A stage in the ExecutionGraph,
+/// represents a set of tasks (one per each `partition`) which can be executed
concurrently.
+/// For a stage, there are five states. And the state machine is as follows:
+///
+/// UnResolvedStage FailedStage
+/// ↓ ↙ ↑
+/// ResolvedStage → RunningStage
+/// ↓
+/// CompletedStage
+#[derive(Clone)]
+pub(super) enum ExecutionStage {
+ UnResolved(UnresolvedStage),
+ Resolved(ResolvedStage),
+ Running(RunningStage),
+ Completed(CompletedStage),
+ Failed(FailedStage),
+}
+
+impl Debug for ExecutionStage {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ match self {
+ ExecutionStage::UnResolved(unresolved_stage) =>
unresolved_stage.fmt(f),
+ ExecutionStage::Resolved(resolved_stage) => resolved_stage.fmt(f),
+ ExecutionStage::Running(running_stage) => running_stage.fmt(f),
+ ExecutionStage::Completed(completed_stage) =>
completed_stage.fmt(f),
+ ExecutionStage::Failed(failed_stage) => failed_stage.fmt(f),
+ }
+ }
+}
+
+/// For a stage whose input stages are not all completed, we say it's a
unresolved stage
+#[derive(Clone)]
+pub(super) struct UnresolvedStage {
+ /// Stage ID
+ pub(super) stage_id: usize,
+ /// Output partitioning for this stage.
+ pub(super) output_partitioning: Option<Partitioning>,
+ /// Stage ID of the stage that will take this stages outputs as inputs.
+ /// If `output_links` is empty then this the final stage in the
`ExecutionGraph`
+ pub(super) output_links: Vec<usize>,
+ /// Represents the outputs from this stage's child stages.
+ /// This stage can only be resolved an executed once all child stages are
completed.
+ pub(super) inputs: HashMap<usize, StageOutput>,
+ /// `ExecutionPlan` for this stage
+ pub(super) plan: Arc<dyn ExecutionPlan>,
+}
+
+/// For a stage, if it has no inputs or all of its input stages are completed,
+/// then we call it as a resolved stage
+#[derive(Clone)]
+pub(super) struct ResolvedStage {
+ /// Stage ID
+ pub(super) stage_id: usize,
+ /// Total number of output partitions for this stage.
+ /// This stage will produce on task for partition.
+ pub(super) partitions: usize,
+ /// Output partitioning for this stage.
+ pub(super) output_partitioning: Option<Partitioning>,
+ /// Stage ID of the stage that will take this stages outputs as inputs.
+ /// If `output_links` is empty then this the final stage in the
`ExecutionGraph`
+ pub(super) output_links: Vec<usize>,
+ /// `ExecutionPlan` for this stage
+ pub(super) plan: Arc<dyn ExecutionPlan>,
+}
+
+/// Different from the resolved stage, a running stage will
+/// 1. save the execution plan as encoded one to avoid serialization cost for
creating task definition
+/// 2. manage the task statuses
+/// 3. manage the stage-level combined metrics
+/// Running stages will only be maintained in memory and will not saved to the
backend storage
+#[derive(Clone)]
+pub(super) struct RunningStage {
+ /// Stage ID
+ pub(super) stage_id: usize,
+ /// Total number of output partitions for this stage.
+ /// This stage will produce on task for partition.
+ pub(super) partitions: usize,
+ /// Output partitioning for this stage.
+ pub(super) output_partitioning: Option<Partitioning>,
+ /// Stage ID of the stage that will take this stages outputs as inputs.
+ /// If `output_links` is empty then this the final stage in the
`ExecutionGraph`
+ pub(super) output_links: Vec<usize>,
+ /// `ExecutionPlan` for this stage
+ pub(super) plan: Arc<dyn ExecutionPlan>,
+ /// Status of each already scheduled task. If status is None, the
partition has not yet been scheduled
+ pub(super) task_statuses: Vec<Option<task_status::Status>>,
+ /// Combined metrics of the already finished tasks in the stage, If it is
None, no task is finished yet.
+ pub(super) stage_metrics: Option<Vec<MetricsSet>>,
+}
+
+/// If a stage finishes successfully, its task statuses and metrics will be
finalized
+#[derive(Clone)]
+pub(super) struct CompletedStage {
+ /// Stage ID
+ pub(super) stage_id: usize,
+ /// Total number of output partitions for this stage.
+ /// This stage will produce on task for partition.
+ pub(super) partitions: usize,
+ /// Output partitioning for this stage.
+ pub(super) output_partitioning: Option<Partitioning>,
+ /// Stage ID of the stage that will take this stages outputs as inputs.
+ /// If `output_links` is empty then this the final stage in the
`ExecutionGraph`
+ pub(super) output_links: Vec<usize>,
+ /// `ExecutionPlan` for this stage
+ pub(super) plan: Arc<dyn ExecutionPlan>,
+ /// Status of each already scheduled task.
+ pub(super) task_statuses: Vec<task_status::Status>,
+ /// Combined metrics of the already finished tasks in the stage.
+ pub(super) stage_metrics: Vec<MetricsSet>,
+}
+
+/// If a stage fails, it will be with an error message
+#[derive(Clone)]
+pub(super) struct FailedStage {
+ /// Stage ID
+ pub(super) stage_id: usize,
+ /// Total number of output partitions for this stage.
+ /// This stage will produce on task for partition.
+ pub(super) partitions: usize,
+ /// Output partitioning for this stage.
+ pub(super) output_partitioning: Option<Partitioning>,
+ /// Stage ID of the stage that will take this stages outputs as inputs.
+ /// If `output_links` is empty then this the final stage in the
`ExecutionGraph`
+ pub(super) output_links: Vec<usize>,
+ /// `ExecutionPlan` for this stage
+ pub(super) plan: Arc<dyn ExecutionPlan>,
+ /// Status of each already scheduled task. If status is None, the
partition has not yet been scheduled
+ pub(super) task_statuses: Vec<Option<task_status::Status>>,
+ /// Combined metrics of the already finished tasks in the stage, If it is
None, no task is finished yet.
+ pub(super) stage_metrics: Option<Vec<MetricsSet>>,
+ /// Error message
+ pub(super) error_message: String,
+}
+
+impl UnresolvedStage {
+ pub(super) fn new(
+ stage_id: usize,
+ plan: Arc<dyn ExecutionPlan>,
+ output_partitioning: Option<Partitioning>,
+ output_links: Vec<usize>,
+ child_stages: Vec<usize>,
+ ) -> Self {
+ let mut inputs: HashMap<usize, StageOutput> = HashMap::new();
+ for input_stage_id in child_stages {
+ inputs.insert(input_stage_id, StageOutput::new());
+ }
+
+ Self {
+ stage_id,
+ output_partitioning,
+ output_links,
+ inputs,
+ plan,
+ }
+ }
+
+ /// Add input partitions published from an input stage.
+ pub(super) fn add_input_partitions(
+ &mut self,
+ stage_id: usize,
+ locations: Vec<PartitionLocation>,
+ ) -> Result<()> {
+ if let Some(stage_inputs) = self.inputs.get_mut(&stage_id) {
+ for partition in locations {
+ stage_inputs.add_partition(partition);
+ }
+ } else {
+ return Err(BallistaError::Internal(format!("Error adding input
partitions to stage {}, {} is not a valid child stage ID", self.stage_id,
stage_id)));
+ }
+
+ Ok(())
+ }
+
+ /// Marks the input stage ID as complete.
+ pub(super) fn complete_input(&mut self, stage_id: usize) {
+ if let Some(input) = self.inputs.get_mut(&stage_id) {
+ input.complete = true;
+ }
+ }
+
+ /// Returns true if all inputs are complete and we can resolve all
+ /// UnresolvedShuffleExec operators to ShuffleReadExec
+ pub(super) fn resolvable(&self) -> bool {
+ self.inputs.iter().all(|(_, input)| input.is_complete())
+ }
+
+ /// Change to the resolved state
+ pub(super) fn to_resolved(&self) -> Result<ResolvedStage> {
+ let input_locations = self
+ .inputs
+ .iter()
+ .map(|(stage, input)| (*stage, input.partition_locations.clone()))
+ .collect();
+ let plan = crate::planner::remove_unresolved_shuffles(
+ self.plan.clone(),
+ &input_locations,
+ )?;
+ Ok(ResolvedStage::new(
+ self.stage_id,
+ plan,
+ self.output_partitioning.clone(),
+ self.output_links.clone(),
+ ))
+ }
+
+ pub(super) fn decode<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>(
+ stage: protobuf::UnResolvedStage,
+ codec: &BallistaCodec<T, U>,
+ session_ctx: &SessionContext,
+ ) -> Result<UnresolvedStage> {
+ let plan_proto = U::try_decode(&stage.plan)?;
+ let plan = plan_proto.try_into_physical_plan(
+ session_ctx,
+ session_ctx.runtime_env().as_ref(),
+ codec.physical_extension_codec(),
+ )?;
+
+ let output_partitioning: Option<Partitioning> =
parse_protobuf_hash_partitioning(
+ stage.output_partitioning.as_ref(),
+ session_ctx,
+ plan.schema().as_ref(),
+ )?;
+
+ let mut inputs: HashMap<usize, StageOutput> = HashMap::new();
+ for input in stage.inputs {
+ let stage_id = input.stage_id as usize;
+
+ let outputs = input
+ .partition_locations
+ .into_iter()
+ .map(|loc| {
+ let partition = loc.partition as usize;
+ let locations = loc
+ .partition_location
+ .into_iter()
+ .map(|l| l.try_into())
+ .collect::<Result<Vec<_>>>()?;
+ Ok((partition, locations))
+ })
+ .collect::<Result<HashMap<usize, Vec<PartitionLocation>>>>()?;
+
+ inputs.insert(
+ stage_id,
+ StageOutput {
+ partition_locations: outputs,
+ complete: input.complete,
+ },
+ );
+ }
+
+ Ok(UnresolvedStage {
+ stage_id: stage.stage_id as usize,
+ output_partitioning,
+ output_links: stage.output_links.into_iter().map(|l| l as
usize).collect(),
+ plan,
+ inputs,
+ })
+ }
+
+ pub(super) fn encode<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>(
+ stage: UnresolvedStage,
+ codec: &BallistaCodec<T, U>,
+ ) -> Result<protobuf::UnResolvedStage> {
+ let mut plan: Vec<u8> = vec![];
+ U::try_from_physical_plan(stage.plan, codec.physical_extension_codec())
+ .and_then(|proto| proto.try_encode(&mut plan))?;
+
+ let mut inputs: Vec<protobuf::GraphStageInput> = vec![];
+ for (stage_id, output) in stage.inputs.into_iter() {
+ inputs.push(protobuf::GraphStageInput {
+ stage_id: stage_id as u32,
+ partition_locations: output
+ .partition_locations
+ .into_iter()
+ .map(|(partition, locations)| {
+ Ok(protobuf::TaskInputPartitions {
+ partition: partition as u32,
+ partition_location: locations
+ .into_iter()
+ .map(|l| l.try_into())
+ .collect::<Result<Vec<_>>>()?,
+ })
+ })
+ .collect::<Result<Vec<_>>>()?,
+ complete: output.complete,
+ });
+ }
+
+ let output_partitioning =
+ hash_partitioning_to_proto(stage.output_partitioning.as_ref())?;
+
+ Ok(protobuf::UnResolvedStage {
+ stage_id: stage.stage_id as u64,
+ output_partitioning,
+ output_links: stage.output_links.into_iter().map(|l| l as
u32).collect(),
+ inputs,
+ plan,
+ })
+ }
+}
+
+impl Debug for UnresolvedStage {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
+
+ write!(
+ f,
+ "=========UnResolvedStage[id={},
children={}]=========\nInputs{:?}\n{}",
+ self.stage_id,
+ self.inputs.len(),
+ self.inputs,
+ plan
+ )
+ }
+}
+
+impl ResolvedStage {
+ pub(super) fn new(
+ stage_id: usize,
+ plan: Arc<dyn ExecutionPlan>,
+ output_partitioning: Option<Partitioning>,
+ output_links: Vec<usize>,
+ ) -> Self {
+ let partitions = plan.output_partitioning().partition_count();
+
+ Self {
+ stage_id,
+ partitions,
+ output_partitioning,
+ output_links,
+ plan,
+ }
+ }
+
+ /// Change to the running state
+ pub(super) fn to_running(&self) -> RunningStage {
+ RunningStage::new(
+ self.stage_id,
+ self.plan.clone(),
+ self.partitions,
+ self.output_partitioning.clone(),
+ self.output_links.clone(),
+ )
+ }
+
+ pub(super) fn decode<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>(
+ stage: protobuf::ResolvedStage,
+ codec: &BallistaCodec<T, U>,
+ session_ctx: &SessionContext,
+ ) -> Result<ResolvedStage> {
+ let plan_proto = U::try_decode(&stage.plan)?;
+ let plan = plan_proto.try_into_physical_plan(
+ session_ctx,
+ session_ctx.runtime_env().as_ref(),
+ codec.physical_extension_codec(),
+ )?;
+
+ let output_partitioning: Option<Partitioning> =
parse_protobuf_hash_partitioning(
+ stage.output_partitioning.as_ref(),
+ session_ctx,
+ plan.schema().as_ref(),
+ )?;
+
+ Ok(ResolvedStage {
+ stage_id: stage.stage_id as usize,
+ partitions: stage.partitions as usize,
+ output_partitioning,
+ output_links: stage.output_links.into_iter().map(|l| l as
usize).collect(),
+ plan,
+ })
+ }
+
+ pub(super) fn encode<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>(
+ stage: ResolvedStage,
+ codec: &BallistaCodec<T, U>,
+ ) -> Result<protobuf::ResolvedStage> {
+ let mut plan: Vec<u8> = vec![];
+ U::try_from_physical_plan(stage.plan, codec.physical_extension_codec())
+ .and_then(|proto| proto.try_encode(&mut plan))?;
+
+ let output_partitioning =
+ hash_partitioning_to_proto(stage.output_partitioning.as_ref())?;
+
+ Ok(protobuf::ResolvedStage {
+ stage_id: stage.stage_id as u64,
+ partitions: stage.partitions as u32,
+ output_partitioning,
+ output_links: stage.output_links.into_iter().map(|l| l as
u32).collect(),
+ plan,
+ })
+ }
+}
+
+impl Debug for ResolvedStage {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
+
+ write!(
+ f,
+ "=========ResolvedStage[id={}, partitions={}]=========\n{}",
+ self.stage_id, self.partitions, plan
+ )
+ }
+}
+
+impl RunningStage {
+ pub(super) fn new(
+ stage_id: usize,
+ plan: Arc<dyn ExecutionPlan>,
+ partitions: usize,
+ output_partitioning: Option<Partitioning>,
+ output_links: Vec<usize>,
+ ) -> Self {
+ Self {
+ stage_id,
+ partitions,
+ output_partitioning,
+ output_links,
+ plan,
+ task_statuses: vec![None; partitions],
+ stage_metrics: None,
+ }
+ }
+
+ pub(super) fn to_completed(&self) -> CompletedStage {
+ let task_statuses = self
+ .task_statuses
+ .iter()
+ .enumerate()
+ .map(|(task_id, status)| {
+ status.clone().unwrap_or_else(|| {
+ panic!(
+ "The status of task {}/{} should not be none",
+ self.stage_id, task_id
+ )
+ })
+ })
+ .collect();
+ let stage_metrics = self.stage_metrics.clone().unwrap_or_else(|| {
+ warn!("The metrics for stage {} should not be none",
self.stage_id);
+ vec![]
+ });
+ CompletedStage {
+ stage_id: self.stage_id,
+ partitions: self.partitions,
+ output_partitioning: self.output_partitioning.clone(),
+ output_links: self.output_links.clone(),
+ plan: self.plan.clone(),
+ task_statuses,
+ stage_metrics,
+ }
+ }
+
+ pub(super) fn to_failed(&self, error_message: String) -> FailedStage {
+ FailedStage {
+ stage_id: self.stage_id,
+ partitions: self.partitions,
+ output_partitioning: self.output_partitioning.clone(),
+ output_links: self.output_links.clone(),
+ plan: self.plan.clone(),
+ task_statuses: self.task_statuses.clone(),
+ stage_metrics: self.stage_metrics.clone(),
+ error_message,
+ }
+ }
+
+ pub(super) fn to_resolved(&self) -> ResolvedStage {
+ ResolvedStage::new(
+ self.stage_id,
+ self.plan.clone(),
+ self.output_partitioning.clone(),
+ self.output_links.clone(),
+ )
+ }
+
+ /// Returns `true` if all tasks for this stage are complete
+ pub(super) fn is_completed(&self) -> bool {
+ self.task_statuses
+ .iter()
+ .all(|status| matches!(status,
Some(task_status::Status::Completed(_))))
+ }
+
+ /// Returns the number of completed tasks
+ pub(super) fn completed_tasks(&self) -> usize {
+ self.task_statuses
+ .iter()
+ .filter(|status| matches!(status,
Some(task_status::Status::Completed(_))))
+ .count()
+ }
+
+ /// Returns the number of scheduled tasks
+ pub(super) fn scheduled_tasks(&self) -> usize {
+ self.task_statuses.iter().filter(|s| s.is_some()).count()
+ }
+
+ /// Returns a vector of currently running tasks in this stage
+ pub(super) fn running_tasks(&self) -> Vec<(usize, usize, String)> {
+ self.task_statuses
+ .iter()
+ .enumerate()
+ .filter_map(|(partition, status)| match status {
+ Some(task_status::Status::Running(RunningTask { executor_id
})) => {
+ Some((self.stage_id, partition, executor_id.clone()))
+ }
+ _ => None,
+ })
+ .collect()
+ }
+
+ /// Returns the number of tasks in this stage which are available for
scheduling.
+ /// If the stage is not yet resolved, then this will return `0`, otherwise
it will
+ /// return the number of tasks where the task status is not yet set.
+ pub(super) fn available_tasks(&self) -> usize {
+ self.task_statuses.iter().filter(|s| s.is_none()).count()
+ }
+
+ /// Update the status for task partition
+ pub(super) fn update_task_status(
+ &mut self,
+ partition_id: usize,
+ status: task_status::Status,
+ ) {
+ debug!("Updating task status for partition {}", partition_id);
+ self.task_statuses[partition_id] = Some(status);
+ }
+
+ /// update and combine the task metrics to the stage metrics
+ pub(super) fn update_task_metrics(
+ &mut self,
+ partition: usize,
+ metrics: Vec<OperatorMetricsSet>,
+ ) -> Result<()> {
+ if let Some(combined_metrics) = &mut self.stage_metrics {
+ if metrics.len() != combined_metrics.len() {
+ return Err(BallistaError::Internal(format!("Error updating
task metrics to stage {}, task metrics array size {} does not equal \
+ with the stage metrics array size {} for task {}",
self.stage_id, metrics.len(), combined_metrics.len(), partition)));
+ }
+ let metrics_values_array = metrics
+ .into_iter()
+ .map(|ms| {
+ ms.metrics
+ .into_iter()
+ .map(|m| m.try_into())
+ .collect::<Result<Vec<_>>>()
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ let new_metrics_set = combined_metrics
+ .iter_mut()
+ .zip(metrics_values_array)
+ .map(|(first, second)| {
+ Self::combine_metrics_set(first, second, partition)
+ })
+ .collect();
+ self.stage_metrics = Some(new_metrics_set)
+ } else {
+ let new_metrics_set = metrics
+ .into_iter()
+ .map(|ms| ms.try_into())
+ .collect::<Result<Vec<_>>>()?;
+ if !new_metrics_set.is_empty() {
+ self.stage_metrics = Some(new_metrics_set)
+ }
+ }
+ Ok(())
+ }
+
+ pub(super) fn combine_metrics_set(
+ first: &mut MetricsSet,
+ second: Vec<MetricValue>,
+ partition: usize,
+ ) -> MetricsSet {
+ for metric_value in second {
+ // TODO recheck the lable logic
+ let new_metric = Arc::new(Metric::new(metric_value,
Some(partition)));
+ first.push(new_metric);
+ }
+ first.aggregate_by_partition()
+ }
+}
+
+impl Debug for RunningStage {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
+
+ write!(
+ f,
+ "=========RunningStage[id={}, partitions={}, completed_tasks={},
scheduled_tasks={}, available_tasks={}]=========\n{}",
+ self.stage_id,
+ self.partitions,
+ self.completed_tasks(),
+ self.scheduled_tasks(),
+ self.available_tasks(),
+ plan
+ )
+ }
+}
+
+impl CompletedStage {
+ pub(super) fn decode<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>(
+ stage: protobuf::CompletedStage,
+ codec: &BallistaCodec<T, U>,
+ session_ctx: &SessionContext,
+ ) -> Result<CompletedStage> {
+ let plan_proto = U::try_decode(&stage.plan)?;
+ let plan = plan_proto.try_into_physical_plan(
+ session_ctx,
+ session_ctx.runtime_env().as_ref(),
+ codec.physical_extension_codec(),
+ )?;
+
+ let output_partitioning: Option<Partitioning> =
parse_protobuf_hash_partitioning(
+ stage.output_partitioning.as_ref(),
+ session_ctx,
+ plan.schema().as_ref(),
+ )?;
+
+ let task_statuses = stage
+ .task_statuses
+ .into_iter()
+ .enumerate()
+ .map(|(task_id, status)| {
+ status.status.unwrap_or_else(|| {
+ panic!("Status for task {} should not be none", task_id)
+ })
+ })
+ .collect();
+
+ let stage_metrics = stage
+ .stage_metrics
+ .into_iter()
+ .map(|m| m.try_into())
+ .collect::<Result<Vec<_>>>()?;
+
+ Ok(CompletedStage {
+ stage_id: stage.stage_id as usize,
+ partitions: stage.partitions as usize,
+ output_partitioning,
+ output_links: stage.output_links.into_iter().map(|l| l as
usize).collect(),
+ plan,
+ task_statuses,
+ stage_metrics,
+ })
+ }
+
+ pub(super) fn encode<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>(
+ job_id: String,
+ stage: CompletedStage,
+ codec: &BallistaCodec<T, U>,
+ ) -> Result<protobuf::CompletedStage> {
+ let stage_id = stage.stage_id;
+
+ let mut plan: Vec<u8> = vec![];
+ U::try_from_physical_plan(stage.plan, codec.physical_extension_codec())
+ .and_then(|proto| proto.try_encode(&mut plan))?;
+
+ let output_partitioning =
+ hash_partitioning_to_proto(stage.output_partitioning.as_ref())?;
+
+ let task_statuses: Vec<protobuf::TaskStatus> = stage
+ .task_statuses
+ .into_iter()
+ .enumerate()
+ .map(|(partition, status)| {
+ protobuf::TaskStatus {
+ task_id: Some(protobuf::PartitionId {
+ job_id: job_id.clone(),
+ stage_id: stage_id as u32,
+ partition_id: partition as u32,
+ }),
+ // task metrics should not persist.
+ metrics: vec![],
+ status: Some(status),
+ }
+ })
+ .collect();
+
+ let stage_metrics = stage
+ .stage_metrics
+ .into_iter()
+ .map(|m| m.try_into())
+ .collect::<Result<Vec<_>>>()?;
+
+ Ok(protobuf::CompletedStage {
+ stage_id: stage_id as u64,
+ partitions: stage.partitions as u32,
+ output_partitioning,
+ output_links: stage.output_links.into_iter().map(|l| l as
u32).collect(),
+ plan,
+ task_statuses,
+ stage_metrics,
+ })
+ }
+}
+
+impl Debug for CompletedStage {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let plan = DisplayableBallistaExecutionPlan::new(
+ self.plan.as_ref(),
+ &self.stage_metrics,
+ )
+ .indent();
+
+ write!(
+ f,
+ "=========CompletedStage[id={}, partitions={}]=========\n{}",
+ self.stage_id, self.partitions, plan
+ )
+ }
+}
+
+impl FailedStage {
+ /// Returns the number of completed tasks
+ pub(super) fn completed_tasks(&self) -> usize {
+ self.task_statuses
+ .iter()
+ .filter(|status| matches!(status,
Some(task_status::Status::Completed(_))))
+ .count()
+ }
+
+ /// Returns the number of scheduled tasks
+ pub(super) fn scheduled_tasks(&self) -> usize {
+ self.task_statuses.iter().filter(|s| s.is_some()).count()
+ }
+
+ /// Returns the number of tasks in this stage which are available for
scheduling.
+ /// If the stage is not yet resolved, then this will return `0`, otherwise
it will
+ /// return the number of tasks where the task status is not yet set.
+ pub(super) fn available_tasks(&self) -> usize {
+ self.task_statuses.iter().filter(|s| s.is_none()).count()
+ }
+
+ pub(super) fn decode<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>(
+ stage: protobuf::FailedStage,
+ codec: &BallistaCodec<T, U>,
+ session_ctx: &SessionContext,
+ ) -> Result<FailedStage> {
+ let plan_proto = U::try_decode(&stage.plan)?;
+ let plan = plan_proto.try_into_physical_plan(
+ session_ctx,
+ session_ctx.runtime_env().as_ref(),
+ codec.physical_extension_codec(),
+ )?;
+
+ let output_partitioning: Option<Partitioning> =
parse_protobuf_hash_partitioning(
+ stage.output_partitioning.as_ref(),
+ session_ctx,
+ plan.schema().as_ref(),
+ )?;
+
+ let mut task_statuses: Vec<Option<task_status::Status>> =
+ vec![None; stage.partitions as usize];
+ for status in stage.task_statuses {
+ if let Some(task_id) = status.task_id.as_ref() {
+ task_statuses[task_id.partition_id as usize] = status.status
+ }
+ }
+
+ let stage_metrics = if stage.stage_metrics.is_empty() {
+ None
+ } else {
+ let ms = stage
+ .stage_metrics
+ .into_iter()
+ .map(|m| m.try_into())
+ .collect::<Result<Vec<_>>>()?;
+ Some(ms)
+ };
+
+ Ok(FailedStage {
+ stage_id: stage.stage_id as usize,
+ partitions: stage.partitions as usize,
+ output_partitioning,
+ output_links: stage.output_links.into_iter().map(|l| l as
usize).collect(),
+ plan,
+ task_statuses,
+ stage_metrics,
+ error_message: stage.error_message,
+ })
+ }
+
+ pub(super) fn encode<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>(
+ job_id: String,
+ stage: FailedStage,
+ codec: &BallistaCodec<T, U>,
+ ) -> Result<protobuf::FailedStage> {
+ let stage_id = stage.stage_id;
+
+ let mut plan: Vec<u8> = vec![];
+ U::try_from_physical_plan(stage.plan, codec.physical_extension_codec())
+ .and_then(|proto| proto.try_encode(&mut plan))?;
+
+ let output_partitioning =
+ hash_partitioning_to_proto(stage.output_partitioning.as_ref())?;
+
+ let task_statuses: Vec<protobuf::TaskStatus> = stage
+ .task_statuses
+ .into_iter()
+ .enumerate()
+ .filter_map(|(partition, status)| {
+ status.map(|status| protobuf::TaskStatus {
+ task_id: Some(protobuf::PartitionId {
+ job_id: job_id.clone(),
+ stage_id: stage_id as u32,
+ partition_id: partition as u32,
+ }),
+ // task metrics should not persist.
+ metrics: vec![],
+ status: Some(status),
+ })
+ })
+ .collect();
+
+ let stage_metrics = stage
+ .stage_metrics
+ .unwrap_or_default()
+ .into_iter()
+ .map(|m| m.try_into())
+ .collect::<Result<Vec<_>>>()?;
+
+ Ok(protobuf::FailedStage {
+ stage_id: stage_id as u64,
+ partitions: stage.partitions as u32,
+ output_partitioning,
+ output_links: stage.output_links.into_iter().map(|l| l as
u32).collect(),
+ plan,
+ task_statuses,
+ stage_metrics,
+ error_message: stage.error_message,
+ })
+ }
+}
+
+impl Debug for FailedStage {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
+
+ write!(
+ f,
+ "=========FailedStage[id={}, partitions={}, completed_tasks={},
scheduled_tasks={}, available_tasks={}, error_message={}]=========\n{}",
+ self.stage_id,
+ self.partitions,
+ self.completed_tasks(),
+ self.scheduled_tasks(),
+ self.available_tasks(),
+ self.error_message,
+ plan
+ )
+ }
+}
+
+/// This data structure collects the partition locations for an
`ExecutionStage`.
+/// Each `ExecutionStage` will hold a `StageOutput`s for each of its child
stages.
+/// When all tasks for the child stage are complete, it will mark the
`StageOutput`
+#[derive(Clone, Debug, Default)]
+pub(super) struct StageOutput {
+ /// Map from partition -> partition locations
+ partition_locations: HashMap<usize, Vec<PartitionLocation>>,
+ /// Flag indicating whether all tasks are complete
+ complete: bool,
+}
+
+impl StageOutput {
+ pub(super) fn new() -> Self {
+ Self {
+ partition_locations: HashMap::new(),
+ complete: false,
+ }
+ }
+
+ /// Add a `PartitionLocation` to the `StageOutput`
+ pub(super) fn add_partition(&mut self, partition_location:
PartitionLocation) {
+ if let Some(parts) = self
+ .partition_locations
+ .get_mut(&partition_location.partition_id.partition_id)
+ {
+ parts.push(partition_location)
+ } else {
+ self.partition_locations.insert(
+ partition_location.partition_id.partition_id,
+ vec![partition_location],
+ );
+ }
+ }
+
+ pub(super) fn is_complete(&self) -> bool {
+ self.complete
+ }
+}
diff --git a/ballista/rust/scheduler/src/state/mod.rs
b/ballista/rust/scheduler/src/state/mod.rs
index 12546a8e..5668c6b1 100644
--- a/ballista/rust/scheduler/src/state/mod.rs
+++ b/ballista/rust/scheduler/src/state/mod.rs
@@ -82,15 +82,29 @@ pub(super) struct SchedulerState<T: 'static +
AsLogicalPlan, U: 'static + AsExec
pub executor_manager: ExecutorManager,
pub task_manager: TaskManager<T, U>,
pub session_manager: SessionManager,
- _codec: BallistaCodec<T, U>,
+ pub codec: BallistaCodec<T, U>,
}
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
SchedulerState<T, U> {
+ #[cfg(test)]
+ pub fn new_with_default_scheduler_name(
+ config_client: Arc<dyn StateBackendClient>,
+ session_builder: SessionBuilder,
+ codec: BallistaCodec<T, U>,
+ ) -> Self {
+ SchedulerState::new(
+ config_client,
+ session_builder,
+ codec,
+ "localhost:50050".to_owned(),
+ )
+ }
+
pub fn new(
config_client: Arc<dyn StateBackendClient>,
- _namespace: String,
session_builder: SessionBuilder,
codec: BallistaCodec<T, U>,
+ scheduler_name: String,
) -> Self {
Self {
executor_manager: ExecutorManager::new(config_client.clone()),
@@ -98,9 +112,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
config_client.clone(),
session_builder,
codec.clone(),
+ scheduler_name,
),
session_manager: SessionManager::new(config_client,
session_builder),
- _codec: codec,
+ codec,
}
}
diff --git a/ballista/rust/scheduler/src/state/task_manager.rs
b/ballista/rust/scheduler/src/state/task_manager.rs
index 58c49a3b..480f9369 100644
--- a/ballista/rust/scheduler/src/state/task_manager.rs
+++ b/ballista/rust/scheduler/src/state/task_manager.rs
@@ -40,13 +40,14 @@ use datafusion_proto::logical_plan::AsLogicalPlan;
use log::{debug, error, info, warn};
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
-use std::collections::{HashMap, HashSet};
+use std::collections::HashMap;
use std::default::Default;
use std::sync::Arc;
use tokio::sync::RwLock;
use tonic::transport::Channel;
type ExecutorClients = Arc<RwLock<HashMap<String,
ExecutorGrpcClient<Channel>>>>;
+type ExecutionGraphCache = Arc<RwLock<HashMap<String,
Arc<RwLock<ExecutionGraph>>>>>;
#[derive(Clone)]
pub struct TaskManager<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> {
@@ -55,6 +56,9 @@ pub struct TaskManager<T: 'static + AsLogicalPlan, U: 'static
+ AsExecutionPlan>
clients: ExecutorClients,
session_builder: SessionBuilder,
codec: BallistaCodec<T, U>,
+ scheduler_id: String,
+ // Cache for active execution graphs curated by this scheduler
+ active_job_cache: ExecutionGraphCache,
}
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T,
U> {
@@ -62,38 +66,53 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
state: Arc<dyn StateBackendClient>,
session_builder: SessionBuilder,
codec: BallistaCodec<T, U>,
+ scheduler_id: String,
) -> Self {
Self {
state,
clients: Default::default(),
session_builder,
codec,
+ scheduler_id,
+ active_job_cache: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Generate an ExecutionGraph for the job and save it to the persistent
state.
+ /// By default, this job will be curated by the scheduler which receives
it.
+ /// Then we will also save it to the active execution graph
pub async fn submit_job(
&self,
job_id: &str,
session_id: &str,
plan: Arc<dyn ExecutionPlan>,
) -> Result<()> {
- let graph = ExecutionGraph::new(job_id, session_id, plan)?;
+ let mut graph =
+ ExecutionGraph::new(&self.scheduler_id, job_id, session_id, plan)?;
info!("Submitting execution graph: {:?}", graph);
self.state
.put(
Keyspace::ActiveJobs,
job_id.to_owned(),
- self.encode_execution_graph(graph)?,
+ self.encode_execution_graph(graph.clone())?,
)
.await?;
+ graph.revive();
+
+ let mut active_graph_cache = self.active_job_cache.write().await;
+ active_graph_cache.insert(job_id.to_owned(),
Arc::new(RwLock::new(graph)));
+
Ok(())
}
- /// Get the status of of a job. First look in Active/Completed jobs, and
then in Failed jobs
+ /// Get the status of of a job. First look in the active cache.
+ /// If no one found, then in the Active/Completed jobs, and then in Failed
jobs
pub async fn get_job_status(&self, job_id: &str) ->
Result<Option<JobStatus>> {
- if let Ok(graph) = self.get_execution_graph(job_id).await {
+ if let Some(graph) = self.get_active_execution_graph(job_id).await {
+ let status = graph.read().await.status();
+ Ok(Some(status))
+ } else if let Ok(graph) = self.get_execution_graph(job_id).await {
Ok(Some(graph.status()))
} else {
let value = self.state.get(Keyspace::FailedJobs, job_id).await?;
@@ -107,123 +126,63 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
}
}
- /// Generate a new random Job ID
- pub fn generate_job_id(&self) -> String {
- let mut rng = thread_rng();
- std::iter::repeat(())
- .map(|()| rng.sample(Alphanumeric))
- .map(char::from)
- .take(7)
- .collect()
- }
-
- /// Atomically update given task statuses in the respective job and return
a tuple containing:
+ /// Update given task statuses in the respective job and return a tuple
containing:
/// 1. A list of QueryStageSchedulerEvent to publish.
/// 2. A list of reservations that can now be offered.
- ///
- /// When a task is updated, there may or may not be more tasks pending for
its job. If there are more
- /// tasks pending then we want to reschedule one of those tasks on the
same task slot. In that case
- /// we will set the `job_id` on the `ExecutorReservation` so the scheduler
attempts to assign tasks from
- /// the same job. Note that when the scheduler attempts to fill the
reservation, there is no guarantee
- /// that the available task is still available.
pub(crate) async fn update_task_statuses(
&self,
executor: &ExecutorMetadata,
task_status: Vec<TaskStatus>,
) -> Result<(Vec<QueryStageSchedulerEvent>, Vec<ExecutorReservation>)> {
- let lock = self.state.lock(Keyspace::ActiveJobs, "").await?;
-
- with_lock(lock, async {
- let mut events: Vec<QueryStageSchedulerEvent> = vec![];
- let mut reservation: Vec<ExecutorReservation> = vec![];
+ let mut job_updates: HashMap<String, Vec<TaskStatus>> = HashMap::new();
+ for status in task_status {
+ debug!("Task Update\n{:?}", status);
+ if let Some(job_id) = status.task_id.as_ref().map(|id| &id.job_id)
{
+ let job_task_statuses =
+ job_updates.entry(job_id.clone()).or_insert_with(Vec::new);
+ job_task_statuses.push(status);
+ } else {
+ warn!("Received task with no job ID");
+ }
+ }
- let mut job_updates: HashMap<String, Vec<TaskStatus>> =
HashMap::new();
+ let mut events: Vec<QueryStageSchedulerEvent> = vec![];
+ let mut total_num_tasks = 0;
+ for (job_id, statuses) in job_updates {
+ let num_tasks = statuses.len();
+ debug!("Updating {} tasks in job {}", num_tasks, job_id);
- for status in task_status {
- debug!("Task Update\n{:?}", status);
- if let Some(job_id) = status.task_id.as_ref().map(|id|
&id.job_id) {
- if let Some(statuses) = job_updates.get_mut(job_id) {
- statuses.push(status)
- } else {
- job_updates.insert(job_id.clone(), vec![status]);
- }
- } else {
- warn!("Received task with no job ID");
- }
- }
+ total_num_tasks += num_tasks;
- let mut txn_ops: Vec<(Keyspace, String, Vec<u8>)> = vec![];
-
- for (job_id, statuses) in job_updates {
- let num_tasks = statuses.len();
- debug!("Updating {} tasks in job {}", num_tasks, job_id);
-
- let mut graph = self.get_execution_graph(&job_id).await?;
-
- graph.update_task_status(executor, statuses)?;
-
- if graph.complete() {
- // If this ExecutionGraph is complete, finalize it
- info!(
- "Job {} is complete, finalizing output partitions",
- graph.job_id()
- );
- graph.finalize()?;
-
events.push(QueryStageSchedulerEvent::JobFinished(job_id.clone()));
-
- for _ in 0..num_tasks {
- reservation
-
.push(ExecutorReservation::new_free(executor.id.to_owned()));
- }
- } else if let Some(job_status::Status::Failed(failure)) =
- graph.status().status
- {
- events.push(QueryStageSchedulerEvent::JobFailed(
- job_id.clone(),
- failure.error,
- ));
-
- for _ in 0..num_tasks {
- reservation
-
.push(ExecutorReservation::new_free(executor.id.to_owned()));
- }
- } else {
- // Otherwise keep the task slots reserved for this job
- for _ in 0..num_tasks {
- reservation.push(ExecutorReservation::new_assigned(
- executor.id.to_owned(),
- job_id.clone(),
- ));
- }
- }
+ let graph = self.get_active_execution_graph(&job_id).await;
+ let job_event = if let Some(graph) = graph {
+ let mut graph = graph.write().await;
+ graph.update_task_status(executor, statuses)?
+ } else {
+ // TODO Deal with curator changed case
+ error!("Fail to find job {} in the active cache and it may not
be curated by this scheduler", job_id);
+ None
+ };
- txn_ops.push((
- Keyspace::ActiveJobs,
- job_id.clone(),
- self.encode_execution_graph(graph)?,
- ));
+ if let Some(event) = job_event {
+ events.push(event);
}
+ }
- self.state.put_txn(txn_ops).await?;
-
- Ok((events, reservation))
- })
- .await
+ let reservation = (0..total_num_tasks)
+ .into_iter()
+ .map(|_| ExecutorReservation::new_free(executor.id.to_owned()))
+ .collect();
+ Ok((events, reservation))
}
/// Take a list of executor reservations and fill them with tasks that are
ready
- /// to be scheduled. When the reservation is filled, the underlying stage
task in the
- /// `ExecutionGraph` will be set to a status of Running, so if the task is
not subsequently launched
- /// we must ensure that the task status is reset.
+ /// to be scheduled.
///
/// Here we use the following algorithm:
///
- /// 1. For each reservation with a `job_id` assigned try and assign
another task from the same job.
- /// 2. If a reservation either does not have a `job_id` or there are no
available tasks for its `job_id`,
- /// add it to a list of "free" reservations.
- /// 3. For each free reservation, try to assign a task from one of the
jobs we have already considered.
- /// 4. If we cannot find a task, then looks for a task among all active
jobs
- /// 5. If we cannot find a task in all active jobs, then add the
reservation to the list of unassigned reservations
+ /// 1. For each free reservation, try to assign a task from one of the
active jobs
+ /// 2. If we cannot find a task in all active jobs, then add the
reservation to the list of unassigned reservations
///
/// Finally, we return:
/// 1. A list of assignments which is a (Executor ID, Task) tuple
@@ -241,96 +200,55 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
})
.collect();
- let lock = self.state.lock(Keyspace::ActiveJobs, "").await?;
- with_lock(lock, async {
- let mut jobs: Vec<String> =
- self.get_active_jobs().await?.into_iter().collect();
-
- let mut assignments: Vec<(String, Task)> = vec![];
- let mut unassigned: Vec<ExecutorReservation> = vec![];
- // Need to collect graphs we update so we can update them in
storage when we are done
- let mut graphs: HashMap<String, ExecutionGraph> = HashMap::new();
- // Now try and find tasks for free reservations from current set
of graphs
- for reservation in free_reservations {
- debug!(
- "Filling free reservation for executor {}",
- reservation.executor_id
- );
- let mut assigned = false;
- let executor_id = reservation.executor_id.clone();
-
- // Try and find a task in the graphs we already have locks on
- if let Ok(Some(assignment)) = find_next_task(&executor_id,
&mut graphs) {
- debug!(
- "Filled free reservation for executor {} with task
{:?}",
- reservation.executor_id, assignment.1
- );
- // First check if we can find another task
- assignments.push(assignment);
- assigned = true;
+ let mut assignments: Vec<(String, Task)> = vec![];
+ let mut pending_tasks = 0usize;
+ let mut assign_tasks = 0usize;
+ let job_cache = self.active_job_cache.read().await;
+ for (_job_id, graph) in job_cache.iter() {
+ let mut graph = graph.write().await;
+ for reservation in free_reservations.iter().skip(assign_tasks) {
+ if let Some(task) =
graph.pop_next_task(&reservation.executor_id)? {
+ assignments.push((reservation.executor_id.clone(), task));
+ assign_tasks += 1;
} else {
- // Otherwise start searching through other active jobs.
- debug!(
- "Filling free reservation for executor {} from active
jobs {:?}",
- reservation.executor_id, jobs
- );
- while let Some(job_id) = jobs.pop() {
- if graphs.get(&job_id).is_none() {
- let mut graph =
self.get_execution_graph(&job_id).await?;
-
- if let Ok(Some(task)) =
graph.pop_next_task(&executor_id) {
- debug!(
- "Filled free reservation for executor {}
with task {:?}",
- reservation.executor_id, task
- );
- assignments.push((executor_id.clone(), task));
- graphs.insert(job_id, graph);
- assigned = true;
- break;
- } else {
- debug!("No available tasks for job {}",
job_id);
- }
- }
- }
- }
-
- if !assigned {
- debug!(
- "Unable to fill reservation for executor {}, no tasks
available",
- executor_id
- );
- unassigned.push(reservation);
+ break;
}
}
+ if assign_tasks >= free_reservations.len() {
+ pending_tasks = graph.available_tasks();
+ break;
+ }
+ }
- let mut pending_tasks = 0;
-
- // Transactional update graphs now that we have assigned tasks
- let txn_ops: Vec<(Keyspace, String, Vec<u8>)> = graphs
- .into_iter()
- .map(|(job_id, graph)| {
- pending_tasks += graph.available_tasks();
- let value = self.encode_execution_graph(graph)?;
- Ok((Keyspace::ActiveJobs, job_id, value))
- })
- .collect::<Result<Vec<_>>>()?;
-
- self.state.put_txn(txn_ops).await?;
-
- Ok((assignments, unassigned, pending_tasks))
- }).await
+ let mut unassigned = vec![];
+ for reservation in free_reservations.iter().skip(assign_tasks) {
+ unassigned.push(reservation.clone());
+ }
+ Ok((assignments, unassigned, pending_tasks))
}
- /// Move the given job to the CompletedJobs keyspace in persistent storage.
+ /// Mark a job as completed. This will create a key under the
CompletedJobs keyspace
+ /// and remove the job from ActiveJobs
pub async fn complete_job(&self, job_id: &str) -> Result<()> {
debug!("Moving job {} from Active to Completed", job_id);
let lock = self.state.lock(Keyspace::ActiveJobs, "").await?;
- with_lock(
- lock,
- self.state
- .mv(Keyspace::ActiveJobs, Keyspace::CompletedJobs, job_id),
- )
- .await
+ with_lock(lock, self.state.delete(Keyspace::ActiveJobs,
job_id)).await?;
+
+ if let Some(graph) = self.get_active_execution_graph(job_id).await {
+ let graph = graph.read().await.clone();
+ if graph.complete() {
+ let value = self.encode_execution_graph(graph)?;
+ self.state
+ .put(Keyspace::CompletedJobs, job_id.to_owned(), value)
+ .await?;
+ } else {
+ error!("Job {} has not finished and cannot be completed",
job_id);
+ }
+ } else {
+ warn!("Fail to find job {} in the cache", job_id);
+ }
+
+ Ok(())
}
pub(crate) async fn cancel_job(
@@ -395,6 +313,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
/// and remove the job from ActiveJobs or QueuedJobs
/// TODO this should be atomic
pub async fn fail_job(&self, job_id: &str, error_message: String) ->
Result<()> {
+ debug!("Moving job {} from Active or Queue to Failed", job_id);
let lock = self.state.lock(Keyspace::ActiveJobs, "").await?;
self.fail_job_inner(lock, job_id, error_message).await
}
@@ -407,16 +326,66 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
) -> Result<()> {
with_lock(lock, self.state.delete(Keyspace::ActiveJobs,
job_id)).await?;
- let status = JobStatus {
- status: Some(job_status::Status::Failed(FailedJob {
- error: error_message,
- })),
+ let value = if let Some(graph) =
self.get_active_execution_graph(job_id).await {
+ let mut graph = graph.write().await;
+ graph.fail_job(error_message);
+ let graph = graph.clone();
+
+ self.encode_execution_graph(graph)?
+ } else {
+ warn!("Fail to find job {} in the cache", job_id);
+
+ let status = JobStatus {
+ status: Some(job_status::Status::Failed(FailedJob {
+ error: error_message.clone(),
+ })),
+ };
+ encode_protobuf(&status)?
};
- let value = encode_protobuf(&status)?;
self.state
.put(Keyspace::FailedJobs, job_id.to_owned(), value)
- .await
+ .await?;
+
+ Ok(())
+ }
+
+ /// Mark a job as failed. This will create a key under the FailedJobs
keyspace
+ /// and remove the job from ActiveJobs or QueuedJobs
+ /// TODO this should be atomic
+ pub async fn fail_running_job(&self, job_id: &str) -> Result<()> {
+ if let Some(graph) = self.get_active_execution_graph(job_id).await {
+ let graph = graph.read().await.clone();
+ let value = self.encode_execution_graph(graph)?;
+
+ debug!("Moving job {} from Active to Failed", job_id);
+ let lock = self.state.lock(Keyspace::ActiveJobs, "").await?;
+ with_lock(lock, self.state.delete(Keyspace::ActiveJobs,
job_id)).await?;
+ self.state
+ .put(Keyspace::FailedJobs, job_id.to_owned(), value)
+ .await?;
+ } else {
+ warn!("Fail to find job {} in the cache", job_id);
+ }
+
+ Ok(())
+ }
+
+ pub async fn update_job(&self, job_id: &str) -> Result<()> {
+ debug!("Update job {} in Active", job_id);
+ if let Some(graph) = self.get_active_execution_graph(job_id).await {
+ let mut graph = graph.write().await;
+ graph.revive();
+ let graph = graph.clone();
+ let value = self.encode_execution_graph(graph)?;
+ self.state
+ .put(Keyspace::ActiveJobs, job_id.to_owned(), value)
+ .await?;
+ } else {
+ warn!("Fail to find job {} in the cache", job_id);
+ }
+
+ Ok(())
}
#[cfg(not(test))]
@@ -476,9 +445,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
/// Retrieve the number of available tasks for the given job. The value
returned
/// is strictly a point-in-time snapshot
pub async fn get_available_task_count(&self, job_id: &str) ->
Result<usize> {
- let graph = self.get_execution_graph(job_id).await?;
-
- Ok(graph.available_tasks())
+ if let Some(graph) = self.get_active_execution_graph(job_id).await {
+ let available_tasks = graph.read().await.available_tasks();
+ Ok(available_tasks)
+ } else {
+ warn!("Fail to find job {} in the cache", job_id);
+ Ok(0)
+ }
}
#[allow(dead_code)]
@@ -506,12 +479,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
Ok(task_definition)
}
- /// Return a set of active job IDs. This will return all keys
- /// in the `ActiveJobs` keyspace stripped of any prefixes used for
- /// the storage layer (i.e. just the Job IDs).
- async fn get_active_jobs(&self) -> Result<HashSet<String>> {
- debug!("Scanning for active job IDs");
- self.state.scan_keys(Keyspace::ActiveJobs).await
+ /// Get the `ExecutionGraph` for the given job ID from cache
+ pub(crate) async fn get_active_execution_graph(
+ &self,
+ job_id: &str,
+ ) -> Option<Arc<RwLock<ExecutionGraph>>> {
+ let active_graph_cache = self.active_job_cache.read().await;
+ active_graph_cache.get(job_id).cloned()
}
/// Get the `ExecutionGraph` for the given job ID. This will search fist
in the `ActiveJobs`
@@ -559,17 +533,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
encode_protobuf(&proto)
}
-}
-/// Find the next available task in a set of `ExecutionGraph`s
-fn find_next_task(
- executor_id: &str,
- graphs: &mut HashMap<String, ExecutionGraph>,
-) -> Result<Option<(String, Task)>> {
- for graph in graphs.values_mut() {
- if let Ok(Some(task)) = graph.pop_next_task(executor_id) {
- return Ok(Some((executor_id.to_owned(), task)));
- }
+ /// Generate a new random Job ID
+ pub fn generate_job_id(&self) -> String {
+ let mut rng = thread_rng();
+ std::iter::repeat(())
+ .map(|()| rng.sample(Alphanumeric))
+ .map(char::from)
+ .take(7)
+ .collect()
}
- Ok(None)
}