This is an automated email from the ASF dual-hosted git repository.

thinkharderdev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 93cf14f7 Cluster state refactor part 1 (#560)
93cf14f7 is described below

commit 93cf14f707420eabfc6de751cfd7bf10afec164c
Author: Dan Harris <[email protected]>
AuthorDate: Sat Jan 21 08:44:49 2023 -0500

    Cluster state refactor part 1 (#560)
    
    * Customize session builder
    
    * Add setter for executor slots policy
    
    * Construct Executor with functions
    
    * Add queued and completed timestamps to successful job status
    
    * Add public methods to SchedulerServer
    
    * Public method for getting execution graph
    
    * Public method for stage metrics
    
    * Use node-level local limit (#20)
    
    * Use node-level local limit
    
    * serialize limit in shuffle writer
    
    * Revert "Merge pull request #19 from coralogix/sc-5792"
    
    This reverts commit 08140ef3f1ebe573ce9f0d2f3253422546c8a31a, reversing
    changes made to a7f13844e44328203f803e5073cfff69f3981f4e.
    
    * add log
    
    * make sure we don't forget limit for shuffle writer
    
    * update accum correctly and try to break early
    
    * Check local limit accumulator before polling for more data
    
    * fix build
    
    Co-authored-by: Martins Purins <[email protected]>
    
    * configure_me_codegen retroactively reserved on our `bind_host` parame… 
(#520)
    
    * configure_me_codegen retroactively reserved on our `bind_host` parameter 
name
    
    * Add label and pray
    
    * Add more labels why not
    
    * Add ClusterState trait
    
    * Refactor slightly for clarity
    
    * Revert "Use node-level local limit (#20)"
    
    This reverts commit ff96bcd5b4294e71babe98fb177da84a8423ee82.
    
    * Revert "Public method for stage metrics"
    
    This reverts commit a80231509c478c0c82972b2ff24782804862c04b.
    
    * Revert "Public method for getting execution graph"
    
    This reverts commit 490bda5aae4b79ab1bd7fbd564281929ecb693ca.
    
    * Revert "Add public methods to SchedulerServer"
    
    This reverts commit 5ad27c04e80a2db55a9bb29444f98da6b16c395c.
    
    * Revert "Add queued and completed timestamps to successful job status"
    
    This reverts commit c615fcecb1eb2ded6c14ecdd5511bd7a76ccfb85.
    
    * Revert "Construct Executor with functions"
    
    This reverts commit 24d4830417fff730f1e4998533aa5ff3aec8fefc.
    
    * Always forget the apache header
    
    Co-authored-by: Martins Purins <[email protected]>
    Co-authored-by: Brent Gardner <[email protected]>
---
 ballista/core/src/serde/generated/ballista.rs    |   4 +-
 ballista/scheduler/scheduler_config_spec.toml    |   7 +
 ballista/scheduler/src/bin/main.rs               |  50 +-
 ballista/scheduler/src/config.rs                 |   5 +
 ballista/scheduler/src/scheduler_process.rs      |   3 +
 ballista/scheduler/src/scheduler_server/grpc.rs  |  13 +-
 ballista/scheduler/src/scheduler_server/mod.rs   |  44 +-
 ballista/scheduler/src/standalone.rs             |   6 +-
 ballista/scheduler/src/state/backend/cluster.rs  | 710 +++++++++++++++++++++++
 ballista/scheduler/src/state/backend/mod.rs      |  14 +-
 ballista/scheduler/src/state/executor_manager.rs | 478 ++++-----------
 ballista/scheduler/src/state/mod.rs              |  16 +-
 ballista/scheduler/src/test_utils.rs             |   5 +-
 13 files changed, 969 insertions(+), 386 deletions(-)

diff --git a/ballista/core/src/serde/generated/ballista.rs 
b/ballista/core/src/serde/generated/ballista.rs
index d11c6ea5..8f0bc9bc 100644
--- a/ballista/core/src/serde/generated/ballista.rs
+++ b/ballista/core/src/serde/generated/ballista.rs
@@ -1985,7 +1985,7 @@ pub mod executor_grpc_client {
 pub mod scheduler_grpc_server {
     #![allow(unused_variables, dead_code, missing_docs, 
clippy::let_unit_value)]
     use tonic::codegen::*;
-    /// Generated trait containing gRPC methods that should be implemented for 
use with SchedulerGrpcServer.
+    ///Generated trait containing gRPC methods that should be implemented for 
use with SchedulerGrpcServer.
     #[async_trait]
     pub trait SchedulerGrpc: Send + Sync + 'static {
         /// Executors must poll the scheduler for heartbeat and to receive 
tasks
@@ -2531,7 +2531,7 @@ pub mod scheduler_grpc_server {
 pub mod executor_grpc_server {
     #![allow(unused_variables, dead_code, missing_docs, 
clippy::let_unit_value)]
     use tonic::codegen::*;
-    /// Generated trait containing gRPC methods that should be implemented for 
use with ExecutorGrpcServer.
+    ///Generated trait containing gRPC methods that should be implemented for 
use with ExecutorGrpcServer.
     #[async_trait]
     pub trait ExecutorGrpc: Send + Sync + 'static {
         async fn launch_task(
diff --git a/ballista/scheduler/scheduler_config_spec.toml 
b/ballista/scheduler/scheduler_config_spec.toml
index e79abb85..0bb609cf 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -36,6 +36,13 @@ type = "ballista_scheduler::state::backend::StateBackend"
 doc = "The configuration backend for the scheduler, possible values: etcd, 
memory, sled. Default: sled"
 default = "ballista_scheduler::state::backend::StateBackend::Sled"
 
+[[param]]
+abbr = "c"
+name = "cluster_backend"
+type = "ballista_scheduler::state::backend::StateBackend"
+doc = "The configuration backend for the scheduler cluster state, possible 
values: etcd, memory, sled. Default: sled"
+default = "ballista_scheduler::state::backend::StateBackend::Sled"
+
 [[param]]
 abbr = "n"
 name = "namespace"
diff --git a/ballista/scheduler/src/bin/main.rs 
b/ballista/scheduler/src/bin/main.rs
index 3fd1e060..2ad29bd2 100644
--- a/ballista/scheduler/src/bin/main.rs
+++ b/ballista/scheduler/src/bin/main.rs
@@ -45,6 +45,7 @@ mod config {
 
 use ballista_core::config::LogRotationPolicy;
 use ballista_scheduler::config::SchedulerConfig;
+use ballista_scheduler::state::backend::cluster::DefaultClusterState;
 use config::prelude::*;
 use tracing_subscriber::EnvFilter;
 
@@ -60,6 +61,16 @@ async fn main() -> Result<()> {
         std::process::exit(0);
     }
 
+    let config_backend = init_kv_backend(&opt.config_backend, &opt).await?;
+
+    let cluster_state = if opt.cluster_backend == opt.config_backend {
+        Arc::new(DefaultClusterState::new(config_backend.clone()))
+    } else {
+        let cluster_kv_store = init_kv_backend(&opt.cluster_backend, 
&opt).await?;
+
+        Arc::new(DefaultClusterState::new(cluster_kv_store))
+    };
+
     let special_mod_log_level = opt.log_level_setting;
     let namespace = opt.namespace;
     let external_host = opt.external_host;
@@ -110,13 +121,31 @@ async fn main() -> Result<()> {
     let addr = format!("{}:{}", bind_host, port);
     let addr = addr.parse()?;
 
-    let config_backend: Arc<dyn StateBackendClient> = match opt.config_backend 
{
+    let config = SchedulerConfig {
+        scheduling_policy: opt.scheduler_policy,
+        event_loop_buffer_size: opt.event_loop_buffer_size,
+        executor_slots_policy: opt.executor_slots_policy,
+        finished_job_data_clean_up_interval_seconds: opt
+            .finished_job_data_clean_up_interval_seconds,
+        finished_job_state_clean_up_interval_seconds: opt
+            .finished_job_state_clean_up_interval_seconds,
+        advertise_flight_sql_endpoint: opt.advertise_flight_sql_endpoint,
+    };
+    start_server(scheduler_name, config_backend, cluster_state, addr, 
config).await?;
+    Ok(())
+}
+
+async fn init_kv_backend(
+    backend: &StateBackend,
+    opt: &Config,
+) -> Result<Arc<dyn StateBackendClient>> {
+    let cluster_backend: Arc<dyn StateBackendClient> = match backend {
         #[cfg(feature = "etcd")]
         StateBackend::Etcd => {
-            let etcd = etcd_client::Client::connect(&[opt.etcd_urls], None)
+            let etcd = etcd_client::Client::connect(&[opt.etcd_urls.clone()], 
None)
                 .await
                 .context("Could not connect to etcd")?;
-            Arc::new(EtcdClient::new(namespace.clone(), etcd))
+            Arc::new(EtcdClient::new(opt.namespace.clone(), etcd))
         }
         #[cfg(not(feature = "etcd"))]
         StateBackend::Etcd => {
@@ -134,7 +163,7 @@ async fn main() -> Result<()> {
             } else {
                 println!("{}", opt.sled_dir);
                 Arc::new(
-                    SledClient::try_new(opt.sled_dir)
+                    SledClient::try_new(opt.sled_dir.clone())
                         .context("Could not create sled config backend")?,
                 )
             }
@@ -148,16 +177,5 @@ async fn main() -> Result<()> {
         StateBackend::Memory => Arc::new(MemoryBackendClient::new()),
     };
 
-    let config = SchedulerConfig {
-        scheduling_policy: opt.scheduler_policy,
-        event_loop_buffer_size: opt.event_loop_buffer_size,
-        executor_slots_policy: opt.executor_slots_policy,
-        finished_job_data_clean_up_interval_seconds: opt
-            .finished_job_data_clean_up_interval_seconds,
-        finished_job_state_clean_up_interval_seconds: opt
-            .finished_job_state_clean_up_interval_seconds,
-        advertise_flight_sql_endpoint: opt.advertise_flight_sql_endpoint,
-    };
-    start_server(scheduler_name, config_backend, addr, config).await?;
-    Ok(())
+    Ok(cluster_backend)
 }
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index 8ef33219..97f79787 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -90,6 +90,11 @@ impl SchedulerConfig {
         self.advertise_flight_sql_endpoint = endpoint;
         self
     }
+
+    pub fn with_executor_slots_policy(mut self, policy: SlotsPolicy) -> Self {
+        self.executor_slots_policy = policy;
+        self
+    }
 }
 
 // an enum used to configure the executor slots policy
diff --git a/ballista/scheduler/src/scheduler_process.rs 
b/ballista/scheduler/src/scheduler_process.rs
index 06057871..44cf7bf4 100644
--- a/ballista/scheduler/src/scheduler_process.rs
+++ b/ballista/scheduler/src/scheduler_process.rs
@@ -41,11 +41,13 @@ use crate::flight_sql::FlightSqlServiceImpl;
 use crate::metrics::default_metrics_collector;
 use 
crate::scheduler_server::externalscaler::external_scaler_server::ExternalScalerServer;
 use crate::scheduler_server::SchedulerServer;
+use crate::state::backend::cluster::ClusterState;
 use crate::state::backend::StateBackendClient;
 
 pub async fn start_server(
     scheduler_name: String,
     config_backend: Arc<dyn StateBackendClient>,
+    cluster_state: Arc<dyn ClusterState>,
     addr: SocketAddr,
     config: SchedulerConfig,
 ) -> Result<()> {
@@ -65,6 +67,7 @@ pub async fn start_server(
         SchedulerServer::new(
             scheduler_name,
             config_backend.clone(),
+            cluster_state,
             BallistaCodec::default(),
             config,
             metrics_collector,
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs 
b/ballista/scheduler/src/scheduler_server/grpc.rs
index 979b65f7..ad04efb2 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -588,6 +588,7 @@ mod test {
     use ballista_core::serde::BallistaCodec;
     use ballista_core::utils::default_session_builder;
 
+    use crate::state::backend::cluster::DefaultClusterState;
     use crate::state::executor_manager::DEFAULT_EXECUTOR_TIMEOUT_SECONDS;
     use crate::state::{backend::sled::SledClient, SchedulerState};
 
@@ -596,10 +597,12 @@ mod test {
     #[tokio::test]
     async fn test_poll_work() -> Result<(), BallistaError> {
         let state_storage = Arc::new(SledClient::try_new_temporary()?);
+        let cluster_state = 
Arc::new(DefaultClusterState::new(state_storage.clone()));
         let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerServer::new(
                 "localhost:50050".to_owned(),
                 state_storage.clone(),
+                cluster_state.clone(),
                 BallistaCodec::default(),
                 SchedulerConfig::default(),
                 default_metrics_collector().unwrap(),
@@ -627,6 +630,7 @@ mod test {
         let state: SchedulerState<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerState::new_with_default_scheduler_name(
                 state_storage.clone(),
+                cluster_state.clone(),
                 default_session_builder,
                 BallistaCodec::default(),
             );
@@ -660,6 +664,7 @@ mod test {
         let state: SchedulerState<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerState::new_with_default_scheduler_name(
                 state_storage.clone(),
+                cluster_state,
                 default_session_builder,
                 BallistaCodec::default(),
             );
@@ -683,10 +688,12 @@ mod test {
     #[tokio::test]
     async fn test_stop_executor() -> Result<(), BallistaError> {
         let state_storage = Arc::new(SledClient::try_new_temporary()?);
+        let cluster_state = 
Arc::new(DefaultClusterState::new(state_storage.clone()));
         let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerServer::new(
                 "localhost:50050".to_owned(),
-                state_storage.clone(),
+                state_storage,
+                cluster_state,
                 BallistaCodec::default(),
                 SchedulerConfig::default(),
                 default_metrics_collector().unwrap(),
@@ -764,10 +771,12 @@ mod test {
     #[ignore]
     async fn test_expired_executor() -> Result<(), BallistaError> {
         let state_storage = Arc::new(SledClient::try_new_temporary()?);
+        let cluster_state = 
Arc::new(DefaultClusterState::new(state_storage.clone()));
         let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerServer::new(
                 "localhost:50050".to_owned(),
-                state_storage.clone(),
+                state_storage,
+                cluster_state,
                 BallistaCodec::default(),
                 SchedulerConfig::default(),
                 default_metrics_collector().unwrap(),
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs 
b/ballista/scheduler/src/scheduler_server/mod.rs
index f4727f40..f39e0d1b 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -35,6 +35,7 @@ use log::{error, warn};
 
 use crate::scheduler_server::event::QueryStageSchedulerEvent;
 use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
+use crate::state::backend::cluster::ClusterState;
 use crate::state::backend::StateBackendClient;
 use crate::state::executor_manager::{
     ExecutorManager, ExecutorReservation, DEFAULT_EXECUTOR_TIMEOUT_SECONDS,
@@ -69,12 +70,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
     pub fn new(
         scheduler_name: String,
         config_backend: Arc<dyn StateBackendClient>,
+        cluster_state: Arc<dyn ClusterState>,
         codec: BallistaCodec<T, U>,
         config: SchedulerConfig,
         metrics_collector: Arc<dyn SchedulerMetricsCollector>,
     ) -> Self {
         let state = Arc::new(SchedulerState::new(
             config_backend,
+            cluster_state,
             default_session_builder,
             codec,
             scheduler_name.clone(),
@@ -97,10 +100,45 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
         }
     }
 
+    pub fn with_session_builder(
+        scheduler_name: String,
+        config_backend: Arc<dyn StateBackendClient>,
+        cluster_backend: Arc<dyn ClusterState>,
+        codec: BallistaCodec<T, U>,
+        config: SchedulerConfig,
+        session_builder: SessionBuilder,
+        metrics_collector: Arc<dyn SchedulerMetricsCollector>,
+    ) -> Self {
+        let state = Arc::new(SchedulerState::new(
+            config_backend,
+            cluster_backend,
+            session_builder,
+            codec,
+            scheduler_name.clone(),
+            config.clone(),
+        ));
+        let query_stage_scheduler =
+            Arc::new(QueryStageScheduler::new(state.clone(), 
metrics_collector));
+        let query_stage_event_loop = EventLoop::new(
+            "query_stage".to_owned(),
+            config.event_loop_buffer_size as usize,
+            query_stage_scheduler.clone(),
+        );
+
+        Self {
+            scheduler_name,
+            start_time: timestamp_millis() as u128,
+            state,
+            query_stage_event_loop,
+            query_stage_scheduler,
+        }
+    }
+
     #[allow(dead_code)]
     pub(crate) fn with_task_launcher(
         scheduler_name: String,
         config_backend: Arc<dyn StateBackendClient>,
+        cluster_backend: Arc<dyn ClusterState>,
         codec: BallistaCodec<T, U>,
         config: SchedulerConfig,
         metrics_collector: Arc<dyn SchedulerMetricsCollector>,
@@ -108,6 +146,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
     ) -> Self {
         let state = Arc::new(SchedulerState::with_task_launcher(
             config_backend,
+            cluster_backend,
             default_session_builder,
             codec,
             scheduler_name.clone(),
@@ -330,6 +369,7 @@ mod test {
     use ballista_core::serde::BallistaCodec;
 
     use crate::scheduler_server::{timestamp_millis, SchedulerServer};
+    use crate::state::backend::cluster::DefaultClusterState;
     use crate::state::backend::sled::SledClient;
 
     use crate::test_utils::{
@@ -599,10 +639,12 @@ mod test {
         scheduling_policy: TaskSchedulingPolicy,
     ) -> Result<SchedulerServer<LogicalPlanNode, PhysicalPlanNode>> {
         let state_storage = Arc::new(SledClient::try_new_temporary()?);
+        let cluster_state = 
Arc::new(DefaultClusterState::new(state_storage.clone()));
         let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerServer::new(
                 "localhost:50050".to_owned(),
-                state_storage.clone(),
+                state_storage,
+                cluster_state,
                 BallistaCodec::default(),
                 
SchedulerConfig::default().with_scheduler_policy(scheduling_policy),
                 Arc::new(TestMetricsCollector::default()),
diff --git a/ballista/scheduler/src/standalone.rs 
b/ballista/scheduler/src/standalone.rs
index 43f3e505..0fe608f8 100644
--- a/ballista/scheduler/src/standalone.rs
+++ b/ballista/scheduler/src/standalone.rs
@@ -17,6 +17,7 @@
 
 use crate::config::SchedulerConfig;
 use crate::metrics::default_metrics_collector;
+use crate::state::backend::cluster::DefaultClusterState;
 use crate::{scheduler_server::SchedulerServer, 
state::backend::sled::SledClient};
 use ballista_core::serde::protobuf::PhysicalPlanNode;
 use ballista_core::serde::BallistaCodec;
@@ -31,14 +32,15 @@ use std::{net::SocketAddr, sync::Arc};
 use tokio::net::TcpListener;
 
 pub async fn new_standalone_scheduler() -> Result<SocketAddr> {
-    let client = SledClient::try_new_temporary()?;
+    let backend = Arc::new(SledClient::try_new_temporary()?);
 
     let metrics_collector = default_metrics_collector()?;
 
     let mut scheduler_server: SchedulerServer<LogicalPlanNode, 
PhysicalPlanNode> =
         SchedulerServer::new(
             "localhost:50050".to_owned(),
-            Arc::new(client),
+            backend.clone(),
+            Arc::new(DefaultClusterState::new(backend)),
             BallistaCodec::default(),
             SchedulerConfig::default(),
             metrics_collector,
diff --git a/ballista/scheduler/src/state/backend/cluster.rs 
b/ballista/scheduler/src/state/backend/cluster.rs
new file mode 100644
index 00000000..2370c492
--- /dev/null
+++ b/ballista/scheduler/src/state/backend/cluster.rs
@@ -0,0 +1,710 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::state::backend::{
+    Keyspace, Operation, StateBackendClient, TaskDistribution, WatchEvent,
+};
+use crate::state::executor_manager::ExecutorReservation;
+use crate::state::{decode_into, decode_protobuf, encode_protobuf, with_lock};
+use ballista_core::error;
+use ballista_core::error::BallistaError;
+use ballista_core::serde::protobuf;
+use ballista_core::serde::protobuf::{
+    executor_status, ExecutorHeartbeat, ExecutorStatus,
+};
+use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
+use futures::{Stream, StreamExt};
+use log::{debug, info};
+use std::collections::{HashMap, HashSet};
+use std::pin::Pin;
+use std::sync::Arc;
+use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
+
+pub type ExecutorHeartbeatStream = Pin<Box<dyn Stream<Item = 
ExecutorHeartbeat> + Send>>;
+
+/// A trait that contains the necessary method to maintain a globally 
consistent view of cluster resources
+#[tonic::async_trait]
+pub trait ClusterState: Send + Sync {
+    /// Reserve up to `num_slots` executor task slots. If not enough task 
slots are available, reserve
+    /// as many as possible.
+    ///
+    /// If `executors` is provided, only reserve slots of the specified 
executor IDs
+    async fn reserve_slots(
+        &self,
+        num_slots: u32,
+        distribution: TaskDistribution,
+        executors: Option<HashSet<String>>,
+    ) -> error::Result<Vec<ExecutorReservation>>;
+
+    /// Reserve exactly `num_slots` executor task slots. If not enough task 
slots are available,
+    /// returns an empty vec
+    ///
+    /// If `executors` is provided, only reserve slots of the specified 
executor IDs
+    async fn reserve_slots_exact(
+        &self,
+        num_slots: u32,
+        distribution: TaskDistribution,
+        executors: Option<HashSet<String>>,
+    ) -> error::Result<Vec<ExecutorReservation>>;
+
+    /// Cancel the specified reservations. This will make reserved executor 
slots available to other
+    /// tasks.
+    /// This operations should be atomic. Either all reservations are 
cancelled or none are
+    async fn cancel_reservations(
+        &self,
+        reservations: Vec<ExecutorReservation>,
+    ) -> error::Result<()>;
+
+    /// Register a new executor in the cluster. If `reserve` is true, then the 
executors task slots
+    /// will be reserved and returned in the response and none of the new 
executors task slots will be
+    /// available to other tasks.
+    async fn register_executor(
+        &self,
+        metadata: ExecutorMetadata,
+        spec: ExecutorData,
+        reserve: bool,
+    ) -> error::Result<Vec<ExecutorReservation>>;
+
+    /// Save the executor metadata. This will overwrite existing metadata for 
the executor ID
+    async fn save_executor_metadata(
+        &self,
+        metadata: ExecutorMetadata,
+    ) -> error::Result<()>;
+
+    /// Get executor metadata for the provided executor ID. Returns an error 
if the executor does not exist
+    async fn get_executor_metadata(
+        &self,
+        executor_id: &str,
+    ) -> error::Result<ExecutorMetadata>;
+
+    /// Save the executor heartbeat
+    async fn save_executor_heartbeat(
+        &self,
+        heartbeat: ExecutorHeartbeat,
+    ) -> error::Result<()>;
+
+    /// Remove the executor from the cluster
+    async fn remove_executor(&self, executor_id: &str) -> error::Result<()>;
+
+    /// Return the stream of executor heartbeats observed by all schedulers in 
the cluster.
+    /// This can be aggregated to provide an eventually consistent view of all 
executors within the cluster
+    async fn executor_heartbeat_stream(&self) -> 
error::Result<ExecutorHeartbeatStream>;
+
+    /// Return a map of the last seen heartbeat for all active executors
+    async fn executor_heartbeats(
+        &self,
+    ) -> error::Result<HashMap<String, ExecutorHeartbeat>>;
+}
+
+/// Default implementation of `ClusterState` that can use the key-value 
interface defined in
+/// `StateBackendClient
+pub struct DefaultClusterState {
+    kv_store: Arc<dyn StateBackendClient>,
+}
+
+impl DefaultClusterState {
+    pub fn new(kv_store: Arc<dyn StateBackendClient>) -> Self {
+        Self { kv_store }
+    }
+}
+
+#[tonic::async_trait]
+impl ClusterState for DefaultClusterState {
+    async fn reserve_slots(
+        &self,
+        num_slots: u32,
+        distribution: TaskDistribution,
+        executors: Option<HashSet<String>>,
+    ) -> error::Result<Vec<ExecutorReservation>> {
+        let lock = self.kv_store.lock(Keyspace::Slots, "global").await?;
+
+        with_lock(lock, async {
+            debug!("Attempting to reserve {} executor slots", num_slots);
+            let start = Instant::now();
+
+            let executors = match executors {
+                Some(executors) => executors,
+                None => {
+                    let heartbeats = self.executor_heartbeats().await?;
+
+                    get_alive_executors(60, heartbeats)?
+                }
+            };
+
+            let (reservations, txn_ops) = match distribution {
+                TaskDistribution::Bias => {
+                    reserve_slots_bias(self.kv_store.as_ref(), num_slots, 
executors)
+                        .await?
+                }
+                TaskDistribution::RoundRobin => {
+                    reserve_slots_round_robin(
+                        self.kv_store.as_ref(),
+                        num_slots,
+                        executors,
+                    )
+                    .await?
+                }
+            };
+
+            self.kv_store.apply_txn(txn_ops).await?;
+
+            let elapsed = start.elapsed();
+            info!(
+                "Reserved {} executor slots in {:?}",
+                reservations.len(),
+                elapsed
+            );
+
+            Ok(reservations)
+        })
+        .await
+    }
+
+    async fn reserve_slots_exact(
+        &self,
+        num_slots: u32,
+        distribution: TaskDistribution,
+        executors: Option<HashSet<String>>,
+    ) -> error::Result<Vec<ExecutorReservation>> {
+        let lock = self.kv_store.lock(Keyspace::Slots, "global").await?;
+
+        with_lock(lock, async {
+            debug!("Attempting to reserve {} executor slots", num_slots);
+            let start = Instant::now();
+
+            let executors = match executors {
+                Some(executors) => executors,
+                None => {
+                    let heartbeats = self.executor_heartbeats().await?;
+
+                    get_alive_executors(60, heartbeats)?
+                }
+            };
+
+            let (reservations, txn_ops) = match distribution {
+                TaskDistribution::Bias => {
+                    reserve_slots_bias(self.kv_store.as_ref(), num_slots, 
executors)
+                        .await?
+                }
+                TaskDistribution::RoundRobin => {
+                    reserve_slots_round_robin(
+                        self.kv_store.as_ref(),
+                        num_slots,
+                        executors,
+                    )
+                    .await?
+                }
+            };
+
+            let elapsed = start.elapsed();
+            if reservations.len() as u32 == num_slots {
+                self.kv_store.apply_txn(txn_ops).await?;
+
+                info!(
+                    "Reserved {} executor slots in {:?}",
+                    reservations.len(),
+                    elapsed
+                );
+
+                Ok(reservations)
+            } else {
+                info!(
+                    "Failed to reserve exactly {} executor slots in {:?}",
+                    reservations.len(),
+                    elapsed
+                );
+
+                Ok(vec![])
+            }
+        })
+        .await
+    }
+
+    async fn cancel_reservations(
+        &self,
+        reservations: Vec<ExecutorReservation>,
+    ) -> error::Result<()> {
+        let lock = self.kv_store.lock(Keyspace::Slots, "global").await?;
+
+        with_lock(lock, async {
+            let num_reservations = reservations.len();
+            debug!("Cancelling {} reservations", num_reservations);
+            let start = Instant::now();
+
+            let mut executor_slots: HashMap<String, ExecutorData> = 
HashMap::new();
+
+            for reservation in reservations {
+                let executor_id = &reservation.executor_id;
+                if let Some(data) = executor_slots.get_mut(executor_id) {
+                    data.available_task_slots += 1;
+                } else {
+                    let value = self.kv_store.get(Keyspace::Slots, 
executor_id).await?;
+                    let mut data =
+                        decode_into::<protobuf::ExecutorData, 
ExecutorData>(&value)?;
+                    data.available_task_slots += 1;
+                    executor_slots.insert(executor_id.clone(), data);
+                }
+            }
+
+            let txn_ops: Vec<(Operation, Keyspace, String)> = executor_slots
+                .into_iter()
+                .map(|(executor_id, data)| {
+                    let proto: protobuf::ExecutorData = data.into();
+                    let new_data = encode_protobuf(&proto)?;
+                    Ok((Operation::Put(new_data), Keyspace::Slots, 
executor_id))
+                })
+                .collect::<error::Result<Vec<_>>>()?;
+
+            self.kv_store.apply_txn(txn_ops).await?;
+
+            let elapsed = start.elapsed();
+            info!(
+                "Cancelled {} reservations in {:?}",
+                num_reservations, elapsed
+            );
+
+            Ok(())
+        })
+        .await
+    }
+
+    async fn register_executor(
+        &self,
+        metadata: ExecutorMetadata,
+        spec: ExecutorData,
+        reserve: bool,
+    ) -> error::Result<Vec<ExecutorReservation>> {
+        let executor_id = metadata.id.clone();
+
+        let current_ts = SystemTime::now()
+            .duration_since(UNIX_EPOCH)
+            .map_err(|e| {
+                BallistaError::Internal(format!(
+                    "Error getting current timestamp: {:?}",
+                    e
+                ))
+            })?
+            .as_secs();
+
+        //TODO this should be in a transaction
+        // Now that we know we can connect, save the metadata and slots
+        self.save_executor_metadata(metadata).await?;
+        self.save_executor_heartbeat(ExecutorHeartbeat {
+            executor_id: executor_id.clone(),
+            timestamp: current_ts,
+            metrics: vec![],
+            status: Some(ExecutorStatus {
+                status: Some(executor_status::Status::Active("".to_string())),
+            }),
+        })
+        .await?;
+
+        if !reserve {
+            let proto: protobuf::ExecutorData = spec.into();
+            let value = encode_protobuf(&proto)?;
+            self.kv_store
+                .put(Keyspace::Slots, executor_id, value)
+                .await?;
+            Ok(vec![])
+        } else {
+            let mut specification = spec;
+            let num_slots = specification.available_task_slots as usize;
+            let mut reservations: Vec<ExecutorReservation> = vec![];
+            for _ in 0..num_slots {
+                
reservations.push(ExecutorReservation::new_free(executor_id.clone()));
+            }
+
+            specification.available_task_slots = 0;
+
+            let proto: protobuf::ExecutorData = specification.into();
+            let value = encode_protobuf(&proto)?;
+            self.kv_store
+                .put(Keyspace::Slots, executor_id, value)
+                .await?;
+            Ok(reservations)
+        }
+    }
+
+    async fn save_executor_metadata(
+        &self,
+        metadata: ExecutorMetadata,
+    ) -> error::Result<()> {
+        let executor_id = metadata.id.clone();
+        let proto: protobuf::ExecutorMetadata = metadata.into();
+        let value = encode_protobuf(&proto)?;
+
+        self.kv_store
+            .put(Keyspace::Executors, executor_id, value)
+            .await
+    }
+
+    async fn get_executor_metadata(
+        &self,
+        executor_id: &str,
+    ) -> error::Result<ExecutorMetadata> {
+        let value = self.kv_store.get(Keyspace::Executors, executor_id).await?;
+
+        let decoded =
+            decode_into::<protobuf::ExecutorMetadata, 
ExecutorMetadata>(&value)?;
+        Ok(decoded)
+    }
+
+    async fn save_executor_heartbeat(
+        &self,
+        heartbeat: ExecutorHeartbeat,
+    ) -> error::Result<()> {
+        let executor_id = heartbeat.executor_id.clone();
+        let value = encode_protobuf(&heartbeat)?;
+        self.kv_store
+            .put(Keyspace::Heartbeats, executor_id, value)
+            .await
+    }
+
+    async fn remove_executor(&self, executor_id: &str) -> error::Result<()> {
+        let current_ts = SystemTime::now()
+            .duration_since(UNIX_EPOCH)
+            .map_err(|e| {
+                BallistaError::Internal(format!(
+                    "Error getting current timestamp: {:?}",
+                    e
+                ))
+            })?
+            .as_secs();
+
+        let value = encode_protobuf(&ExecutorHeartbeat {
+            executor_id: executor_id.to_owned(),
+            timestamp: current_ts,
+            metrics: vec![],
+            status: Some(ExecutorStatus {
+                status: Some(executor_status::Status::Dead("".to_string())),
+            }),
+        })?;
+        self.kv_store
+            .put(Keyspace::Heartbeats, executor_id.to_owned(), value)
+            .await?;
+
+        // TODO Check the Executor reservation logic for push-based scheduling
+
+        Ok(())
+    }
+
+    async fn executor_heartbeat_stream(&self) -> 
error::Result<ExecutorHeartbeatStream> {
+        let events = self
+            .kv_store
+            .watch(Keyspace::Heartbeats, String::default())
+            .await?;
+
+        Ok(events
+            .filter_map(|event| {
+                futures::future::ready(match event {
+                    WatchEvent::Put(_, value) => {
+                        if let Ok(heartbeat) =
+                            decode_protobuf::<ExecutorHeartbeat>(&value)
+                        {
+                            Some(heartbeat)
+                        } else {
+                            None
+                        }
+                    }
+                    WatchEvent::Delete(_) => None,
+                })
+            })
+            .boxed())
+    }
+
+    async fn executor_heartbeats(
+        &self,
+    ) -> error::Result<HashMap<String, ExecutorHeartbeat>> {
+        let heartbeats = self.kv_store.scan(Keyspace::Heartbeats, None).await?;
+
+        let mut heartbeat_map = HashMap::with_capacity(heartbeats.len());
+
+        for (_, value) in heartbeats {
+            let data: ExecutorHeartbeat = decode_protobuf(&value)?;
+            if let Some(ExecutorStatus {
+                status: Some(executor_status::Status::Active(_)),
+            }) = &data.status
+            {
+                heartbeat_map.insert(data.executor_id.clone(), data);
+            }
+        }
+
+        Ok(heartbeat_map)
+    }
+}
+
+/// Return the set of executor IDs which have heartbeated within 
`last_seen_threshold` seconds
+fn get_alive_executors(
+    last_seen_threshold: u64,
+    heartbeats: HashMap<String, ExecutorHeartbeat>,
+) -> error::Result<HashSet<String>> {
+    let now_epoch_ts = SystemTime::now()
+        .duration_since(UNIX_EPOCH)
+        .expect("Time went backwards");
+
+    let last_seen_threshold = now_epoch_ts
+        .checked_sub(Duration::from_secs(last_seen_threshold))
+        .ok_or_else(|| {
+            BallistaError::Internal(format!(
+                "Error getting alive executors, invalid last_seen_threshold of 
{}",
+                last_seen_threshold
+            ))
+        })?
+        .as_secs();
+
+    Ok(heartbeats
+        .iter()
+        .filter_map(|(exec, heartbeat)| {
+            (heartbeat.timestamp > last_seen_threshold).then(|| exec.clone())
+        })
+        .collect())
+}
+
+/// It will get ExecutorReservation from one executor as many as possible.
+/// By this way, it can reduce the chance of decoding and encoding 
ExecutorData.
+/// However, it may make the whole cluster unbalanced,
+/// which means some executors may be very busy while other executors may be 
idle.
+async fn reserve_slots_bias(
+    state: &dyn StateBackendClient,
+    mut n: u32,
+    executors: HashSet<String>,
+) -> error::Result<(Vec<ExecutorReservation>, Vec<(Operation, Keyspace, 
String)>)> {
+    let mut reservations: Vec<ExecutorReservation> = vec![];
+    let mut txn_ops: Vec<(Operation, Keyspace, String)> = vec![];
+
+    for executor_id in executors {
+        if n == 0 {
+            break;
+        }
+
+        let value = state.get(Keyspace::Slots, &executor_id).await?;
+        let mut data = decode_into::<protobuf::ExecutorData, 
ExecutorData>(&value)?;
+        let take = std::cmp::min(data.available_task_slots, n);
+
+        for _ in 0..take {
+            
reservations.push(ExecutorReservation::new_free(executor_id.clone()));
+            data.available_task_slots -= 1;
+            n -= 1;
+        }
+
+        let proto: protobuf::ExecutorData = data.into();
+        let new_data = encode_protobuf(&proto)?;
+        txn_ops.push((Operation::Put(new_data), Keyspace::Slots, executor_id));
+    }
+
+    Ok((reservations, txn_ops))
+}
+
+/// Create ExecutorReservation in a round robin way to evenly assign tasks to 
executors
+async fn reserve_slots_round_robin(
+    state: &dyn StateBackendClient,
+    mut n: u32,
+    executors: HashSet<String>,
+) -> error::Result<(Vec<ExecutorReservation>, Vec<(Operation, Keyspace, 
String)>)> {
+    let mut reservations: Vec<ExecutorReservation> = vec![];
+    let mut txn_ops: Vec<(Operation, Keyspace, String)> = vec![];
+
+    let all_executor_data = state
+        .scan(Keyspace::Slots, None)
+        .await?
+        .into_iter()
+        .map(|(_, data)| decode_into::<protobuf::ExecutorData, 
ExecutorData>(&data))
+        .collect::<error::Result<Vec<ExecutorData>>>()?;
+
+    let mut available_executor_data: Vec<ExecutorData> = all_executor_data
+        .into_iter()
+        .filter_map(|data| {
+            (data.available_task_slots > 0 && 
executors.contains(&data.executor_id))
+                .then_some(data)
+        })
+        .collect();
+    available_executor_data
+        .sort_by(|a, b| Ord::cmp(&b.available_task_slots, 
&a.available_task_slots));
+
+    // Exclusive
+    let mut last_updated_idx = 0usize;
+    loop {
+        let n_before = n;
+        for (idx, data) in available_executor_data.iter_mut().enumerate() {
+            if n == 0 {
+                break;
+            }
+
+            // Since the vector is sorted in descending order,
+            // if finding one executor has not enough slots, the following 
will have not enough, either
+            if data.available_task_slots == 0 {
+                break;
+            }
+
+            
reservations.push(ExecutorReservation::new_free(data.executor_id.clone()));
+            data.available_task_slots -= 1;
+            n -= 1;
+
+            if idx >= last_updated_idx {
+                last_updated_idx = idx + 1;
+            }
+        }
+
+        if n_before == n {
+            break;
+        }
+    }
+
+    for (idx, data) in available_executor_data.into_iter().enumerate() {
+        if idx >= last_updated_idx {
+            break;
+        }
+        let executor_id = data.executor_id.clone();
+        let proto: protobuf::ExecutorData = data.into();
+        let new_data = encode_protobuf(&proto)?;
+        txn_ops.push((Operation::Put(new_data), Keyspace::Slots, executor_id));
+    }
+
+    Ok((reservations, txn_ops))
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::state::backend::cluster::{ClusterState, DefaultClusterState};
+    use crate::state::backend::sled::SledClient;
+
+    use ballista_core::error::Result;
+    use ballista_core::serde::protobuf::{
+        executor_status, ExecutorHeartbeat, ExecutorStatus,
+    };
+
+    use futures::StreamExt;
+
+    use std::sync::Arc;
+
+    #[tokio::test]
+    async fn test_heartbeat_stream() -> Result<()> {
+        let sled = Arc::new(SledClient::try_new_temporary()?);
+
+        let cluster_state: Arc<dyn ClusterState> =
+            Arc::new(DefaultClusterState::new(sled));
+
+        for i in 0..10 {
+            let mut heartbeat_stream = 
cluster_state.executor_heartbeat_stream().await?;
+
+            cluster_state
+                .save_executor_heartbeat(ExecutorHeartbeat {
+                    executor_id: i.to_string(),
+                    timestamp: 0,
+                    metrics: vec![],
+                    status: Some(ExecutorStatus {
+                        status: 
Some(executor_status::Status::Active(String::default())),
+                    }),
+                })
+                .await?;
+
+            let received = if let Some(event) = heartbeat_stream.next().await {
+                event.executor_id == i.to_string()
+            } else {
+                false
+            };
+
+            assert!(received, "Did not receive heartbeat for executor {}", i);
+        }
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_heartbeats() -> Result<()> {
+        let sled = Arc::new(SledClient::try_new_temporary()?);
+
+        let cluster_state: Arc<dyn ClusterState> =
+            Arc::new(DefaultClusterState::new(sled));
+
+        // Add 10 executor heartbeats
+        for i in 0..10 {
+            cluster_state
+                .save_executor_heartbeat(ExecutorHeartbeat {
+                    executor_id: i.to_string(),
+                    timestamp: i as u64,
+                    metrics: vec![],
+                    status: Some(ExecutorStatus {
+                        status: 
Some(executor_status::Status::Active(String::default())),
+                    }),
+                })
+                .await?;
+        }
+
+        let heartbeats = cluster_state.executor_heartbeats().await?;
+
+        // Check that all 10 are present in the global view
+        for i in 0..10 {
+            let id = i.to_string();
+            if let Some(hb) = heartbeats.get(&id) {
+                assert_eq!(
+                    hb.executor_id,
+                    i.to_string(),
+                    "Expected heartbeat in map for {}",
+                    i
+                );
+                assert_eq!(
+                    hb.timestamp, i,
+                    "Expected timestamp to be correct for {}",
+                    i
+                );
+            } else {
+                panic!("Expected heartbeat for executor {}", i);
+            }
+        }
+
+        // Send new heartbeat with updated timestamp
+        cluster_state
+            .save_executor_heartbeat(ExecutorHeartbeat {
+                executor_id: "0".to_string(),
+                timestamp: 100,
+                metrics: vec![],
+                status: Some(ExecutorStatus {
+                    status: 
Some(executor_status::Status::Active(String::default())),
+                }),
+            })
+            .await?;
+
+        let heartbeats = cluster_state.executor_heartbeats().await?;
+
+        if let Some(hb) = heartbeats.get("0") {
+            assert_eq!(hb.executor_id, "0", "Expected heartbeat in map for 0");
+            assert_eq!(hb.timestamp, 100, "Expected timestamp to be updated 
for 0");
+        }
+
+        for i in 1..10 {
+            let id = i.to_string();
+            if let Some(hb) = heartbeats.get(&id) {
+                assert_eq!(
+                    hb.executor_id,
+                    i.to_string(),
+                    "Expected heartbeat in map for {}",
+                    i
+                );
+                assert_eq!(
+                    hb.timestamp, i,
+                    "Expected timestamp to be correct for {}",
+                    i
+                );
+            } else {
+                panic!("Expected heartbeat for executor {}", i);
+            }
+        }
+
+        Ok(())
+    }
+}
diff --git a/ballista/scheduler/src/state/backend/mod.rs 
b/ballista/scheduler/src/state/backend/mod.rs
index 0807f3c2..5c859937 100644
--- a/ballista/scheduler/src/state/backend/mod.rs
+++ b/ballista/scheduler/src/state/backend/mod.rs
@@ -22,6 +22,7 @@ use std::collections::HashSet;
 use std::fmt;
 use tokio::sync::OwnedMutexGuard;
 
+pub mod cluster;
 #[cfg(feature = "etcd")]
 pub mod etcd;
 pub mod memory;
@@ -31,7 +32,7 @@ mod utils;
 
 // an enum used to configure the backend
 // needs to be visible to code generated by configure_me
-#[derive(Debug, Clone, ArgEnum, serde::Deserialize)]
+#[derive(Debug, Clone, ArgEnum, serde::Deserialize, PartialEq, Eq)]
 pub enum StateBackend {
     Etcd,
     Memory,
@@ -131,6 +132,17 @@ pub trait StateBackendClient: Send + Sync {
     async fn delete(&self, keyspace: Keyspace, key: &str) -> Result<()>;
 }
 
+/// Method of distributing tasks to available executor slots
+#[derive(Debug, Clone, Copy)]
+pub enum TaskDistribution {
+    /// Eagerly assign tasks to executor slots. This will assign as many task 
slots per executor
+    /// as are currently available
+    Bias,
+    /// Distributed tasks evenely across executors. This will try and iterate 
through available executors
+    /// and assign one task to each executor until all tasks are assigned.
+    RoundRobin,
+}
+
 /// A Watch is a cancelable stream of put or delete events in the 
[StateBackendClient]
 #[tonic::async_trait]
 pub trait Watch: Stream<Item = WatchEvent> + Send + Unpin {
diff --git a/ballista/scheduler/src/state/executor_manager.rs 
b/ballista/scheduler/src/state/executor_manager.rs
index 165502b2..a6234528 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -15,15 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
 
-use crate::state::backend::{Keyspace, Operation, StateBackendClient, 
WatchEvent};
+use crate::state::backend::TaskDistribution;
 
-use crate::state::{decode_into, decode_protobuf, encode_protobuf, with_lock};
 use ballista_core::error::{BallistaError, Result};
 use ballista_core::serde::protobuf;
 
 use crate::config::SlotsPolicy;
+use crate::state::backend::cluster::ClusterState;
 use crate::state::execution_graph::RunningTaskInfo;
 use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
 use ballista_core::serde::protobuf::{
@@ -88,7 +88,8 @@ pub const DEFAULT_EXECUTOR_TIMEOUT_SECONDS: u64 = 180;
 pub(crate) struct ExecutorManager {
     // executor slot policy
     slots_policy: SlotsPolicy,
-    state: Arc<dyn StateBackendClient>,
+    task_distribution: TaskDistribution,
+    cluster_state: Arc<dyn ClusterState>,
     // executor_id -> ExecutorMetadata map
     executor_metadata: Arc<DashMap<String, ExecutorMetadata>>,
     // executor_id -> ExecutorHeartbeat map
@@ -102,12 +103,20 @@ pub(crate) struct ExecutorManager {
 
 impl ExecutorManager {
     pub(crate) fn new(
-        state: Arc<dyn StateBackendClient>,
+        cluster_state: Arc<dyn ClusterState>,
         slots_policy: SlotsPolicy,
     ) -> Self {
+        let task_distribution = match slots_policy {
+            SlotsPolicy::Bias => TaskDistribution::Bias,
+            SlotsPolicy::RoundRobin | SlotsPolicy::RoundRobinLocal => {
+                TaskDistribution::RoundRobin
+            }
+        };
+
         Self {
             slots_policy,
-            state,
+            task_distribution,
+            cluster_state,
             executor_metadata: Arc::new(DashMap::new()),
             executors_heartbeat: Arc::new(DashMap::new()),
             executor_data: Arc::new(Mutex::new(HashMap::new())),
@@ -116,16 +125,33 @@ impl ExecutorManager {
         }
     }
 
-    /// Initialize the `ExecutorManager` state. This will fill the 
`executor_heartbeats` value
-    /// with existing active heartbeats. Then new updates will be consumed 
through the `ExecutorHeartbeatListener`
+    /// Initialize a background process that will listen for executor 
heartbeats and update the in-memory cache
+    /// of executor heartbeats
     pub async fn init(&self) -> Result<()> {
         self.init_active_executor_heartbeats().await?;
-        let heartbeat_listener = ExecutorHeartbeatListener::new(
-            self.state.clone(),
-            self.executors_heartbeat.clone(),
-            self.dead_executors.clone(),
-        );
-        heartbeat_listener.start().await
+
+        let mut heartbeat_stream = 
self.cluster_state.executor_heartbeat_stream().await?;
+
+        info!("Initializing heartbeat listener");
+
+        let heartbeats = self.executors_heartbeat.clone();
+        let dead_executors = self.dead_executors.clone();
+        tokio::task::spawn(async move {
+            while let Some(heartbeat) = heartbeat_stream.next().await {
+                let executor_id = heartbeat.executor_id.clone();
+                if let Some(ExecutorStatus {
+                    status: Some(executor_status::Status::Dead(_)),
+                }) = heartbeat.status
+                {
+                    heartbeats.remove(&executor_id);
+                    dead_executors.insert(executor_id);
+                } else {
+                    heartbeats.insert(executor_id, heartbeat);
+                }
+            }
+        });
+
+        Ok(())
     }
 
     /// Reserve up to n executor task slots. Once reserved these slots will 
not be available
@@ -135,7 +161,13 @@ impl ExecutorManager {
         if self.slots_policy.is_local() {
             self.reserve_slots_local(n).await
         } else {
-            self.reserve_slots_global(n).await
+            let alive_executors = self.get_alive_executors_within_one_minute();
+
+            println!("Alive executors: {:?}", alive_executors);
+
+            self.cluster_state
+                .reserve_slots(n, self.task_distribution, 
Some(alive_executors))
+                .await
         }
     }
 
@@ -210,152 +242,6 @@ impl ExecutorManager {
         Ok(reservations)
     }
 
-    /// Reserve up to n executor task slots with considering the global 
resource snapshot
-    async fn reserve_slots_global(&self, n: u32) -> 
Result<Vec<ExecutorReservation>> {
-        let lock = self.state.lock(Keyspace::Slots, "global").await?;
-
-        with_lock(lock, async {
-            debug!("Attempting to reserve {} executor slots", n);
-            let start = Instant::now();
-
-            let alive_executors = self.get_alive_executors_within_one_minute();
-
-            let (reservations, txn_ops) = match self.slots_policy {
-                SlotsPolicy::Bias => {
-                    self.reserve_slots_global_bias(n, alive_executors).await?
-                }
-                SlotsPolicy::RoundRobin => {
-                    self.reserve_slots_global_round_robin(n, alive_executors)
-                        .await?
-                }
-                _ => {
-                    return Err(BallistaError::General(format!(
-                        "Reservation policy {:?} is not supported",
-                        self.slots_policy
-                    )))
-                }
-            };
-
-            self.state.apply_txn(txn_ops).await?;
-
-            let elapsed = start.elapsed();
-            info!(
-                "Reserved {} executor slots in {:?}",
-                reservations.len(),
-                elapsed
-            );
-
-            Ok(reservations)
-        })
-        .await
-    }
-
-    /// It will get ExecutorReservation from one executor as many as possible.
-    /// By this way, it can reduce the chance of decoding and encoding 
ExecutorData.
-    /// However, it may make the whole cluster unbalanced,
-    /// which means some executors may be very busy while other executors may 
be idle.
-    async fn reserve_slots_global_bias(
-        &self,
-        mut n: u32,
-        alive_executors: HashSet<String>,
-    ) -> Result<(Vec<ExecutorReservation>, Vec<(Operation, Keyspace, 
String)>)> {
-        let mut reservations: Vec<ExecutorReservation> = vec![];
-        let mut txn_ops: Vec<(Operation, Keyspace, String)> = vec![];
-
-        for executor_id in alive_executors {
-            if n == 0 {
-                break;
-            }
-
-            let value = self.state.get(Keyspace::Slots, &executor_id).await?;
-            let mut data = decode_into::<protobuf::ExecutorData, 
ExecutorData>(&value)?;
-            let take = std::cmp::min(data.available_task_slots, n);
-
-            for _ in 0..take {
-                
reservations.push(ExecutorReservation::new_free(executor_id.clone()));
-                data.available_task_slots -= 1;
-                n -= 1;
-            }
-
-            let proto: protobuf::ExecutorData = data.into();
-            let new_data = encode_protobuf(&proto)?;
-            txn_ops.push((Operation::Put(new_data), Keyspace::Slots, 
executor_id));
-        }
-
-        Ok((reservations, txn_ops))
-    }
-
-    /// Create ExecutorReservation in a round robin way to evenly assign tasks 
to executors
-    async fn reserve_slots_global_round_robin(
-        &self,
-        mut n: u32,
-        alive_executors: HashSet<String>,
-    ) -> Result<(Vec<ExecutorReservation>, Vec<(Operation, Keyspace, 
String)>)> {
-        let mut reservations: Vec<ExecutorReservation> = vec![];
-        let mut txn_ops: Vec<(Operation, Keyspace, String)> = vec![];
-
-        let all_executor_data = self
-            .state
-            .scan(Keyspace::Slots, None)
-            .await?
-            .into_iter()
-            .map(|(_, data)| decode_into::<protobuf::ExecutorData, 
ExecutorData>(&data))
-            .collect::<Result<Vec<ExecutorData>>>()?;
-
-        let mut available_executor_data: Vec<ExecutorData> = all_executor_data
-            .into_iter()
-            .filter_map(|data| {
-                (data.available_task_slots > 0
-                    && alive_executors.contains(&data.executor_id))
-                .then_some(data)
-            })
-            .collect();
-        available_executor_data
-            .sort_by(|a, b| Ord::cmp(&b.available_task_slots, 
&a.available_task_slots));
-
-        // Exclusive
-        let mut last_updated_idx = 0usize;
-        loop {
-            let n_before = n;
-            for (idx, data) in available_executor_data.iter_mut().enumerate() {
-                if n == 0 {
-                    break;
-                }
-
-                // Since the vector is sorted in descending order,
-                // if finding one executor has not enough slots, the following 
will have not enough, either
-                if data.available_task_slots == 0 {
-                    break;
-                }
-
-                reservations
-                    
.push(ExecutorReservation::new_free(data.executor_id.clone()));
-                data.available_task_slots -= 1;
-                n -= 1;
-
-                if idx >= last_updated_idx {
-                    last_updated_idx = idx + 1;
-                }
-            }
-
-            if n_before == n {
-                break;
-            }
-        }
-
-        for (idx, data) in available_executor_data.into_iter().enumerate() {
-            if idx >= last_updated_idx {
-                break;
-            }
-            let executor_id = data.executor_id.clone();
-            let proto: protobuf::ExecutorData = data.into();
-            let new_data = encode_protobuf(&proto)?;
-            txn_ops.push((Operation::Put(new_data), Keyspace::Slots, 
executor_id));
-        }
-
-        Ok((reservations, txn_ops))
-    }
-
     /// Returned reserved task slots to the pool of available slots. This 
operation is atomic
     /// so either the entire pool of reserved task slots it returned or none 
are.
     pub async fn cancel_reservations(
@@ -365,7 +251,7 @@ impl ExecutorManager {
         if self.slots_policy.is_local() {
             self.cancel_reservations_local(reservations).await
         } else {
-            self.cancel_reservations_global(reservations).await
+            self.cluster_state.cancel_reservations(reservations).await
         }
     }
 
@@ -394,54 +280,6 @@ impl ExecutorManager {
         Ok(())
     }
 
-    async fn cancel_reservations_global(
-        &self,
-        reservations: Vec<ExecutorReservation>,
-    ) -> Result<()> {
-        let lock = self.state.lock(Keyspace::Slots, "global").await?;
-
-        with_lock(lock, async {
-            let num_reservations = reservations.len();
-            debug!("Cancelling {} reservations", num_reservations);
-            let start = Instant::now();
-
-            let mut executor_slots: HashMap<String, ExecutorData> = 
HashMap::new();
-
-            for reservation in reservations {
-                let executor_id = &reservation.executor_id;
-                if let Some(data) = executor_slots.get_mut(executor_id) {
-                    data.available_task_slots += 1;
-                } else {
-                    let value = self.state.get(Keyspace::Slots, 
executor_id).await?;
-                    let mut data =
-                        decode_into::<protobuf::ExecutorData, 
ExecutorData>(&value)?;
-                    data.available_task_slots += 1;
-                    executor_slots.insert(executor_id.clone(), data);
-                }
-            }
-
-            let txn_ops: Vec<(Operation, Keyspace, String)> = executor_slots
-                .into_iter()
-                .map(|(executor_id, data)| {
-                    let proto: protobuf::ExecutorData = data.into();
-                    let new_data = encode_protobuf(&proto)?;
-                    Ok((Operation::Put(new_data), Keyspace::Slots, 
executor_id))
-                })
-                .collect::<Result<Vec<_>>>()?;
-
-            self.state.apply_txn(txn_ops).await?;
-
-            let elapsed = start.elapsed();
-            info!(
-                "Cancelled {} reservations in {:?}",
-                num_reservations, elapsed
-            );
-
-            Ok(())
-        })
-        .await
-    }
-
     /// Send rpc to Executors to cancel the running tasks
     pub async fn cancel_running_tasks(&self, tasks: Vec<RunningTaskInfo>) -> 
Result<()> {
         let mut tasks_to_cancel: HashMap<&str, Vec<protobuf::RunningTaskInfo>> 
=
@@ -595,21 +433,11 @@ impl ExecutorManager {
             }
         }
 
-        let value = self.state.get(Keyspace::Executors, executor_id).await?;
-
-        let decoded =
-            decode_into::<protobuf::ExecutorMetadata, 
ExecutorMetadata>(&value)?;
-        Ok(decoded)
+        self.cluster_state.get_executor_metadata(executor_id).await
     }
 
     pub async fn save_executor_metadata(&self, metadata: ExecutorMetadata) -> 
Result<()> {
-        let executor_id = metadata.id.clone();
-        let proto: protobuf::ExecutorMetadata = metadata.into();
-        let value = encode_protobuf(&proto)?;
-
-        self.state
-            .put(Keyspace::Executors, executor_id, value)
-            .await
+        self.cluster_state.save_executor_metadata(metadata).await
     }
 
     /// Register the executor with the scheduler. This will save the executor 
metadata and the
@@ -628,8 +456,6 @@ impl ExecutorManager {
     ) -> Result<Vec<ExecutorReservation>> {
         self.test_scheduler_connectivity(&metadata).await?;
 
-        let executor_id = metadata.id.clone();
-
         let current_ts = SystemTime::now()
             .duration_since(UNIX_EPOCH)
             .map_err(|e| {
@@ -640,18 +466,14 @@ impl ExecutorManager {
             })?
             .as_secs();
 
-        //TODO this should be in a transaction
-        // Now that we know we can connect, save the metadata and slots
-        self.save_executor_metadata(metadata).await?;
-        self.save_executor_heartbeat(protobuf::ExecutorHeartbeat {
-            executor_id: executor_id.clone(),
+        let initial_heartbeat = ExecutorHeartbeat {
+            executor_id: metadata.id.clone(),
             timestamp: current_ts,
             metrics: vec![],
             status: Some(ExecutorStatus {
-                status: Some(executor_status::Status::Active("".to_string())),
+                status: 
Some(executor_status::Status::Active(String::default())),
             }),
-        })
-        .await?;
+        };
 
         if !reserve {
             if self.slots_policy.is_local() {
@@ -660,16 +482,20 @@ impl ExecutorManager {
                     .insert(specification.executor_id.clone(), 
specification.clone());
             }
 
-            let proto: protobuf::ExecutorData = specification.into();
-            let value = encode_protobuf(&proto)?;
-            self.state.put(Keyspace::Slots, executor_id, value).await?;
+            self.cluster_state
+                .register_executor(metadata, specification.clone(), reserve)
+                .await?;
+
+            self.executors_heartbeat
+                .insert(initial_heartbeat.executor_id.clone(), 
initial_heartbeat);
+
             Ok(vec![])
         } else {
             let mut specification = specification;
             let num_slots = specification.available_task_slots as usize;
             let mut reservations: Vec<ExecutorReservation> = vec![];
             for _ in 0..num_slots {
-                
reservations.push(ExecutorReservation::new_free(executor_id.clone()));
+                
reservations.push(ExecutorReservation::new_free(metadata.id.clone()));
             }
 
             specification.available_task_slots = 0;
@@ -680,9 +506,13 @@ impl ExecutorManager {
                     .insert(specification.executor_id.clone(), 
specification.clone());
             }
 
-            let proto: protobuf::ExecutorData = specification.into();
-            let value = encode_protobuf(&proto)?;
-            self.state.put(Keyspace::Slots, executor_id, value).await?;
+            self.cluster_state
+                .register_executor(metadata, specification, reserve)
+                .await?;
+
+            self.executors_heartbeat
+                .insert(initial_heartbeat.executor_id.clone(), 
initial_heartbeat);
+
             Ok(reservations)
         }
     }
@@ -691,29 +521,22 @@ impl ExecutorManager {
     pub async fn remove_executor(
         &self,
         executor_id: &str,
-        _reason: Option<String>,
+        reason: Option<String>,
     ) -> Result<()> {
-        let current_ts = SystemTime::now()
-            .duration_since(UNIX_EPOCH)
-            .map_err(|e| {
-                BallistaError::Internal(format!(
-                    "Error getting current timestamp: {:?}",
-                    e
-                ))
-            })?
-            .as_secs();
+        info!("Removing executor {}: {:?}", executor_id, reason);
+        self.cluster_state.remove_executor(executor_id).await?;
 
-        self.save_dead_executor_heartbeat(protobuf::ExecutorHeartbeat {
-            executor_id: executor_id.to_owned(),
-            timestamp: current_ts,
-            metrics: vec![],
-            status: Some(ExecutorStatus {
-                status: Some(executor_status::Status::Dead("".to_string())),
-            }),
-        })
-        .await?;
+        let executor_id = executor_id.to_owned();
 
-        // TODO Check the Executor reservation logic for push-based scheduling
+        self.executors_heartbeat.remove(&executor_id);
+
+        // Remove executor data cache for dead executors
+        {
+            let mut executor_data = self.executor_data.lock();
+            executor_data.remove(&executor_id);
+        }
+
+        self.dead_executors.insert(executor_id);
 
         Ok(())
     }
@@ -746,12 +569,10 @@ impl ExecutorManager {
 
     pub(crate) async fn save_executor_heartbeat(
         &self,
-        heartbeat: protobuf::ExecutorHeartbeat,
+        heartbeat: ExecutorHeartbeat,
     ) -> Result<()> {
-        let executor_id = heartbeat.executor_id.clone();
-        let value = encode_protobuf(&heartbeat)?;
-        self.state
-            .put(Keyspace::Heartbeats, executor_id, value)
+        self.cluster_state
+            .save_executor_heartbeat(heartbeat.clone())
             .await?;
 
         self.executors_heartbeat
@@ -760,45 +581,21 @@ impl ExecutorManager {
         Ok(())
     }
 
-    pub(crate) async fn save_dead_executor_heartbeat(
-        &self,
-        heartbeat: protobuf::ExecutorHeartbeat,
-    ) -> Result<()> {
-        let executor_id = heartbeat.executor_id.clone();
-        let value = encode_protobuf(&heartbeat)?;
-        self.state
-            .put(Keyspace::Heartbeats, executor_id.clone(), value)
-            .await?;
-
-        self.executors_heartbeat
-            .remove(&heartbeat.executor_id.clone());
-
-        // Remove executor data cache for dead executors
-        {
-            let mut executor_data = self.executor_data.lock();
-            executor_data.remove(&executor_id);
-        }
-
-        self.dead_executors.insert(executor_id);
-        Ok(())
-    }
-
     pub(crate) fn is_dead_executor(&self, executor_id: &str) -> bool {
         self.dead_executors.contains(executor_id)
     }
 
     /// Initialize the set of active executor heartbeats from storage
     async fn init_active_executor_heartbeats(&self) -> Result<()> {
-        let heartbeats = self.state.scan(Keyspace::Heartbeats, None).await?;
+        let heartbeats = self.cluster_state.executor_heartbeats().await?;
 
-        for (_, value) in heartbeats {
-            let data: protobuf::ExecutorHeartbeat = decode_protobuf(&value)?;
-            let executor_id = data.executor_id.clone();
+        for (executor_id, heartbeat) in heartbeats {
+            // let data: protobuf::ExecutorHeartbeat = 
decode_protobuf(&value)?;
             if let Some(ExecutorStatus {
                 status: Some(executor_status::Status::Active(_)),
-            }) = data.status
+            }) = heartbeat.status
             {
-                self.executors_heartbeat.insert(executor_id, data);
+                self.executors_heartbeat.insert(executor_id, heartbeat);
             }
         }
         Ok(())
@@ -851,65 +648,10 @@ impl ExecutorManager {
     }
 }
 
-/// Rather than doing a scan across persistent state to find alive executors 
every time
-/// we need to find the set of alive executors, we start a watch on the 
`Heartbeats` keyspace
-/// and maintain an in-memory copy of the executor heartbeats.
-struct ExecutorHeartbeatListener {
-    state: Arc<dyn StateBackendClient>,
-    executors_heartbeat: Arc<DashMap<String, protobuf::ExecutorHeartbeat>>,
-    dead_executors: Arc<DashSet<String>>,
-}
-
-impl ExecutorHeartbeatListener {
-    pub fn new(
-        state: Arc<dyn StateBackendClient>,
-        executors_heartbeat: Arc<DashMap<String, protobuf::ExecutorHeartbeat>>,
-        dead_executors: Arc<DashSet<String>>,
-    ) -> Self {
-        Self {
-            state,
-            executors_heartbeat,
-            dead_executors,
-        }
-    }
-
-    /// Spawn an sync task which will watch the the Heartbeats keyspace and 
insert
-    /// new heartbeats in the `executors_heartbeat` cache.
-    pub async fn start(&self) -> Result<()> {
-        let mut watch = self
-            .state
-            .watch(Keyspace::Heartbeats, "".to_owned())
-            .await?;
-        let heartbeats = self.executors_heartbeat.clone();
-        let dead_executors = self.dead_executors.clone();
-        tokio::task::spawn(async move {
-            while let Some(event) = watch.next().await {
-                if let WatchEvent::Put(_, value) = event {
-                    if let Ok(data) =
-                        decode_protobuf::<protobuf::ExecutorHeartbeat>(&value)
-                    {
-                        let executor_id = data.executor_id.clone();
-                        // Remove dead executors
-                        if let Some(ExecutorStatus {
-                            status: Some(executor_status::Status::Dead(_)),
-                        }) = data.status
-                        {
-                            heartbeats.remove(&executor_id);
-                            dead_executors.insert(executor_id);
-                        } else {
-                            heartbeats.insert(executor_id, data);
-                        }
-                    }
-                }
-            }
-        });
-        Ok(())
-    }
-}
-
 #[cfg(test)]
 mod test {
     use crate::config::SlotsPolicy;
+    use crate::state::backend::cluster::DefaultClusterState;
     use crate::state::backend::sled::SledClient;
     use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
     use ballista_core::error::Result;
@@ -928,9 +670,11 @@ mod test {
     }
 
     async fn test_reserve_and_cancel_inner(slots_policy: SlotsPolicy) -> 
Result<()> {
-        let state_storage = Arc::new(SledClient::try_new_temporary()?);
+        let cluster_state = Arc::new(DefaultClusterState::new(Arc::new(
+            SledClient::try_new_temporary()?,
+        )));
 
-        let executor_manager = ExecutorManager::new(state_storage, 
slots_policy);
+        let executor_manager = ExecutorManager::new(cluster_state, 
slots_policy);
 
         let executors = test_executors(10, 4);
 
@@ -943,7 +687,12 @@ mod test {
         // Reserve all the slots
         let reservations = executor_manager.reserve_slots(40).await?;
 
-        assert_eq!(reservations.len(), 40);
+        assert_eq!(
+            reservations.len(),
+            40,
+            "Expected 40 reservations for policy {:?}",
+            slots_policy
+        );
 
         // Now cancel them
         executor_manager.cancel_reservations(reservations).await?;
@@ -951,7 +700,12 @@ mod test {
         // Now reserve again
         let reservations = executor_manager.reserve_slots(40).await?;
 
-        assert_eq!(reservations.len(), 40);
+        assert_eq!(
+            reservations.len(),
+            40,
+            "Expected 40 reservations for policy {:?}",
+            slots_policy
+        );
 
         Ok(())
     }
@@ -966,9 +720,11 @@ mod test {
     }
 
     async fn test_reserve_partial_inner(slots_policy: SlotsPolicy) -> 
Result<()> {
-        let state_storage = Arc::new(SledClient::try_new_temporary()?);
+        let cluster_state = Arc::new(DefaultClusterState::new(Arc::new(
+            SledClient::try_new_temporary()?,
+        )));
 
-        let executor_manager = ExecutorManager::new(state_storage, 
slots_policy);
+        let executor_manager = ExecutorManager::new(cluster_state, 
slots_policy);
 
         let executors = test_executors(10, 4);
 
@@ -1021,9 +777,11 @@ mod test {
 
         let executors = test_executors(10, 4);
 
-        let state_storage = Arc::new(SledClient::try_new_temporary()?);
+        let cluster_state = Arc::new(DefaultClusterState::new(Arc::new(
+            SledClient::try_new_temporary()?,
+        )));
 
-        let executor_manager = ExecutorManager::new(state_storage, 
slots_policy);
+        let executor_manager = ExecutorManager::new(cluster_state, 
slots_policy);
 
         for (executor_metadata, executor_data) in executors {
             executor_manager
@@ -1066,9 +824,11 @@ mod test {
     }
 
     async fn test_register_reserve_inner(slots_policy: SlotsPolicy) -> 
Result<()> {
-        let state_storage = Arc::new(SledClient::try_new_temporary()?);
+        let cluster_state = Arc::new(DefaultClusterState::new(Arc::new(
+            SledClient::try_new_temporary()?,
+        )));
 
-        let executor_manager = ExecutorManager::new(state_storage, 
slots_policy);
+        let executor_manager = ExecutorManager::new(cluster_state, 
slots_policy);
 
         let executors = test_executors(10, 4);
 
diff --git a/ballista/scheduler/src/state/mod.rs 
b/ballista/scheduler/src/state/mod.rs
index 0671fdbe..99791457 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -33,6 +33,7 @@ use crate::state::task_manager::{TaskLauncher, TaskManager};
 
 use crate::config::SchedulerConfig;
 use crate::state::execution_graph::TaskDescription;
+use backend::cluster::ClusterState;
 use ballista_core::error::{BallistaError, Result};
 use ballista_core::serde::protobuf::TaskStatus;
 use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
@@ -99,11 +100,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
     #[cfg(test)]
     pub fn new_with_default_scheduler_name(
         config_client: Arc<dyn StateBackendClient>,
+        cluster_state: Arc<dyn ClusterState>,
         session_builder: SessionBuilder,
         codec: BallistaCodec<T, U>,
     ) -> Self {
         SchedulerState::new(
             config_client,
+            cluster_state,
             session_builder,
             codec,
             "localhost:50050".to_owned(),
@@ -113,6 +116,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
 
     pub fn new(
         config_client: Arc<dyn StateBackendClient>,
+        cluster_state: Arc<dyn ClusterState>,
         session_builder: SessionBuilder,
         codec: BallistaCodec<T, U>,
         scheduler_name: String,
@@ -120,7 +124,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
     ) -> Self {
         Self {
             executor_manager: ExecutorManager::new(
-                config_client.clone(),
+                cluster_state,
                 config.executor_slots_policy,
             ),
             task_manager: TaskManager::new(
@@ -138,6 +142,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
     #[allow(dead_code)]
     pub(crate) fn with_task_launcher(
         config_client: Arc<dyn StateBackendClient>,
+        cluster_state: Arc<dyn ClusterState>,
         session_builder: SessionBuilder,
         codec: BallistaCodec<T, U>,
         scheduler_name: String,
@@ -146,7 +151,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
     ) -> Self {
         Self {
             executor_manager: ExecutorManager::new(
-                config_client.clone(),
+                cluster_state,
                 config.executor_slots_policy,
             ),
             task_manager: TaskManager::with_launcher(
@@ -461,6 +466,7 @@ mod test {
     use ballista_core::utils::default_session_builder;
 
     use crate::config::SchedulerConfig;
+    use crate::state::backend::cluster::DefaultClusterState;
     use crate::test_utils::BlackholeTaskLauncher;
     use datafusion::arrow::datatypes::{DataType, Field, Schema};
     use datafusion::logical_expr::{col, sum};
@@ -474,9 +480,11 @@ mod test {
     #[tokio::test]
     async fn test_offer_free_reservations() -> Result<()> {
         let state_storage = Arc::new(SledClient::try_new_temporary()?);
+        let cluster_state = 
Arc::new(DefaultClusterState::new(state_storage.clone()));
         let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
             Arc::new(SchedulerState::new_with_default_scheduler_name(
                 state_storage,
+                cluster_state,
                 default_session_builder,
                 BallistaCodec::default(),
             ));
@@ -510,9 +518,11 @@ mod test {
             .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4")
             .build()?;
         let state_storage = Arc::new(SledClient::try_new_temporary()?);
+        let cluster_state = 
Arc::new(DefaultClusterState::new(state_storage.clone()));
         let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
             Arc::new(SchedulerState::with_task_launcher(
                 state_storage,
+                cluster_state,
                 default_session_builder,
                 BallistaCodec::default(),
                 String::default(),
@@ -595,9 +605,11 @@ mod test {
             .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4")
             .build()?;
         let state_storage = Arc::new(SledClient::try_new_temporary()?);
+        let cluster_state = 
Arc::new(DefaultClusterState::new(state_storage.clone()));
         let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
             Arc::new(SchedulerState::with_task_launcher(
                 state_storage,
+                cluster_state,
                 default_session_builder,
                 BallistaCodec::default(),
                 String::default(),
diff --git a/ballista/scheduler/src/test_utils.rs 
b/ballista/scheduler/src/test_utils.rs
index ae386382..3f98e370 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -51,6 +51,7 @@ use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::CsvReadOptions;
 
 use crate::scheduler_server::event::QueryStageSchedulerEvent;
+use crate::state::backend::cluster::DefaultClusterState;
 use datafusion_proto::protobuf::LogicalPlanNode;
 use parking_lot::Mutex;
 use tokio::sync::mpsc::{channel, Receiver, Sender};
@@ -381,6 +382,7 @@ impl SchedulerTest {
         runner: Option<Arc<dyn TaskRunner>>,
     ) -> Result<Self> {
         let state_storage = Arc::new(SledClient::try_new_temporary()?);
+        let cluster_state = 
Arc::new(DefaultClusterState::new(state_storage.clone()));
 
         let ballista_config = BallistaConfig::builder()
             .set(
@@ -415,7 +417,8 @@ impl SchedulerTest {
         let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerServer::with_task_launcher(
                 "localhost:50050".to_owned(),
-                state_storage.clone(),
+                state_storage,
+                cluster_state,
                 BallistaCodec::default(),
                 config,
                 metrics_collector,

Reply via email to