This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c22e52a Add Prometheus metrics endpoint (#511)
0c22e52a is described below

commit 0c22e52aecae525af662687ceafff549bfa6a12d
Author: Dan Harris <[email protected]>
AuthorDate: Wed Nov 16 12:30:16 2022 -0500

    Add Prometheus metrics endpoint (#511)
    
    * Add prometheus metrics for scheduler
    
    * Add SchedulerMetricsCollector, implementation for Prometheus and some 
refactoring for testability
    
    * Linting
    
    * apache heaeder
    
    * Linting again
    
    * Make sure collector is only initialized once
    
    * Do not rely on accept header for API routing
    
    * Add tracking of pending tasks in scheduler
    
    * Empty commit
    
    * Make the metrics API pluggable through the collector trait
    
    * Add rudimentary user guide for prometheus metrics
    
    * Prettier
    
    * Fix bug in pending task count
    
    * Remove println
    
    * Do not try and keep track in the event loop
---
 ballista/core/proto/ballista.proto                 |   1 +
 ballista/scheduler/Cargo.toml                      |   5 +-
 ballista/scheduler/src/api/handlers.rs             |  26 +-
 ballista/scheduler/src/api/mod.rs                  |   9 +-
 ballista/scheduler/src/lib.rs                      |   1 +
 ballista/scheduler/src/main.rs                     |   8 +-
 ballista/scheduler/src/metrics/mod.rs              |  85 ++++
 ballista/scheduler/src/metrics/prometheus.rs       | 176 +++++++
 ballista/scheduler/src/scheduler_server/event.rs   |  27 +-
 .../src/scheduler_server/external_scaler.rs        |   2 +-
 ballista/scheduler/src/scheduler_server/grpc.rs    |  11 +-
 ballista/scheduler/src/scheduler_server/mod.rs     | 489 +++++++------------
 .../src/scheduler_server/query_stage_scheduler.rs  | 248 +++++++++-
 ballista/scheduler/src/standalone.rs               |   4 +
 ballista/scheduler/src/state/execution_graph.rs    |  48 +-
 .../scheduler/src/state/execution_graph_dot.rs     |   2 +-
 ballista/scheduler/src/state/mod.rs                | 150 +++---
 ballista/scheduler/src/state/task_manager.rs       | 197 ++++----
 ballista/scheduler/src/test_utils.rs               | 519 ++++++++++++++++++++-
 docs/source/user-guide/metrics.md                  |  41 ++
 docs/source/user-guide/scheduler.md                |   1 +
 21 files changed, 1527 insertions(+), 523 deletions(-)

diff --git a/ballista/core/proto/ballista.proto 
b/ballista/core/proto/ballista.proto
index a38bf0ad..5113a6a7 100644
--- a/ballista/core/proto/ballista.proto
+++ b/ballista/core/proto/ballista.proto
@@ -440,6 +440,7 @@ message ExecutionGraph {
   string job_name = 10;
   uint64 start_time = 11;
   uint64 end_time = 12;
+  uint64 queued_at = 13;
 }
 
 message StageAttempts {
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index e95c9ee0..7a12fa16 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -30,9 +30,10 @@ edition = "2018"
 scheduler = "scheduler_config_spec.toml"
 
 [features]
-default = ["etcd", "sled"]
+default = ["etcd", "sled", "prometheus-metrics"]
 etcd = ["etcd-client"]
 flight-sql = []
+prometheus-metrics = ["prometheus", "once_cell"]
 sled = ["sled_package", "tokio-stream"]
 
 
@@ -58,8 +59,10 @@ hyper = "0.14.4"
 itertools = "0.10.3"
 log = "0.4"
 object_store = "0.5.0"
+once_cell = { version = "1.16.0", optional = true }
 parking_lot = "0.12"
 parse_arg = "0.1.3"
+prometheus = { version = "0.13", features = ["process"], optional = true }
 prost = "0.11"
 prost-types = { version = "0.11.0" }
 rand = "0.8"
diff --git a/ballista/scheduler/src/api/handlers.rs 
b/ballista/scheduler/src/api/handlers.rs
index 71cb9ea9..7e77aa34 100644
--- a/ballista/scheduler/src/api/handlers.rs
+++ b/ballista/scheduler/src/api/handlers.rs
@@ -10,6 +10,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+use crate::scheduler_server::event::QueryStageSchedulerEvent;
 use crate::scheduler_server::SchedulerServer;
 use crate::state::execution_graph::ExecutionStage;
 use crate::state::execution_graph_dot::ExecutionGraphDot;
@@ -21,6 +22,8 @@ use datafusion_proto::logical_plan::AsLogicalPlan;
 use graphviz_rust::cmd::{CommandArg, Format};
 use graphviz_rust::exec;
 use graphviz_rust::printer::PrinterContext;
+use http::header::CONTENT_TYPE;
+
 use std::time::Duration;
 use warp::Rejection;
 
@@ -176,13 +179,15 @@ pub(crate) async fn cancel_job<T: AsLogicalPlan, U: 
AsExecutionPlan>(
         .map_err(|_| warp::reject())?
         .ok_or_else(warp::reject)?;
 
-    let cancelled = data_server
-        .state
-        .cancel_job(&job_id)
+    data_server
+        .query_stage_event_loop
+        .get_sender()
+        .map_err(|_| warp::reject())?
+        .post_event(QueryStageSchedulerEvent::JobCancel(job_id))
         .await
         .map_err(|_| warp::reject())?;
 
-    Ok(warp::reply::json(&CancelJobResponse { cancelled }))
+    Ok(warp::reply::json(&CancelJobResponse { cancelled: true }))
 }
 
 #[derive(Debug, serde::Serialize)]
@@ -347,3 +352,16 @@ pub(crate) async fn get_job_svg_graph<T: AsLogicalPlan, U: 
AsExecutionPlan>(
         _ => Ok("Not Found".to_string()),
     }
 }
+
+pub(crate) async fn get_scheduler_metrics<T: AsLogicalPlan, U: 
AsExecutionPlan>(
+    data_server: SchedulerServer<T, U>,
+) -> Result<impl warp::Reply, Rejection> {
+    Ok(data_server
+        .metrics_collector()
+        .gather_metrics()
+        .map_err(|_| warp::reject())?
+        .map(|(data, content_type)| {
+            warp::reply::with_header(data, CONTENT_TYPE, content_type)
+        })
+        .unwrap_or_else(|| warp::reply::with_header(vec![], CONTENT_TYPE, 
"text/html")))
+}
diff --git a/ballista/scheduler/src/api/mod.rs 
b/ballista/scheduler/src/api/mod.rs
index 7710e1a2..c8defd67 100644
--- a/ballista/scheduler/src/api/mod.rs
+++ b/ballista/scheduler/src/api/mod.rs
@@ -118,9 +118,13 @@ pub fn get_routes<T: AsLogicalPlan + Clone, U: 'static + 
AsExecutionPlan>(
             });
 
     let route_job_dot_svg = warp::path!("api" / "job" / String / "dot_svg")
-        .and(with_data_server(scheduler_server))
+        .and(with_data_server(scheduler_server.clone()))
         .and_then(|job_id, data_server| 
handlers::get_job_svg_graph(data_server, job_id));
 
+    let route_scheduler_metrics = warp::path!("api" / "metrics")
+        .and(with_data_server(scheduler_server))
+        .and_then(|data_server| handlers::get_scheduler_metrics(data_server));
+
     let routes = route_scheduler_state
         .or(route_executors)
         .or(route_jobs)
@@ -128,6 +132,7 @@ pub fn get_routes<T: AsLogicalPlan + Clone, U: 'static + 
AsExecutionPlan>(
         .or(route_query_stages)
         .or(route_job_dot)
         .or(route_query_stage_dot)
-        .or(route_job_dot_svg);
+        .or(route_job_dot_svg)
+        .or(route_scheduler_metrics);
     routes.boxed()
 }
diff --git a/ballista/scheduler/src/lib.rs b/ballista/scheduler/src/lib.rs
index 98fac309..483f3287 100644
--- a/ballista/scheduler/src/lib.rs
+++ b/ballista/scheduler/src/lib.rs
@@ -20,6 +20,7 @@
 pub mod api;
 pub mod config;
 pub mod display;
+pub mod metrics;
 pub mod planner;
 pub mod scheduler_server;
 #[cfg(feature = "sled")]
diff --git a/ballista/scheduler/src/main.rs b/ballista/scheduler/src/main.rs
index 47a19e42..1141a519 100644
--- a/ballista/scheduler/src/main.rs
+++ b/ballista/scheduler/src/main.rs
@@ -66,6 +66,7 @@ use ballista_core::config::LogRotationPolicy;
 use ballista_scheduler::config::SchedulerConfig;
 #[cfg(feature = "flight-sql")]
 use ballista_scheduler::flight_sql::FlightSqlServiceImpl;
+use ballista_scheduler::metrics::default_metrics_collector;
 use config::prelude::*;
 use tracing_subscriber::EnvFilter;
 
@@ -85,12 +86,15 @@ async fn start_server(
         config.scheduling_policy
     );
 
+    let metrics_collector = default_metrics_collector()?;
+
     let mut scheduler_server: SchedulerServer<LogicalPlanNode, 
PhysicalPlanNode> =
         SchedulerServer::new(
             scheduler_name,
             config_backend.clone(),
             BallistaCodec::default(),
             config,
+            metrics_collector,
         );
 
     scheduler_server.init().await?;
@@ -123,14 +127,14 @@ async fn start_server(
                     parts.extensions.insert(connect_info.clone());
                     let req = http::Request::from_parts(parts, body);
 
-                    let header = req.headers().get(hyper::header::ACCEPT);
-                    if header.is_some() && 
header.unwrap().eq("application/json") {
+                    if req.uri().path().starts_with("/api") {
                         return Either::Left(
                             warp.call(req)
                                 .map_ok(|res| res.map(EitherBody::Left))
                                 .map_err(Error::from),
                         );
                     }
+
                     Either::Right(
                         tonic
                             .call(req)
diff --git a/ballista/scheduler/src/metrics/mod.rs 
b/ballista/scheduler/src/metrics/mod.rs
new file mode 100644
index 00000000..0b54ebbd
--- /dev/null
+++ b/ballista/scheduler/src/metrics/mod.rs
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[cfg(feature = "prometheus")]
+pub mod prometheus;
+
+use crate::metrics::prometheus::PrometheusMetricsCollector;
+use ballista_core::error::Result;
+use std::sync::Arc;
+
+/// Interface for recording metrics events in the scheduler. An instance of 
`Arc<dyn SchedulerMetricsCollector>`
+/// will be passed when constructing the `QueryStageScheduler` which is the 
core event loop of the scheduler.
+/// The event loop will then record metric events through this trait.
+pub trait SchedulerMetricsCollector: Send + Sync {
+    /// Record that job with `job_id` was submitted. This will be invoked
+    /// after the job's `ExecutionGraph` is created and it is ready to be 
scheduled
+    /// on executors.
+    /// When invoked should specify the timestamp in milliseconds when the job 
was originally
+    /// queued and the timestamp in milliseconds when it was submitted
+    fn record_submitted(&self, job_id: &str, queued_at: u64, submitted_at: 
u64);
+
+    /// Record that job with `job_id` has completed successfully. This should 
only
+    /// be invoked on successful job completion.
+    /// When invoked should specify the timestamp in milliseconds when the job 
was originally
+    /// queued and the timestamp in milliseconds when it was completed
+    fn record_completed(&self, job_id: &str, queued_at: u64, completed_at: 
u64);
+
+    /// Record that job with `job_id` has failed.
+    /// When invoked should specify the timestamp in milliseconds when the job 
was originally
+    /// queued and the timestamp in milliseconds when it failed.
+    fn record_failed(&self, job_id: &str, queued_at: u64, failed_at: u64);
+
+    /// Record that job with `job_id` was cancelled.
+    fn record_cancelled(&self, job_id: &str);
+
+    /// Set the current number of pending tasks in scheduler. A pending task 
is a task that is available
+    /// to schedule on an executor but cannot be scheduled because no 
resources are available.
+    fn set_pending_tasks_queue_size(&self, value: u64);
+
+    /// Gather current metric set that should be returned when calling the 
scheduler's metrics API
+    /// Should return a tuple containing the content of the metric set and the 
content type (e.g. `application/json`, `text/plain`, etc)
+    fn gather_metrics(&self) -> Result<Option<(Vec<u8>, String)>>;
+}
+
+/// Implementation of `SchedulerMetricsCollector` that ignores all events. 
This can be used as
+/// a default implementation when tracking scheduler metrics is not required 
(or performed through other means)
+#[derive(Default)]
+pub struct NoopMetricsCollector {}
+
+impl SchedulerMetricsCollector for NoopMetricsCollector {
+    fn record_submitted(&self, _job_id: &str, _queued_at: u64, _submitted_at: 
u64) {}
+    fn record_completed(&self, _job_id: &str, _queued_at: u64, _completed_att: 
u64) {}
+    fn record_failed(&self, _job_id: &str, _queued_at: u64, _failed_at: u64) {}
+    fn record_cancelled(&self, _job_id: &str) {}
+    fn set_pending_tasks_queue_size(&self, _value: u64) {}
+
+    fn gather_metrics(&self) -> Result<Option<(Vec<u8>, String)>> {
+        Ok(None)
+    }
+}
+
+/// Return a reference to the systems default metrics collector.
+#[cfg(feature = "prometheus")]
+pub fn default_metrics_collector() -> Result<Arc<dyn 
SchedulerMetricsCollector>> {
+    PrometheusMetricsCollector::current()
+}
+
+#[cfg(not(feature = "prometheus"))]
+pub fn default_metrics_collector() -> Result<Arc<dyn 
SchedulerMetricsCollector>> {
+    Ok(Arc::new(NoopMetricsCollector::default()))
+}
diff --git a/ballista/scheduler/src/metrics/prometheus.rs 
b/ballista/scheduler/src/metrics/prometheus.rs
new file mode 100644
index 00000000..81efb07d
--- /dev/null
+++ b/ballista/scheduler/src/metrics/prometheus.rs
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metrics::SchedulerMetricsCollector;
+use ballista_core::error::{BallistaError, Result};
+
+use once_cell::sync::OnceCell;
+use prometheus::{
+    register_counter_with_registry, register_gauge_with_registry,
+    register_histogram_with_registry, Counter, Gauge, Histogram, Registry,
+};
+use prometheus::{Encoder, TextEncoder};
+use std::sync::Arc;
+
+static COLLECTOR: OnceCell<Arc<dyn SchedulerMetricsCollector>> = 
OnceCell::new();
+
+/// SchedulerMetricsCollector implementation based on Prometheus. By default 
this will track
+/// 7 metrics:
+/// *job_exec_time_seconds* - Histogram of successful job execution time in 
seconds
+/// *planning_time_ms* - Histogram of job planning time in milliseconds
+/// *failed* - Counter of failed jobs
+/// *job_failed_total* - Counter of failed jobs
+/// *job_cancelled_total* - Counter of cancelled jobs
+/// *job_completed_total* - Counter of completed jobs
+/// *job_submitted_total* - Counter of submitted jobs
+/// *pending_task_queue_size* - Number of pending tasks
+pub struct PrometheusMetricsCollector {
+    execution_time: Histogram,
+    planning_time: Histogram,
+    failed: Counter,
+    cancelled: Counter,
+    completed: Counter,
+    submitted: Counter,
+    pending_queue_size: Gauge,
+}
+
+impl PrometheusMetricsCollector {
+    pub fn new(registry: &Registry) -> Result<Self> {
+        let execution_time = register_histogram_with_registry!(
+            "job_exec_time_seconds",
+            "Histogram of successful job execution time in seconds",
+            vec![0.5_f64, 1_f64, 5_f64, 30_f64, 60_f64],
+            registry
+        )
+        .map_err(|e| {
+            BallistaError::Internal(format!("Error registering metric: {:?}", 
e))
+        })?;
+
+        let planning_time = register_histogram_with_registry!(
+            "planning_time_ms",
+            "Histogram of job planning time in milliseconds",
+            vec![1.0_f64, 5.0_f64, 25.0_f64, 100.0_f64, 500.0_f64],
+            registry
+        )
+        .map_err(|e| {
+            BallistaError::Internal(format!("Error registering metric: {:?}", 
e))
+        })?;
+
+        let failed = register_counter_with_registry!(
+            "job_failed_total",
+            "Counter of failed jobs",
+            registry
+        )
+        .map_err(|e| {
+            BallistaError::Internal(format!("Error registering metric: {:?}", 
e))
+        })?;
+
+        let cancelled = register_counter_with_registry!(
+            "job_cancelled_total",
+            "Counter of cancelled jobs",
+            registry
+        )
+        .map_err(|e| {
+            BallistaError::Internal(format!("Error registering metric: {:?}", 
e))
+        })?;
+
+        let completed = register_counter_with_registry!(
+            "job_completed_total",
+            "Counter of completed jobs",
+            registry
+        )
+        .map_err(|e| {
+            BallistaError::Internal(format!("Error registering metric: {:?}", 
e))
+        })?;
+
+        let submitted = register_counter_with_registry!(
+            "job_submitted_total",
+            "Counter of submitted jobs",
+            registry
+        )
+        .map_err(|e| {
+            BallistaError::Internal(format!("Error registering metric: {:?}", 
e))
+        })?;
+
+        let pending_queue_size = register_gauge_with_registry!(
+            "pending_task_queue_size",
+            "Number of pending tasks",
+            registry
+        )
+        .map_err(|e| {
+            BallistaError::Internal(format!("Error registering metric: {:?}", 
e))
+        })?;
+
+        Ok(Self {
+            execution_time,
+            planning_time,
+            failed,
+            cancelled,
+            completed,
+            submitted,
+            pending_queue_size,
+        })
+    }
+
+    pub fn current() -> Result<Arc<dyn SchedulerMetricsCollector>> {
+        COLLECTOR
+            .get_or_try_init(|| {
+                let collector = Self::new(::prometheus::default_registry())?;
+
+                Ok(Arc::new(collector) as Arc<dyn SchedulerMetricsCollector>)
+            })
+            .map(|arc| arc.clone())
+    }
+}
+
+impl SchedulerMetricsCollector for PrometheusMetricsCollector {
+    fn record_submitted(&self, _job_id: &str, queued_at: u64, submitted_at: 
u64) {
+        self.submitted.inc();
+        self.planning_time
+            .observe((submitted_at - queued_at) as f64);
+    }
+
+    fn record_completed(&self, _job_id: &str, queued_at: u64, completed_at: 
u64) {
+        self.completed.inc();
+        self.execution_time
+            .observe((completed_at - queued_at) as f64 / 1000_f64)
+    }
+
+    fn record_failed(&self, _job_id: &str, _queued_at: u64, _failed_at: u64) {
+        self.failed.inc()
+    }
+
+    fn record_cancelled(&self, _job_id: &str) {
+        self.cancelled.inc();
+    }
+
+    fn set_pending_tasks_queue_size(&self, value: u64) {
+        self.pending_queue_size.set(value as f64);
+    }
+
+    fn gather_metrics(&self) -> Result<Option<(Vec<u8>, String)>> {
+        let encoder = TextEncoder::new();
+
+        let metric_families = prometheus::gather();
+        let mut buffer = vec![];
+        encoder.encode(&metric_families, &mut buffer).map_err(|e| {
+            BallistaError::Internal(format!("Error encoding prometheus 
metrics: {:?}", e))
+        })?;
+
+        Ok(Some((buffer, encoder.format_type().to_owned())))
+    }
+}
diff --git a/ballista/scheduler/src/scheduler_server/event.rs 
b/ballista/scheduler/src/scheduler_server/event.rs
index cb1dd8e9..74643535 100644
--- a/ballista/scheduler/src/scheduler_server/event.rs
+++ b/ballista/scheduler/src/scheduler_server/event.rs
@@ -31,13 +31,32 @@ pub enum QueryStageSchedulerEvent {
         job_name: String,
         session_ctx: Arc<SessionContext>,
         plan: Box<LogicalPlan>,
+        queued_at: u64,
+    },
+    JobSubmitted {
+        job_id: String,
+        queued_at: u64,
+        submitted_at: u64,
     },
-    JobSubmitted(String),
     // For a job which failed during planning
-    JobPlanningFailed(String, String),
-    JobFinished(String),
+    JobPlanningFailed {
+        job_id: String,
+        fail_message: String,
+        queued_at: u64,
+        failed_at: u64,
+    },
+    JobFinished {
+        job_id: String,
+        queued_at: u64,
+        completed_at: u64,
+    },
     // For a job fails with its execution graph setting failed
-    JobRunningFailed(String, String),
+    JobRunningFailed {
+        job_id: String,
+        fail_message: String,
+        queued_at: u64,
+        failed_at: u64,
+    },
     JobUpdated(String),
     JobCancel(String),
     JobDataClean(String),
diff --git a/ballista/scheduler/src/scheduler_server/external_scaler.rs 
b/ballista/scheduler/src/scheduler_server/external_scaler.rs
index d40c7b9b..60237dde 100644
--- a/ballista/scheduler/src/scheduler_server/external_scaler.rs
+++ b/ballista/scheduler/src/scheduler_server/external_scaler.rs
@@ -57,7 +57,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> ExternalScaler
         Ok(Response::new(GetMetricsResponse {
             metric_values: vec![MetricValue {
                 metric_name: INFLIGHT_TASKS_METRIC_NAME.to_string(),
-                metric_value: 10000000, // A very high number to saturate the 
HPA
+                metric_value: self.pending_tasks() as i64,
             }],
         }))
     }
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs 
b/ballista/scheduler/src/scheduler_server/grpc.rs
index 42805672..062e45a7 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -47,7 +47,7 @@ use crate::scheduler_server::event::QueryStageSchedulerEvent;
 use std::time::{SystemTime, UNIX_EPOCH};
 use tonic::{Request, Response, Status};
 
-use crate::scheduler_server::SchedulerServer;
+use crate::scheduler_server::{timestamp_secs, SchedulerServer};
 use crate::state::executor_manager::ExecutorReservation;
 
 #[tonic::async_trait]
@@ -99,10 +99,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
             };
             let executor_heartbeat = ExecutorHeartbeat {
                 executor_id: metadata.id.clone(),
-                timestamp: SystemTime::now()
-                    .duration_since(UNIX_EPOCH)
-                    .expect("Time went backwards")
-                    .as_secs(),
+                timestamp: timestamp_secs(),
                 metrics: vec![],
                 status: Some(ExecutorStatus {
                     status: 
Some(executor_status::Status::Active("".to_string())),
@@ -578,6 +575,7 @@ mod test {
     use tonic::Request;
 
     use crate::config::SchedulerConfig;
+    use crate::metrics::default_metrics_collector;
     use ballista_core::error::BallistaError;
     use ballista_core::serde::protobuf::{
         executor_registration::OptionalHost, executor_status, 
ExecutorRegistration,
@@ -602,6 +600,7 @@ mod test {
                 state_storage.clone(),
                 BallistaCodec::default(),
                 SchedulerConfig::default(),
+                default_metrics_collector().unwrap(),
             );
         scheduler.init().await?;
         let exec_meta = ExecutorRegistration {
@@ -688,6 +687,7 @@ mod test {
                 state_storage.clone(),
                 BallistaCodec::default(),
                 SchedulerConfig::default(),
+                default_metrics_collector().unwrap(),
             );
         scheduler.init().await?;
 
@@ -768,6 +768,7 @@ mod test {
                 state_storage.clone(),
                 BallistaCodec::default(),
                 SchedulerConfig::default(),
+                default_metrics_collector().unwrap(),
             );
         scheduler.init().await?;
 
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs 
b/ballista/scheduler/src/scheduler_server/mod.rs
index f81135ae..04410974 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -30,6 +30,7 @@ use datafusion::prelude::{SessionConfig, SessionContext};
 use datafusion_proto::logical_plan::AsLogicalPlan;
 
 use crate::config::SchedulerConfig;
+use crate::metrics::SchedulerMetricsCollector;
 use log::{error, warn};
 
 use crate::scheduler_server::event::QueryStageSchedulerEvent;
@@ -38,6 +39,8 @@ use crate::state::backend::StateBackendClient;
 use crate::state::executor_manager::{
     ExecutorManager, ExecutorReservation, DEFAULT_EXECUTOR_TIMEOUT_SECONDS,
 };
+
+use crate::state::task_manager::TaskLauncher;
 use crate::state::SchedulerState;
 
 // include the generated protobuf source as a submodule
@@ -59,6 +62,7 @@ pub struct SchedulerServer<T: 'static + AsLogicalPlan, U: 
'static + AsExecutionP
     pub start_time: u128,
     pub(crate) state: Arc<SchedulerState<T, U>>,
     pub(crate) query_stage_event_loop: EventLoop<QueryStageSchedulerEvent>,
+    query_stage_scheduler: Arc<QueryStageScheduler<T, U>>,
 }
 
 impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> 
SchedulerServer<T, U> {
@@ -67,6 +71,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
         config_backend: Arc<dyn StateBackendClient>,
         codec: BallistaCodec<T, U>,
         config: SchedulerConfig,
+        metrics_collector: Arc<dyn SchedulerMetricsCollector>,
     ) -> Self {
         let state = Arc::new(SchedulerState::new(
             config_backend,
@@ -75,21 +80,54 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
             scheduler_name.clone(),
             config.clone(),
         ));
-        let query_stage_scheduler = 
Arc::new(QueryStageScheduler::new(state.clone()));
+        let query_stage_scheduler =
+            Arc::new(QueryStageScheduler::new(state.clone(), 
metrics_collector));
         let query_stage_event_loop = EventLoop::new(
             "query_stage".to_owned(),
             config.event_loop_buffer_size as usize,
+            query_stage_scheduler.clone(),
+        );
+
+        Self {
+            scheduler_name,
+            start_time: timestamp_millis() as u128,
+            state,
+            query_stage_event_loop,
             query_stage_scheduler,
+        }
+    }
+
+    #[allow(dead_code)]
+    pub(crate) fn with_task_launcher(
+        scheduler_name: String,
+        config_backend: Arc<dyn StateBackendClient>,
+        codec: BallistaCodec<T, U>,
+        config: SchedulerConfig,
+        metrics_collector: Arc<dyn SchedulerMetricsCollector>,
+        task_launcher: Arc<dyn TaskLauncher>,
+    ) -> Self {
+        let state = Arc::new(SchedulerState::with_task_launcher(
+            config_backend,
+            default_session_builder,
+            codec,
+            scheduler_name.clone(),
+            config.clone(),
+            task_launcher,
+        ));
+        let query_stage_scheduler =
+            Arc::new(QueryStageScheduler::new(state.clone(), 
metrics_collector));
+        let query_stage_event_loop = EventLoop::new(
+            "query_stage".to_owned(),
+            config.event_loop_buffer_size as usize,
+            query_stage_scheduler.clone(),
         );
 
         Self {
             scheduler_name,
-            start_time: SystemTime::now()
-                .duration_since(UNIX_EPOCH)
-                .unwrap()
-                .as_millis(),
+            start_time: timestamp_millis() as u128,
             state,
             query_stage_event_loop,
+            query_stage_scheduler,
         }
     }
 
@@ -101,6 +139,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
         Ok(())
     }
 
+    pub(crate) fn pending_tasks(&self) -> usize {
+        self.query_stage_scheduler.pending_tasks()
+    }
+
+    pub(crate) fn metrics_collector(&self) -> &dyn SchedulerMetricsCollector {
+        self.query_stage_scheduler.metrics_collector()
+    }
+
     pub(crate) async fn submit_job(
         &self,
         job_id: &str,
@@ -115,6 +161,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
                 job_name: job_name.to_owned(),
                 session_ctx: ctx,
                 plan: Box::new(plan.clone()),
+                queued_at: timestamp_millis(),
             })
             .await
     }
@@ -241,10 +288,23 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
     }
 }
 
+pub fn timestamp_secs() -> u64 {
+    SystemTime::now()
+        .duration_since(UNIX_EPOCH)
+        .expect("Time went backwards")
+        .as_secs()
+}
+
+pub fn timestamp_millis() -> u64 {
+    SystemTime::now()
+        .duration_since(UNIX_EPOCH)
+        .expect("Time went backwards")
+        .as_millis() as u64
+}
+
 #[cfg(all(test, feature = "sled"))]
 mod test {
     use std::sync::Arc;
-    use std::time::Duration;
 
     use datafusion::arrow::datatypes::{DataType, Field, Schema};
     use datafusion::logical_expr::{col, sum, LogicalPlan};
@@ -258,20 +318,25 @@ mod test {
     use ballista_core::error::Result;
 
     use crate::config::SchedulerConfig;
+
     use ballista_core::serde::protobuf::{
         failed_task, job_status, task_status, ExecutionError, FailedTask, 
JobStatus,
-        PhysicalPlanNode, ShuffleWritePartition, SuccessfulTask, TaskStatus,
+        MultiTaskDefinition, PhysicalPlanNode, ShuffleWritePartition, 
SuccessfulJob,
+        SuccessfulTask, TaskId, TaskStatus,
     };
     use ballista_core::serde::scheduler::{
         ExecutorData, ExecutorMetadata, ExecutorSpecification,
     };
     use ballista_core::serde::BallistaCodec;
 
-    use crate::scheduler_server::SchedulerServer;
+    use crate::scheduler_server::{timestamp_millis, SchedulerServer};
     use crate::state::backend::standalone::StandaloneClient;
 
-    use crate::state::executor_manager::ExecutorReservation;
-    use crate::test_utils::{await_condition, ExplodingTableProvider};
+    use crate::test_utils::{
+        assert_completed_event, assert_failed_event, assert_no_submitted_event,
+        assert_submitted_event, ExplodingTableProvider, SchedulerTest, 
TaskRunnerFn,
+        TestMetricsCollector,
+    };
 
     #[tokio::test]
     async fn test_pull_scheduling() -> Result<()> {
@@ -302,7 +367,7 @@ mod test {
         // Submit job
         scheduler
             .state
-            .submit_job(job_id, "", ctx, &plan)
+            .submit_job(job_id, "", ctx, &plan, 0)
             .await
             .expect("submitting plan");
 
@@ -380,150 +445,37 @@ mod test {
         Ok(())
     }
 
-    /// This test will exercise the push-based scheduling.
     #[tokio::test]
     async fn test_push_scheduling() -> Result<()> {
         let plan = test_plan();
-        let task_slots = 4;
 
-        let scheduler = test_push_staged_scheduler().await?;
-
-        let executors = test_executors(task_slots);
-        for (executor_metadata, executor_data) in executors {
-            scheduler
-                .state
-                .executor_manager
-                .register_executor(executor_metadata, executor_data, false)
-                .await?;
-        }
-
-        let config = test_session(task_slots);
-
-        let ctx = scheduler
-            .state
-            .session_manager
-            .create_session(&config)
-            .await?;
-
-        let job_id = "job";
-
-        scheduler.state.submit_job(job_id, "", ctx, &plan).await?;
-
-        // Complete tasks that are offered through scheduler events
-        loop {
-            // Check condition
-            let available_tasks = {
-                let graph = scheduler
-                    .state
-                    .task_manager
-                    .get_job_execution_graph(job_id)
-                    .await?
-                    .unwrap();
-                if graph.is_successful() {
-                    break;
-                }
-                graph.available_tasks()
-            };
-
-            if available_tasks == 0 {
-                tokio::time::sleep(Duration::from_millis(5)).await;
-                continue;
+        let metrics_collector = Arc::new(TestMetricsCollector::default());
+
+        let mut test = SchedulerTest::new(
+            SchedulerConfig::default()
+                .with_scheduler_policy(TaskSchedulingPolicy::PushStaged),
+            metrics_collector.clone(),
+            4,
+            1,
+            None,
+        )
+        .await?;
+
+        let status = test.run("job", "", &plan).await.expect("running plan");
+
+        match status.status {
+            Some(job_status::Status::Successful(SuccessfulJob {
+                partition_location,
+            })) => {
+                assert_eq!(partition_location.len(), 4);
             }
-
-            let reservations: Vec<ExecutorReservation> = scheduler
-                .state
-                .executor_manager
-                .reserve_slots(available_tasks as u32)
-                .await?
-                .into_iter()
-                .map(|res| res.assign(job_id.to_owned()))
-                .collect();
-
-            let free_list = match scheduler
-                .state
-                .task_manager
-                .fill_reservations(&reservations)
-                .await
-            {
-                Ok((assignments, mut unassigned_reservations, _)) => {
-                    for (executor_id, task) in assignments.into_iter() {
-                        match scheduler
-                            .state
-                            .executor_manager
-                            .get_executor_metadata(&executor_id)
-                            .await
-                        {
-                            Ok(executor) => {
-                                let mut partitions: Vec<ShuffleWritePartition> 
= vec![];
-
-                                let num_partitions = task
-                                    .output_partitioning
-                                    .map(|p| p.partition_count())
-                                    .unwrap_or(1);
-
-                                for partition_id in 0..num_partitions {
-                                    partitions.push(ShuffleWritePartition {
-                                        partition_id: partition_id as u64,
-                                        path: "some/path".to_string(),
-                                        num_batches: 1,
-                                        num_rows: 1,
-                                        num_bytes: 1,
-                                    })
-                                }
-
-                                // Complete the task
-                                let task_status = TaskStatus {
-                                    task_id: task.task_id as u32,
-                                    job_id: task.partition.job_id.clone(),
-                                    stage_id: task.partition.stage_id as u32,
-                                    stage_attempt_num: task.stage_attempt_num 
as u32,
-                                    partition_id: task.partition.partition_id 
as u32,
-                                    launch_time: 0,
-                                    start_exec_time: 0,
-                                    end_exec_time: 0,
-                                    metrics: vec![],
-                                    status: 
Some(task_status::Status::Successful(
-                                        SuccessfulTask {
-                                            executor_id: executor.id.clone(),
-                                            partitions,
-                                        },
-                                    )),
-                                };
-
-                                scheduler
-                                    .update_task_status(&executor.id, 
vec![task_status])
-                                    .await?;
-                            }
-                            Err(_e) => {
-                                unassigned_reservations.push(
-                                    
ExecutorReservation::new_free(executor_id.clone()),
-                                );
-                            }
-                        }
-                    }
-                    unassigned_reservations
-                }
-                Err(_e) => reservations,
-            };
-
-            // If any reserved slots remain, return them to the pool
-            if !free_list.is_empty() {
-                scheduler
-                    .state
-                    .executor_manager
-                    .cancel_reservations(free_list)
-                    .await?;
+            other => {
+                panic!("Expected success status but found {:?}", other);
             }
         }
 
-        let final_graph = scheduler
-            .state
-            .task_manager
-            .get_execution_graph(job_id)
-            .await?;
-
-        assert!(final_graph.is_successful());
-        assert_eq!(final_graph.output_locations().len(), 4);
+        assert_submitted_event("job", &metrics_collector);
+        assert_completed_event("job", &metrics_collector);
 
         Ok(())
     }
@@ -532,139 +484,73 @@ mod test {
     #[tokio::test]
     async fn test_job_failure() -> Result<()> {
         let plan = test_plan();
-        let task_slots = 4;
-
-        let scheduler = test_push_staged_scheduler().await?;
-
-        let executors = test_executors(task_slots);
-        for (executor_metadata, executor_data) in executors {
-            scheduler
-                .state
-                .executor_manager
-                .register_executor(executor_metadata, executor_data, false)
-                .await?;
-        }
-
-        let config = test_session(task_slots);
-
-        let ctx = scheduler
-            .state
-            .session_manager
-            .create_session(&config)
-            .await?;
 
-        let job_id = "job";
-
-        scheduler.state.submit_job(job_id, "", ctx, &plan).await?;
+        let runner = Arc::new(TaskRunnerFn::new(
+            |_executor_id: String, task: MultiTaskDefinition| {
+                let mut statuses = vec![];
+
+                for TaskId {
+                    task_id,
+                    partition_id,
+                    ..
+                } in task.task_ids
+                {
+                    let timestamp = timestamp_millis();
+                    statuses.push(TaskStatus {
+                        task_id,
+                        job_id: task.job_id.clone(),
+                        stage_id: task.stage_id,
+                        stage_attempt_num: task.stage_attempt_num,
+                        partition_id,
+                        launch_time: timestamp,
+                        start_exec_time: timestamp,
+                        end_exec_time: timestamp,
+                        metrics: vec![],
+                        status: Some(task_status::Status::Failed(FailedTask {
+                            error: "ERROR".to_string(),
+                            retryable: false,
+                            count_to_failures: false,
+                            failed_reason: Some(
+                                failed_task::FailedReason::ExecutionError(
+                                    ExecutionError {},
+                                ),
+                            ),
+                        })),
+                    });
+                }
 
-        let available_tasks = scheduler
-            .state
-            .task_manager
-            .get_available_task_count(job_id)
-            .await?;
+                statuses
+            },
+        ));
 
-        let reservations: Vec<ExecutorReservation> = scheduler
-            .state
-            .executor_manager
-            .reserve_slots(available_tasks as u32)
-            .await?
-            .into_iter()
-            .map(|res| res.assign(job_id.to_owned()))
-            .collect();
-
-        // Complete tasks that are offered through scheduler events
-        let free_list = match scheduler
-            .state
-            .task_manager
-            .fill_reservations(&reservations)
-            .await
-        {
-            Ok((assignments, mut unassigned_reservations, _)) => {
-                for (executor_id, task) in assignments.into_iter() {
-                    match scheduler
-                        .state
-                        .executor_manager
-                        .get_executor_metadata(&executor_id)
-                        .await
-                    {
-                        Ok(executor) => {
-                            let mut partitions: Vec<ShuffleWritePartition> = 
vec![];
-
-                            let num_partitions = task
-                                .output_partitioning
-                                .map(|p| p.partition_count())
-                                .unwrap_or(1);
-
-                            for partition_id in 0..num_partitions {
-                                partitions.push(ShuffleWritePartition {
-                                    partition_id: partition_id as u64,
-                                    path: "some/path".to_string(),
-                                    num_batches: 1,
-                                    num_rows: 1,
-                                    num_bytes: 1,
-                                })
-                            }
-
-                            // Complete the task
-                            let task_status = TaskStatus {
-                                task_id: task.task_id as u32,
-                                job_id: task.partition.job_id.clone(),
-                                stage_id: task.partition.stage_id as u32,
-                                stage_attempt_num: task.stage_attempt_num as 
u32,
-                                partition_id: task.partition.partition_id as 
u32,
-                                launch_time: 0,
-                                start_exec_time: 0,
-                                end_exec_time: 0,
-                                metrics: vec![],
-                                status: 
Some(task_status::Status::Failed(FailedTask {
-                                    error: "".to_string(),
-                                    retryable: false,
-                                    count_to_failures: false,
-                                    failed_reason: Some(
-                                        
failed_task::FailedReason::ExecutionError(
-                                            ExecutionError {},
-                                        ),
-                                    ),
-                                })),
-                            };
-
-                            scheduler
-                                .state
-                                .update_task_statuses(&executor.id, 
vec![task_status])
-                                .await?;
-                        }
-                        Err(_e) => {
-                            unassigned_reservations
-                                
.push(ExecutorReservation::new_free(executor_id.clone()));
-                        }
-                    }
-                }
-                unassigned_reservations
-            }
-            Err(_e) => reservations,
-        };
+        let metrics_collector = Arc::new(TestMetricsCollector::default());
 
-        // If any reserved slots remain, return them to the pool
-        if !free_list.is_empty() {
-            scheduler
-                .state
-                .executor_manager
-                .cancel_reservations(free_list)
-                .await?;
-        }
+        let mut test = SchedulerTest::new(
+            SchedulerConfig::default()
+                .with_scheduler_policy(TaskSchedulingPolicy::PushStaged),
+            metrics_collector.clone(),
+            4,
+            1,
+            Some(runner),
+        )
+        .await?;
 
-        let status = 
scheduler.state.task_manager.get_job_status(job_id).await?;
+        let status = test.run("job", "", &plan).await.expect("running plan");
 
         assert!(
             matches!(
                 status,
-                Some(JobStatus {
+                JobStatus {
                     status: Some(job_status::Status::Failed(_))
-                })
+                }
             ),
-            "Expected job status to be failed"
+            "Expected job status to be failed but it was {:?}",
+            status
         );
 
+        assert_submitted_event("job", &metrics_collector);
+        assert_failed_event("job", &metrics_collector);
+
         Ok(())
     }
 
@@ -672,44 +558,39 @@ mod test {
     // Here we simulate a planning failure using ExplodingTableProvider to 
test this.
     #[tokio::test]
     async fn test_planning_failure() -> Result<()> {
-        let task_slots = 4;
-
-        let scheduler = test_push_staged_scheduler().await?;
-
-        let config = test_session(task_slots);
-
-        let ctx = scheduler
-            .state
-            .session_manager
-            .create_session(&config)
-            .await?;
+        let metrics_collector = Arc::new(TestMetricsCollector::default());
+        let mut test = SchedulerTest::new(
+            SchedulerConfig::default()
+                .with_scheduler_policy(TaskSchedulingPolicy::PushStaged),
+            metrics_collector.clone(),
+            4,
+            1,
+            None,
+        )
+        .await?;
+
+        let ctx = test.ctx().await?;
 
         ctx.register_table("explode", Arc::new(ExplodingTableProvider))?;
 
         let plan = ctx.sql("SELECT * FROM explode").await?.to_logical_plan()?;
 
-        let job_id = "job";
-
         // This should fail when we try and create the physical plan
-        scheduler.submit_job(job_id, "", ctx, &plan).await?;
-
-        let scheduler = scheduler.clone();
+        let status = test.run("job", "", &plan).await?;
 
-        let check = || async {
-            let status = 
scheduler.state.task_manager.get_job_status(job_id).await?;
-
-            Ok(matches!(
+        assert!(
+            matches!(
                 status,
-                Some(JobStatus {
+                JobStatus {
                     status: Some(job_status::Status::Failed(_))
-                })
-            ))
-        };
-
-        // Sine this happens in an event loop, we need to check a few times.
-        let job_failed = await_condition(Duration::from_millis(100), 10, 
check).await?;
+                }
+            ),
+            "Expected job status to be failed but it was {:?}",
+            status
+        );
 
-        assert!(job_failed, "Job status not failed after 1 second");
+        assert_no_submitted_event("job", &metrics_collector);
+        assert_failed_event("job", &metrics_collector);
 
         Ok(())
     }
@@ -724,23 +605,7 @@ mod test {
                 state_storage.clone(),
                 BallistaCodec::default(),
                 
SchedulerConfig::default().with_scheduler_policy(scheduling_policy),
-            );
-        scheduler.init().await?;
-
-        Ok(scheduler)
-    }
-
-    async fn test_push_staged_scheduler(
-    ) -> Result<SchedulerServer<LogicalPlanNode, PhysicalPlanNode>> {
-        let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
-        let config = SchedulerConfig::default()
-            .with_scheduler_policy(TaskSchedulingPolicy::PushStaged);
-        let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
-            SchedulerServer::new(
-                "localhost:50050".to_owned(),
-                state_storage,
-                BallistaCodec::default(),
-                config,
+                Arc::new(TestMetricsCollector::default()),
             );
         scheduler.init().await?;
 
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs 
b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index f49cae46..29f3d2d7 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
 
 use async_trait::async_trait;
@@ -23,6 +24,8 @@ use log::{debug, error, info};
 use ballista_core::error::{BallistaError, Result};
 use ballista_core::event_loop::{EventAction, EventSender};
 
+use crate::metrics::SchedulerMetricsCollector;
+use crate::scheduler_server::timestamp_millis;
 use ballista_core::serde::AsExecutionPlan;
 use datafusion_proto::logical_plan::AsLogicalPlan;
 use tokio::sync::mpsc;
@@ -37,11 +40,34 @@ pub(crate) struct QueryStageScheduler<
     U: 'static + AsExecutionPlan,
 > {
     state: Arc<SchedulerState<T, U>>,
+    metrics_collector: Arc<dyn SchedulerMetricsCollector>,
+    pending_tasks: AtomicUsize,
 }
 
 impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> 
QueryStageScheduler<T, U> {
-    pub(crate) fn new(state: Arc<SchedulerState<T, U>>) -> Self {
-        Self { state }
+    pub(crate) fn new(
+        state: Arc<SchedulerState<T, U>>,
+        metrics_collector: Arc<dyn SchedulerMetricsCollector>,
+    ) -> Self {
+        Self {
+            state,
+            metrics_collector,
+            pending_tasks: AtomicUsize::default(),
+        }
+    }
+
+    pub(crate) fn set_pending_tasks(&self, tasks: usize) {
+        self.pending_tasks.store(tasks, Ordering::SeqCst);
+        self.metrics_collector
+            .set_pending_tasks_queue_size(tasks as u64);
+    }
+
+    pub(crate) fn pending_tasks(&self) -> usize {
+        self.pending_tasks.load(Ordering::SeqCst)
+    }
+
+    pub(crate) fn metrics_collector(&self) -> &dyn SchedulerMetricsCollector {
+        self.metrics_collector.as_ref()
     }
 }
 
@@ -70,19 +96,31 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                 job_name,
                 session_ctx,
                 plan,
+                queued_at,
             } => {
                 info!("Job {} queued with name {:?}", job_id, job_name);
+
                 let state = self.state.clone();
                 tokio::spawn(async move {
                     let event = if let Err(e) = state
-                        .submit_job(&job_id, &job_name, session_ctx, &plan)
+                        .submit_job(&job_id, &job_name, session_ctx, &plan, 
queued_at)
                         .await
                     {
-                        let msg = format!("Error planning job {}: {:?}", 
job_id, e);
-                        error!("{}", &msg);
-                        QueryStageSchedulerEvent::JobPlanningFailed(job_id, 
msg)
+                        let fail_message =
+                            format!("Error planning job {}: {:?}", job_id, e);
+                        error!("{}", &fail_message);
+                        QueryStageSchedulerEvent::JobPlanningFailed {
+                            job_id,
+                            fail_message,
+                            queued_at,
+                            failed_at: timestamp_millis(),
+                        }
                     } else {
-                        QueryStageSchedulerEvent::JobSubmitted(job_id)
+                        QueryStageSchedulerEvent::JobSubmitted {
+                            job_id,
+                            queued_at,
+                            submitted_at: timestamp_millis(),
+                        }
                     };
                     tx_event
                         .post_event(event)
@@ -91,7 +129,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                         .unwrap();
                 });
             }
-            QueryStageSchedulerEvent::JobSubmitted(job_id) => {
+            QueryStageSchedulerEvent::JobSubmitted {
+                job_id,
+                queued_at,
+                submitted_at,
+            } => {
+                self.metrics_collector
+                    .record_submitted(&job_id, queued_at, submitted_at);
+
                 info!("Job {} submitted", job_id);
                 if self.state.config.is_push_staged_scheduling() {
                     let available_tasks = self
@@ -122,28 +167,52 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                         .await?;
                 }
             }
-            QueryStageSchedulerEvent::JobPlanningFailed(job_id, 
failure_reason) => {
-                error!("Job {} failed: {}", job_id, failure_reason);
+            QueryStageSchedulerEvent::JobPlanningFailed {
+                job_id,
+                fail_message,
+                queued_at,
+                failed_at,
+            } => {
+                self.metrics_collector
+                    .record_failed(&job_id, queued_at, failed_at);
+
+                error!("Job {} failed: {}", job_id, fail_message);
                 self.state
                     .task_manager
-                    .fail_unscheduled_job(&job_id, failure_reason)
+                    .fail_unscheduled_job(&job_id, fail_message)
                     .await?;
             }
-            QueryStageSchedulerEvent::JobFinished(job_id) => {
+            QueryStageSchedulerEvent::JobFinished {
+                job_id,
+                queued_at,
+                completed_at,
+            } => {
+                self.metrics_collector
+                    .record_completed(&job_id, queued_at, completed_at);
+
                 info!("Job {} success", job_id);
                 self.state.task_manager.succeed_job(&job_id).await?;
                 self.state.clean_up_successful_job(job_id);
             }
-            QueryStageSchedulerEvent::JobRunningFailed(job_id, failure_reason) 
=> {
+            QueryStageSchedulerEvent::JobRunningFailed {
+                job_id,
+                fail_message,
+                queued_at,
+                failed_at,
+            } => {
+                self.metrics_collector
+                    .record_failed(&job_id, queued_at, failed_at);
+
                 error!("Job {} running failed", job_id);
-                let tasks = self
+                let (running_tasks, _pending_tasks) = self
                     .state
                     .task_manager
-                    .abort_job(&job_id, failure_reason)
+                    .abort_job(&job_id, fail_message)
                     .await?;
-                if !tasks.is_empty() {
+
+                if !running_tasks.is_empty() {
                     tx_event
-                        
.post_event(QueryStageSchedulerEvent::CancelTasks(tasks))
+                        
.post_event(QueryStageSchedulerEvent::CancelTasks(running_tasks))
                         .await?;
                 }
                 self.state.clean_up_failed_job(job_id);
@@ -153,8 +222,16 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                 self.state.task_manager.update_job(&job_id).await?;
             }
             QueryStageSchedulerEvent::JobCancel(job_id) => {
-                self.state.task_manager.cancel_job(&job_id).await?;
+                self.metrics_collector.record_cancelled(&job_id);
+
+                info!("Job {} Cancelled", job_id);
+                let (running_tasks, _pending_tasks) =
+                    self.state.task_manager.cancel_job(&job_id).await?;
                 self.state.clean_up_failed_job(job_id);
+
+                tx_event
+                    
.post_event(QueryStageSchedulerEvent::CancelTasks(running_tasks))
+                    .await?;
             }
             QueryStageSchedulerEvent::TaskUpdating(executor_id, tasks_status) 
=> {
                 let num_status = tasks_status.len();
@@ -186,7 +263,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                 }
             }
             QueryStageSchedulerEvent::ReservationOffering(reservations) => {
-                let reservations = 
self.state.offer_reservation(reservations).await?;
+                let (reservations, pending) =
+                    self.state.offer_reservation(reservations).await?;
+
+                self.set_pending_tasks(pending);
+
                 if !reservations.is_empty() {
                     tx_event
                         
.post_event(QueryStageSchedulerEvent::ReservationOffering(
@@ -217,7 +298,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                 self.state
                     .executor_manager
                     .cancel_running_tasks(tasks)
-                    .await?
+                    .await?;
             }
             QueryStageSchedulerEvent::JobDataClean(job_id) => {
                 self.state.executor_manager.clean_up_job_data(job_id);
@@ -231,3 +312,130 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
         error!("Error received by QueryStageScheduler: {:?}", error);
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use crate::config::SchedulerConfig;
+    use crate::test_utils::{await_condition, SchedulerTest, 
TestMetricsCollector};
+    use ballista_core::config::TaskSchedulingPolicy;
+    use ballista_core::error::Result;
+    use datafusion::arrow::datatypes::{DataType, Field, Schema};
+    use datafusion::logical_expr::{col, sum, LogicalPlan};
+    use datafusion::test_util::scan_empty_with_partitions;
+    use std::sync::Arc;
+    use std::time::Duration;
+
+    #[tokio::test]
+    async fn test_pending_task_metric() -> Result<()> {
+        let plan = test_plan(10);
+
+        let metrics_collector = Arc::new(TestMetricsCollector::default());
+
+        let mut test = SchedulerTest::new(
+            SchedulerConfig::default()
+                .with_scheduler_policy(TaskSchedulingPolicy::PushStaged),
+            metrics_collector.clone(),
+            1,
+            1,
+            None,
+        )
+        .await?;
+
+        test.submit("job-1", "", &plan).await?;
+
+        // First stage has 10 tasks, one of which should be scheduled 
immediately
+        expect_pending_tasks(&test, 9).await;
+
+        test.tick().await?;
+
+        // First task completes and another should be scheduler, so we should 
have 8
+        expect_pending_tasks(&test, 8).await;
+
+        // Complete the 8 remaining tasks in the first stage
+        for _ in 0..8 {
+            test.tick().await?;
+        }
+
+        // The second stage should be resolved so we should have a new pending 
task
+        expect_pending_tasks(&test, 1).await;
+
+        // complete the final task
+        test.tick().await?;
+
+        expect_pending_tasks(&test, 0).await;
+
+        // complete the final task
+        test.tick().await?;
+
+        // Job should be finished now
+        let _ = test.await_completion("job-1").await?;
+
+        Ok(())
+    }
+
+    #[ignore]
+    #[tokio::test]
+    async fn test_pending_task_metric_on_cancellation() -> Result<()> {
+        let plan = test_plan(10);
+
+        let metrics_collector = Arc::new(TestMetricsCollector::default());
+
+        let mut test = SchedulerTest::new(
+            SchedulerConfig::default()
+                .with_scheduler_policy(TaskSchedulingPolicy::PushStaged),
+            metrics_collector.clone(),
+            1,
+            1,
+            None,
+        )
+        .await?;
+
+        test.submit("job-1", "", &plan).await?;
+
+        // First stage has 10 tasks, one of which should be scheduled 
immediately
+        expect_pending_tasks(&test, 9).await;
+
+        test.tick().await?;
+
+        // First task completes and another should be scheduler, so we should 
have 8
+        expect_pending_tasks(&test, 8).await;
+
+        test.cancel("job-1").await?;
+
+        // First task completes and another should be scheduler, so we should 
have 8
+        expect_pending_tasks(&test, 0).await;
+
+        Ok(())
+    }
+
+    async fn expect_pending_tasks(test: &SchedulerTest, expected: usize) {
+        let success = await_condition(Duration::from_millis(500), 20, || {
+            let pending_tasks = test.pending_tasks();
+
+            futures::future::ready(Ok(pending_tasks == expected))
+        })
+        .await
+        .unwrap();
+
+        assert!(
+            success,
+            "Expected {} pending tasks but found {}",
+            expected,
+            test.pending_tasks()
+        );
+    }
+
+    fn test_plan(partitions: usize) -> LogicalPlan {
+        let schema = Schema::new(vec![
+            Field::new("id", DataType::Utf8, false),
+            Field::new("gmv", DataType::UInt64, false),
+        ]);
+
+        scan_empty_with_partitions(None, &schema, Some(vec![0, 1]), partitions)
+            .unwrap()
+            .aggregate(vec![col("id")], vec![sum(col("gmv"))])
+            .unwrap()
+            .build()
+            .unwrap()
+    }
+}
diff --git a/ballista/scheduler/src/standalone.rs 
b/ballista/scheduler/src/standalone.rs
index eb64cfb7..975ad293 100644
--- a/ballista/scheduler/src/standalone.rs
+++ b/ballista/scheduler/src/standalone.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use crate::config::SchedulerConfig;
+use crate::metrics::default_metrics_collector;
 use crate::{
     scheduler_server::SchedulerServer, 
state::backend::standalone::StandaloneClient,
 };
@@ -34,12 +35,15 @@ use tokio::net::TcpListener;
 pub async fn new_standalone_scheduler() -> Result<SocketAddr> {
     let client = StandaloneClient::try_new_temporary()?;
 
+    let metrics_collector = default_metrics_collector()?;
+
     let mut scheduler_server: SchedulerServer<LogicalPlanNode, 
PhysicalPlanNode> =
         SchedulerServer::new(
             "localhost:50050".to_owned(),
             Arc::new(client),
             BallistaCodec::default(),
             SchedulerConfig::default(),
+            metrics_collector,
         );
     scheduler_server.init().await?;
     let server = SchedulerGrpcServer::new(scheduler_server.clone());
diff --git a/ballista/scheduler/src/state/execution_graph.rs 
b/ballista/scheduler/src/state/execution_graph.rs
index dcb919b1..51a6232f 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -47,6 +47,7 @@ use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
 use crate::display::print_stage_metrics;
 use crate::planner::DistributedPlanner;
 use crate::scheduler_server::event::QueryStageSchedulerEvent;
+use crate::scheduler_server::timestamp_millis;
 pub(crate) use crate::state::execution_graph::execution_stage::{
     ExecutionStage, FailedStage, ResolvedStage, StageOutput, SuccessfulStage, 
TaskInfo,
     UnresolvedStage,
@@ -110,6 +111,8 @@ pub struct ExecutionGraph {
     session_id: String,
     /// Status of this job
     status: JobStatus,
+    /// Timestamp of when this job was submitted
+    queued_at: u64,
     /// Job start time
     start_time: u64,
     /// Job end time
@@ -143,6 +146,7 @@ impl ExecutionGraph {
         job_name: &str,
         session_id: &str,
         plan: Arc<dyn ExecutionPlan>,
+        queued_at: u64,
     ) -> Result<Self> {
         let mut planner = DistributedPlanner::new();
 
@@ -161,10 +165,8 @@ impl ExecutionGraph {
             status: JobStatus {
                 status: Some(job_status::Status::Queued(QueuedJob {})),
             },
-            start_time: SystemTime::now()
-                .duration_since(UNIX_EPOCH)
-                .unwrap()
-                .as_millis() as u64,
+            queued_at,
+            start_time: timestamp_millis(),
             end_time: 0,
             stages,
             output_partitions,
@@ -700,15 +702,21 @@ impl ExecutionGraph {
         if !updated_stages.failed_stages.is_empty() {
             info!("Job {} is failed", job_id);
             self.fail_job(job_err_msg.clone());
-            events.push(QueryStageSchedulerEvent::JobRunningFailed(
+            events.push(QueryStageSchedulerEvent::JobRunningFailed {
                 job_id,
-                job_err_msg,
-            ));
+                fail_message: job_err_msg,
+                queued_at: self.queued_at,
+                failed_at: timestamp_millis(),
+            });
         } else if self.is_successful() {
             // If this ExecutionGraph is successful, finish it
             info!("Job {} is success, finalizing output partitions", job_id);
             self.succeed_job()?;
-            events.push(QueryStageSchedulerEvent::JobFinished(job_id));
+            events.push(QueryStageSchedulerEvent::JobFinished {
+                job_id,
+                queued_at: self.queued_at,
+                completed_at: timestamp_millis(),
+            });
         } else if has_resolved {
             events.push(QueryStageSchedulerEvent::JobUpdated(job_id))
         }
@@ -1312,6 +1320,7 @@ impl ExecutionGraph {
                     "Invalid Execution Graph: missing job status".to_owned(),
                 )
             })?,
+            queued_at: proto.queued_at,
             start_time: proto.start_time,
             end_time: proto.end_time,
             stages,
@@ -1386,6 +1395,7 @@ impl ExecutionGraph {
             job_name: graph.job_name,
             session_id: graph.session_id,
             status: Some(graph.status),
+            queued_at: graph.queued_at,
             start_time: graph.start_time,
             end_time: graph.end_time,
             stages,
@@ -2229,7 +2239,7 @@ mod test {
                     assert_eq!(stage_events.len(), 1);
                     assert!(matches!(
                         stage_events[0],
-                        QueryStageSchedulerEvent::JobRunningFailed(_, _)
+                        QueryStageSchedulerEvent::JobRunningFailed { .. }
                     ));
                     // Stage 2 is still running
                     let running_stage = agg_graph.running_stages();
@@ -2719,7 +2729,7 @@ mod test {
         assert_eq!(stage_events.len(), 1);
         assert!(matches!(
             stage_events[0],
-            QueryStageSchedulerEvent::JobRunningFailed(_, _)
+            QueryStageSchedulerEvent::JobRunningFailed { .. }
         ));
 
         drain_tasks(&mut agg_graph)?;
@@ -2769,7 +2779,7 @@ mod test {
 
         println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
 
-        ExecutionGraph::new("localhost:50050", "job", "", "session", 
plan).unwrap()
+        ExecutionGraph::new("localhost:50050", "job", "", "session", plan, 
0).unwrap()
     }
 
     async fn test_two_aggregations_plan(partition: usize) -> ExecutionGraph {
@@ -2797,7 +2807,7 @@ mod test {
 
         println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
 
-        ExecutionGraph::new("localhost:50050", "job", "", "session", 
plan).unwrap()
+        ExecutionGraph::new("localhost:50050", "job", "", "session", plan, 
0).unwrap()
     }
 
     async fn test_coalesce_plan(partition: usize) -> ExecutionGraph {
@@ -2820,7 +2830,7 @@ mod test {
 
         let plan = ctx.create_physical_plan(&optimized_plan).await.unwrap();
 
-        ExecutionGraph::new("localhost:50050", "job", "", "session", 
plan).unwrap()
+        ExecutionGraph::new("localhost:50050", "job", "", "session", plan, 
0).unwrap()
     }
 
     async fn test_join_plan(partition: usize) -> ExecutionGraph {
@@ -2861,8 +2871,8 @@ mod test {
 
         println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
 
-        let graph =
-            ExecutionGraph::new("localhost:50050", "job", "", "session", 
plan).unwrap();
+        let graph = ExecutionGraph::new("localhost:50050", "job", "", 
"session", plan, 0)
+            .unwrap();
 
         println!("{:?}", graph);
 
@@ -2886,8 +2896,8 @@ mod test {
 
         println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
 
-        let graph =
-            ExecutionGraph::new("localhost:50050", "job", "", "session", 
plan).unwrap();
+        let graph = ExecutionGraph::new("localhost:50050", "job", "", 
"session", plan, 0)
+            .unwrap();
 
         println!("{:?}", graph);
 
@@ -2911,8 +2921,8 @@ mod test {
 
         println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
 
-        let graph =
-            ExecutionGraph::new("localhost:50050", "job", "", "session", 
plan).unwrap();
+        let graph = ExecutionGraph::new("localhost:50050", "job", "", 
"session", plan, 0)
+            .unwrap();
 
         println!("{:?}", graph);
 
diff --git a/ballista/scheduler/src/state/execution_graph_dot.rs 
b/ballista/scheduler/src/state/execution_graph_dot.rs
index 0cdbb39c..708b5077 100644
--- a/ballista/scheduler/src/state/execution_graph_dot.rs
+++ b/ballista/scheduler/src/state/execution_graph_dot.rs
@@ -542,6 +542,6 @@ filter_expr="]
             .await?;
         let plan = df.to_logical_plan()?;
         let plan = ctx.create_physical_plan(&plan).await?;
-        ExecutionGraph::new("scheduler_id", "job_id", "job_name", 
"session_id", plan)
+        ExecutionGraph::new("scheduler_id", "job_id", "job_name", 
"session_id", plan, 0)
     }
 }
diff --git a/ballista/scheduler/src/state/mod.rs 
b/ballista/scheduler/src/state/mod.rs
index d770f984..3c12d2c8 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -29,7 +29,7 @@ use crate::scheduler_server::SessionBuilder;
 use crate::state::backend::{Lock, StateBackendClient};
 use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
 use crate::state::session_manager::SessionManager;
-use crate::state::task_manager::TaskManager;
+use crate::state::task_manager::{TaskLauncher, TaskManager};
 
 use crate::config::SchedulerConfig;
 use crate::state::execution_graph::TaskDescription;
@@ -49,7 +49,7 @@ pub mod execution_graph_dot;
 pub mod executor_manager;
 pub mod session_manager;
 pub mod session_registry;
-mod task_manager;
+pub(crate) mod task_manager;
 
 pub fn decode_protobuf<T: Message + Default>(bytes: &[u8]) -> Result<T> {
     T::decode(bytes).map_err(|e| {
@@ -135,11 +135,37 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
         }
     }
 
+    #[allow(dead_code)]
+    pub(crate) fn with_task_launcher(
+        config_client: Arc<dyn StateBackendClient>,
+        session_builder: SessionBuilder,
+        codec: BallistaCodec<T, U>,
+        scheduler_name: String,
+        config: SchedulerConfig,
+        dispatcher: Arc<dyn TaskLauncher>,
+    ) -> Self {
+        Self {
+            executor_manager: ExecutorManager::new(
+                config_client.clone(),
+                config.executor_slots_policy,
+            ),
+            task_manager: TaskManager::with_launcher(
+                config_client.clone(),
+                session_builder,
+                codec.clone(),
+                scheduler_name,
+                dispatcher,
+            ),
+            session_manager: SessionManager::new(config_client, 
session_builder),
+            codec,
+            config,
+        }
+    }
+
     pub async fn init(&self) -> Result<()> {
         self.executor_manager.init().await
     }
 
-    #[cfg(not(test))]
     pub(crate) async fn update_task_statuses(
         &self,
         executor_id: &str,
@@ -164,33 +190,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
         Ok((events, reservations))
     }
 
-    #[cfg(test)]
-    pub(crate) async fn update_task_statuses(
-        &self,
-        executor_id: &str,
-        tasks_status: Vec<TaskStatus>,
-    ) -> Result<(Vec<QueryStageSchedulerEvent>, Vec<ExecutorReservation>)> {
-        let executor = self
-            .executor_manager
-            .get_executor_metadata(executor_id)
-            .await?;
-
-        let total_num_tasks = tasks_status.len();
-        let free_list = (0..total_num_tasks)
-            .into_iter()
-            .map(|_| ExecutorReservation::new_free(executor_id.to_owned()))
-            .collect();
-
-        let events = self
-            .task_manager
-            .update_task_statuses(&executor, tasks_status)
-            .await?;
-
-        self.executor_manager.cancel_reservations(free_list).await?;
-
-        Ok((events, vec![]))
-    }
-
     /// Process reservations which are offered. The basic process is
     /// 1. Attempt to fill the offered reservations with available tasks
     /// 2. For any reservation that filled, launch the assigned task on the 
executor.
@@ -203,7 +202,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
     pub(crate) async fn offer_reservation(
         &self,
         reservations: Vec<ExecutorReservation>,
-    ) -> Result<Vec<ExecutorReservation>> {
+    ) -> Result<(Vec<ExecutorReservation>, usize)> {
         let (free_list, pending_tasks) = match self
             .task_manager
             .fill_reservations(&reservations)
@@ -286,9 +285,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
             }
         };
 
-        dbg!(free_list.clone());
-        dbg!(pending_tasks);
-
         let mut new_reservations = vec![];
         if !free_list.is_empty() {
             // If any reserved slots remain, return them to the pool
@@ -302,7 +298,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
             new_reservations.extend(pending_reservations);
         }
 
-        Ok(new_reservations)
+        Ok((new_reservations, pending_tasks))
     }
 
     pub(crate) async fn submit_job(
@@ -311,6 +307,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
         job_name: &str,
         session_ctx: Arc<SessionContext>,
         plan: &LogicalPlan,
+        queued_at: u64,
     ) -> Result<()> {
         let start = Instant::now();
 
@@ -377,7 +374,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
         );
 
         self.task_manager
-            .submit_job(job_id, job_name, &session_ctx.session_id(), plan)
+            .submit_job(job_id, job_name, &session_ctx.session_id(), plan, 
queued_at)
             .await?;
 
         let elapsed = start.elapsed();
@@ -387,26 +384,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
         Ok(())
     }
 
-    pub(crate) async fn cancel_job(&self, job_id: &str) -> Result<bool> {
-        info!("Received cancellation request for job {}", job_id);
-
-        match self.task_manager.cancel_job(job_id).await {
-            Ok(tasks) => {
-                
self.executor_manager.cancel_running_tasks(tasks).await.map_err(|e| {
-                        let msg = format!("Error to cancel running tasks when 
cancelling job {} due to {:?}", job_id, e);
-                        error!("{}", msg);
-                        BallistaError::Internal(msg)
-                })?;
-                Ok(true)
-            }
-            Err(e) => {
-                let msg = format!("Error cancelling job {}: {:?}", job_id, e);
-                error!("{}", msg);
-                Ok(false)
-            }
-        }
-    }
-
     /// Spawn a delayed future to clean up job data on both Scheduler and 
Executors
     pub(crate) fn clean_up_successful_job(&self, job_id: String) {
         self.executor_manager.clean_up_job_data_delayed(
@@ -464,6 +441,8 @@ mod test {
     use ballista_core::serde::BallistaCodec;
     use ballista_core::utils::default_session_builder;
 
+    use crate::config::SchedulerConfig;
+    use crate::test_utils::BlackholeTaskLauncher;
     use datafusion::arrow::datatypes::{DataType, Field, Schema};
     use datafusion::logical_expr::{col, sum};
     use datafusion::physical_plan::ExecutionPlan;
@@ -492,8 +471,9 @@ mod test {
             .register_executor(executor_metadata, executor_data, true)
             .await?;
 
-        let result = state.offer_reservation(reservations).await?;
+        let (result, assigned) = state.offer_reservation(reservations).await?;
 
+        assert_eq!(assigned, 0);
         assert!(result.is_empty());
 
         // All reservations should have been cancelled so we should be able to 
reserve them now
@@ -512,10 +492,13 @@ mod test {
             .build()?;
         let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
         let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
-            Arc::new(SchedulerState::new_with_default_scheduler_name(
+            Arc::new(SchedulerState::with_task_launcher(
                 state_storage,
                 default_session_builder,
                 BallistaCodec::default(),
+                String::default(),
+                SchedulerConfig::default(),
+                Arc::new(BlackholeTaskLauncher::default()),
             ));
 
         let session_ctx = state.session_manager.create_session(&config).await?;
@@ -525,19 +508,43 @@ mod test {
         // Create 4 jobs so we have four pending tasks
         state
             .task_manager
-            .submit_job("job-1", "", session_ctx.session_id().as_str(), 
plan.clone())
+            .submit_job(
+                "job-1",
+                "",
+                session_ctx.session_id().as_str(),
+                plan.clone(),
+                0,
+            )
             .await?;
         state
             .task_manager
-            .submit_job("job-2", "", session_ctx.session_id().as_str(), 
plan.clone())
+            .submit_job(
+                "job-2",
+                "",
+                session_ctx.session_id().as_str(),
+                plan.clone(),
+                0,
+            )
             .await?;
         state
             .task_manager
-            .submit_job("job-3", "", session_ctx.session_id().as_str(), 
plan.clone())
+            .submit_job(
+                "job-3",
+                "",
+                session_ctx.session_id().as_str(),
+                plan.clone(),
+                0,
+            )
             .await?;
         state
             .task_manager
-            .submit_job("job-4", "", session_ctx.session_id().as_str(), 
plan.clone())
+            .submit_job(
+                "job-4",
+                "",
+                session_ctx.session_id().as_str(),
+                plan.clone(),
+                0,
+            )
             .await?;
 
         let executors = test_executors(1, 4);
@@ -549,8 +556,9 @@ mod test {
             .register_executor(executor_metadata, executor_data, true)
             .await?;
 
-        let result = state.offer_reservation(reservations).await?;
+        let (result, pending) = state.offer_reservation(reservations).await?;
 
+        assert_eq!(pending, 0);
         assert!(result.is_empty());
 
         // All task slots should be assigned so we should not be able to 
reserve more tasks
@@ -569,10 +577,13 @@ mod test {
             .build()?;
         let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
         let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
-            Arc::new(SchedulerState::new_with_default_scheduler_name(
+            Arc::new(SchedulerState::with_task_launcher(
                 state_storage,
                 default_session_builder,
                 BallistaCodec::default(),
+                String::default(),
+                SchedulerConfig::default(),
+                Arc::new(BlackholeTaskLauncher::default()),
             ));
 
         let session_ctx = state.session_manager.create_session(&config).await?;
@@ -582,7 +593,13 @@ mod test {
         // Create a job
         state
             .task_manager
-            .submit_job("job-1", "", session_ctx.session_id().as_str(), 
plan.clone())
+            .submit_job(
+                "job-1",
+                "",
+                session_ctx.session_id().as_str(),
+                plan.clone(),
+                0,
+            )
             .await?;
 
         let executors = test_executors(1, 4);
@@ -645,8 +662,9 @@ mod test {
 
         // Offer the reservation. It should be filled with one of the 4 
pending tasks. The other 3 should
         // be reserved for the other 3 tasks, emitting another offer event
-        let reservations = state.offer_reservation(reservations).await?;
+        let (reservations, pending) = 
state.offer_reservation(reservations).await?;
 
+        assert_eq!(pending, 3);
         assert_eq!(reservations.len(), 3);
 
         // Remaining 3 task slots should be reserved for pending tasks
diff --git a/ballista/scheduler/src/state/task_manager.rs 
b/ballista/scheduler/src/state/task_manager.rs
index 31b28100..53b59219 100644
--- a/ballista/scheduler/src/state/task_manager.rs
+++ b/ballista/scheduler/src/state/task_manager.rs
@@ -29,6 +29,7 @@ use ballista_core::error::Result;
 
 use crate::state::backend::Keyspace::{CompletedJobs, FailedJobs};
 use crate::state::session_manager::create_datafusion_context;
+
 use ballista_core::serde::protobuf::{
     self, job_status, FailedJob, JobStatus, MultiTaskDefinition, 
TaskDefinition, TaskId,
     TaskStatus,
@@ -58,6 +59,52 @@ pub const TASK_MAX_FAILURES: usize = 4;
 /// Default max failure attempts for stage level retry
 pub const STAGE_MAX_FAILURES: usize = 4;
 
+#[async_trait::async_trait]
+pub(crate) trait TaskLauncher: Send + Sync + 'static {
+    async fn launch_tasks(
+        &self,
+        executor: &ExecutorMetadata,
+        tasks: Vec<MultiTaskDefinition>,
+        executor_manager: &ExecutorManager,
+    ) -> Result<()>;
+}
+
+struct DefaultTaskLauncher {
+    scheduler_id: String,
+}
+
+impl DefaultTaskLauncher {
+    pub fn new(scheduler_id: String) -> Self {
+        Self { scheduler_id }
+    }
+}
+
+#[async_trait::async_trait]
+impl TaskLauncher for DefaultTaskLauncher {
+    async fn launch_tasks(
+        &self,
+        executor: &ExecutorMetadata,
+        tasks: Vec<MultiTaskDefinition>,
+        executor_manager: &ExecutorManager,
+    ) -> Result<()> {
+        info!("Launching multi task on executor {:?}", executor.id);
+        let mut client = executor_manager.get_client(&executor.id).await?;
+        client
+            .launch_multi_task(protobuf::LaunchMultiTaskParams {
+                multi_tasks: tasks,
+                scheduler_id: self.scheduler_id.clone(),
+            })
+            .await
+            .map_err(|e| {
+                BallistaError::Internal(format!(
+                    "Failed to connect to executor {}: {:?}",
+                    executor.id, e
+                ))
+            })?;
+        Ok(())
+    }
+}
+
 #[derive(Clone)]
 pub struct TaskManager<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> {
     state: Arc<dyn StateBackendClient>,
@@ -66,6 +113,7 @@ pub struct TaskManager<T: 'static + AsLogicalPlan, U: 
'static + AsExecutionPlan>
     scheduler_id: String,
     // Cache for active jobs curated by this scheduler
     active_job_cache: ActiveJobCache,
+    launcher: Arc<dyn TaskLauncher>,
 }
 
 #[derive(Clone)]
@@ -100,6 +148,24 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         session_builder: SessionBuilder,
         codec: BallistaCodec<T, U>,
         scheduler_id: String,
+    ) -> Self {
+        Self {
+            state,
+            session_builder,
+            codec,
+            scheduler_id: scheduler_id.clone(),
+            active_job_cache: Arc::new(DashMap::new()),
+            launcher: Arc::new(DefaultTaskLauncher::new(scheduler_id)),
+        }
+    }
+
+    #[allow(dead_code)]
+    pub(crate) fn with_launcher(
+        state: Arc<dyn StateBackendClient>,
+        session_builder: SessionBuilder,
+        codec: BallistaCodec<T, U>,
+        scheduler_id: String,
+        launcher: Arc<dyn TaskLauncher>,
     ) -> Self {
         Self {
             state,
@@ -107,6 +173,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
             codec,
             scheduler_id,
             active_job_cache: Arc::new(DashMap::new()),
+            launcher,
         }
     }
 
@@ -119,9 +186,16 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         job_name: &str,
         session_id: &str,
         plan: Arc<dyn ExecutionPlan>,
+        queued_at: u64,
     ) -> Result<()> {
-        let mut graph =
-            ExecutionGraph::new(&self.scheduler_id, job_id, job_name, 
session_id, plan)?;
+        let mut graph = ExecutionGraph::new(
+            &self.scheduler_id,
+            job_id,
+            job_name,
+            session_id,
+            plan,
+            queued_at,
+        )?;
         info!("Submitting execution graph: {:?}", graph);
         self.state
             .put(
@@ -297,7 +371,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
                 }
             }
             if assign_tasks >= free_reservations.len() {
-                pending_tasks = graph.available_tasks();
+                pending_tasks += graph.available_tasks();
                 break;
             }
         }
@@ -335,7 +409,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
     }
 
     /// Cancel the job and return a Vec of running tasks need to cancel
-    pub(crate) async fn cancel_job(&self, job_id: &str) -> 
Result<Vec<RunningTaskInfo>> {
+    pub(crate) async fn cancel_job(
+        &self,
+        job_id: &str,
+    ) -> Result<(Vec<RunningTaskInfo>, usize)> {
         self.abort_job(job_id, "Cancelled".to_owned()).await
     }
 
@@ -344,7 +421,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         &self,
         job_id: &str,
         failure_reason: String,
-    ) -> Result<Vec<RunningTaskInfo>> {
+    ) -> Result<(Vec<RunningTaskInfo>, usize)> {
         let locks = self
             .state
             .acquire_locks(vec![
@@ -352,25 +429,32 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
                 (Keyspace::FailedJobs, job_id),
             ])
             .await?;
-        let tasks_to_cancel = if let Some(graph) =
+        let (tasks_to_cancel, pending_tasks) = if let Some(graph) =
             self.get_active_execution_graph(job_id).await
         {
-            let running_tasks = graph.read().await.running_tasks();
+            let (pending_tasks, running_tasks) = {
+                let guard = graph.read().await;
+                (guard.available_tasks(), guard.running_tasks())
+            };
+
             info!(
                 "Cancelling {} running tasks for job {}",
                 running_tasks.len(),
                 job_id
             );
-            with_locks(locks, self.fail_job_state(job_id, 
failure_reason)).await?;
-            running_tasks
+            with_locks(locks, self.fail_job_state(job_id, failure_reason))
+                .await
+                .unwrap();
+
+            (running_tasks, pending_tasks)
         } else {
             // TODO listen the job state update event and fix task cancelling
             warn!("Fail to find job {} in the cache, unable to cancel tasks 
for job, fail the job state only.", job_id);
             with_locks(locks, self.fail_job_state(job_id, 
failure_reason)).await?;
-            vec![]
+            (vec![], 0)
         };
 
-        Ok(tasks_to_cancel)
+        Ok((tasks_to_cancel, pending_tasks))
     }
 
     /// Mark a unscheduled job as failed. This will create a key under the 
FailedJobs keyspace
@@ -405,11 +489,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
             ]
         };
 
-        let _res = if let Some(graph) = 
self.remove_active_execution_graph(job_id).await {
+        if let Some(graph) = self.remove_active_execution_graph(job_id).await {
             let mut graph = graph.write().await;
             let previous_status = graph.status();
             graph.fail_job(failure_reason);
-            let value = self.encode_execution_graph(graph.clone())?;
+
+            let value = encode_protobuf(&graph.status())?;
             let txn_ops = txn_operations(value);
             let result = self.state.apply_txn(txn_ops).await;
             if result.is_err() {
@@ -417,7 +502,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
                 graph.update_status(previous_status);
                 warn!("Rollback Execution Graph state change since it did not 
persisted due to a possible connection error.")
             };
-            result
         } else {
             info!("Fail to find job {} in the cache", job_id);
             let status = JobStatus {
@@ -427,27 +511,35 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
             };
             let value = encode_protobuf(&status)?;
             let txn_ops = txn_operations(value);
-            self.state.apply_txn(txn_ops).await
+            self.state.apply_txn(txn_ops).await?;
         };
 
         Ok(())
     }
 
-    pub async fn update_job(&self, job_id: &str) -> Result<()> {
+    pub async fn update_job(&self, job_id: &str) -> Result<usize> {
         debug!("Update job {} in Active", job_id);
         if let Some(graph) = self.get_active_execution_graph(job_id).await {
             let mut graph = graph.write().await;
+
+            let curr_available_tasks = graph.available_tasks();
+
             graph.revive();
             let graph = graph.clone();
+
+            let new_tasks = graph.available_tasks() - curr_available_tasks;
+
             let value = self.encode_execution_graph(graph)?;
             self.state
                 .put(Keyspace::ActiveJobs, job_id.to_owned(), value)
                 .await?;
+
+            Ok(new_tasks)
         } else {
             warn!("Fail to find job {} in the cache", job_id);
-        }
 
-        Ok(())
+            Ok(0)
+        }
     }
 
     /// return a Vec of running tasks need to cancel
@@ -484,45 +576,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         .await
     }
 
-    #[allow(dead_code)]
-    #[cfg(not(test))]
-    /// Launch the given task on the specified executor
-    pub(crate) async fn launch_task(
-        &self,
-        executor: &ExecutorMetadata,
-        task: TaskDescription,
-        executor_manager: &ExecutorManager,
-    ) -> Result<()> {
-        info!("Launching task {:?} on executor {:?}", task, executor.id);
-        let task_definition = self.prepare_task_definition(task)?;
-        let mut client = executor_manager.get_client(&executor.id).await?;
-        client
-            .launch_task(protobuf::LaunchTaskParams {
-                tasks: vec![task_definition],
-                scheduler_id: self.scheduler_id.clone(),
-            })
-            .await
-            .map_err(|e| {
-                BallistaError::Internal(format!(
-                    "Failed to connect to executor {}: {:?}",
-                    executor.id, e
-                ))
-            })?;
-        Ok(())
-    }
-
-    #[allow(dead_code)]
-    #[cfg(test)]
-    /// In unit tests, we do not have actual executors running, so it 
simplifies things to just noop.
-    pub(crate) async fn launch_task(
-        &self,
-        _executor: &ExecutorMetadata,
-        _task: TaskDescription,
-        _executor_manager: &ExecutorManager,
-    ) -> Result<()> {
-        Ok(())
-    }
-
     /// Retrieve the number of available tasks for the given job. The value 
returned
     /// is strictly a point-in-time snapshot
     pub async fn get_available_task_count(&self, job_id: &str) -> 
Result<usize> {
@@ -591,7 +644,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         }
     }
 
-    #[cfg(not(test))]
     /// Launch the given tasks on the specified executor
     pub(crate) async fn launch_multi_task(
         &self,
@@ -599,37 +651,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         tasks: Vec<Vec<TaskDescription>>,
         executor_manager: &ExecutorManager,
     ) -> Result<()> {
-        info!("Launching multi task on executor {:?}", executor.id);
         let multi_tasks: Result<Vec<MultiTaskDefinition>> = tasks
             .into_iter()
             .map(|stage_tasks| self.prepare_multi_task_definition(stage_tasks))
             .collect();
-        let multi_tasks = multi_tasks?;
-        let mut client = executor_manager.get_client(&executor.id).await?;
-        client
-            .launch_multi_task(protobuf::LaunchMultiTaskParams {
-                multi_tasks,
-                scheduler_id: self.scheduler_id.clone(),
-            })
-            .await
-            .map_err(|e| {
-                BallistaError::Internal(format!(
-                    "Failed to connect to executor {}: {:?}",
-                    executor.id, e
-                ))
-            })?;
-        Ok(())
-    }
 
-    #[cfg(test)]
-    /// Launch the given tasks on the specified executor
-    pub(crate) async fn launch_multi_task(
-        &self,
-        _executor: &ExecutorMetadata,
-        _tasks: Vec<Vec<TaskDescription>>,
-        _executor_manager: &ExecutorManager,
-    ) -> Result<()> {
-        Ok(())
+        self.launcher
+            .launch_tasks(executor, multi_tasks?, executor_manager)
+            .await
     }
 
     #[allow(dead_code)]
diff --git a/ballista/scheduler/src/test_utils.rs 
b/ballista/scheduler/src/test_utils.rs
index c115ce72..2566dcb5 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -15,22 +15,46 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use ballista_core::error::Result;
+use ballista_core::error::{BallistaError, Result};
 use std::any::Any;
+use std::collections::HashMap;
 use std::future::Future;
 use std::sync::Arc;
 use std::time::Duration;
 
 use async_trait::async_trait;
 
+use crate::config::SchedulerConfig;
+use crate::metrics::SchedulerMetricsCollector;
+use crate::scheduler_server::{timestamp_millis, SchedulerServer};
+use crate::state::backend::standalone::StandaloneClient;
+
+use crate::state::executor_manager::ExecutorManager;
+use crate::state::task_manager::TaskLauncher;
+
+use ballista_core::config::{BallistaConfig, 
BALLISTA_DEFAULT_SHUFFLE_PARTITIONS};
+use ballista_core::serde::protobuf::job_status::Status;
+use ballista_core::serde::protobuf::{
+    task_status, JobStatus, MultiTaskDefinition, PhysicalPlanNode, 
ShuffleWritePartition,
+    SuccessfulTask, TaskId, TaskStatus,
+};
+use ballista_core::serde::scheduler::{
+    ExecutorData, ExecutorMetadata, ExecutorSpecification,
+};
+use ballista_core::serde::BallistaCodec;
 use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use datafusion::common::DataFusionError;
 use datafusion::datasource::{TableProvider, TableType};
 use datafusion::execution::context::{SessionConfig, SessionContext, 
SessionState};
-use datafusion::logical_expr::Expr;
+use datafusion::logical_expr::{Expr, LogicalPlan};
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::CsvReadOptions;
 
+use crate::scheduler_server::event::QueryStageSchedulerEvent;
+use datafusion_proto::protobuf::LogicalPlanNode;
+use parking_lot::Mutex;
+use tokio::sync::mpsc::{channel, Receiver, Sender};
+
 pub const TPCH_TABLES: &[&str] = &[
     "part", "supplier", "partsupp", "customer", "orders", "lineitem", 
"nation", "region",
 ];
@@ -201,3 +225,494 @@ pub fn get_tpch_schema(table: &str) -> Schema {
         _ => unimplemented!(),
     }
 }
+
+pub trait TaskRunner: Send + Sync + 'static {
+    fn run(&self, executor_id: String, tasks: MultiTaskDefinition) -> 
Vec<TaskStatus>;
+}
+
+#[derive(Clone)]
+pub struct TaskRunnerFn<F> {
+    f: F,
+}
+
+impl<F> TaskRunnerFn<F>
+where
+    F: Fn(String, MultiTaskDefinition) -> Vec<TaskStatus> + Send + Sync + 
'static,
+{
+    pub fn new(f: F) -> Self {
+        Self { f }
+    }
+}
+
+impl<F> TaskRunner for TaskRunnerFn<F>
+where
+    F: Fn(String, MultiTaskDefinition) -> Vec<TaskStatus> + Send + Sync + 
'static,
+{
+    fn run(&self, executor_id: String, tasks: MultiTaskDefinition) -> 
Vec<TaskStatus> {
+        (self.f)(executor_id, tasks)
+    }
+}
+
+pub fn default_task_runner() -> impl TaskRunner {
+    TaskRunnerFn::new(|executor_id: String, task: MultiTaskDefinition| {
+        let mut statuses = vec![];
+
+        let partitions =
+            if let Some(output_partitioning) = 
task.output_partitioning.as_ref() {
+                output_partitioning.partition_count as usize
+            } else {
+                1
+            };
+
+        let partitions: Vec<ShuffleWritePartition> = (0..partitions)
+            .into_iter()
+            .map(|i| ShuffleWritePartition {
+                partition_id: i as u64,
+                path: String::default(),
+                num_batches: 1,
+                num_rows: 1,
+                num_bytes: 1,
+            })
+            .collect();
+
+        for TaskId {
+            task_id,
+            partition_id,
+            ..
+        } in task.task_ids
+        {
+            let timestamp = timestamp_millis();
+            statuses.push(TaskStatus {
+                task_id,
+                job_id: task.job_id.clone(),
+                stage_id: task.stage_id,
+                stage_attempt_num: task.stage_attempt_num,
+                partition_id,
+                launch_time: timestamp,
+                start_exec_time: timestamp,
+                end_exec_time: timestamp,
+                metrics: vec![],
+                status: Some(task_status::Status::Successful(SuccessfulTask {
+                    executor_id: executor_id.clone(),
+                    partitions: partitions.clone(),
+                })),
+            });
+        }
+
+        statuses
+    })
+}
+
+#[derive(Clone)]
+struct VirtualExecutor {
+    executor_id: String,
+    task_slots: usize,
+    runner: Arc<dyn TaskRunner>,
+}
+
+impl VirtualExecutor {
+    pub fn run_tasks(&self, tasks: MultiTaskDefinition) -> Vec<TaskStatus> {
+        self.runner.run(self.executor_id.clone(), tasks)
+    }
+}
+
+/// Launcher which consumes tasks and never sends a status update
+#[derive(Default)]
+pub struct BlackholeTaskLauncher {}
+
+#[async_trait]
+impl TaskLauncher for BlackholeTaskLauncher {
+    async fn launch_tasks(
+        &self,
+        _executor: &ExecutorMetadata,
+        _tasks: Vec<MultiTaskDefinition>,
+        _executor_manager: &ExecutorManager,
+    ) -> Result<()> {
+        Ok(())
+    }
+}
+
+pub struct VirtualTaskLauncher {
+    sender: Sender<(String, Vec<TaskStatus>)>,
+    executors: HashMap<String, VirtualExecutor>,
+}
+
+#[async_trait::async_trait]
+impl TaskLauncher for VirtualTaskLauncher {
+    async fn launch_tasks(
+        &self,
+        executor: &ExecutorMetadata,
+        tasks: Vec<MultiTaskDefinition>,
+        _executor_manager: &ExecutorManager,
+    ) -> Result<()> {
+        let virtual_executor = self.executors.get(&executor.id).ok_or_else(|| {
+            BallistaError::Internal(format!(
+                "No virtual executor with ID {} found",
+                executor.id
+            ))
+        })?;
+
+        let status = tasks
+            .into_iter()
+            .flat_map(|t| virtual_executor.run_tasks(t))
+            .collect();
+
+        self.sender
+            .send((executor.id.clone(), status))
+            .await
+            .map_err(|e| {
+                BallistaError::Internal(format!("Error sending task status: 
{:?}", e))
+            })
+    }
+}
+
+pub struct SchedulerTest {
+    scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode>,
+    ballista_config: BallistaConfig,
+    status_receiver: Option<Receiver<(String, Vec<TaskStatus>)>>,
+}
+
+impl SchedulerTest {
+    pub async fn new(
+        config: SchedulerConfig,
+        metrics_collector: Arc<dyn SchedulerMetricsCollector>,
+        num_executors: usize,
+        task_slots_per_executor: usize,
+        runner: Option<Arc<dyn TaskRunner>>,
+    ) -> Result<Self> {
+        let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+
+        let ballista_config = BallistaConfig::builder()
+            .set(
+                BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
+                format!("{}", num_executors * 
task_slots_per_executor).as_str(),
+            )
+            .build()
+            .expect("creating BallistaConfig");
+
+        let runner = runner.unwrap_or_else(|| Arc::new(default_task_runner()));
+
+        let executors: HashMap<String, VirtualExecutor> = (0..num_executors)
+            .into_iter()
+            .map(|i| {
+                let id = format!("virtual-executor-{}", i);
+                let executor = VirtualExecutor {
+                    executor_id: id.clone(),
+                    task_slots: task_slots_per_executor,
+                    runner: runner.clone(),
+                };
+                (id, executor)
+            })
+            .collect();
+
+        let (status_sender, status_receiver) = channel(1000);
+
+        let launcher = VirtualTaskLauncher {
+            sender: status_sender,
+            executors: executors.clone(),
+        };
+
+        let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
+            SchedulerServer::with_task_launcher(
+                "localhost:50050".to_owned(),
+                state_storage.clone(),
+                BallistaCodec::default(),
+                config,
+                metrics_collector,
+                Arc::new(launcher),
+            );
+        scheduler.init().await?;
+
+        for (executor_id, VirtualExecutor { task_slots, .. }) in executors {
+            let metadata = ExecutorMetadata {
+                id: executor_id.clone(),
+                host: String::default(),
+                port: 0,
+                grpc_port: 0,
+                specification: ExecutorSpecification {
+                    task_slots: task_slots as u32,
+                },
+            };
+
+            let executor_data = ExecutorData {
+                executor_id,
+                total_task_slots: task_slots as u32,
+                available_task_slots: task_slots as u32,
+            };
+
+            scheduler
+                .state
+                .executor_manager
+                .register_executor(metadata, executor_data, false)
+                .await?;
+        }
+
+        Ok(Self {
+            scheduler,
+            ballista_config,
+            status_receiver: Some(status_receiver),
+        })
+    }
+
+    pub fn pending_tasks(&self) -> usize {
+        self.scheduler.pending_tasks()
+    }
+
+    pub async fn ctx(&self) -> Result<Arc<SessionContext>> {
+        self.scheduler
+            .state
+            .session_manager
+            .create_session(&self.ballista_config)
+            .await
+    }
+
+    pub async fn submit(
+        &mut self,
+        job_id: &str,
+        job_name: &str,
+        plan: &LogicalPlan,
+    ) -> Result<()> {
+        let ctx = self
+            .scheduler
+            .state
+            .session_manager
+            .create_session(&self.ballista_config)
+            .await?;
+
+        self.scheduler
+            .submit_job(job_id, job_name, ctx, plan)
+            .await?;
+
+        Ok(())
+    }
+
+    pub async fn tick(&mut self) -> Result<()> {
+        if let Some(receiver) = self.status_receiver.as_mut() {
+            if let Some((executor_id, status)) = receiver.recv().await {
+                self.scheduler
+                    .update_task_status(&executor_id, status)
+                    .await?;
+            } else {
+                return Err(BallistaError::Internal("Task sender 
dropped".to_owned()));
+            }
+        } else {
+            return Err(BallistaError::Internal(
+                "Status receiver was None".to_owned(),
+            ));
+        }
+
+        Ok(())
+    }
+
+    pub async fn cancel(&self, job_id: &str) -> Result<()> {
+        self.scheduler
+            .query_stage_event_loop
+            .get_sender()?
+            .post_event(QueryStageSchedulerEvent::JobCancel(job_id.to_owned()))
+            .await
+    }
+
+    pub async fn await_completion(&self, job_id: &str) -> Result<JobStatus> {
+        let final_status: Result<JobStatus> = loop {
+            let status = self
+                .scheduler
+                .state
+                .task_manager
+                .get_job_status(job_id)
+                .await?;
+
+            if let Some(JobStatus {
+                status: Some(inner),
+            }) = status.as_ref()
+            {
+                match inner {
+                    Status::Failed(_) | Status::Successful(_) => {
+                        break Ok(status.unwrap())
+                    }
+                    _ => continue,
+                }
+            }
+
+            tokio::time::sleep(Duration::from_millis(100)).await
+        };
+
+        final_status
+    }
+
+    pub async fn run(
+        &mut self,
+        job_id: &str,
+        job_name: &str,
+        plan: &LogicalPlan,
+    ) -> Result<JobStatus> {
+        let ctx = self
+            .scheduler
+            .state
+            .session_manager
+            .create_session(&self.ballista_config)
+            .await?;
+
+        self.scheduler
+            .submit_job(job_id, job_name, ctx, plan)
+            .await?;
+
+        let mut receiver = self.status_receiver.take().unwrap();
+
+        let scheduler_clone = self.scheduler.clone();
+        tokio::spawn(async move {
+            while let Some((executor_id, status)) = receiver.recv().await {
+                scheduler_clone
+                    .update_task_status(&executor_id, status)
+                    .await
+                    .unwrap();
+            }
+        });
+
+        let final_status: Result<JobStatus> = loop {
+            let status = self
+                .scheduler
+                .state
+                .task_manager
+                .get_job_status(job_id)
+                .await?;
+
+            if let Some(JobStatus {
+                status: Some(inner),
+            }) = status.as_ref()
+            {
+                match inner {
+                    Status::Failed(_) | Status::Successful(_) => {
+                        break Ok(status.unwrap())
+                    }
+                    _ => continue,
+                }
+            }
+
+            tokio::time::sleep(Duration::from_millis(100)).await
+        };
+
+        final_status
+    }
+}
+
+#[derive(Clone)]
+pub enum MetricEvent {
+    Submitted(String, u64, u64),
+    Completed(String, u64, u64),
+    Cancelled(String),
+    Failed(String, u64, u64),
+}
+
+impl MetricEvent {
+    pub fn job_id(&self) -> &str {
+        match self {
+            MetricEvent::Submitted(job, _, _) => job.as_str(),
+            MetricEvent::Completed(job, _, _) => job.as_str(),
+            MetricEvent::Cancelled(job) => job.as_str(),
+            MetricEvent::Failed(job, _, _) => job.as_str(),
+        }
+    }
+}
+
+#[derive(Default, Clone)]
+pub struct TestMetricsCollector {
+    pub events: Arc<Mutex<Vec<MetricEvent>>>,
+}
+
+impl TestMetricsCollector {
+    pub fn job_events(&self, job_id: &str) -> Vec<MetricEvent> {
+        let guard = self.events.lock();
+
+        guard
+            .iter()
+            .filter_map(|event| {
+                if event.job_id() == job_id {
+                    Some(event.clone())
+                } else {
+                    None
+                }
+            })
+            .collect()
+    }
+}
+
+impl SchedulerMetricsCollector for TestMetricsCollector {
+    fn record_submitted(&self, job_id: &str, queued_at: u64, submitted_at: 
u64) {
+        let mut guard = self.events.lock();
+        guard.push(MetricEvent::Submitted(
+            job_id.to_owned(),
+            queued_at,
+            submitted_at,
+        ));
+    }
+
+    fn record_completed(&self, job_id: &str, queued_at: u64, completed_at: 
u64) {
+        let mut guard = self.events.lock();
+        guard.push(MetricEvent::Completed(
+            job_id.to_owned(),
+            queued_at,
+            completed_at,
+        ));
+    }
+
+    fn record_failed(&self, job_id: &str, queued_at: u64, failed_at: u64) {
+        let mut guard = self.events.lock();
+        guard.push(MetricEvent::Failed(job_id.to_owned(), queued_at, 
failed_at));
+    }
+
+    fn record_cancelled(&self, job_id: &str) {
+        let mut guard = self.events.lock();
+        guard.push(MetricEvent::Cancelled(job_id.to_owned()));
+    }
+
+    fn set_pending_tasks_queue_size(&self, _value: u64) {}
+
+    fn gather_metrics(&self) -> Result<Option<(Vec<u8>, String)>> {
+        Ok(None)
+    }
+}
+
+pub fn assert_submitted_event(job_id: &str, collector: &TestMetricsCollector) {
+    let found = collector
+        .job_events(job_id)
+        .iter()
+        .any(|ev| matches!(ev, MetricEvent::Submitted(_, _, _)));
+
+    assert!(found, "Expected submitted event for job {}", job_id);
+}
+
+pub fn assert_no_submitted_event(job_id: &str, collector: 
&TestMetricsCollector) {
+    let found = collector
+        .job_events(job_id)
+        .iter()
+        .any(|ev| matches!(ev, MetricEvent::Submitted(_, _, _)));
+
+    assert!(!found, "Expected no submitted event for job {}", job_id);
+}
+
+pub fn assert_completed_event(job_id: &str, collector: &TestMetricsCollector) {
+    let found = collector
+        .job_events(job_id)
+        .iter()
+        .any(|ev| matches!(ev, MetricEvent::Completed(_, _, _)));
+
+    assert!(found, "Expected completed event for job {}", job_id);
+}
+
+pub fn assert_cancelled_event(job_id: &str, collector: &TestMetricsCollector) {
+    let found = collector
+        .job_events(job_id)
+        .iter()
+        .any(|ev| matches!(ev, MetricEvent::Cancelled(_)));
+
+    assert!(found, "Expected cancelled event for job {}", job_id);
+}
+
+pub fn assert_failed_event(job_id: &str, collector: &TestMetricsCollector) {
+    let found = collector
+        .job_events(job_id)
+        .iter()
+        .any(|ev| matches!(ev, MetricEvent::Failed(_, _, _)));
+
+    assert!(found, "Expected failed event for job {}", job_id);
+}
diff --git a/docs/source/user-guide/metrics.md 
b/docs/source/user-guide/metrics.md
new file mode 100644
index 00000000..7e831dca
--- /dev/null
+++ b/docs/source/user-guide/metrics.md
@@ -0,0 +1,41 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# Ballista Scheduler Metrics
+
+## Prometheus
+
+Built with default features, the ballista scheduler will automatically collect 
and expose a standard set of prometheus metrics.
+The metrics currently collected automatically include:
+
+- _job_exec_time_seconds_ - Histogram of successful job execution time in 
seconds
+- _planning_time_ms_ - Histogram of job planning time in milliseconds
+- _failed_ - Counter of failed jobs
+- _job_failed_total_ - Counter of failed jobs
+- _job_cancelled_total_ - Counter of cancelled jobs
+- _job_completed_total_ - Counter of completed jobs
+- _job_submitted_total_ - Counter of submitted jobs
+- _pending_task_queue_size_ - Number of pending tasks
+
+**NOTE** Currently the histogram buckets for the above metrics are set to 
reasonable defaults. If the defaults are not
+appropriate for a given use case, the only workaround is to implement a 
customer `SchedulerMetricsCollector`. In the future
+the buckets should be made configurable.
+
+The metrics are then exported through the scheduler REST API at `GET 
/api/metrics`. It should be sufficient to ingest metrics
+into an existing metrics system by point your chosen prometheus exporter at 
that endpoint.
diff --git a/docs/source/user-guide/scheduler.md 
b/docs/source/user-guide/scheduler.md
index 2f94ebc3..da6166c2 100644
--- a/docs/source/user-guide/scheduler.md
+++ b/docs/source/user-guide/scheduler.md
@@ -35,3 +35,4 @@ The scheduler also provides a REST API that allows jobs to be 
monitored.
 | /api/job/{job_id}     | GET    | Get a summary of a submitted job.           
                |
 | /api/job/{job_id}/dot | GET    | Produce a query plan in DOT (graphviz) 
format.              |
 | /api/job/{job_id}     | PATCH  | Cancel a currently running job              
                |
+| /api/metrics          | GET    | Return current scheduler metric set         
                |

Reply via email to