This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 0c22e52a Add Prometheus metrics endpoint (#511)
0c22e52a is described below
commit 0c22e52aecae525af662687ceafff549bfa6a12d
Author: Dan Harris <[email protected]>
AuthorDate: Wed Nov 16 12:30:16 2022 -0500
Add Prometheus metrics endpoint (#511)
* Add prometheus metrics for scheduler
* Add SchedulerMetricsCollector, implementation for Prometheus and some
refactoring for testability
* Linting
* apache heaeder
* Linting again
* Make sure collector is only initialized once
* Do not rely on accept header for API routing
* Add tracking of pending tasks in scheduler
* Empty commit
* Make the metrics API pluggable through the collector trait
* Add rudimentary user guide for prometheus metrics
* Prettier
* Fix bug in pending task count
* Remove println
* Do not try and keep track in the event loop
---
ballista/core/proto/ballista.proto | 1 +
ballista/scheduler/Cargo.toml | 5 +-
ballista/scheduler/src/api/handlers.rs | 26 +-
ballista/scheduler/src/api/mod.rs | 9 +-
ballista/scheduler/src/lib.rs | 1 +
ballista/scheduler/src/main.rs | 8 +-
ballista/scheduler/src/metrics/mod.rs | 85 ++++
ballista/scheduler/src/metrics/prometheus.rs | 176 +++++++
ballista/scheduler/src/scheduler_server/event.rs | 27 +-
.../src/scheduler_server/external_scaler.rs | 2 +-
ballista/scheduler/src/scheduler_server/grpc.rs | 11 +-
ballista/scheduler/src/scheduler_server/mod.rs | 489 +++++++------------
.../src/scheduler_server/query_stage_scheduler.rs | 248 +++++++++-
ballista/scheduler/src/standalone.rs | 4 +
ballista/scheduler/src/state/execution_graph.rs | 48 +-
.../scheduler/src/state/execution_graph_dot.rs | 2 +-
ballista/scheduler/src/state/mod.rs | 150 +++---
ballista/scheduler/src/state/task_manager.rs | 197 ++++----
ballista/scheduler/src/test_utils.rs | 519 ++++++++++++++++++++-
docs/source/user-guide/metrics.md | 41 ++
docs/source/user-guide/scheduler.md | 1 +
21 files changed, 1527 insertions(+), 523 deletions(-)
diff --git a/ballista/core/proto/ballista.proto
b/ballista/core/proto/ballista.proto
index a38bf0ad..5113a6a7 100644
--- a/ballista/core/proto/ballista.proto
+++ b/ballista/core/proto/ballista.proto
@@ -440,6 +440,7 @@ message ExecutionGraph {
string job_name = 10;
uint64 start_time = 11;
uint64 end_time = 12;
+ uint64 queued_at = 13;
}
message StageAttempts {
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index e95c9ee0..7a12fa16 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -30,9 +30,10 @@ edition = "2018"
scheduler = "scheduler_config_spec.toml"
[features]
-default = ["etcd", "sled"]
+default = ["etcd", "sled", "prometheus-metrics"]
etcd = ["etcd-client"]
flight-sql = []
+prometheus-metrics = ["prometheus", "once_cell"]
sled = ["sled_package", "tokio-stream"]
@@ -58,8 +59,10 @@ hyper = "0.14.4"
itertools = "0.10.3"
log = "0.4"
object_store = "0.5.0"
+once_cell = { version = "1.16.0", optional = true }
parking_lot = "0.12"
parse_arg = "0.1.3"
+prometheus = { version = "0.13", features = ["process"], optional = true }
prost = "0.11"
prost-types = { version = "0.11.0" }
rand = "0.8"
diff --git a/ballista/scheduler/src/api/handlers.rs
b/ballista/scheduler/src/api/handlers.rs
index 71cb9ea9..7e77aa34 100644
--- a/ballista/scheduler/src/api/handlers.rs
+++ b/ballista/scheduler/src/api/handlers.rs
@@ -10,6 +10,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::scheduler_server::SchedulerServer;
use crate::state::execution_graph::ExecutionStage;
use crate::state::execution_graph_dot::ExecutionGraphDot;
@@ -21,6 +22,8 @@ use datafusion_proto::logical_plan::AsLogicalPlan;
use graphviz_rust::cmd::{CommandArg, Format};
use graphviz_rust::exec;
use graphviz_rust::printer::PrinterContext;
+use http::header::CONTENT_TYPE;
+
use std::time::Duration;
use warp::Rejection;
@@ -176,13 +179,15 @@ pub(crate) async fn cancel_job<T: AsLogicalPlan, U:
AsExecutionPlan>(
.map_err(|_| warp::reject())?
.ok_or_else(warp::reject)?;
- let cancelled = data_server
- .state
- .cancel_job(&job_id)
+ data_server
+ .query_stage_event_loop
+ .get_sender()
+ .map_err(|_| warp::reject())?
+ .post_event(QueryStageSchedulerEvent::JobCancel(job_id))
.await
.map_err(|_| warp::reject())?;
- Ok(warp::reply::json(&CancelJobResponse { cancelled }))
+ Ok(warp::reply::json(&CancelJobResponse { cancelled: true }))
}
#[derive(Debug, serde::Serialize)]
@@ -347,3 +352,16 @@ pub(crate) async fn get_job_svg_graph<T: AsLogicalPlan, U:
AsExecutionPlan>(
_ => Ok("Not Found".to_string()),
}
}
+
+pub(crate) async fn get_scheduler_metrics<T: AsLogicalPlan, U:
AsExecutionPlan>(
+ data_server: SchedulerServer<T, U>,
+) -> Result<impl warp::Reply, Rejection> {
+ Ok(data_server
+ .metrics_collector()
+ .gather_metrics()
+ .map_err(|_| warp::reject())?
+ .map(|(data, content_type)| {
+ warp::reply::with_header(data, CONTENT_TYPE, content_type)
+ })
+ .unwrap_or_else(|| warp::reply::with_header(vec![], CONTENT_TYPE,
"text/html")))
+}
diff --git a/ballista/scheduler/src/api/mod.rs
b/ballista/scheduler/src/api/mod.rs
index 7710e1a2..c8defd67 100644
--- a/ballista/scheduler/src/api/mod.rs
+++ b/ballista/scheduler/src/api/mod.rs
@@ -118,9 +118,13 @@ pub fn get_routes<T: AsLogicalPlan + Clone, U: 'static +
AsExecutionPlan>(
});
let route_job_dot_svg = warp::path!("api" / "job" / String / "dot_svg")
- .and(with_data_server(scheduler_server))
+ .and(with_data_server(scheduler_server.clone()))
.and_then(|job_id, data_server|
handlers::get_job_svg_graph(data_server, job_id));
+ let route_scheduler_metrics = warp::path!("api" / "metrics")
+ .and(with_data_server(scheduler_server))
+ .and_then(|data_server| handlers::get_scheduler_metrics(data_server));
+
let routes = route_scheduler_state
.or(route_executors)
.or(route_jobs)
@@ -128,6 +132,7 @@ pub fn get_routes<T: AsLogicalPlan + Clone, U: 'static +
AsExecutionPlan>(
.or(route_query_stages)
.or(route_job_dot)
.or(route_query_stage_dot)
- .or(route_job_dot_svg);
+ .or(route_job_dot_svg)
+ .or(route_scheduler_metrics);
routes.boxed()
}
diff --git a/ballista/scheduler/src/lib.rs b/ballista/scheduler/src/lib.rs
index 98fac309..483f3287 100644
--- a/ballista/scheduler/src/lib.rs
+++ b/ballista/scheduler/src/lib.rs
@@ -20,6 +20,7 @@
pub mod api;
pub mod config;
pub mod display;
+pub mod metrics;
pub mod planner;
pub mod scheduler_server;
#[cfg(feature = "sled")]
diff --git a/ballista/scheduler/src/main.rs b/ballista/scheduler/src/main.rs
index 47a19e42..1141a519 100644
--- a/ballista/scheduler/src/main.rs
+++ b/ballista/scheduler/src/main.rs
@@ -66,6 +66,7 @@ use ballista_core::config::LogRotationPolicy;
use ballista_scheduler::config::SchedulerConfig;
#[cfg(feature = "flight-sql")]
use ballista_scheduler::flight_sql::FlightSqlServiceImpl;
+use ballista_scheduler::metrics::default_metrics_collector;
use config::prelude::*;
use tracing_subscriber::EnvFilter;
@@ -85,12 +86,15 @@ async fn start_server(
config.scheduling_policy
);
+ let metrics_collector = default_metrics_collector()?;
+
let mut scheduler_server: SchedulerServer<LogicalPlanNode,
PhysicalPlanNode> =
SchedulerServer::new(
scheduler_name,
config_backend.clone(),
BallistaCodec::default(),
config,
+ metrics_collector,
);
scheduler_server.init().await?;
@@ -123,14 +127,14 @@ async fn start_server(
parts.extensions.insert(connect_info.clone());
let req = http::Request::from_parts(parts, body);
- let header = req.headers().get(hyper::header::ACCEPT);
- if header.is_some() &&
header.unwrap().eq("application/json") {
+ if req.uri().path().starts_with("/api") {
return Either::Left(
warp.call(req)
.map_ok(|res| res.map(EitherBody::Left))
.map_err(Error::from),
);
}
+
Either::Right(
tonic
.call(req)
diff --git a/ballista/scheduler/src/metrics/mod.rs
b/ballista/scheduler/src/metrics/mod.rs
new file mode 100644
index 00000000..0b54ebbd
--- /dev/null
+++ b/ballista/scheduler/src/metrics/mod.rs
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[cfg(feature = "prometheus")]
+pub mod prometheus;
+
+use crate::metrics::prometheus::PrometheusMetricsCollector;
+use ballista_core::error::Result;
+use std::sync::Arc;
+
+/// Interface for recording metrics events in the scheduler. An instance of
`Arc<dyn SchedulerMetricsCollector>`
+/// will be passed when constructing the `QueryStageScheduler` which is the
core event loop of the scheduler.
+/// The event loop will then record metric events through this trait.
+pub trait SchedulerMetricsCollector: Send + Sync {
+ /// Record that job with `job_id` was submitted. This will be invoked
+ /// after the job's `ExecutionGraph` is created and it is ready to be
scheduled
+ /// on executors.
+ /// When invoked should specify the timestamp in milliseconds when the job
was originally
+ /// queued and the timestamp in milliseconds when it was submitted
+ fn record_submitted(&self, job_id: &str, queued_at: u64, submitted_at:
u64);
+
+ /// Record that job with `job_id` has completed successfully. This should
only
+ /// be invoked on successful job completion.
+ /// When invoked should specify the timestamp in milliseconds when the job
was originally
+ /// queued and the timestamp in milliseconds when it was completed
+ fn record_completed(&self, job_id: &str, queued_at: u64, completed_at:
u64);
+
+ /// Record that job with `job_id` has failed.
+ /// When invoked should specify the timestamp in milliseconds when the job
was originally
+ /// queued and the timestamp in milliseconds when it failed.
+ fn record_failed(&self, job_id: &str, queued_at: u64, failed_at: u64);
+
+ /// Record that job with `job_id` was cancelled.
+ fn record_cancelled(&self, job_id: &str);
+
+ /// Set the current number of pending tasks in scheduler. A pending task
is a task that is available
+ /// to schedule on an executor but cannot be scheduled because no
resources are available.
+ fn set_pending_tasks_queue_size(&self, value: u64);
+
+ /// Gather current metric set that should be returned when calling the
scheduler's metrics API
+ /// Should return a tuple containing the content of the metric set and the
content type (e.g. `application/json`, `text/plain`, etc)
+ fn gather_metrics(&self) -> Result<Option<(Vec<u8>, String)>>;
+}
+
+/// Implementation of `SchedulerMetricsCollector` that ignores all events.
This can be used as
+/// a default implementation when tracking scheduler metrics is not required
(or performed through other means)
+#[derive(Default)]
+pub struct NoopMetricsCollector {}
+
+impl SchedulerMetricsCollector for NoopMetricsCollector {
+ fn record_submitted(&self, _job_id: &str, _queued_at: u64, _submitted_at:
u64) {}
+ fn record_completed(&self, _job_id: &str, _queued_at: u64, _completed_att:
u64) {}
+ fn record_failed(&self, _job_id: &str, _queued_at: u64, _failed_at: u64) {}
+ fn record_cancelled(&self, _job_id: &str) {}
+ fn set_pending_tasks_queue_size(&self, _value: u64) {}
+
+ fn gather_metrics(&self) -> Result<Option<(Vec<u8>, String)>> {
+ Ok(None)
+ }
+}
+
+/// Return a reference to the systems default metrics collector.
+#[cfg(feature = "prometheus")]
+pub fn default_metrics_collector() -> Result<Arc<dyn
SchedulerMetricsCollector>> {
+ PrometheusMetricsCollector::current()
+}
+
+#[cfg(not(feature = "prometheus"))]
+pub fn default_metrics_collector() -> Result<Arc<dyn
SchedulerMetricsCollector>> {
+ Ok(Arc::new(NoopMetricsCollector::default()))
+}
diff --git a/ballista/scheduler/src/metrics/prometheus.rs
b/ballista/scheduler/src/metrics/prometheus.rs
new file mode 100644
index 00000000..81efb07d
--- /dev/null
+++ b/ballista/scheduler/src/metrics/prometheus.rs
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metrics::SchedulerMetricsCollector;
+use ballista_core::error::{BallistaError, Result};
+
+use once_cell::sync::OnceCell;
+use prometheus::{
+ register_counter_with_registry, register_gauge_with_registry,
+ register_histogram_with_registry, Counter, Gauge, Histogram, Registry,
+};
+use prometheus::{Encoder, TextEncoder};
+use std::sync::Arc;
+
+static COLLECTOR: OnceCell<Arc<dyn SchedulerMetricsCollector>> =
OnceCell::new();
+
+/// SchedulerMetricsCollector implementation based on Prometheus. By default
this will track
+/// 7 metrics:
+/// *job_exec_time_seconds* - Histogram of successful job execution time in
seconds
+/// *planning_time_ms* - Histogram of job planning time in milliseconds
+/// *failed* - Counter of failed jobs
+/// *job_failed_total* - Counter of failed jobs
+/// *job_cancelled_total* - Counter of cancelled jobs
+/// *job_completed_total* - Counter of completed jobs
+/// *job_submitted_total* - Counter of submitted jobs
+/// *pending_task_queue_size* - Number of pending tasks
+pub struct PrometheusMetricsCollector {
+ execution_time: Histogram,
+ planning_time: Histogram,
+ failed: Counter,
+ cancelled: Counter,
+ completed: Counter,
+ submitted: Counter,
+ pending_queue_size: Gauge,
+}
+
+impl PrometheusMetricsCollector {
+ pub fn new(registry: &Registry) -> Result<Self> {
+ let execution_time = register_histogram_with_registry!(
+ "job_exec_time_seconds",
+ "Histogram of successful job execution time in seconds",
+ vec![0.5_f64, 1_f64, 5_f64, 30_f64, 60_f64],
+ registry
+ )
+ .map_err(|e| {
+ BallistaError::Internal(format!("Error registering metric: {:?}",
e))
+ })?;
+
+ let planning_time = register_histogram_with_registry!(
+ "planning_time_ms",
+ "Histogram of job planning time in milliseconds",
+ vec![1.0_f64, 5.0_f64, 25.0_f64, 100.0_f64, 500.0_f64],
+ registry
+ )
+ .map_err(|e| {
+ BallistaError::Internal(format!("Error registering metric: {:?}",
e))
+ })?;
+
+ let failed = register_counter_with_registry!(
+ "job_failed_total",
+ "Counter of failed jobs",
+ registry
+ )
+ .map_err(|e| {
+ BallistaError::Internal(format!("Error registering metric: {:?}",
e))
+ })?;
+
+ let cancelled = register_counter_with_registry!(
+ "job_cancelled_total",
+ "Counter of cancelled jobs",
+ registry
+ )
+ .map_err(|e| {
+ BallistaError::Internal(format!("Error registering metric: {:?}",
e))
+ })?;
+
+ let completed = register_counter_with_registry!(
+ "job_completed_total",
+ "Counter of completed jobs",
+ registry
+ )
+ .map_err(|e| {
+ BallistaError::Internal(format!("Error registering metric: {:?}",
e))
+ })?;
+
+ let submitted = register_counter_with_registry!(
+ "job_submitted_total",
+ "Counter of submitted jobs",
+ registry
+ )
+ .map_err(|e| {
+ BallistaError::Internal(format!("Error registering metric: {:?}",
e))
+ })?;
+
+ let pending_queue_size = register_gauge_with_registry!(
+ "pending_task_queue_size",
+ "Number of pending tasks",
+ registry
+ )
+ .map_err(|e| {
+ BallistaError::Internal(format!("Error registering metric: {:?}",
e))
+ })?;
+
+ Ok(Self {
+ execution_time,
+ planning_time,
+ failed,
+ cancelled,
+ completed,
+ submitted,
+ pending_queue_size,
+ })
+ }
+
+ pub fn current() -> Result<Arc<dyn SchedulerMetricsCollector>> {
+ COLLECTOR
+ .get_or_try_init(|| {
+ let collector = Self::new(::prometheus::default_registry())?;
+
+ Ok(Arc::new(collector) as Arc<dyn SchedulerMetricsCollector>)
+ })
+ .map(|arc| arc.clone())
+ }
+}
+
+impl SchedulerMetricsCollector for PrometheusMetricsCollector {
+ fn record_submitted(&self, _job_id: &str, queued_at: u64, submitted_at:
u64) {
+ self.submitted.inc();
+ self.planning_time
+ .observe((submitted_at - queued_at) as f64);
+ }
+
+ fn record_completed(&self, _job_id: &str, queued_at: u64, completed_at:
u64) {
+ self.completed.inc();
+ self.execution_time
+ .observe((completed_at - queued_at) as f64 / 1000_f64)
+ }
+
+ fn record_failed(&self, _job_id: &str, _queued_at: u64, _failed_at: u64) {
+ self.failed.inc()
+ }
+
+ fn record_cancelled(&self, _job_id: &str) {
+ self.cancelled.inc();
+ }
+
+ fn set_pending_tasks_queue_size(&self, value: u64) {
+ self.pending_queue_size.set(value as f64);
+ }
+
+ fn gather_metrics(&self) -> Result<Option<(Vec<u8>, String)>> {
+ let encoder = TextEncoder::new();
+
+ let metric_families = prometheus::gather();
+ let mut buffer = vec![];
+ encoder.encode(&metric_families, &mut buffer).map_err(|e| {
+ BallistaError::Internal(format!("Error encoding prometheus
metrics: {:?}", e))
+ })?;
+
+ Ok(Some((buffer, encoder.format_type().to_owned())))
+ }
+}
diff --git a/ballista/scheduler/src/scheduler_server/event.rs
b/ballista/scheduler/src/scheduler_server/event.rs
index cb1dd8e9..74643535 100644
--- a/ballista/scheduler/src/scheduler_server/event.rs
+++ b/ballista/scheduler/src/scheduler_server/event.rs
@@ -31,13 +31,32 @@ pub enum QueryStageSchedulerEvent {
job_name: String,
session_ctx: Arc<SessionContext>,
plan: Box<LogicalPlan>,
+ queued_at: u64,
+ },
+ JobSubmitted {
+ job_id: String,
+ queued_at: u64,
+ submitted_at: u64,
},
- JobSubmitted(String),
// For a job which failed during planning
- JobPlanningFailed(String, String),
- JobFinished(String),
+ JobPlanningFailed {
+ job_id: String,
+ fail_message: String,
+ queued_at: u64,
+ failed_at: u64,
+ },
+ JobFinished {
+ job_id: String,
+ queued_at: u64,
+ completed_at: u64,
+ },
// For a job fails with its execution graph setting failed
- JobRunningFailed(String, String),
+ JobRunningFailed {
+ job_id: String,
+ fail_message: String,
+ queued_at: u64,
+ failed_at: u64,
+ },
JobUpdated(String),
JobCancel(String),
JobDataClean(String),
diff --git a/ballista/scheduler/src/scheduler_server/external_scaler.rs
b/ballista/scheduler/src/scheduler_server/external_scaler.rs
index d40c7b9b..60237dde 100644
--- a/ballista/scheduler/src/scheduler_server/external_scaler.rs
+++ b/ballista/scheduler/src/scheduler_server/external_scaler.rs
@@ -57,7 +57,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> ExternalScaler
Ok(Response::new(GetMetricsResponse {
metric_values: vec![MetricValue {
metric_name: INFLIGHT_TASKS_METRIC_NAME.to_string(),
- metric_value: 10000000, // A very high number to saturate the
HPA
+ metric_value: self.pending_tasks() as i64,
}],
}))
}
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs
b/ballista/scheduler/src/scheduler_server/grpc.rs
index 42805672..062e45a7 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -47,7 +47,7 @@ use crate::scheduler_server::event::QueryStageSchedulerEvent;
use std::time::{SystemTime, UNIX_EPOCH};
use tonic::{Request, Response, Status};
-use crate::scheduler_server::SchedulerServer;
+use crate::scheduler_server::{timestamp_secs, SchedulerServer};
use crate::state::executor_manager::ExecutorReservation;
#[tonic::async_trait]
@@ -99,10 +99,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
};
let executor_heartbeat = ExecutorHeartbeat {
executor_id: metadata.id.clone(),
- timestamp: SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("Time went backwards")
- .as_secs(),
+ timestamp: timestamp_secs(),
metrics: vec![],
status: Some(ExecutorStatus {
status:
Some(executor_status::Status::Active("".to_string())),
@@ -578,6 +575,7 @@ mod test {
use tonic::Request;
use crate::config::SchedulerConfig;
+ use crate::metrics::default_metrics_collector;
use ballista_core::error::BallistaError;
use ballista_core::serde::protobuf::{
executor_registration::OptionalHost, executor_status,
ExecutorRegistration,
@@ -602,6 +600,7 @@ mod test {
state_storage.clone(),
BallistaCodec::default(),
SchedulerConfig::default(),
+ default_metrics_collector().unwrap(),
);
scheduler.init().await?;
let exec_meta = ExecutorRegistration {
@@ -688,6 +687,7 @@ mod test {
state_storage.clone(),
BallistaCodec::default(),
SchedulerConfig::default(),
+ default_metrics_collector().unwrap(),
);
scheduler.init().await?;
@@ -768,6 +768,7 @@ mod test {
state_storage.clone(),
BallistaCodec::default(),
SchedulerConfig::default(),
+ default_metrics_collector().unwrap(),
);
scheduler.init().await?;
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs
b/ballista/scheduler/src/scheduler_server/mod.rs
index f81135ae..04410974 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -30,6 +30,7 @@ use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_proto::logical_plan::AsLogicalPlan;
use crate::config::SchedulerConfig;
+use crate::metrics::SchedulerMetricsCollector;
use log::{error, warn};
use crate::scheduler_server::event::QueryStageSchedulerEvent;
@@ -38,6 +39,8 @@ use crate::state::backend::StateBackendClient;
use crate::state::executor_manager::{
ExecutorManager, ExecutorReservation, DEFAULT_EXECUTOR_TIMEOUT_SECONDS,
};
+
+use crate::state::task_manager::TaskLauncher;
use crate::state::SchedulerState;
// include the generated protobuf source as a submodule
@@ -59,6 +62,7 @@ pub struct SchedulerServer<T: 'static + AsLogicalPlan, U:
'static + AsExecutionP
pub start_time: u128,
pub(crate) state: Arc<SchedulerState<T, U>>,
pub(crate) query_stage_event_loop: EventLoop<QueryStageSchedulerEvent>,
+ query_stage_scheduler: Arc<QueryStageScheduler<T, U>>,
}
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
SchedulerServer<T, U> {
@@ -67,6 +71,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
config_backend: Arc<dyn StateBackendClient>,
codec: BallistaCodec<T, U>,
config: SchedulerConfig,
+ metrics_collector: Arc<dyn SchedulerMetricsCollector>,
) -> Self {
let state = Arc::new(SchedulerState::new(
config_backend,
@@ -75,21 +80,54 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
scheduler_name.clone(),
config.clone(),
));
- let query_stage_scheduler =
Arc::new(QueryStageScheduler::new(state.clone()));
+ let query_stage_scheduler =
+ Arc::new(QueryStageScheduler::new(state.clone(),
metrics_collector));
let query_stage_event_loop = EventLoop::new(
"query_stage".to_owned(),
config.event_loop_buffer_size as usize,
+ query_stage_scheduler.clone(),
+ );
+
+ Self {
+ scheduler_name,
+ start_time: timestamp_millis() as u128,
+ state,
+ query_stage_event_loop,
query_stage_scheduler,
+ }
+ }
+
+ #[allow(dead_code)]
+ pub(crate) fn with_task_launcher(
+ scheduler_name: String,
+ config_backend: Arc<dyn StateBackendClient>,
+ codec: BallistaCodec<T, U>,
+ config: SchedulerConfig,
+ metrics_collector: Arc<dyn SchedulerMetricsCollector>,
+ task_launcher: Arc<dyn TaskLauncher>,
+ ) -> Self {
+ let state = Arc::new(SchedulerState::with_task_launcher(
+ config_backend,
+ default_session_builder,
+ codec,
+ scheduler_name.clone(),
+ config.clone(),
+ task_launcher,
+ ));
+ let query_stage_scheduler =
+ Arc::new(QueryStageScheduler::new(state.clone(),
metrics_collector));
+ let query_stage_event_loop = EventLoop::new(
+ "query_stage".to_owned(),
+ config.event_loop_buffer_size as usize,
+ query_stage_scheduler.clone(),
);
Self {
scheduler_name,
- start_time: SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .unwrap()
- .as_millis(),
+ start_time: timestamp_millis() as u128,
state,
query_stage_event_loop,
+ query_stage_scheduler,
}
}
@@ -101,6 +139,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
Ok(())
}
+ pub(crate) fn pending_tasks(&self) -> usize {
+ self.query_stage_scheduler.pending_tasks()
+ }
+
+ pub(crate) fn metrics_collector(&self) -> &dyn SchedulerMetricsCollector {
+ self.query_stage_scheduler.metrics_collector()
+ }
+
pub(crate) async fn submit_job(
&self,
job_id: &str,
@@ -115,6 +161,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
job_name: job_name.to_owned(),
session_ctx: ctx,
plan: Box::new(plan.clone()),
+ queued_at: timestamp_millis(),
})
.await
}
@@ -241,10 +288,23 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
}
}
+pub fn timestamp_secs() -> u64 {
+ SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("Time went backwards")
+ .as_secs()
+}
+
+pub fn timestamp_millis() -> u64 {
+ SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("Time went backwards")
+ .as_millis() as u64
+}
+
#[cfg(all(test, feature = "sled"))]
mod test {
use std::sync::Arc;
- use std::time::Duration;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::logical_expr::{col, sum, LogicalPlan};
@@ -258,20 +318,25 @@ mod test {
use ballista_core::error::Result;
use crate::config::SchedulerConfig;
+
use ballista_core::serde::protobuf::{
failed_task, job_status, task_status, ExecutionError, FailedTask,
JobStatus,
- PhysicalPlanNode, ShuffleWritePartition, SuccessfulTask, TaskStatus,
+ MultiTaskDefinition, PhysicalPlanNode, ShuffleWritePartition,
SuccessfulJob,
+ SuccessfulTask, TaskId, TaskStatus,
};
use ballista_core::serde::scheduler::{
ExecutorData, ExecutorMetadata, ExecutorSpecification,
};
use ballista_core::serde::BallistaCodec;
- use crate::scheduler_server::SchedulerServer;
+ use crate::scheduler_server::{timestamp_millis, SchedulerServer};
use crate::state::backend::standalone::StandaloneClient;
- use crate::state::executor_manager::ExecutorReservation;
- use crate::test_utils::{await_condition, ExplodingTableProvider};
+ use crate::test_utils::{
+ assert_completed_event, assert_failed_event, assert_no_submitted_event,
+ assert_submitted_event, ExplodingTableProvider, SchedulerTest,
TaskRunnerFn,
+ TestMetricsCollector,
+ };
#[tokio::test]
async fn test_pull_scheduling() -> Result<()> {
@@ -302,7 +367,7 @@ mod test {
// Submit job
scheduler
.state
- .submit_job(job_id, "", ctx, &plan)
+ .submit_job(job_id, "", ctx, &plan, 0)
.await
.expect("submitting plan");
@@ -380,150 +445,37 @@ mod test {
Ok(())
}
- /// This test will exercise the push-based scheduling.
#[tokio::test]
async fn test_push_scheduling() -> Result<()> {
let plan = test_plan();
- let task_slots = 4;
- let scheduler = test_push_staged_scheduler().await?;
-
- let executors = test_executors(task_slots);
- for (executor_metadata, executor_data) in executors {
- scheduler
- .state
- .executor_manager
- .register_executor(executor_metadata, executor_data, false)
- .await?;
- }
-
- let config = test_session(task_slots);
-
- let ctx = scheduler
- .state
- .session_manager
- .create_session(&config)
- .await?;
-
- let job_id = "job";
-
- scheduler.state.submit_job(job_id, "", ctx, &plan).await?;
-
- // Complete tasks that are offered through scheduler events
- loop {
- // Check condition
- let available_tasks = {
- let graph = scheduler
- .state
- .task_manager
- .get_job_execution_graph(job_id)
- .await?
- .unwrap();
- if graph.is_successful() {
- break;
- }
- graph.available_tasks()
- };
-
- if available_tasks == 0 {
- tokio::time::sleep(Duration::from_millis(5)).await;
- continue;
+ let metrics_collector = Arc::new(TestMetricsCollector::default());
+
+ let mut test = SchedulerTest::new(
+ SchedulerConfig::default()
+ .with_scheduler_policy(TaskSchedulingPolicy::PushStaged),
+ metrics_collector.clone(),
+ 4,
+ 1,
+ None,
+ )
+ .await?;
+
+ let status = test.run("job", "", &plan).await.expect("running plan");
+
+ match status.status {
+ Some(job_status::Status::Successful(SuccessfulJob {
+ partition_location,
+ })) => {
+ assert_eq!(partition_location.len(), 4);
}
-
- let reservations: Vec<ExecutorReservation> = scheduler
- .state
- .executor_manager
- .reserve_slots(available_tasks as u32)
- .await?
- .into_iter()
- .map(|res| res.assign(job_id.to_owned()))
- .collect();
-
- let free_list = match scheduler
- .state
- .task_manager
- .fill_reservations(&reservations)
- .await
- {
- Ok((assignments, mut unassigned_reservations, _)) => {
- for (executor_id, task) in assignments.into_iter() {
- match scheduler
- .state
- .executor_manager
- .get_executor_metadata(&executor_id)
- .await
- {
- Ok(executor) => {
- let mut partitions: Vec<ShuffleWritePartition>
= vec![];
-
- let num_partitions = task
- .output_partitioning
- .map(|p| p.partition_count())
- .unwrap_or(1);
-
- for partition_id in 0..num_partitions {
- partitions.push(ShuffleWritePartition {
- partition_id: partition_id as u64,
- path: "some/path".to_string(),
- num_batches: 1,
- num_rows: 1,
- num_bytes: 1,
- })
- }
-
- // Complete the task
- let task_status = TaskStatus {
- task_id: task.task_id as u32,
- job_id: task.partition.job_id.clone(),
- stage_id: task.partition.stage_id as u32,
- stage_attempt_num: task.stage_attempt_num
as u32,
- partition_id: task.partition.partition_id
as u32,
- launch_time: 0,
- start_exec_time: 0,
- end_exec_time: 0,
- metrics: vec![],
- status:
Some(task_status::Status::Successful(
- SuccessfulTask {
- executor_id: executor.id.clone(),
- partitions,
- },
- )),
- };
-
- scheduler
- .update_task_status(&executor.id,
vec![task_status])
- .await?;
- }
- Err(_e) => {
- unassigned_reservations.push(
-
ExecutorReservation::new_free(executor_id.clone()),
- );
- }
- }
- }
- unassigned_reservations
- }
- Err(_e) => reservations,
- };
-
- // If any reserved slots remain, return them to the pool
- if !free_list.is_empty() {
- scheduler
- .state
- .executor_manager
- .cancel_reservations(free_list)
- .await?;
+ other => {
+ panic!("Expected success status but found {:?}", other);
}
}
- let final_graph = scheduler
- .state
- .task_manager
- .get_execution_graph(job_id)
- .await?;
-
- assert!(final_graph.is_successful());
- assert_eq!(final_graph.output_locations().len(), 4);
+ assert_submitted_event("job", &metrics_collector);
+ assert_completed_event("job", &metrics_collector);
Ok(())
}
@@ -532,139 +484,73 @@ mod test {
#[tokio::test]
async fn test_job_failure() -> Result<()> {
let plan = test_plan();
- let task_slots = 4;
-
- let scheduler = test_push_staged_scheduler().await?;
-
- let executors = test_executors(task_slots);
- for (executor_metadata, executor_data) in executors {
- scheduler
- .state
- .executor_manager
- .register_executor(executor_metadata, executor_data, false)
- .await?;
- }
-
- let config = test_session(task_slots);
-
- let ctx = scheduler
- .state
- .session_manager
- .create_session(&config)
- .await?;
- let job_id = "job";
-
- scheduler.state.submit_job(job_id, "", ctx, &plan).await?;
+ let runner = Arc::new(TaskRunnerFn::new(
+ |_executor_id: String, task: MultiTaskDefinition| {
+ let mut statuses = vec![];
+
+ for TaskId {
+ task_id,
+ partition_id,
+ ..
+ } in task.task_ids
+ {
+ let timestamp = timestamp_millis();
+ statuses.push(TaskStatus {
+ task_id,
+ job_id: task.job_id.clone(),
+ stage_id: task.stage_id,
+ stage_attempt_num: task.stage_attempt_num,
+ partition_id,
+ launch_time: timestamp,
+ start_exec_time: timestamp,
+ end_exec_time: timestamp,
+ metrics: vec![],
+ status: Some(task_status::Status::Failed(FailedTask {
+ error: "ERROR".to_string(),
+ retryable: false,
+ count_to_failures: false,
+ failed_reason: Some(
+ failed_task::FailedReason::ExecutionError(
+ ExecutionError {},
+ ),
+ ),
+ })),
+ });
+ }
- let available_tasks = scheduler
- .state
- .task_manager
- .get_available_task_count(job_id)
- .await?;
+ statuses
+ },
+ ));
- let reservations: Vec<ExecutorReservation> = scheduler
- .state
- .executor_manager
- .reserve_slots(available_tasks as u32)
- .await?
- .into_iter()
- .map(|res| res.assign(job_id.to_owned()))
- .collect();
-
- // Complete tasks that are offered through scheduler events
- let free_list = match scheduler
- .state
- .task_manager
- .fill_reservations(&reservations)
- .await
- {
- Ok((assignments, mut unassigned_reservations, _)) => {
- for (executor_id, task) in assignments.into_iter() {
- match scheduler
- .state
- .executor_manager
- .get_executor_metadata(&executor_id)
- .await
- {
- Ok(executor) => {
- let mut partitions: Vec<ShuffleWritePartition> =
vec![];
-
- let num_partitions = task
- .output_partitioning
- .map(|p| p.partition_count())
- .unwrap_or(1);
-
- for partition_id in 0..num_partitions {
- partitions.push(ShuffleWritePartition {
- partition_id: partition_id as u64,
- path: "some/path".to_string(),
- num_batches: 1,
- num_rows: 1,
- num_bytes: 1,
- })
- }
-
- // Complete the task
- let task_status = TaskStatus {
- task_id: task.task_id as u32,
- job_id: task.partition.job_id.clone(),
- stage_id: task.partition.stage_id as u32,
- stage_attempt_num: task.stage_attempt_num as
u32,
- partition_id: task.partition.partition_id as
u32,
- launch_time: 0,
- start_exec_time: 0,
- end_exec_time: 0,
- metrics: vec![],
- status:
Some(task_status::Status::Failed(FailedTask {
- error: "".to_string(),
- retryable: false,
- count_to_failures: false,
- failed_reason: Some(
-
failed_task::FailedReason::ExecutionError(
- ExecutionError {},
- ),
- ),
- })),
- };
-
- scheduler
- .state
- .update_task_statuses(&executor.id,
vec![task_status])
- .await?;
- }
- Err(_e) => {
- unassigned_reservations
-
.push(ExecutorReservation::new_free(executor_id.clone()));
- }
- }
- }
- unassigned_reservations
- }
- Err(_e) => reservations,
- };
+ let metrics_collector = Arc::new(TestMetricsCollector::default());
- // If any reserved slots remain, return them to the pool
- if !free_list.is_empty() {
- scheduler
- .state
- .executor_manager
- .cancel_reservations(free_list)
- .await?;
- }
+ let mut test = SchedulerTest::new(
+ SchedulerConfig::default()
+ .with_scheduler_policy(TaskSchedulingPolicy::PushStaged),
+ metrics_collector.clone(),
+ 4,
+ 1,
+ Some(runner),
+ )
+ .await?;
- let status =
scheduler.state.task_manager.get_job_status(job_id).await?;
+ let status = test.run("job", "", &plan).await.expect("running plan");
assert!(
matches!(
status,
- Some(JobStatus {
+ JobStatus {
status: Some(job_status::Status::Failed(_))
- })
+ }
),
- "Expected job status to be failed"
+ "Expected job status to be failed but it was {:?}",
+ status
);
+ assert_submitted_event("job", &metrics_collector);
+ assert_failed_event("job", &metrics_collector);
+
Ok(())
}
@@ -672,44 +558,39 @@ mod test {
// Here we simulate a planning failure using ExplodingTableProvider to
test this.
#[tokio::test]
async fn test_planning_failure() -> Result<()> {
- let task_slots = 4;
-
- let scheduler = test_push_staged_scheduler().await?;
-
- let config = test_session(task_slots);
-
- let ctx = scheduler
- .state
- .session_manager
- .create_session(&config)
- .await?;
+ let metrics_collector = Arc::new(TestMetricsCollector::default());
+ let mut test = SchedulerTest::new(
+ SchedulerConfig::default()
+ .with_scheduler_policy(TaskSchedulingPolicy::PushStaged),
+ metrics_collector.clone(),
+ 4,
+ 1,
+ None,
+ )
+ .await?;
+
+ let ctx = test.ctx().await?;
ctx.register_table("explode", Arc::new(ExplodingTableProvider))?;
let plan = ctx.sql("SELECT * FROM explode").await?.to_logical_plan()?;
- let job_id = "job";
-
// This should fail when we try and create the physical plan
- scheduler.submit_job(job_id, "", ctx, &plan).await?;
-
- let scheduler = scheduler.clone();
+ let status = test.run("job", "", &plan).await?;
- let check = || async {
- let status =
scheduler.state.task_manager.get_job_status(job_id).await?;
-
- Ok(matches!(
+ assert!(
+ matches!(
status,
- Some(JobStatus {
+ JobStatus {
status: Some(job_status::Status::Failed(_))
- })
- ))
- };
-
- // Sine this happens in an event loop, we need to check a few times.
- let job_failed = await_condition(Duration::from_millis(100), 10,
check).await?;
+ }
+ ),
+ "Expected job status to be failed but it was {:?}",
+ status
+ );
- assert!(job_failed, "Job status not failed after 1 second");
+ assert_no_submitted_event("job", &metrics_collector);
+ assert_failed_event("job", &metrics_collector);
Ok(())
}
@@ -724,23 +605,7 @@ mod test {
state_storage.clone(),
BallistaCodec::default(),
SchedulerConfig::default().with_scheduler_policy(scheduling_policy),
- );
- scheduler.init().await?;
-
- Ok(scheduler)
- }
-
- async fn test_push_staged_scheduler(
- ) -> Result<SchedulerServer<LogicalPlanNode, PhysicalPlanNode>> {
- let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
- let config = SchedulerConfig::default()
- .with_scheduler_policy(TaskSchedulingPolicy::PushStaged);
- let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
- SchedulerServer::new(
- "localhost:50050".to_owned(),
- state_storage,
- BallistaCodec::default(),
- config,
+ Arc::new(TestMetricsCollector::default()),
);
scheduler.init().await?;
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index f49cae46..29f3d2d7 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
@@ -23,6 +24,8 @@ use log::{debug, error, info};
use ballista_core::error::{BallistaError, Result};
use ballista_core::event_loop::{EventAction, EventSender};
+use crate::metrics::SchedulerMetricsCollector;
+use crate::scheduler_server::timestamp_millis;
use ballista_core::serde::AsExecutionPlan;
use datafusion_proto::logical_plan::AsLogicalPlan;
use tokio::sync::mpsc;
@@ -37,11 +40,34 @@ pub(crate) struct QueryStageScheduler<
U: 'static + AsExecutionPlan,
> {
state: Arc<SchedulerState<T, U>>,
+ metrics_collector: Arc<dyn SchedulerMetricsCollector>,
+ pending_tasks: AtomicUsize,
}
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
QueryStageScheduler<T, U> {
- pub(crate) fn new(state: Arc<SchedulerState<T, U>>) -> Self {
- Self { state }
+ pub(crate) fn new(
+ state: Arc<SchedulerState<T, U>>,
+ metrics_collector: Arc<dyn SchedulerMetricsCollector>,
+ ) -> Self {
+ Self {
+ state,
+ metrics_collector,
+ pending_tasks: AtomicUsize::default(),
+ }
+ }
+
+ pub(crate) fn set_pending_tasks(&self, tasks: usize) {
+ self.pending_tasks.store(tasks, Ordering::SeqCst);
+ self.metrics_collector
+ .set_pending_tasks_queue_size(tasks as u64);
+ }
+
+ pub(crate) fn pending_tasks(&self) -> usize {
+ self.pending_tasks.load(Ordering::SeqCst)
+ }
+
+ pub(crate) fn metrics_collector(&self) -> &dyn SchedulerMetricsCollector {
+ self.metrics_collector.as_ref()
}
}
@@ -70,19 +96,31 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
job_name,
session_ctx,
plan,
+ queued_at,
} => {
info!("Job {} queued with name {:?}", job_id, job_name);
+
let state = self.state.clone();
tokio::spawn(async move {
let event = if let Err(e) = state
- .submit_job(&job_id, &job_name, session_ctx, &plan)
+ .submit_job(&job_id, &job_name, session_ctx, &plan,
queued_at)
.await
{
- let msg = format!("Error planning job {}: {:?}",
job_id, e);
- error!("{}", &msg);
- QueryStageSchedulerEvent::JobPlanningFailed(job_id,
msg)
+ let fail_message =
+ format!("Error planning job {}: {:?}", job_id, e);
+ error!("{}", &fail_message);
+ QueryStageSchedulerEvent::JobPlanningFailed {
+ job_id,
+ fail_message,
+ queued_at,
+ failed_at: timestamp_millis(),
+ }
} else {
- QueryStageSchedulerEvent::JobSubmitted(job_id)
+ QueryStageSchedulerEvent::JobSubmitted {
+ job_id,
+ queued_at,
+ submitted_at: timestamp_millis(),
+ }
};
tx_event
.post_event(event)
@@ -91,7 +129,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
.unwrap();
});
}
- QueryStageSchedulerEvent::JobSubmitted(job_id) => {
+ QueryStageSchedulerEvent::JobSubmitted {
+ job_id,
+ queued_at,
+ submitted_at,
+ } => {
+ self.metrics_collector
+ .record_submitted(&job_id, queued_at, submitted_at);
+
info!("Job {} submitted", job_id);
if self.state.config.is_push_staged_scheduling() {
let available_tasks = self
@@ -122,28 +167,52 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
.await?;
}
}
- QueryStageSchedulerEvent::JobPlanningFailed(job_id,
failure_reason) => {
- error!("Job {} failed: {}", job_id, failure_reason);
+ QueryStageSchedulerEvent::JobPlanningFailed {
+ job_id,
+ fail_message,
+ queued_at,
+ failed_at,
+ } => {
+ self.metrics_collector
+ .record_failed(&job_id, queued_at, failed_at);
+
+ error!("Job {} failed: {}", job_id, fail_message);
self.state
.task_manager
- .fail_unscheduled_job(&job_id, failure_reason)
+ .fail_unscheduled_job(&job_id, fail_message)
.await?;
}
- QueryStageSchedulerEvent::JobFinished(job_id) => {
+ QueryStageSchedulerEvent::JobFinished {
+ job_id,
+ queued_at,
+ completed_at,
+ } => {
+ self.metrics_collector
+ .record_completed(&job_id, queued_at, completed_at);
+
info!("Job {} success", job_id);
self.state.task_manager.succeed_job(&job_id).await?;
self.state.clean_up_successful_job(job_id);
}
- QueryStageSchedulerEvent::JobRunningFailed(job_id, failure_reason)
=> {
+ QueryStageSchedulerEvent::JobRunningFailed {
+ job_id,
+ fail_message,
+ queued_at,
+ failed_at,
+ } => {
+ self.metrics_collector
+ .record_failed(&job_id, queued_at, failed_at);
+
error!("Job {} running failed", job_id);
- let tasks = self
+ let (running_tasks, _pending_tasks) = self
.state
.task_manager
- .abort_job(&job_id, failure_reason)
+ .abort_job(&job_id, fail_message)
.await?;
- if !tasks.is_empty() {
+
+ if !running_tasks.is_empty() {
tx_event
-
.post_event(QueryStageSchedulerEvent::CancelTasks(tasks))
+
.post_event(QueryStageSchedulerEvent::CancelTasks(running_tasks))
.await?;
}
self.state.clean_up_failed_job(job_id);
@@ -153,8 +222,16 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
self.state.task_manager.update_job(&job_id).await?;
}
QueryStageSchedulerEvent::JobCancel(job_id) => {
- self.state.task_manager.cancel_job(&job_id).await?;
+ self.metrics_collector.record_cancelled(&job_id);
+
+ info!("Job {} Cancelled", job_id);
+ let (running_tasks, _pending_tasks) =
+ self.state.task_manager.cancel_job(&job_id).await?;
self.state.clean_up_failed_job(job_id);
+
+ tx_event
+
.post_event(QueryStageSchedulerEvent::CancelTasks(running_tasks))
+ .await?;
}
QueryStageSchedulerEvent::TaskUpdating(executor_id, tasks_status)
=> {
let num_status = tasks_status.len();
@@ -186,7 +263,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
}
}
QueryStageSchedulerEvent::ReservationOffering(reservations) => {
- let reservations =
self.state.offer_reservation(reservations).await?;
+ let (reservations, pending) =
+ self.state.offer_reservation(reservations).await?;
+
+ self.set_pending_tasks(pending);
+
if !reservations.is_empty() {
tx_event
.post_event(QueryStageSchedulerEvent::ReservationOffering(
@@ -217,7 +298,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
self.state
.executor_manager
.cancel_running_tasks(tasks)
- .await?
+ .await?;
}
QueryStageSchedulerEvent::JobDataClean(job_id) => {
self.state.executor_manager.clean_up_job_data(job_id);
@@ -231,3 +312,130 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
error!("Error received by QueryStageScheduler: {:?}", error);
}
}
+
+#[cfg(test)]
+mod tests {
+ use crate::config::SchedulerConfig;
+ use crate::test_utils::{await_condition, SchedulerTest,
TestMetricsCollector};
+ use ballista_core::config::TaskSchedulingPolicy;
+ use ballista_core::error::Result;
+ use datafusion::arrow::datatypes::{DataType, Field, Schema};
+ use datafusion::logical_expr::{col, sum, LogicalPlan};
+ use datafusion::test_util::scan_empty_with_partitions;
+ use std::sync::Arc;
+ use std::time::Duration;
+
+ #[tokio::test]
+ async fn test_pending_task_metric() -> Result<()> {
+ let plan = test_plan(10);
+
+ let metrics_collector = Arc::new(TestMetricsCollector::default());
+
+ let mut test = SchedulerTest::new(
+ SchedulerConfig::default()
+ .with_scheduler_policy(TaskSchedulingPolicy::PushStaged),
+ metrics_collector.clone(),
+ 1,
+ 1,
+ None,
+ )
+ .await?;
+
+ test.submit("job-1", "", &plan).await?;
+
+ // First stage has 10 tasks, one of which should be scheduled
immediately
+ expect_pending_tasks(&test, 9).await;
+
+ test.tick().await?;
+
+ // First task completes and another should be scheduler, so we should
have 8
+ expect_pending_tasks(&test, 8).await;
+
+ // Complete the 8 remaining tasks in the first stage
+ for _ in 0..8 {
+ test.tick().await?;
+ }
+
+ // The second stage should be resolved so we should have a new pending
task
+ expect_pending_tasks(&test, 1).await;
+
+ // complete the final task
+ test.tick().await?;
+
+ expect_pending_tasks(&test, 0).await;
+
+ // complete the final task
+ test.tick().await?;
+
+ // Job should be finished now
+ let _ = test.await_completion("job-1").await?;
+
+ Ok(())
+ }
+
+ #[ignore]
+ #[tokio::test]
+ async fn test_pending_task_metric_on_cancellation() -> Result<()> {
+ let plan = test_plan(10);
+
+ let metrics_collector = Arc::new(TestMetricsCollector::default());
+
+ let mut test = SchedulerTest::new(
+ SchedulerConfig::default()
+ .with_scheduler_policy(TaskSchedulingPolicy::PushStaged),
+ metrics_collector.clone(),
+ 1,
+ 1,
+ None,
+ )
+ .await?;
+
+ test.submit("job-1", "", &plan).await?;
+
+ // First stage has 10 tasks, one of which should be scheduled
immediately
+ expect_pending_tasks(&test, 9).await;
+
+ test.tick().await?;
+
+ // First task completes and another should be scheduler, so we should
have 8
+ expect_pending_tasks(&test, 8).await;
+
+ test.cancel("job-1").await?;
+
+ // First task completes and another should be scheduler, so we should
have 8
+ expect_pending_tasks(&test, 0).await;
+
+ Ok(())
+ }
+
+ async fn expect_pending_tasks(test: &SchedulerTest, expected: usize) {
+ let success = await_condition(Duration::from_millis(500), 20, || {
+ let pending_tasks = test.pending_tasks();
+
+ futures::future::ready(Ok(pending_tasks == expected))
+ })
+ .await
+ .unwrap();
+
+ assert!(
+ success,
+ "Expected {} pending tasks but found {}",
+ expected,
+ test.pending_tasks()
+ );
+ }
+
+ fn test_plan(partitions: usize) -> LogicalPlan {
+ let schema = Schema::new(vec![
+ Field::new("id", DataType::Utf8, false),
+ Field::new("gmv", DataType::UInt64, false),
+ ]);
+
+ scan_empty_with_partitions(None, &schema, Some(vec![0, 1]), partitions)
+ .unwrap()
+ .aggregate(vec![col("id")], vec![sum(col("gmv"))])
+ .unwrap()
+ .build()
+ .unwrap()
+ }
+}
diff --git a/ballista/scheduler/src/standalone.rs
b/ballista/scheduler/src/standalone.rs
index eb64cfb7..975ad293 100644
--- a/ballista/scheduler/src/standalone.rs
+++ b/ballista/scheduler/src/standalone.rs
@@ -16,6 +16,7 @@
// under the License.
use crate::config::SchedulerConfig;
+use crate::metrics::default_metrics_collector;
use crate::{
scheduler_server::SchedulerServer,
state::backend::standalone::StandaloneClient,
};
@@ -34,12 +35,15 @@ use tokio::net::TcpListener;
pub async fn new_standalone_scheduler() -> Result<SocketAddr> {
let client = StandaloneClient::try_new_temporary()?;
+ let metrics_collector = default_metrics_collector()?;
+
let mut scheduler_server: SchedulerServer<LogicalPlanNode,
PhysicalPlanNode> =
SchedulerServer::new(
"localhost:50050".to_owned(),
Arc::new(client),
BallistaCodec::default(),
SchedulerConfig::default(),
+ metrics_collector,
);
scheduler_server.init().await?;
let server = SchedulerGrpcServer::new(scheduler_server.clone());
diff --git a/ballista/scheduler/src/state/execution_graph.rs
b/ballista/scheduler/src/state/execution_graph.rs
index dcb919b1..51a6232f 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -47,6 +47,7 @@ use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
use crate::display::print_stage_metrics;
use crate::planner::DistributedPlanner;
use crate::scheduler_server::event::QueryStageSchedulerEvent;
+use crate::scheduler_server::timestamp_millis;
pub(crate) use crate::state::execution_graph::execution_stage::{
ExecutionStage, FailedStage, ResolvedStage, StageOutput, SuccessfulStage,
TaskInfo,
UnresolvedStage,
@@ -110,6 +111,8 @@ pub struct ExecutionGraph {
session_id: String,
/// Status of this job
status: JobStatus,
+ /// Timestamp of when this job was submitted
+ queued_at: u64,
/// Job start time
start_time: u64,
/// Job end time
@@ -143,6 +146,7 @@ impl ExecutionGraph {
job_name: &str,
session_id: &str,
plan: Arc<dyn ExecutionPlan>,
+ queued_at: u64,
) -> Result<Self> {
let mut planner = DistributedPlanner::new();
@@ -161,10 +165,8 @@ impl ExecutionGraph {
status: JobStatus {
status: Some(job_status::Status::Queued(QueuedJob {})),
},
- start_time: SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .unwrap()
- .as_millis() as u64,
+ queued_at,
+ start_time: timestamp_millis(),
end_time: 0,
stages,
output_partitions,
@@ -700,15 +702,21 @@ impl ExecutionGraph {
if !updated_stages.failed_stages.is_empty() {
info!("Job {} is failed", job_id);
self.fail_job(job_err_msg.clone());
- events.push(QueryStageSchedulerEvent::JobRunningFailed(
+ events.push(QueryStageSchedulerEvent::JobRunningFailed {
job_id,
- job_err_msg,
- ));
+ fail_message: job_err_msg,
+ queued_at: self.queued_at,
+ failed_at: timestamp_millis(),
+ });
} else if self.is_successful() {
// If this ExecutionGraph is successful, finish it
info!("Job {} is success, finalizing output partitions", job_id);
self.succeed_job()?;
- events.push(QueryStageSchedulerEvent::JobFinished(job_id));
+ events.push(QueryStageSchedulerEvent::JobFinished {
+ job_id,
+ queued_at: self.queued_at,
+ completed_at: timestamp_millis(),
+ });
} else if has_resolved {
events.push(QueryStageSchedulerEvent::JobUpdated(job_id))
}
@@ -1312,6 +1320,7 @@ impl ExecutionGraph {
"Invalid Execution Graph: missing job status".to_owned(),
)
})?,
+ queued_at: proto.queued_at,
start_time: proto.start_time,
end_time: proto.end_time,
stages,
@@ -1386,6 +1395,7 @@ impl ExecutionGraph {
job_name: graph.job_name,
session_id: graph.session_id,
status: Some(graph.status),
+ queued_at: graph.queued_at,
start_time: graph.start_time,
end_time: graph.end_time,
stages,
@@ -2229,7 +2239,7 @@ mod test {
assert_eq!(stage_events.len(), 1);
assert!(matches!(
stage_events[0],
- QueryStageSchedulerEvent::JobRunningFailed(_, _)
+ QueryStageSchedulerEvent::JobRunningFailed { .. }
));
// Stage 2 is still running
let running_stage = agg_graph.running_stages();
@@ -2719,7 +2729,7 @@ mod test {
assert_eq!(stage_events.len(), 1);
assert!(matches!(
stage_events[0],
- QueryStageSchedulerEvent::JobRunningFailed(_, _)
+ QueryStageSchedulerEvent::JobRunningFailed { .. }
));
drain_tasks(&mut agg_graph)?;
@@ -2769,7 +2779,7 @@ mod test {
println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
- ExecutionGraph::new("localhost:50050", "job", "", "session",
plan).unwrap()
+ ExecutionGraph::new("localhost:50050", "job", "", "session", plan,
0).unwrap()
}
async fn test_two_aggregations_plan(partition: usize) -> ExecutionGraph {
@@ -2797,7 +2807,7 @@ mod test {
println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
- ExecutionGraph::new("localhost:50050", "job", "", "session",
plan).unwrap()
+ ExecutionGraph::new("localhost:50050", "job", "", "session", plan,
0).unwrap()
}
async fn test_coalesce_plan(partition: usize) -> ExecutionGraph {
@@ -2820,7 +2830,7 @@ mod test {
let plan = ctx.create_physical_plan(&optimized_plan).await.unwrap();
- ExecutionGraph::new("localhost:50050", "job", "", "session",
plan).unwrap()
+ ExecutionGraph::new("localhost:50050", "job", "", "session", plan,
0).unwrap()
}
async fn test_join_plan(partition: usize) -> ExecutionGraph {
@@ -2861,8 +2871,8 @@ mod test {
println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
- let graph =
- ExecutionGraph::new("localhost:50050", "job", "", "session",
plan).unwrap();
+ let graph = ExecutionGraph::new("localhost:50050", "job", "",
"session", plan, 0)
+ .unwrap();
println!("{:?}", graph);
@@ -2886,8 +2896,8 @@ mod test {
println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
- let graph =
- ExecutionGraph::new("localhost:50050", "job", "", "session",
plan).unwrap();
+ let graph = ExecutionGraph::new("localhost:50050", "job", "",
"session", plan, 0)
+ .unwrap();
println!("{:?}", graph);
@@ -2911,8 +2921,8 @@ mod test {
println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
- let graph =
- ExecutionGraph::new("localhost:50050", "job", "", "session",
plan).unwrap();
+ let graph = ExecutionGraph::new("localhost:50050", "job", "",
"session", plan, 0)
+ .unwrap();
println!("{:?}", graph);
diff --git a/ballista/scheduler/src/state/execution_graph_dot.rs
b/ballista/scheduler/src/state/execution_graph_dot.rs
index 0cdbb39c..708b5077 100644
--- a/ballista/scheduler/src/state/execution_graph_dot.rs
+++ b/ballista/scheduler/src/state/execution_graph_dot.rs
@@ -542,6 +542,6 @@ filter_expr="]
.await?;
let plan = df.to_logical_plan()?;
let plan = ctx.create_physical_plan(&plan).await?;
- ExecutionGraph::new("scheduler_id", "job_id", "job_name",
"session_id", plan)
+ ExecutionGraph::new("scheduler_id", "job_id", "job_name",
"session_id", plan, 0)
}
}
diff --git a/ballista/scheduler/src/state/mod.rs
b/ballista/scheduler/src/state/mod.rs
index d770f984..3c12d2c8 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -29,7 +29,7 @@ use crate::scheduler_server::SessionBuilder;
use crate::state::backend::{Lock, StateBackendClient};
use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
use crate::state::session_manager::SessionManager;
-use crate::state::task_manager::TaskManager;
+use crate::state::task_manager::{TaskLauncher, TaskManager};
use crate::config::SchedulerConfig;
use crate::state::execution_graph::TaskDescription;
@@ -49,7 +49,7 @@ pub mod execution_graph_dot;
pub mod executor_manager;
pub mod session_manager;
pub mod session_registry;
-mod task_manager;
+pub(crate) mod task_manager;
pub fn decode_protobuf<T: Message + Default>(bytes: &[u8]) -> Result<T> {
T::decode(bytes).map_err(|e| {
@@ -135,11 +135,37 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
}
}
+ #[allow(dead_code)]
+ pub(crate) fn with_task_launcher(
+ config_client: Arc<dyn StateBackendClient>,
+ session_builder: SessionBuilder,
+ codec: BallistaCodec<T, U>,
+ scheduler_name: String,
+ config: SchedulerConfig,
+ dispatcher: Arc<dyn TaskLauncher>,
+ ) -> Self {
+ Self {
+ executor_manager: ExecutorManager::new(
+ config_client.clone(),
+ config.executor_slots_policy,
+ ),
+ task_manager: TaskManager::with_launcher(
+ config_client.clone(),
+ session_builder,
+ codec.clone(),
+ scheduler_name,
+ dispatcher,
+ ),
+ session_manager: SessionManager::new(config_client,
session_builder),
+ codec,
+ config,
+ }
+ }
+
pub async fn init(&self) -> Result<()> {
self.executor_manager.init().await
}
- #[cfg(not(test))]
pub(crate) async fn update_task_statuses(
&self,
executor_id: &str,
@@ -164,33 +190,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
Ok((events, reservations))
}
- #[cfg(test)]
- pub(crate) async fn update_task_statuses(
- &self,
- executor_id: &str,
- tasks_status: Vec<TaskStatus>,
- ) -> Result<(Vec<QueryStageSchedulerEvent>, Vec<ExecutorReservation>)> {
- let executor = self
- .executor_manager
- .get_executor_metadata(executor_id)
- .await?;
-
- let total_num_tasks = tasks_status.len();
- let free_list = (0..total_num_tasks)
- .into_iter()
- .map(|_| ExecutorReservation::new_free(executor_id.to_owned()))
- .collect();
-
- let events = self
- .task_manager
- .update_task_statuses(&executor, tasks_status)
- .await?;
-
- self.executor_manager.cancel_reservations(free_list).await?;
-
- Ok((events, vec![]))
- }
-
/// Process reservations which are offered. The basic process is
/// 1. Attempt to fill the offered reservations with available tasks
/// 2. For any reservation that filled, launch the assigned task on the
executor.
@@ -203,7 +202,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
pub(crate) async fn offer_reservation(
&self,
reservations: Vec<ExecutorReservation>,
- ) -> Result<Vec<ExecutorReservation>> {
+ ) -> Result<(Vec<ExecutorReservation>, usize)> {
let (free_list, pending_tasks) = match self
.task_manager
.fill_reservations(&reservations)
@@ -286,9 +285,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
}
};
- dbg!(free_list.clone());
- dbg!(pending_tasks);
-
let mut new_reservations = vec![];
if !free_list.is_empty() {
// If any reserved slots remain, return them to the pool
@@ -302,7 +298,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
new_reservations.extend(pending_reservations);
}
- Ok(new_reservations)
+ Ok((new_reservations, pending_tasks))
}
pub(crate) async fn submit_job(
@@ -311,6 +307,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
job_name: &str,
session_ctx: Arc<SessionContext>,
plan: &LogicalPlan,
+ queued_at: u64,
) -> Result<()> {
let start = Instant::now();
@@ -377,7 +374,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
);
self.task_manager
- .submit_job(job_id, job_name, &session_ctx.session_id(), plan)
+ .submit_job(job_id, job_name, &session_ctx.session_id(), plan,
queued_at)
.await?;
let elapsed = start.elapsed();
@@ -387,26 +384,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
Ok(())
}
- pub(crate) async fn cancel_job(&self, job_id: &str) -> Result<bool> {
- info!("Received cancellation request for job {}", job_id);
-
- match self.task_manager.cancel_job(job_id).await {
- Ok(tasks) => {
-
self.executor_manager.cancel_running_tasks(tasks).await.map_err(|e| {
- let msg = format!("Error to cancel running tasks when
cancelling job {} due to {:?}", job_id, e);
- error!("{}", msg);
- BallistaError::Internal(msg)
- })?;
- Ok(true)
- }
- Err(e) => {
- let msg = format!("Error cancelling job {}: {:?}", job_id, e);
- error!("{}", msg);
- Ok(false)
- }
- }
- }
-
/// Spawn a delayed future to clean up job data on both Scheduler and
Executors
pub(crate) fn clean_up_successful_job(&self, job_id: String) {
self.executor_manager.clean_up_job_data_delayed(
@@ -464,6 +441,8 @@ mod test {
use ballista_core::serde::BallistaCodec;
use ballista_core::utils::default_session_builder;
+ use crate::config::SchedulerConfig;
+ use crate::test_utils::BlackholeTaskLauncher;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::logical_expr::{col, sum};
use datafusion::physical_plan::ExecutionPlan;
@@ -492,8 +471,9 @@ mod test {
.register_executor(executor_metadata, executor_data, true)
.await?;
- let result = state.offer_reservation(reservations).await?;
+ let (result, assigned) = state.offer_reservation(reservations).await?;
+ assert_eq!(assigned, 0);
assert!(result.is_empty());
// All reservations should have been cancelled so we should be able to
reserve them now
@@ -512,10 +492,13 @@ mod test {
.build()?;
let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
- Arc::new(SchedulerState::new_with_default_scheduler_name(
+ Arc::new(SchedulerState::with_task_launcher(
state_storage,
default_session_builder,
BallistaCodec::default(),
+ String::default(),
+ SchedulerConfig::default(),
+ Arc::new(BlackholeTaskLauncher::default()),
));
let session_ctx = state.session_manager.create_session(&config).await?;
@@ -525,19 +508,43 @@ mod test {
// Create 4 jobs so we have four pending tasks
state
.task_manager
- .submit_job("job-1", "", session_ctx.session_id().as_str(),
plan.clone())
+ .submit_job(
+ "job-1",
+ "",
+ session_ctx.session_id().as_str(),
+ plan.clone(),
+ 0,
+ )
.await?;
state
.task_manager
- .submit_job("job-2", "", session_ctx.session_id().as_str(),
plan.clone())
+ .submit_job(
+ "job-2",
+ "",
+ session_ctx.session_id().as_str(),
+ plan.clone(),
+ 0,
+ )
.await?;
state
.task_manager
- .submit_job("job-3", "", session_ctx.session_id().as_str(),
plan.clone())
+ .submit_job(
+ "job-3",
+ "",
+ session_ctx.session_id().as_str(),
+ plan.clone(),
+ 0,
+ )
.await?;
state
.task_manager
- .submit_job("job-4", "", session_ctx.session_id().as_str(),
plan.clone())
+ .submit_job(
+ "job-4",
+ "",
+ session_ctx.session_id().as_str(),
+ plan.clone(),
+ 0,
+ )
.await?;
let executors = test_executors(1, 4);
@@ -549,8 +556,9 @@ mod test {
.register_executor(executor_metadata, executor_data, true)
.await?;
- let result = state.offer_reservation(reservations).await?;
+ let (result, pending) = state.offer_reservation(reservations).await?;
+ assert_eq!(pending, 0);
assert!(result.is_empty());
// All task slots should be assigned so we should not be able to
reserve more tasks
@@ -569,10 +577,13 @@ mod test {
.build()?;
let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
- Arc::new(SchedulerState::new_with_default_scheduler_name(
+ Arc::new(SchedulerState::with_task_launcher(
state_storage,
default_session_builder,
BallistaCodec::default(),
+ String::default(),
+ SchedulerConfig::default(),
+ Arc::new(BlackholeTaskLauncher::default()),
));
let session_ctx = state.session_manager.create_session(&config).await?;
@@ -582,7 +593,13 @@ mod test {
// Create a job
state
.task_manager
- .submit_job("job-1", "", session_ctx.session_id().as_str(),
plan.clone())
+ .submit_job(
+ "job-1",
+ "",
+ session_ctx.session_id().as_str(),
+ plan.clone(),
+ 0,
+ )
.await?;
let executors = test_executors(1, 4);
@@ -645,8 +662,9 @@ mod test {
// Offer the reservation. It should be filled with one of the 4
pending tasks. The other 3 should
// be reserved for the other 3 tasks, emitting another offer event
- let reservations = state.offer_reservation(reservations).await?;
+ let (reservations, pending) =
state.offer_reservation(reservations).await?;
+ assert_eq!(pending, 3);
assert_eq!(reservations.len(), 3);
// Remaining 3 task slots should be reserved for pending tasks
diff --git a/ballista/scheduler/src/state/task_manager.rs
b/ballista/scheduler/src/state/task_manager.rs
index 31b28100..53b59219 100644
--- a/ballista/scheduler/src/state/task_manager.rs
+++ b/ballista/scheduler/src/state/task_manager.rs
@@ -29,6 +29,7 @@ use ballista_core::error::Result;
use crate::state::backend::Keyspace::{CompletedJobs, FailedJobs};
use crate::state::session_manager::create_datafusion_context;
+
use ballista_core::serde::protobuf::{
self, job_status, FailedJob, JobStatus, MultiTaskDefinition,
TaskDefinition, TaskId,
TaskStatus,
@@ -58,6 +59,52 @@ pub const TASK_MAX_FAILURES: usize = 4;
/// Default max failure attempts for stage level retry
pub const STAGE_MAX_FAILURES: usize = 4;
+#[async_trait::async_trait]
+pub(crate) trait TaskLauncher: Send + Sync + 'static {
+ async fn launch_tasks(
+ &self,
+ executor: &ExecutorMetadata,
+ tasks: Vec<MultiTaskDefinition>,
+ executor_manager: &ExecutorManager,
+ ) -> Result<()>;
+}
+
+struct DefaultTaskLauncher {
+ scheduler_id: String,
+}
+
+impl DefaultTaskLauncher {
+ pub fn new(scheduler_id: String) -> Self {
+ Self { scheduler_id }
+ }
+}
+
+#[async_trait::async_trait]
+impl TaskLauncher for DefaultTaskLauncher {
+ async fn launch_tasks(
+ &self,
+ executor: &ExecutorMetadata,
+ tasks: Vec<MultiTaskDefinition>,
+ executor_manager: &ExecutorManager,
+ ) -> Result<()> {
+ info!("Launching multi task on executor {:?}", executor.id);
+ let mut client = executor_manager.get_client(&executor.id).await?;
+ client
+ .launch_multi_task(protobuf::LaunchMultiTaskParams {
+ multi_tasks: tasks,
+ scheduler_id: self.scheduler_id.clone(),
+ })
+ .await
+ .map_err(|e| {
+ BallistaError::Internal(format!(
+ "Failed to connect to executor {}: {:?}",
+ executor.id, e
+ ))
+ })?;
+ Ok(())
+ }
+}
+
#[derive(Clone)]
pub struct TaskManager<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> {
state: Arc<dyn StateBackendClient>,
@@ -66,6 +113,7 @@ pub struct TaskManager<T: 'static + AsLogicalPlan, U:
'static + AsExecutionPlan>
scheduler_id: String,
// Cache for active jobs curated by this scheduler
active_job_cache: ActiveJobCache,
+ launcher: Arc<dyn TaskLauncher>,
}
#[derive(Clone)]
@@ -100,6 +148,24 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
session_builder: SessionBuilder,
codec: BallistaCodec<T, U>,
scheduler_id: String,
+ ) -> Self {
+ Self {
+ state,
+ session_builder,
+ codec,
+ scheduler_id: scheduler_id.clone(),
+ active_job_cache: Arc::new(DashMap::new()),
+ launcher: Arc::new(DefaultTaskLauncher::new(scheduler_id)),
+ }
+ }
+
+ #[allow(dead_code)]
+ pub(crate) fn with_launcher(
+ state: Arc<dyn StateBackendClient>,
+ session_builder: SessionBuilder,
+ codec: BallistaCodec<T, U>,
+ scheduler_id: String,
+ launcher: Arc<dyn TaskLauncher>,
) -> Self {
Self {
state,
@@ -107,6 +173,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
codec,
scheduler_id,
active_job_cache: Arc::new(DashMap::new()),
+ launcher,
}
}
@@ -119,9 +186,16 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
job_name: &str,
session_id: &str,
plan: Arc<dyn ExecutionPlan>,
+ queued_at: u64,
) -> Result<()> {
- let mut graph =
- ExecutionGraph::new(&self.scheduler_id, job_id, job_name,
session_id, plan)?;
+ let mut graph = ExecutionGraph::new(
+ &self.scheduler_id,
+ job_id,
+ job_name,
+ session_id,
+ plan,
+ queued_at,
+ )?;
info!("Submitting execution graph: {:?}", graph);
self.state
.put(
@@ -297,7 +371,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
}
}
if assign_tasks >= free_reservations.len() {
- pending_tasks = graph.available_tasks();
+ pending_tasks += graph.available_tasks();
break;
}
}
@@ -335,7 +409,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
}
/// Cancel the job and return a Vec of running tasks need to cancel
- pub(crate) async fn cancel_job(&self, job_id: &str) ->
Result<Vec<RunningTaskInfo>> {
+ pub(crate) async fn cancel_job(
+ &self,
+ job_id: &str,
+ ) -> Result<(Vec<RunningTaskInfo>, usize)> {
self.abort_job(job_id, "Cancelled".to_owned()).await
}
@@ -344,7 +421,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
&self,
job_id: &str,
failure_reason: String,
- ) -> Result<Vec<RunningTaskInfo>> {
+ ) -> Result<(Vec<RunningTaskInfo>, usize)> {
let locks = self
.state
.acquire_locks(vec![
@@ -352,25 +429,32 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
(Keyspace::FailedJobs, job_id),
])
.await?;
- let tasks_to_cancel = if let Some(graph) =
+ let (tasks_to_cancel, pending_tasks) = if let Some(graph) =
self.get_active_execution_graph(job_id).await
{
- let running_tasks = graph.read().await.running_tasks();
+ let (pending_tasks, running_tasks) = {
+ let guard = graph.read().await;
+ (guard.available_tasks(), guard.running_tasks())
+ };
+
info!(
"Cancelling {} running tasks for job {}",
running_tasks.len(),
job_id
);
- with_locks(locks, self.fail_job_state(job_id,
failure_reason)).await?;
- running_tasks
+ with_locks(locks, self.fail_job_state(job_id, failure_reason))
+ .await
+ .unwrap();
+
+ (running_tasks, pending_tasks)
} else {
// TODO listen the job state update event and fix task cancelling
warn!("Fail to find job {} in the cache, unable to cancel tasks
for job, fail the job state only.", job_id);
with_locks(locks, self.fail_job_state(job_id,
failure_reason)).await?;
- vec![]
+ (vec![], 0)
};
- Ok(tasks_to_cancel)
+ Ok((tasks_to_cancel, pending_tasks))
}
/// Mark a unscheduled job as failed. This will create a key under the
FailedJobs keyspace
@@ -405,11 +489,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
]
};
- let _res = if let Some(graph) =
self.remove_active_execution_graph(job_id).await {
+ if let Some(graph) = self.remove_active_execution_graph(job_id).await {
let mut graph = graph.write().await;
let previous_status = graph.status();
graph.fail_job(failure_reason);
- let value = self.encode_execution_graph(graph.clone())?;
+
+ let value = encode_protobuf(&graph.status())?;
let txn_ops = txn_operations(value);
let result = self.state.apply_txn(txn_ops).await;
if result.is_err() {
@@ -417,7 +502,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
graph.update_status(previous_status);
warn!("Rollback Execution Graph state change since it did not
persisted due to a possible connection error.")
};
- result
} else {
info!("Fail to find job {} in the cache", job_id);
let status = JobStatus {
@@ -427,27 +511,35 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
};
let value = encode_protobuf(&status)?;
let txn_ops = txn_operations(value);
- self.state.apply_txn(txn_ops).await
+ self.state.apply_txn(txn_ops).await?;
};
Ok(())
}
- pub async fn update_job(&self, job_id: &str) -> Result<()> {
+ pub async fn update_job(&self, job_id: &str) -> Result<usize> {
debug!("Update job {} in Active", job_id);
if let Some(graph) = self.get_active_execution_graph(job_id).await {
let mut graph = graph.write().await;
+
+ let curr_available_tasks = graph.available_tasks();
+
graph.revive();
let graph = graph.clone();
+
+ let new_tasks = graph.available_tasks() - curr_available_tasks;
+
let value = self.encode_execution_graph(graph)?;
self.state
.put(Keyspace::ActiveJobs, job_id.to_owned(), value)
.await?;
+
+ Ok(new_tasks)
} else {
warn!("Fail to find job {} in the cache", job_id);
- }
- Ok(())
+ Ok(0)
+ }
}
/// return a Vec of running tasks need to cancel
@@ -484,45 +576,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
.await
}
- #[allow(dead_code)]
- #[cfg(not(test))]
- /// Launch the given task on the specified executor
- pub(crate) async fn launch_task(
- &self,
- executor: &ExecutorMetadata,
- task: TaskDescription,
- executor_manager: &ExecutorManager,
- ) -> Result<()> {
- info!("Launching task {:?} on executor {:?}", task, executor.id);
- let task_definition = self.prepare_task_definition(task)?;
- let mut client = executor_manager.get_client(&executor.id).await?;
- client
- .launch_task(protobuf::LaunchTaskParams {
- tasks: vec![task_definition],
- scheduler_id: self.scheduler_id.clone(),
- })
- .await
- .map_err(|e| {
- BallistaError::Internal(format!(
- "Failed to connect to executor {}: {:?}",
- executor.id, e
- ))
- })?;
- Ok(())
- }
-
- #[allow(dead_code)]
- #[cfg(test)]
- /// In unit tests, we do not have actual executors running, so it
simplifies things to just noop.
- pub(crate) async fn launch_task(
- &self,
- _executor: &ExecutorMetadata,
- _task: TaskDescription,
- _executor_manager: &ExecutorManager,
- ) -> Result<()> {
- Ok(())
- }
-
/// Retrieve the number of available tasks for the given job. The value
returned
/// is strictly a point-in-time snapshot
pub async fn get_available_task_count(&self, job_id: &str) ->
Result<usize> {
@@ -591,7 +644,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
}
}
- #[cfg(not(test))]
/// Launch the given tasks on the specified executor
pub(crate) async fn launch_multi_task(
&self,
@@ -599,37 +651,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
tasks: Vec<Vec<TaskDescription>>,
executor_manager: &ExecutorManager,
) -> Result<()> {
- info!("Launching multi task on executor {:?}", executor.id);
let multi_tasks: Result<Vec<MultiTaskDefinition>> = tasks
.into_iter()
.map(|stage_tasks| self.prepare_multi_task_definition(stage_tasks))
.collect();
- let multi_tasks = multi_tasks?;
- let mut client = executor_manager.get_client(&executor.id).await?;
- client
- .launch_multi_task(protobuf::LaunchMultiTaskParams {
- multi_tasks,
- scheduler_id: self.scheduler_id.clone(),
- })
- .await
- .map_err(|e| {
- BallistaError::Internal(format!(
- "Failed to connect to executor {}: {:?}",
- executor.id, e
- ))
- })?;
- Ok(())
- }
- #[cfg(test)]
- /// Launch the given tasks on the specified executor
- pub(crate) async fn launch_multi_task(
- &self,
- _executor: &ExecutorMetadata,
- _tasks: Vec<Vec<TaskDescription>>,
- _executor_manager: &ExecutorManager,
- ) -> Result<()> {
- Ok(())
+ self.launcher
+ .launch_tasks(executor, multi_tasks?, executor_manager)
+ .await
}
#[allow(dead_code)]
diff --git a/ballista/scheduler/src/test_utils.rs
b/ballista/scheduler/src/test_utils.rs
index c115ce72..2566dcb5 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -15,22 +15,46 @@
// specific language governing permissions and limitations
// under the License.
-use ballista_core::error::Result;
+use ballista_core::error::{BallistaError, Result};
use std::any::Any;
+use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
+use crate::config::SchedulerConfig;
+use crate::metrics::SchedulerMetricsCollector;
+use crate::scheduler_server::{timestamp_millis, SchedulerServer};
+use crate::state::backend::standalone::StandaloneClient;
+
+use crate::state::executor_manager::ExecutorManager;
+use crate::state::task_manager::TaskLauncher;
+
+use ballista_core::config::{BallistaConfig,
BALLISTA_DEFAULT_SHUFFLE_PARTITIONS};
+use ballista_core::serde::protobuf::job_status::Status;
+use ballista_core::serde::protobuf::{
+ task_status, JobStatus, MultiTaskDefinition, PhysicalPlanNode,
ShuffleWritePartition,
+ SuccessfulTask, TaskId, TaskStatus,
+};
+use ballista_core::serde::scheduler::{
+ ExecutorData, ExecutorMetadata, ExecutorSpecification,
+};
+use ballista_core::serde::BallistaCodec;
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::common::DataFusionError;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::execution::context::{SessionConfig, SessionContext,
SessionState};
-use datafusion::logical_expr::Expr;
+use datafusion::logical_expr::{Expr, LogicalPlan};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::CsvReadOptions;
+use crate::scheduler_server::event::QueryStageSchedulerEvent;
+use datafusion_proto::protobuf::LogicalPlanNode;
+use parking_lot::Mutex;
+use tokio::sync::mpsc::{channel, Receiver, Sender};
+
pub const TPCH_TABLES: &[&str] = &[
"part", "supplier", "partsupp", "customer", "orders", "lineitem",
"nation", "region",
];
@@ -201,3 +225,494 @@ pub fn get_tpch_schema(table: &str) -> Schema {
_ => unimplemented!(),
}
}
+
+pub trait TaskRunner: Send + Sync + 'static {
+ fn run(&self, executor_id: String, tasks: MultiTaskDefinition) ->
Vec<TaskStatus>;
+}
+
+#[derive(Clone)]
+pub struct TaskRunnerFn<F> {
+ f: F,
+}
+
+impl<F> TaskRunnerFn<F>
+where
+ F: Fn(String, MultiTaskDefinition) -> Vec<TaskStatus> + Send + Sync +
'static,
+{
+ pub fn new(f: F) -> Self {
+ Self { f }
+ }
+}
+
+impl<F> TaskRunner for TaskRunnerFn<F>
+where
+ F: Fn(String, MultiTaskDefinition) -> Vec<TaskStatus> + Send + Sync +
'static,
+{
+ fn run(&self, executor_id: String, tasks: MultiTaskDefinition) ->
Vec<TaskStatus> {
+ (self.f)(executor_id, tasks)
+ }
+}
+
+pub fn default_task_runner() -> impl TaskRunner {
+ TaskRunnerFn::new(|executor_id: String, task: MultiTaskDefinition| {
+ let mut statuses = vec![];
+
+ let partitions =
+ if let Some(output_partitioning) =
task.output_partitioning.as_ref() {
+ output_partitioning.partition_count as usize
+ } else {
+ 1
+ };
+
+ let partitions: Vec<ShuffleWritePartition> = (0..partitions)
+ .into_iter()
+ .map(|i| ShuffleWritePartition {
+ partition_id: i as u64,
+ path: String::default(),
+ num_batches: 1,
+ num_rows: 1,
+ num_bytes: 1,
+ })
+ .collect();
+
+ for TaskId {
+ task_id,
+ partition_id,
+ ..
+ } in task.task_ids
+ {
+ let timestamp = timestamp_millis();
+ statuses.push(TaskStatus {
+ task_id,
+ job_id: task.job_id.clone(),
+ stage_id: task.stage_id,
+ stage_attempt_num: task.stage_attempt_num,
+ partition_id,
+ launch_time: timestamp,
+ start_exec_time: timestamp,
+ end_exec_time: timestamp,
+ metrics: vec![],
+ status: Some(task_status::Status::Successful(SuccessfulTask {
+ executor_id: executor_id.clone(),
+ partitions: partitions.clone(),
+ })),
+ });
+ }
+
+ statuses
+ })
+}
+
+#[derive(Clone)]
+struct VirtualExecutor {
+ executor_id: String,
+ task_slots: usize,
+ runner: Arc<dyn TaskRunner>,
+}
+
+impl VirtualExecutor {
+ pub fn run_tasks(&self, tasks: MultiTaskDefinition) -> Vec<TaskStatus> {
+ self.runner.run(self.executor_id.clone(), tasks)
+ }
+}
+
+/// Launcher which consumes tasks and never sends a status update
+#[derive(Default)]
+pub struct BlackholeTaskLauncher {}
+
+#[async_trait]
+impl TaskLauncher for BlackholeTaskLauncher {
+ async fn launch_tasks(
+ &self,
+ _executor: &ExecutorMetadata,
+ _tasks: Vec<MultiTaskDefinition>,
+ _executor_manager: &ExecutorManager,
+ ) -> Result<()> {
+ Ok(())
+ }
+}
+
+pub struct VirtualTaskLauncher {
+ sender: Sender<(String, Vec<TaskStatus>)>,
+ executors: HashMap<String, VirtualExecutor>,
+}
+
+#[async_trait::async_trait]
+impl TaskLauncher for VirtualTaskLauncher {
+ async fn launch_tasks(
+ &self,
+ executor: &ExecutorMetadata,
+ tasks: Vec<MultiTaskDefinition>,
+ _executor_manager: &ExecutorManager,
+ ) -> Result<()> {
+ let virtual_executor = self.executors.get(&executor.id).ok_or_else(|| {
+ BallistaError::Internal(format!(
+ "No virtual executor with ID {} found",
+ executor.id
+ ))
+ })?;
+
+ let status = tasks
+ .into_iter()
+ .flat_map(|t| virtual_executor.run_tasks(t))
+ .collect();
+
+ self.sender
+ .send((executor.id.clone(), status))
+ .await
+ .map_err(|e| {
+ BallistaError::Internal(format!("Error sending task status:
{:?}", e))
+ })
+ }
+}
+
+pub struct SchedulerTest {
+ scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode>,
+ ballista_config: BallistaConfig,
+ status_receiver: Option<Receiver<(String, Vec<TaskStatus>)>>,
+}
+
+impl SchedulerTest {
+ pub async fn new(
+ config: SchedulerConfig,
+ metrics_collector: Arc<dyn SchedulerMetricsCollector>,
+ num_executors: usize,
+ task_slots_per_executor: usize,
+ runner: Option<Arc<dyn TaskRunner>>,
+ ) -> Result<Self> {
+ let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+
+ let ballista_config = BallistaConfig::builder()
+ .set(
+ BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
+ format!("{}", num_executors *
task_slots_per_executor).as_str(),
+ )
+ .build()
+ .expect("creating BallistaConfig");
+
+ let runner = runner.unwrap_or_else(|| Arc::new(default_task_runner()));
+
+ let executors: HashMap<String, VirtualExecutor> = (0..num_executors)
+ .into_iter()
+ .map(|i| {
+ let id = format!("virtual-executor-{}", i);
+ let executor = VirtualExecutor {
+ executor_id: id.clone(),
+ task_slots: task_slots_per_executor,
+ runner: runner.clone(),
+ };
+ (id, executor)
+ })
+ .collect();
+
+ let (status_sender, status_receiver) = channel(1000);
+
+ let launcher = VirtualTaskLauncher {
+ sender: status_sender,
+ executors: executors.clone(),
+ };
+
+ let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
+ SchedulerServer::with_task_launcher(
+ "localhost:50050".to_owned(),
+ state_storage.clone(),
+ BallistaCodec::default(),
+ config,
+ metrics_collector,
+ Arc::new(launcher),
+ );
+ scheduler.init().await?;
+
+ for (executor_id, VirtualExecutor { task_slots, .. }) in executors {
+ let metadata = ExecutorMetadata {
+ id: executor_id.clone(),
+ host: String::default(),
+ port: 0,
+ grpc_port: 0,
+ specification: ExecutorSpecification {
+ task_slots: task_slots as u32,
+ },
+ };
+
+ let executor_data = ExecutorData {
+ executor_id,
+ total_task_slots: task_slots as u32,
+ available_task_slots: task_slots as u32,
+ };
+
+ scheduler
+ .state
+ .executor_manager
+ .register_executor(metadata, executor_data, false)
+ .await?;
+ }
+
+ Ok(Self {
+ scheduler,
+ ballista_config,
+ status_receiver: Some(status_receiver),
+ })
+ }
+
+ pub fn pending_tasks(&self) -> usize {
+ self.scheduler.pending_tasks()
+ }
+
+ pub async fn ctx(&self) -> Result<Arc<SessionContext>> {
+ self.scheduler
+ .state
+ .session_manager
+ .create_session(&self.ballista_config)
+ .await
+ }
+
+ pub async fn submit(
+ &mut self,
+ job_id: &str,
+ job_name: &str,
+ plan: &LogicalPlan,
+ ) -> Result<()> {
+ let ctx = self
+ .scheduler
+ .state
+ .session_manager
+ .create_session(&self.ballista_config)
+ .await?;
+
+ self.scheduler
+ .submit_job(job_id, job_name, ctx, plan)
+ .await?;
+
+ Ok(())
+ }
+
+ pub async fn tick(&mut self) -> Result<()> {
+ if let Some(receiver) = self.status_receiver.as_mut() {
+ if let Some((executor_id, status)) = receiver.recv().await {
+ self.scheduler
+ .update_task_status(&executor_id, status)
+ .await?;
+ } else {
+ return Err(BallistaError::Internal("Task sender
dropped".to_owned()));
+ }
+ } else {
+ return Err(BallistaError::Internal(
+ "Status receiver was None".to_owned(),
+ ));
+ }
+
+ Ok(())
+ }
+
+ pub async fn cancel(&self, job_id: &str) -> Result<()> {
+ self.scheduler
+ .query_stage_event_loop
+ .get_sender()?
+ .post_event(QueryStageSchedulerEvent::JobCancel(job_id.to_owned()))
+ .await
+ }
+
+ pub async fn await_completion(&self, job_id: &str) -> Result<JobStatus> {
+ let final_status: Result<JobStatus> = loop {
+ let status = self
+ .scheduler
+ .state
+ .task_manager
+ .get_job_status(job_id)
+ .await?;
+
+ if let Some(JobStatus {
+ status: Some(inner),
+ }) = status.as_ref()
+ {
+ match inner {
+ Status::Failed(_) | Status::Successful(_) => {
+ break Ok(status.unwrap())
+ }
+ _ => continue,
+ }
+ }
+
+ tokio::time::sleep(Duration::from_millis(100)).await
+ };
+
+ final_status
+ }
+
+ pub async fn run(
+ &mut self,
+ job_id: &str,
+ job_name: &str,
+ plan: &LogicalPlan,
+ ) -> Result<JobStatus> {
+ let ctx = self
+ .scheduler
+ .state
+ .session_manager
+ .create_session(&self.ballista_config)
+ .await?;
+
+ self.scheduler
+ .submit_job(job_id, job_name, ctx, plan)
+ .await?;
+
+ let mut receiver = self.status_receiver.take().unwrap();
+
+ let scheduler_clone = self.scheduler.clone();
+ tokio::spawn(async move {
+ while let Some((executor_id, status)) = receiver.recv().await {
+ scheduler_clone
+ .update_task_status(&executor_id, status)
+ .await
+ .unwrap();
+ }
+ });
+
+ let final_status: Result<JobStatus> = loop {
+ let status = self
+ .scheduler
+ .state
+ .task_manager
+ .get_job_status(job_id)
+ .await?;
+
+ if let Some(JobStatus {
+ status: Some(inner),
+ }) = status.as_ref()
+ {
+ match inner {
+ Status::Failed(_) | Status::Successful(_) => {
+ break Ok(status.unwrap())
+ }
+ _ => continue,
+ }
+ }
+
+ tokio::time::sleep(Duration::from_millis(100)).await
+ };
+
+ final_status
+ }
+}
+
+#[derive(Clone)]
+pub enum MetricEvent {
+ Submitted(String, u64, u64),
+ Completed(String, u64, u64),
+ Cancelled(String),
+ Failed(String, u64, u64),
+}
+
+impl MetricEvent {
+ pub fn job_id(&self) -> &str {
+ match self {
+ MetricEvent::Submitted(job, _, _) => job.as_str(),
+ MetricEvent::Completed(job, _, _) => job.as_str(),
+ MetricEvent::Cancelled(job) => job.as_str(),
+ MetricEvent::Failed(job, _, _) => job.as_str(),
+ }
+ }
+}
+
+#[derive(Default, Clone)]
+pub struct TestMetricsCollector {
+ pub events: Arc<Mutex<Vec<MetricEvent>>>,
+}
+
+impl TestMetricsCollector {
+ pub fn job_events(&self, job_id: &str) -> Vec<MetricEvent> {
+ let guard = self.events.lock();
+
+ guard
+ .iter()
+ .filter_map(|event| {
+ if event.job_id() == job_id {
+ Some(event.clone())
+ } else {
+ None
+ }
+ })
+ .collect()
+ }
+}
+
+impl SchedulerMetricsCollector for TestMetricsCollector {
+ fn record_submitted(&self, job_id: &str, queued_at: u64, submitted_at:
u64) {
+ let mut guard = self.events.lock();
+ guard.push(MetricEvent::Submitted(
+ job_id.to_owned(),
+ queued_at,
+ submitted_at,
+ ));
+ }
+
+ fn record_completed(&self, job_id: &str, queued_at: u64, completed_at:
u64) {
+ let mut guard = self.events.lock();
+ guard.push(MetricEvent::Completed(
+ job_id.to_owned(),
+ queued_at,
+ completed_at,
+ ));
+ }
+
+ fn record_failed(&self, job_id: &str, queued_at: u64, failed_at: u64) {
+ let mut guard = self.events.lock();
+ guard.push(MetricEvent::Failed(job_id.to_owned(), queued_at,
failed_at));
+ }
+
+ fn record_cancelled(&self, job_id: &str) {
+ let mut guard = self.events.lock();
+ guard.push(MetricEvent::Cancelled(job_id.to_owned()));
+ }
+
+ fn set_pending_tasks_queue_size(&self, _value: u64) {}
+
+ fn gather_metrics(&self) -> Result<Option<(Vec<u8>, String)>> {
+ Ok(None)
+ }
+}
+
+pub fn assert_submitted_event(job_id: &str, collector: &TestMetricsCollector) {
+ let found = collector
+ .job_events(job_id)
+ .iter()
+ .any(|ev| matches!(ev, MetricEvent::Submitted(_, _, _)));
+
+ assert!(found, "Expected submitted event for job {}", job_id);
+}
+
+pub fn assert_no_submitted_event(job_id: &str, collector:
&TestMetricsCollector) {
+ let found = collector
+ .job_events(job_id)
+ .iter()
+ .any(|ev| matches!(ev, MetricEvent::Submitted(_, _, _)));
+
+ assert!(!found, "Expected no submitted event for job {}", job_id);
+}
+
+pub fn assert_completed_event(job_id: &str, collector: &TestMetricsCollector) {
+ let found = collector
+ .job_events(job_id)
+ .iter()
+ .any(|ev| matches!(ev, MetricEvent::Completed(_, _, _)));
+
+ assert!(found, "Expected completed event for job {}", job_id);
+}
+
+pub fn assert_cancelled_event(job_id: &str, collector: &TestMetricsCollector) {
+ let found = collector
+ .job_events(job_id)
+ .iter()
+ .any(|ev| matches!(ev, MetricEvent::Cancelled(_)));
+
+ assert!(found, "Expected cancelled event for job {}", job_id);
+}
+
+pub fn assert_failed_event(job_id: &str, collector: &TestMetricsCollector) {
+ let found = collector
+ .job_events(job_id)
+ .iter()
+ .any(|ev| matches!(ev, MetricEvent::Failed(_, _, _)));
+
+ assert!(found, "Expected failed event for job {}", job_id);
+}
diff --git a/docs/source/user-guide/metrics.md
b/docs/source/user-guide/metrics.md
new file mode 100644
index 00000000..7e831dca
--- /dev/null
+++ b/docs/source/user-guide/metrics.md
@@ -0,0 +1,41 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Ballista Scheduler Metrics
+
+## Prometheus
+
+Built with default features, the ballista scheduler will automatically collect
and expose a standard set of prometheus metrics.
+The metrics currently collected automatically include:
+
+- _job_exec_time_seconds_ - Histogram of successful job execution time in
seconds
+- _planning_time_ms_ - Histogram of job planning time in milliseconds
+- _failed_ - Counter of failed jobs
+- _job_failed_total_ - Counter of failed jobs
+- _job_cancelled_total_ - Counter of cancelled jobs
+- _job_completed_total_ - Counter of completed jobs
+- _job_submitted_total_ - Counter of submitted jobs
+- _pending_task_queue_size_ - Number of pending tasks
+
+**NOTE** Currently the histogram buckets for the above metrics are set to
reasonable defaults. If the defaults are not
+appropriate for a given use case, the only workaround is to implement a
customer `SchedulerMetricsCollector`. In the future
+the buckets should be made configurable.
+
+The metrics are then exported through the scheduler REST API at `GET
/api/metrics`. It should be sufficient to ingest metrics
+into an existing metrics system by point your chosen prometheus exporter at
that endpoint.
diff --git a/docs/source/user-guide/scheduler.md
b/docs/source/user-guide/scheduler.md
index 2f94ebc3..da6166c2 100644
--- a/docs/source/user-guide/scheduler.md
+++ b/docs/source/user-guide/scheduler.md
@@ -35,3 +35,4 @@ The scheduler also provides a REST API that allows jobs to be
monitored.
| /api/job/{job_id} | GET | Get a summary of a submitted job.
|
| /api/job/{job_id}/dot | GET | Produce a query plan in DOT (graphviz)
format. |
| /api/job/{job_id} | PATCH | Cancel a currently running job
|
+| /api/metrics | GET | Return current scheduler metric set
|