This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b496e59 Remove redundant fields in ExecutorManager (#728)
0b496e59 is described below

commit 0b496e59955735e0e3fbef60b107c4fa0f879b0d
Author: yahoNanJing <[email protected]>
AuthorDate: Fri Mar 31 07:46:33 2023 +0800

    Remove redundant fields in ExecutorManager (#728)
    
    * Remove redundant fields in ExecutorManager
    
    * Remove RoundRobinLocal slot policy
    
    * Rename SlotsPolicy to TaskDistribution
    
    * Introduce executor heartbeat cache to KeyValueState
    
    * Remove buggy executor check for pull-staged task scheduling
    
    * Add ExecutorMetadata cache for KeyValueState
    
    ---------
    
    Co-authored-by: yangzhong <[email protected]>
---
 ballista/scheduler/scheduler_config_spec.toml    |   8 +-
 ballista/scheduler/src/bin/main.rs               |   2 +-
 ballista/scheduler/src/cluster/kv.rs             | 199 ++++++++-----
 ballista/scheduler/src/cluster/memory.rs         |  74 ++---
 ballista/scheduler/src/cluster/mod.rs            |  27 +-
 ballista/scheduler/src/cluster/test/mod.rs       |  36 +--
 ballista/scheduler/src/config.rs                 |  34 +--
 ballista/scheduler/src/scheduler_server/grpc.rs  |  45 +--
 ballista/scheduler/src/scheduler_server/mod.rs   |   4 +-
 ballista/scheduler/src/state/executor_manager.rs | 363 +++++------------------
 ballista/scheduler/src/state/mod.rs              |   4 +-
 11 files changed, 283 insertions(+), 513 deletions(-)

diff --git a/ballista/scheduler/scheduler_config_spec.toml 
b/ballista/scheduler/scheduler_config_spec.toml
index 93ffe4c1..2e7d0d65 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -96,10 +96,10 @@ default = "3600"
 doc = "Delayed interval for cleaning up finished job state. Default: 3600"
 
 [[param]]
-name = "executor_slots_policy"
-type = "ballista_scheduler::config::SlotsPolicy"
-doc = "The executor slots policy for the scheduler, possible values: bias, 
round-robin, round-robin-local. Default: bias"
-default = "ballista_scheduler::config::SlotsPolicy::Bias"
+name = "task_distribution"
+type = "ballista_scheduler::config::TaskDistribution"
+doc = "The policy of distributing tasks to available executor slots, possible 
values: bias, round-robin. Default: bias"
+default = "ballista_scheduler::config::TaskDistribution::Bias"
 
 [[param]]
 name = "plugin_dir"
diff --git a/ballista/scheduler/src/bin/main.rs 
b/ballista/scheduler/src/bin/main.rs
index 93904901..7511f4d3 100644
--- a/ballista/scheduler/src/bin/main.rs
+++ b/ballista/scheduler/src/bin/main.rs
@@ -109,7 +109,7 @@ async fn main() -> Result<()> {
         bind_port: opt.bind_port,
         scheduling_policy: opt.scheduler_policy,
         event_loop_buffer_size: opt.event_loop_buffer_size,
-        executor_slots_policy: opt.executor_slots_policy,
+        task_distribution: opt.task_distribution,
         finished_job_data_clean_up_interval_seconds: opt
             .finished_job_data_clean_up_interval_seconds,
         finished_job_state_clean_up_interval_seconds: opt
diff --git a/ballista/scheduler/src/cluster/kv.rs 
b/ballista/scheduler/src/cluster/kv.rs
index e2869977..eb164753 100644
--- a/ballista/scheduler/src/cluster/kv.rs
+++ b/ballista/scheduler/src/cluster/kv.rs
@@ -20,7 +20,7 @@ use crate::cluster::{
     reserve_slots_bias, reserve_slots_round_robin, ClusterState, 
ExecutorHeartbeatStream,
     JobState, JobStateEvent, JobStateEventStream, JobStatus, TaskDistribution,
 };
-use crate::scheduler_server::SessionBuilder;
+use crate::scheduler_server::{timestamp_secs, SessionBuilder};
 use crate::state::execution_graph::ExecutionGraph;
 use crate::state::executor_manager::ExecutorReservation;
 use crate::state::session_manager::create_datafusion_context;
@@ -42,12 +42,11 @@ use datafusion_proto::physical_plan::AsExecutionPlan;
 use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
 use futures::StreamExt;
 use itertools::Itertools;
-use log::warn;
+use log::{info, warn};
 use prost::Message;
 use std::collections::{HashMap, HashSet};
 use std::future::Future;
 use std::sync::Arc;
-use std::time::{SystemTime, UNIX_EPOCH};
 
 /// State implementation based on underlying `KeyValueStore`
 pub struct KeyValueState<
@@ -57,6 +56,10 @@ pub struct KeyValueState<
 > {
     /// Underlying `KeyValueStore`
     store: S,
+    /// ExecutorMetadata cache, executor_id -> ExecutorMetadata
+    executors: Arc<DashMap<String, ExecutorMetadata>>,
+    /// ExecutorHeartbeat cache, executor_id -> ExecutorHeartbeat
+    executor_heartbeats: Arc<DashMap<String, ExecutorHeartbeat>>,
     /// Codec used to serialize/deserialize execution plan
     codec: BallistaCodec<T, U>,
     /// Name of current scheduler. Should be `{host}:{port}`
@@ -79,18 +82,98 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 
'static + AsExecutionPlan>
     ) -> Self {
         Self {
             store,
+            executors: Arc::new(DashMap::new()),
+            executor_heartbeats: Arc::new(DashMap::new()),
             scheduler: scheduler.into(),
             codec,
             queued_jobs: DashMap::new(),
             session_builder,
         }
     }
+
+    /// Initialize the set of active executor heartbeats from storage
+    async fn init_active_executor_heartbeats(&self) -> Result<()> {
+        let heartbeats = self.store.scan(Keyspace::Heartbeats, None).await?;
+
+        for (_, value) in heartbeats {
+            let data: ExecutorHeartbeat = decode_protobuf(&value)?;
+            if let Some(protobuf::ExecutorStatus {
+                status: Some(protobuf::executor_status::Status::Active(_)),
+            }) = &data.status
+            {
+                self.executor_heartbeats
+                    .insert(data.executor_id.clone(), data);
+            }
+        }
+
+        Ok(())
+    }
+
+    /// Return the stream of executor heartbeats observed by all schedulers in 
the cluster.
+    /// This can be aggregated to provide an eventually consistent view of all 
executors within the cluster
+    async fn executor_heartbeat_stream(&self) -> 
Result<ExecutorHeartbeatStream> {
+        let events = self
+            .store
+            .watch(Keyspace::Heartbeats, String::default())
+            .await?;
+
+        Ok(events
+            .filter_map(|event| {
+                futures::future::ready(match event {
+                    WatchEvent::Put(_, value) => {
+                        if let Ok(heartbeat) =
+                            decode_protobuf::<ExecutorHeartbeat>(&value)
+                        {
+                            Some(heartbeat)
+                        } else {
+                            None
+                        }
+                    }
+                    WatchEvent::Delete(_) => None,
+                })
+            })
+            .boxed())
+    }
 }
 
 #[async_trait]
 impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
     ClusterState for KeyValueState<S, T, U>
 {
+    /// Initialize a background process that will listen for executor 
heartbeats and update the in-memory cache
+    /// of executor heartbeats
+    async fn init(&self) -> Result<()> {
+        self.init_active_executor_heartbeats().await?;
+
+        let mut heartbeat_stream = self.executor_heartbeat_stream().await?;
+
+        info!("Initializing heartbeat listener");
+
+        let heartbeats = self.executor_heartbeats.clone();
+        let executors = self.executors.clone();
+        tokio::task::spawn(async move {
+            while let Some(heartbeat) = heartbeat_stream.next().await {
+                let executor_id = heartbeat.executor_id.clone();
+
+                match heartbeat
+                    .status
+                    .as_ref()
+                    .and_then(|status| status.status.as_ref())
+                {
+                    Some(protobuf::executor_status::Status::Dead(_)) => {
+                        heartbeats.remove(&executor_id);
+                        executors.remove(&executor_id);
+                    }
+                    _ => {
+                        heartbeats.insert(executor_id, heartbeat);
+                    }
+                }
+            }
+        });
+
+        Ok(())
+    }
+
     async fn reserve_slots(
         &self,
         num_slots: u32,
@@ -240,22 +323,17 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 
'static + AsExecutionPlan>
     ) -> Result<Vec<ExecutorReservation>> {
         let executor_id = metadata.id.clone();
 
-        let current_ts = SystemTime::now()
-            .duration_since(UNIX_EPOCH)
-            .map_err(|e| {
-                BallistaError::Internal(format!("Error getting current 
timestamp: {e:?}"))
-            })?
-            .as_secs();
-
         //TODO this should be in a transaction
         // Now that we know we can connect, save the metadata and slots
         self.save_executor_metadata(metadata).await?;
         self.save_executor_heartbeat(ExecutorHeartbeat {
             executor_id: executor_id.clone(),
-            timestamp: current_ts,
+            timestamp: timestamp_secs(),
             metrics: vec![],
             status: Some(protobuf::ExecutorStatus {
-                status: 
Some(protobuf::executor_status::Status::Active("".to_string())),
+                status: Some(
+                    
protobuf::executor_status::Status::Active(String::default()),
+                ),
             }),
         })
         .await?;
@@ -341,39 +419,54 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 
'static + AsExecutionPlan>
 
     async fn save_executor_metadata(&self, metadata: ExecutorMetadata) -> 
Result<()> {
         let executor_id = metadata.id.clone();
-        let proto: protobuf::ExecutorMetadata = metadata.into();
 
+        let proto: protobuf::ExecutorMetadata = metadata.clone().into();
         self.store
-            .put(Keyspace::Executors, executor_id, proto.encode_to_vec())
-            .await
+            .put(
+                Keyspace::Executors,
+                executor_id.clone(),
+                proto.encode_to_vec(),
+            )
+            .await?;
+
+        self.executors.insert(executor_id, metadata);
+
+        Ok(())
     }
 
     async fn get_executor_metadata(&self, executor_id: &str) -> 
Result<ExecutorMetadata> {
-        let value = self.store.get(Keyspace::Executors, executor_id).await?;
+        let metadata = if let Some(metadata) = self.executors.get(executor_id) 
{
+            metadata.value().clone()
+        } else {
+            let value = self.store.get(Keyspace::Executors, 
executor_id).await?;
+            let decoded =
+                decode_into::<protobuf::ExecutorMetadata, 
ExecutorMetadata>(&value)?;
+            self.executors
+                .insert(executor_id.to_string(), decoded.clone());
 
-        let decoded =
-            decode_into::<protobuf::ExecutorMetadata, 
ExecutorMetadata>(&value)?;
-        Ok(decoded)
+            decoded
+        };
+
+        Ok(metadata)
     }
 
     async fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) -> 
Result<()> {
         let executor_id = heartbeat.executor_id.clone();
         self.store
-            .put(Keyspace::Heartbeats, executor_id, heartbeat.encode_to_vec())
-            .await
+            .put(
+                Keyspace::Heartbeats,
+                executor_id.clone(),
+                heartbeat.clone().encode_to_vec(),
+            )
+            .await?;
+        self.executor_heartbeats.insert(executor_id, heartbeat);
+        Ok(())
     }
 
     async fn remove_executor(&self, executor_id: &str) -> Result<()> {
-        let current_ts = SystemTime::now()
-            .duration_since(UNIX_EPOCH)
-            .map_err(|e| {
-                BallistaError::Internal(format!("Error getting current 
timestamp: {e:?}"))
-            })?
-            .as_secs();
-
         let value = ExecutorHeartbeat {
             executor_id: executor_id.to_owned(),
-            timestamp: current_ts,
+            timestamp: timestamp_secs(),
             metrics: vec![],
             status: Some(protobuf::ExecutorStatus {
                 status: 
Some(protobuf::executor_status::Status::Dead("".to_string())),
@@ -384,52 +477,24 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 
'static + AsExecutionPlan>
         self.store
             .put(Keyspace::Heartbeats, executor_id.to_owned(), value)
             .await?;
+        self.executor_heartbeats.remove(executor_id);
 
         // TODO Check the Executor reservation logic for push-based scheduling
 
         Ok(())
     }
 
-    async fn executor_heartbeat_stream(&self) -> 
Result<ExecutorHeartbeatStream> {
-        let events = self
-            .store
-            .watch(Keyspace::Heartbeats, String::default())
-            .await?;
-
-        Ok(events
-            .filter_map(|event| {
-                futures::future::ready(match event {
-                    WatchEvent::Put(_, value) => {
-                        if let Ok(heartbeat) =
-                            decode_protobuf::<ExecutorHeartbeat>(&value)
-                        {
-                            Some(heartbeat)
-                        } else {
-                            None
-                        }
-                    }
-                    WatchEvent::Delete(_) => None,
-                })
-            })
-            .boxed())
+    fn executor_heartbeats(&self) -> HashMap<String, ExecutorHeartbeat> {
+        self.executor_heartbeats
+            .iter()
+            .map(|r| (r.key().clone(), r.value().clone()))
+            .collect()
     }
 
-    async fn executor_heartbeats(&self) -> Result<HashMap<String, 
ExecutorHeartbeat>> {
-        let heartbeats = self.store.scan(Keyspace::Heartbeats, None).await?;
-
-        let mut heartbeat_map = HashMap::with_capacity(heartbeats.len());
-
-        for (_, value) in heartbeats {
-            let data: ExecutorHeartbeat = decode_protobuf(&value)?;
-            if let Some(protobuf::ExecutorStatus {
-                status: Some(protobuf::executor_status::Status::Active(_)),
-            }) = &data.status
-            {
-                heartbeat_map.insert(data.executor_id.clone(), data);
-            }
-        }
-
-        Ok(heartbeat_map)
+    fn get_executor_heartbeat(&self, executor_id: &str) -> 
Option<ExecutorHeartbeat> {
+        self.executor_heartbeats
+            .get(executor_id)
+            .map(|r| r.value().clone())
     }
 }
 
diff --git a/ballista/scheduler/src/cluster/memory.rs 
b/ballista/scheduler/src/cluster/memory.rs
index 95c6b7a2..6d7d7bc6 100644
--- a/ballista/scheduler/src/cluster/memory.rs
+++ b/ballista/scheduler/src/cluster/memory.rs
@@ -16,8 +16,8 @@
 // under the License.
 
 use crate::cluster::{
-    reserve_slots_bias, reserve_slots_round_robin, ClusterState, 
ExecutorHeartbeatStream,
-    JobState, JobStateEvent, JobStateEventStream, JobStatus, TaskDistribution,
+    reserve_slots_bias, reserve_slots_round_robin, ClusterState, JobState, 
JobStateEvent,
+    JobStateEventStream, JobStatus, TaskDistribution,
 };
 use crate::state::execution_graph::ExecutionGraph;
 use crate::state::executor_manager::ExecutorReservation;
@@ -53,9 +53,6 @@ pub struct InMemoryClusterState {
     executors: DashMap<String, ExecutorMetadata>,
     /// Last heartbeat received for each executor
     heartbeats: DashMap<String, ExecutorHeartbeat>,
-    /// Broadcast channel sender for heartbeats, If `None` there are not
-    /// subscribers
-    heartbeat_sender: ClusterEventSender<ExecutorHeartbeat>,
 }
 
 #[async_trait]
@@ -163,14 +160,18 @@ impl ClusterState for InMemoryClusterState {
         mut spec: ExecutorData,
         reserve: bool,
     ) -> Result<Vec<ExecutorReservation>> {
-        let heartbeat = ExecutorHeartbeat {
-            executor_id: metadata.id.clone(),
+        let executor_id = metadata.id.clone();
+
+        self.save_executor_metadata(metadata).await?;
+        self.save_executor_heartbeat(ExecutorHeartbeat {
+            executor_id: executor_id.clone(),
             timestamp: timestamp_secs(),
             metrics: vec![],
             status: Some(ExecutorStatus {
                 status: 
Some(executor_status::Status::Active(String::default())),
             }),
-        };
+        })
+        .await?;
 
         let mut guard = self.task_slots.lock();
 
@@ -178,38 +179,29 @@ impl ClusterState for InMemoryClusterState {
         if let Some((idx, _)) = guard
             .task_slots
             .iter()
-            .find_position(|slots| slots.executor_id == metadata.id)
+            .find_position(|slots| slots.executor_id == executor_id)
         {
             guard.task_slots.swap_remove(idx);
         }
 
         if reserve {
             let slots = std::mem::take(&mut spec.available_task_slots) as 
usize;
-
             let reservations = (0..slots)
-                .map(|_| ExecutorReservation::new_free(metadata.id.clone()))
+                .map(|_| ExecutorReservation::new_free(executor_id.clone()))
                 .collect();
 
-            self.executors.insert(metadata.id.clone(), metadata.clone());
-
             guard.task_slots.push(AvailableTaskSlots {
-                executor_id: metadata.id,
+                executor_id,
                 slots: 0,
             });
 
-            self.heartbeat_sender.send(&heartbeat);
-
             Ok(reservations)
         } else {
-            self.executors.insert(metadata.id.clone(), metadata.clone());
-
             guard.task_slots.push(AvailableTaskSlots {
-                executor_id: metadata.id,
+                executor_id,
                 slots: spec.available_task_slots,
             });
 
-            self.heartbeat_sender.send(&heartbeat);
-
             Ok(vec![])
         }
     }
@@ -231,15 +223,13 @@ impl ClusterState for InMemoryClusterState {
     }
 
     async fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) -> 
Result<()> {
-        if let Some(mut last) = 
self.heartbeats.get_mut(&heartbeat.executor_id) {
-            let _ = std::mem::replace(last.deref_mut(), heartbeat.clone());
+        let executor_id = heartbeat.executor_id.clone();
+        if let Some(mut last) = self.heartbeats.get_mut(&executor_id) {
+            let _ = std::mem::replace(last.deref_mut(), heartbeat);
         } else {
-            self.heartbeats
-                .insert(heartbeat.executor_id.clone(), heartbeat.clone());
+            self.heartbeats.insert(executor_id, heartbeat);
         }
 
-        self.heartbeat_sender.send(&heartbeat);
-
         Ok(())
     }
 
@@ -256,34 +246,20 @@ impl ClusterState for InMemoryClusterState {
             }
         }
 
-        if let Some(heartbeat) = 
self.heartbeats.get_mut(executor_id).as_deref_mut() {
-            let new_heartbeat = ExecutorHeartbeat {
-                executor_id: executor_id.to_string(),
-                timestamp: timestamp_secs(),
-                metrics: vec![],
-                status: Some(ExecutorStatus {
-                    status: 
Some(executor_status::Status::Dead(String::default())),
-                }),
-            };
-
-            *heartbeat = new_heartbeat;
-
-            self.heartbeat_sender.send(heartbeat);
-        }
+        self.heartbeats.remove(executor_id);
 
         Ok(())
     }
 
-    async fn executor_heartbeat_stream(&self) -> 
Result<ExecutorHeartbeatStream> {
-        Ok(Box::pin(self.heartbeat_sender.subscribe()))
-    }
-
-    async fn executor_heartbeats(&self) -> Result<HashMap<String, 
ExecutorHeartbeat>> {
-        Ok(self
-            .heartbeats
+    fn executor_heartbeats(&self) -> HashMap<String, ExecutorHeartbeat> {
+        self.heartbeats
             .iter()
             .map(|r| (r.key().clone(), r.value().clone()))
-            .collect())
+            .collect()
+    }
+
+    fn get_executor_heartbeat(&self, executor_id: &str) -> 
Option<ExecutorHeartbeat> {
+        self.heartbeats.get(executor_id).map(|r| r.value().clone())
     }
 }
 
diff --git a/ballista/scheduler/src/cluster/mod.rs 
b/ballista/scheduler/src/cluster/mod.rs
index 35a5052a..8bea6d77 100644
--- a/ballista/scheduler/src/cluster/mod.rs
+++ b/ballista/scheduler/src/cluster/mod.rs
@@ -29,7 +29,7 @@ use crate::cluster::memory::{InMemoryClusterState, 
InMemoryJobState};
 use crate::cluster::storage::etcd::EtcdClient;
 use crate::cluster::storage::sled::SledClient;
 use crate::cluster::storage::KeyValueStore;
-use crate::config::{ClusterStorageConfig, SchedulerConfig};
+use crate::config::{ClusterStorageConfig, SchedulerConfig, TaskDistribution};
 use crate::scheduler_server::SessionBuilder;
 use crate::state::execution_graph::ExecutionGraph;
 use crate::state::executor_manager::ExecutorReservation;
@@ -195,20 +195,14 @@ impl BallistaCluster {
 /// by any schedulers with a shared `ClusterState`
 pub type ExecutorHeartbeatStream = Pin<Box<dyn Stream<Item = 
ExecutorHeartbeat> + Send>>;
 
-/// Method of distributing tasks to available executor slots
-#[derive(Debug, Clone, Copy)]
-pub enum TaskDistribution {
-    /// Eagerly assign tasks to executor slots. This will assign as many task 
slots per executor
-    /// as are currently available
-    Bias,
-    /// Distributed tasks evenely across executors. This will try and iterate 
through available executors
-    /// and assign one task to each executor until all tasks are assigned.
-    RoundRobin,
-}
-
 /// A trait that contains the necessary method to maintain a globally 
consistent view of cluster resources
 #[tonic::async_trait]
 pub trait ClusterState: Send + Sync + 'static {
+    /// Initialize when it's necessary, especially for state with backend 
storage
+    async fn init(&self) -> Result<()> {
+        Ok(())
+    }
+
     /// Reserve up to `num_slots` executor task slots. If not enough task 
slots are available, reserve
     /// as many as possible.
     ///
@@ -261,12 +255,11 @@ pub trait ClusterState: Send + Sync + 'static {
     /// Remove the executor from the cluster
     async fn remove_executor(&self, executor_id: &str) -> Result<()>;
 
-    /// Return the stream of executor heartbeats observed by all schedulers in 
the cluster.
-    /// This can be aggregated to provide an eventually consistent view of all 
executors within the cluster
-    async fn executor_heartbeat_stream(&self) -> 
Result<ExecutorHeartbeatStream>;
-
     /// Return a map of the last seen heartbeat for all active executors
-    async fn executor_heartbeats(&self) -> Result<HashMap<String, 
ExecutorHeartbeat>>;
+    fn executor_heartbeats(&self) -> HashMap<String, ExecutorHeartbeat>;
+
+    /// Get executor heartbeat for the provided executor ID. Return None if 
the executor does not exist
+    fn get_executor_heartbeat(&self, executor_id: &str) -> 
Option<ExecutorHeartbeat>;
 }
 
 /// Events related to the state of jobs. Implementations may or may not 
support all event types.
diff --git a/ballista/scheduler/src/cluster/test/mod.rs 
b/ballista/scheduler/src/cluster/test/mod.rs
index 4d4001c6..b9056bfd 100644
--- a/ballista/scheduler/src/cluster/test/mod.rs
+++ b/ballista/scheduler/src/cluster/test/mod.rs
@@ -22,11 +22,10 @@ use crate::state::executor_manager::ExecutorReservation;
 use crate::test_utils::{await_condition, mock_completed_task, mock_executor};
 use ballista_core::error::{BallistaError, Result};
 use ballista_core::serde::protobuf::job_status::Status;
-use ballista_core::serde::protobuf::{executor_status, ExecutorHeartbeat, 
JobStatus};
+use ballista_core::serde::protobuf::{executor_status, JobStatus};
 use ballista_core::serde::scheduler::{
     ExecutorData, ExecutorMetadata, ExecutorSpecification,
 };
-use dashmap::DashMap;
 use futures::StreamExt;
 use itertools::Itertools;
 use std::collections::HashSet;
@@ -36,27 +35,14 @@ use tokio::sync::RwLock;
 
 pub struct ClusterStateTest<S: ClusterState> {
     state: Arc<S>,
-    received_heartbeats: Arc<DashMap<String, ExecutorHeartbeat>>,
     reservations: Vec<ExecutorReservation>,
     total_task_slots: u32,
 }
 
 impl<S: ClusterState> ClusterStateTest<S> {
     pub async fn new(state: S) -> Result<Self> {
-        let received_heartbeats = Arc::new(DashMap::new());
-
-        let mut heartbeat_stream = state.executor_heartbeat_stream().await?;
-        let received_heartbeat_clone = received_heartbeats.clone();
-
-        tokio::spawn(async move {
-            while let Some(heartbeat) = heartbeat_stream.next().await {
-                received_heartbeat_clone.insert(heartbeat.executor_id.clone(), 
heartbeat);
-            }
-        });
-
         Ok(Self {
             state: Arc::new(state),
-            received_heartbeats,
             reservations: vec![],
             total_task_slots: 0,
         })
@@ -115,17 +101,19 @@ impl<S: ClusterState> ClusterStateTest<S> {
 
         // Heratbeat stream is async so wait up to 500ms for it to show up
         await_condition(Duration::from_millis(50), 10, || {
-            let found_heartbeat =
-                self.received_heartbeats.get(executor_id).map(|heartbeat| {
+            let found_heartbeat = 
self.state.get_executor_heartbeat(executor_id).map_or(
+                false,
+                |heartbeat| {
                     matches!(
                         heartbeat.status,
                         
Some(ballista_core::serde::generated::ballista::ExecutorStatus {
                             status: Some(executor_status::Status::Active(_))
                         })
                     )
-                });
+                },
+            );
 
-            futures::future::ready(Ok(found_heartbeat.unwrap_or_default()))
+            futures::future::ready(Ok(found_heartbeat))
         })
         .await?;
 
@@ -135,17 +123,19 @@ impl<S: ClusterState> ClusterStateTest<S> {
     pub async fn assert_dead_executor(self, executor_id: &str) -> Result<Self> 
{
         // Heratbeat stream is async so wait up to 500ms for it to show up
         await_condition(Duration::from_millis(50), 10, || {
-            let found_heartbeat =
-                self.received_heartbeats.get(executor_id).map(|heartbeat| {
+            let found_heartbeat = 
self.state.get_executor_heartbeat(executor_id).map_or(
+                true,
+                |heartbeat| {
                     matches!(
                         heartbeat.status,
                         
Some(ballista_core::serde::generated::ballista::ExecutorStatus {
                             status: Some(executor_status::Status::Dead(_))
                         })
                     )
-                });
+                },
+            );
 
-            futures::future::ready(Ok(found_heartbeat.unwrap_or_default()))
+            futures::future::ready(Ok(found_heartbeat))
         })
         .await?;
 
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index 087e3867..7a2518c3 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -26,7 +26,7 @@ use std::fmt;
 #[derive(Debug, Clone)]
 pub struct SchedulerConfig {
     /// Namespace of this scheduler. Schedulers using the same cluster storage 
and namespace
-    /// will share gloabl cluster state.
+    /// will share global cluster state.
     pub namespace: String,
     /// The external hostname of the scheduler
     pub external_host: String,
@@ -36,8 +36,8 @@ pub struct SchedulerConfig {
     pub scheduling_policy: TaskSchedulingPolicy,
     /// The event loop buffer size. for a system of high throughput, a larger 
value like 1000000 is recommended
     pub event_loop_buffer_size: u32,
-    /// The executor slots policy for the scheduler. For a cluster with single 
scheduler, round-robin-local is recommended
-    pub executor_slots_policy: SlotsPolicy,
+    /// Policy of distributing tasks to available executor slots. For a 
cluster with single scheduler, round-robin is recommended
+    pub task_distribution: TaskDistribution,
     /// The delayed interval for cleaning up finished job data, mainly the 
shuffle data, 0 means the cleaning up is disabled
     pub finished_job_data_clean_up_interval_seconds: u64,
     /// The delayed interval for cleaning up finished job state stored in the 
backend, 0 means the cleaning up is disabled.
@@ -62,7 +62,7 @@ impl Default for SchedulerConfig {
             bind_port: 50050,
             scheduling_policy: TaskSchedulingPolicy::PullStaged,
             event_loop_buffer_size: 10000,
-            executor_slots_policy: SlotsPolicy::Bias,
+            task_distribution: TaskDistribution::Bias,
             finished_job_data_clean_up_interval_seconds: 300,
             finished_job_state_clean_up_interval_seconds: 3600,
             advertise_flight_sql_endpoint: None,
@@ -131,8 +131,8 @@ impl SchedulerConfig {
         self
     }
 
-    pub fn with_executor_slots_policy(mut self, policy: SlotsPolicy) -> Self {
-        self.executor_slots_policy = policy;
+    pub fn with_task_distribution(mut self, policy: TaskDistribution) -> Self {
+        self.task_distribution = policy;
         self
     }
 
@@ -161,22 +161,20 @@ pub enum ClusterStorageConfig {
     Sled(Option<String>),
 }
 
-// an enum used to configure the executor slots policy
-// needs to be visible to code generated by configure_me
+/// Policy of distributing tasks to available executor slots
+///
+/// It needs to be visible to code generated by configure_me
 #[derive(Clone, ArgEnum, Copy, Debug, serde::Deserialize)]
-pub enum SlotsPolicy {
+pub enum TaskDistribution {
+    /// Eagerly assign tasks to executor slots. This will assign as many task 
slots per executor
+    /// as are currently available
     Bias,
+    /// Distributed tasks evenly across executors. This will try and iterate 
through available executors
+    /// and assign one task to each executor until all tasks are assigned.
     RoundRobin,
-    RoundRobinLocal,
 }
 
-impl SlotsPolicy {
-    pub fn is_local(&self) -> bool {
-        matches!(self, SlotsPolicy::RoundRobinLocal)
-    }
-}
-
-impl std::str::FromStr for SlotsPolicy {
+impl std::str::FromStr for TaskDistribution {
     type Err = String;
 
     fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
@@ -184,7 +182,7 @@ impl std::str::FromStr for SlotsPolicy {
     }
 }
 
-impl parse_arg::ParseArgFromStr for SlotsPolicy {
+impl parse_arg::ParseArgFromStr for TaskDistribution {
     fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
         write!(writer, "The executor slots policy for the scheduler")
     }
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs 
b/ballista/scheduler/src/scheduler_server/grpc.rs
index de07a06d..dfef9809 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -22,12 +22,12 @@ use std::convert::TryInto;
 use ballista_core::serde::protobuf::executor_registration::OptionalHost;
 use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpc;
 use ballista_core::serde::protobuf::{
-    executor_status, CancelJobParams, CancelJobResult, CleanJobDataParams,
-    CleanJobDataResult, ExecuteQueryParams, ExecuteQueryResult, 
ExecutorHeartbeat,
-    ExecutorStatus, ExecutorStoppedParams, ExecutorStoppedResult, 
GetFileMetadataParams,
-    GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, 
HeartBeatParams,
-    HeartBeatResult, PollWorkParams, PollWorkResult, RegisterExecutorParams,
-    RegisterExecutorResult, UpdateTaskStatusParams, UpdateTaskStatusResult,
+    CancelJobParams, CancelJobResult, CleanJobDataParams, CleanJobDataResult,
+    ExecuteQueryParams, ExecuteQueryResult, ExecutorHeartbeat, 
ExecutorStoppedParams,
+    ExecutorStoppedResult, GetFileMetadataParams, GetFileMetadataResult,
+    GetJobStatusParams, GetJobStatusResult, HeartBeatParams, HeartBeatResult,
+    PollWorkParams, PollWorkResult, RegisterExecutorParams, 
RegisterExecutorResult,
+    UpdateTaskStatusParams, UpdateTaskStatusResult,
 };
 use ballista_core::serde::scheduler::ExecutorMetadata;
 
@@ -47,7 +47,7 @@ use datafusion::prelude::SessionContext;
 use std::time::{SystemTime, UNIX_EPOCH};
 use tonic::{Request, Response, Status};
 
-use crate::scheduler_server::{timestamp_secs, SchedulerServer};
+use crate::scheduler_server::SchedulerServer;
 use crate::state::executor_manager::ExecutorReservation;
 
 #[tonic::async_trait]
@@ -72,19 +72,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
         } = request.into_inner()
         {
             trace!("Received poll_work request for {:?}", metadata);
-            // We might receive buggy poll work requests from dead executors.
-            if self
-                .state
-                .executor_manager
-                .is_dead_executor(&metadata.id.clone())
-            {
-                let error_msg = format!(
-                    "Receive buggy poll work request from dead Executor {}",
-                    metadata.id.clone()
-                );
-                warn!("{}", error_msg);
-                return Err(Status::internal(error_msg));
-            }
             let metadata = ExecutorMetadata {
                 id: metadata.id,
                 host: metadata
@@ -97,14 +84,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
                 grpc_port: metadata.grpc_port as u16,
                 specification: metadata.specification.unwrap().into(),
             };
-            let executor_heartbeat = ExecutorHeartbeat {
-                executor_id: metadata.id.clone(),
-                timestamp: timestamp_secs(),
-                metrics: vec![],
-                status: Some(ExecutorStatus {
-                    status: 
Some(executor_status::Status::Active("".to_string())),
-                }),
-            };
 
             self.state
                 .executor_manager
@@ -116,16 +95,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
                     Status::internal(msg)
                 })?;
 
-            self.state
-                .executor_manager
-                .save_executor_heartbeat(executor_heartbeat)
-                .await
-                .map_err(|e| {
-                    let msg = format!("Could not save executor heartbeat: 
{e}");
-                    error!("{}", msg);
-                    Status::internal(msg)
-                })?;
-
             self.update_task_status(&metadata.id, task_status)
                 .await
                 .map_err(|e| {
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs 
b/ballista/scheduler/src/scheduler_server/mod.rs
index 69a8c1b4..217711f7 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -189,7 +189,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
         tasks_status: Vec<TaskStatus>,
     ) -> Result<()> {
         // We might receive buggy task updates from dead executors.
-        if self.state.executor_manager.is_dead_executor(executor_id) {
+        if self.state.config.is_push_staged_scheduling()
+            && self.state.executor_manager.is_dead_executor(executor_id)
+        {
             let error_msg = format!(
                 "Receive buggy tasks status from dead Executor {executor_id}, 
task status update ignored."
             );
diff --git a/ballista/scheduler/src/state/executor_manager.rs 
b/ballista/scheduler/src/state/executor_manager.rs
index d3a9468e..63451014 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -17,26 +17,23 @@
 
 use std::time::{Duration, SystemTime, UNIX_EPOCH};
 
-use crate::cluster::TaskDistribution;
-
-use ballista_core::error::{BallistaError, Result};
+#[cfg(not(test))]
+use ballista_core::error::BallistaError;
+use ballista_core::error::Result;
 use ballista_core::serde::protobuf;
 
 use crate::cluster::ClusterState;
-use crate::config::SlotsPolicy;
+use crate::config::TaskDistribution;
 
 use crate::state::execution_graph::RunningTaskInfo;
 use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
 use ballista_core::serde::protobuf::{
-    executor_status, CancelTasksParams, ExecutorHeartbeat, ExecutorStatus,
-    RemoveJobDataParams,
+    executor_status, CancelTasksParams, ExecutorHeartbeat, RemoveJobDataParams,
 };
 use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
 use ballista_core::utils::create_grpc_client_connection;
-use dashmap::{DashMap, DashSet};
-use futures::StreamExt;
+use dashmap::DashMap;
 use log::{debug, error, info, warn};
-use parking_lot::Mutex;
 use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
 use tonic::transport::Channel;
@@ -91,75 +88,25 @@ pub const EXPIRE_DEAD_EXECUTOR_INTERVAL_SECS: u64 = 15;
 
 #[derive(Clone)]
 pub struct ExecutorManager {
-    // executor slot policy
-    slots_policy: SlotsPolicy,
     task_distribution: TaskDistribution,
     cluster_state: Arc<dyn ClusterState>,
-    // executor_id -> ExecutorMetadata map
-    executor_metadata: Arc<DashMap<String, ExecutorMetadata>>,
-    // executor_id -> ExecutorHeartbeat map
-    executors_heartbeat: Arc<DashMap<String, protobuf::ExecutorHeartbeat>>,
-    // executor_id -> ExecutorData map, only used when the slots policy is of 
local
-    executor_data: Arc<Mutex<HashMap<String, ExecutorData>>>,
-    // dead executor sets:
-    dead_executors: Arc<DashSet<String>>,
     clients: ExecutorClients,
 }
 
 impl ExecutorManager {
     pub(crate) fn new(
         cluster_state: Arc<dyn ClusterState>,
-        slots_policy: SlotsPolicy,
+        task_distribution: TaskDistribution,
     ) -> Self {
-        let task_distribution = match slots_policy {
-            SlotsPolicy::Bias => TaskDistribution::Bias,
-            SlotsPolicy::RoundRobin | SlotsPolicy::RoundRobinLocal => {
-                TaskDistribution::RoundRobin
-            }
-        };
-
         Self {
-            slots_policy,
             task_distribution,
             cluster_state,
-            executor_metadata: Arc::new(DashMap::new()),
-            executors_heartbeat: Arc::new(DashMap::new()),
-            executor_data: Arc::new(Mutex::new(HashMap::new())),
-            dead_executors: Arc::new(DashSet::new()),
             clients: Default::default(),
         }
     }
 
-    /// Initialize a background process that will listen for executor 
heartbeats and update the in-memory cache
-    /// of executor heartbeats
     pub async fn init(&self) -> Result<()> {
-        self.init_active_executor_heartbeats().await?;
-
-        let mut heartbeat_stream = 
self.cluster_state.executor_heartbeat_stream().await?;
-
-        info!("Initializing heartbeat listener");
-
-        let heartbeats = self.executors_heartbeat.clone();
-        let dead_executors = self.dead_executors.clone();
-        tokio::task::spawn(async move {
-            while let Some(heartbeat) = heartbeat_stream.next().await {
-                let executor_id = heartbeat.executor_id.clone();
-
-                match heartbeat
-                    .status
-                    .as_ref()
-                    .and_then(|status| status.status.as_ref())
-                {
-                    Some(executor_status::Status::Dead(_)) => {
-                        heartbeats.remove(&executor_id);
-                        dead_executors.insert(executor_id);
-                    }
-                    _ => {
-                        heartbeats.insert(executor_id, heartbeat);
-                    }
-                }
-            }
-        });
+        self.cluster_state.init().await?;
 
         Ok(())
     }
@@ -168,88 +115,13 @@ impl ExecutorManager {
     /// for scheduling.
     /// This operation is atomic, so if this method return an Err, no slots 
have been reserved.
     pub async fn reserve_slots(&self, n: u32) -> 
Result<Vec<ExecutorReservation>> {
-        if self.slots_policy.is_local() {
-            self.reserve_slots_local(n).await
-        } else {
-            let alive_executors = self.get_alive_executors_within_one_minute();
-
-            debug!("Alive executors: {alive_executors:?}");
-
-            self.cluster_state
-                .reserve_slots(n, self.task_distribution, 
Some(alive_executors))
-                .await
-        }
-    }
-
-    async fn reserve_slots_local(&self, n: u32) -> 
Result<Vec<ExecutorReservation>> {
-        debug!("Attempting to reserve {} executor slots", n);
-
         let alive_executors = self.get_alive_executors_within_one_minute();
 
-        match self.slots_policy {
-            SlotsPolicy::RoundRobinLocal => {
-                self.reserve_slots_local_round_robin(n, alive_executors)
-                    .await
-            }
-            _ => Err(BallistaError::General(format!(
-                "Reservation policy {:?} is not supported",
-                self.slots_policy
-            ))),
-        }
-    }
-
-    /// Create ExecutorReservation in a round robin way to evenly assign tasks 
to executors
-    async fn reserve_slots_local_round_robin(
-        &self,
-        mut n: u32,
-        alive_executors: HashSet<String>,
-    ) -> Result<Vec<ExecutorReservation>> {
-        let mut executor_data = self.executor_data.lock();
-
-        let mut available_executor_data: Vec<&mut ExecutorData> = executor_data
-            .values_mut()
-            .filter_map(|data| {
-                (data.available_task_slots > 0
-                    && alive_executors.contains(&data.executor_id))
-                .then_some(data)
-            })
-            .collect();
-        available_executor_data
-            .sort_by(|a, b| Ord::cmp(&b.available_task_slots, 
&a.available_task_slots));
-
-        let mut reservations: Vec<ExecutorReservation> = vec![];
-
-        // Exclusive
-        let mut last_updated_idx = 0usize;
-        loop {
-            let n_before = n;
-            for (idx, data) in available_executor_data.iter_mut().enumerate() {
-                if n == 0 {
-                    break;
-                }
-
-                // Since the vector is sorted in descending order,
-                // if finding one executor has not enough slots, the following 
will have not enough, either
-                if data.available_task_slots == 0 {
-                    break;
-                }
-
-                reservations
-                    
.push(ExecutorReservation::new_free(data.executor_id.clone()));
-                data.available_task_slots -= 1;
-                n -= 1;
-
-                if idx >= last_updated_idx {
-                    last_updated_idx = idx + 1;
-                }
-            }
-
-            if n_before == n {
-                break;
-            }
-        }
+        debug!("Alive executors: {alive_executors:?}");
 
-        Ok(reservations)
+        self.cluster_state
+            .reserve_slots(n, self.task_distribution, Some(alive_executors))
+            .await
     }
 
     /// Returned reserved task slots to the pool of available slots. This 
operation is atomic
@@ -258,36 +130,7 @@ impl ExecutorManager {
         &self,
         reservations: Vec<ExecutorReservation>,
     ) -> Result<()> {
-        if self.slots_policy.is_local() {
-            self.cancel_reservations_local(reservations).await
-        } else {
-            self.cluster_state.cancel_reservations(reservations).await
-        }
-    }
-
-    async fn cancel_reservations_local(
-        &self,
-        reservations: Vec<ExecutorReservation>,
-    ) -> Result<()> {
-        let mut executor_slots: HashMap<String, u32> = HashMap::new();
-        for reservation in reservations {
-            if let Some(slots) = 
executor_slots.get_mut(&reservation.executor_id) {
-                *slots += 1;
-            } else {
-                executor_slots.insert(reservation.executor_id, 1);
-            }
-        }
-
-        let mut executor_data = self.executor_data.lock();
-        for (id, released_slots) in executor_slots.into_iter() {
-            if let Some(slots) = executor_data.get_mut(&id) {
-                slots.available_task_slots += released_slots;
-            } else {
-                warn!("ExecutorData for {} is not cached in memory", id);
-            }
-        }
-
-        Ok(())
+        self.cluster_state.cancel_reservations(reservations).await
     }
 
     /// Send rpc to Executors to cancel the running tasks
@@ -411,15 +254,12 @@ impl ExecutorManager {
 
     /// Get a list of all executors along with the timestamp of their last 
recorded heartbeat
     pub async fn get_executor_state(&self) -> Result<Vec<(ExecutorMetadata, 
Duration)>> {
-        let heartbeat_timestamps: Vec<(String, u64)> = {
-            self.executors_heartbeat
-                .iter()
-                .map(|item| {
-                    let (executor_id, heartbeat) = item.pair();
-                    (executor_id.clone(), heartbeat.timestamp)
-                })
-                .collect()
-        };
+        let heartbeat_timestamps: Vec<(String, u64)> = self
+            .cluster_state
+            .executor_heartbeats()
+            .into_iter()
+            .map(|(executor_id, heartbeat)| (executor_id, heartbeat.timestamp))
+            .collect();
 
         let mut state: Vec<(ExecutorMetadata, Duration)> = vec![];
         for (executor_id, ts) in heartbeat_timestamps {
@@ -437,12 +277,6 @@ impl ExecutorManager {
         &self,
         executor_id: &str,
     ) -> Result<ExecutorMetadata> {
-        {
-            if let Some(cached) = self.executor_metadata.get(executor_id) {
-                return Ok(cached.clone());
-            }
-        }
-
         self.cluster_state.get_executor_metadata(executor_id).await
     }
 
@@ -471,36 +305,11 @@ impl ExecutorManager {
 
         self.test_scheduler_connectivity(&metadata).await?;
 
-        let current_ts = SystemTime::now()
-            .duration_since(UNIX_EPOCH)
-            .map_err(|e| {
-                BallistaError::Internal(format!("Error getting current 
timestamp: {e:?}"))
-            })?
-            .as_secs();
-
-        let initial_heartbeat = ExecutorHeartbeat {
-            executor_id: metadata.id.clone(),
-            timestamp: current_ts,
-            metrics: vec![],
-            status: Some(ExecutorStatus {
-                status: 
Some(executor_status::Status::Active(String::default())),
-            }),
-        };
-
         if !reserve {
-            if self.slots_policy.is_local() {
-                let mut executor_data = self.executor_data.lock();
-                executor_data
-                    .insert(specification.executor_id.clone(), 
specification.clone());
-            }
-
             self.cluster_state
                 .register_executor(metadata, specification.clone(), reserve)
                 .await?;
 
-            self.executors_heartbeat
-                .insert(initial_heartbeat.executor_id.clone(), 
initial_heartbeat);
-
             Ok(vec![])
         } else {
             let mut specification = specification;
@@ -512,19 +321,10 @@ impl ExecutorManager {
 
             specification.available_task_slots = 0;
 
-            if self.slots_policy.is_local() {
-                let mut executor_data = self.executor_data.lock();
-                executor_data
-                    .insert(specification.executor_id.clone(), 
specification.clone());
-            }
-
             self.cluster_state
                 .register_executor(metadata, specification, reserve)
                 .await?;
 
-            self.executors_heartbeat
-                .insert(initial_heartbeat.executor_id.clone(), 
initial_heartbeat);
-
             Ok(reservations)
         }
     }
@@ -536,21 +336,7 @@ impl ExecutorManager {
         reason: Option<String>,
     ) -> Result<()> {
         info!("Removing executor {}: {:?}", executor_id, reason);
-        self.cluster_state.remove_executor(executor_id).await?;
-
-        let executor_id = executor_id.to_owned();
-
-        self.executors_heartbeat.remove(&executor_id);
-
-        // Remove executor data cache for dead executors
-        {
-            let mut executor_data = self.executor_data.lock();
-            executor_data.remove(&executor_id);
-        }
-
-        self.dead_executors.insert(executor_id);
-
-        Ok(())
+        self.cluster_state.remove_executor(executor_id).await
     }
 
     #[cfg(not(test))]
@@ -587,30 +373,20 @@ impl ExecutorManager {
             .save_executor_heartbeat(heartbeat.clone())
             .await?;
 
-        self.executors_heartbeat
-            .insert(heartbeat.executor_id.clone(), heartbeat);
-
         Ok(())
     }
 
     pub(crate) fn is_dead_executor(&self, executor_id: &str) -> bool {
-        self.dead_executors.contains(executor_id)
-    }
-
-    /// Initialize the set of active executor heartbeats from storage
-    async fn init_active_executor_heartbeats(&self) -> Result<()> {
-        let heartbeats = self.cluster_state.executor_heartbeats().await?;
-
-        for (executor_id, heartbeat) in heartbeats {
-            // let data: protobuf::ExecutorHeartbeat = 
decode_protobuf(&value)?;
-            if let Some(ExecutorStatus {
-                status: Some(executor_status::Status::Active(_)),
-            }) = heartbeat.status
-            {
-                self.executors_heartbeat.insert(executor_id, heartbeat);
-            }
-        }
-        Ok(())
+        self.cluster_state
+            .get_executor_heartbeat(executor_id)
+            .map_or(true, |heartbeat| {
+                matches!(
+                    heartbeat.status,
+                    
Some(ballista_core::serde::generated::ballista::ExecutorStatus {
+                        status: Some(executor_status::Status::Dead(_))
+                    })
+                )
+            })
     }
 
     /// Retrieve the set of all executor IDs where the executor has been 
observed in the last
@@ -619,11 +395,10 @@ impl ExecutorManager {
         &self,
         last_seen_ts_threshold: u64,
     ) -> HashSet<String> {
-        self.executors_heartbeat
+        self.cluster_state
+            .executor_heartbeats()
             .iter()
-            .filter_map(|pair| {
-                let (exec, heartbeat) = pair.pair();
-
+            .filter_map(|(exec, heartbeat)| {
                 let active = matches!(
                     heartbeat
                         .status
@@ -658,12 +433,10 @@ impl ExecutorManager {
             .unwrap_or_else(|| Duration::from_secs(0))
             .as_secs();
 
-        let expired_executors = self
-            .executors_heartbeat
+        self.cluster_state
+            .executor_heartbeats()
             .iter()
-            .filter_map(|pair| {
-                let (_exec, heartbeat) = pair.pair();
-
+            .filter_map(|(_exec, heartbeat)| {
                 let terminating = matches!(
                     heartbeat
                         .status
@@ -680,8 +453,7 @@ impl ExecutorManager {
                 ((terminating && grace_period_expired) || expired)
                     .then(|| heartbeat.clone())
             })
-            .collect::<Vec<_>>();
-        expired_executors
+            .collect::<Vec<_>>()
     }
 
     pub(crate) fn get_alive_executors_within_one_minute(&self) -> 
HashSet<String> {
@@ -698,7 +470,7 @@ impl ExecutorManager {
 #[cfg(test)]
 mod test {
 
-    use crate::config::SlotsPolicy;
+    use crate::config::TaskDistribution;
 
     use crate::scheduler_server::timestamp_secs;
     use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
@@ -712,18 +484,19 @@ mod test {
 
     #[tokio::test]
     async fn test_reserve_and_cancel() -> Result<()> {
-        test_reserve_and_cancel_inner(SlotsPolicy::Bias).await?;
-        test_reserve_and_cancel_inner(SlotsPolicy::RoundRobin).await?;
-        test_reserve_and_cancel_inner(SlotsPolicy::RoundRobinLocal).await?;
+        test_reserve_and_cancel_inner(TaskDistribution::Bias).await?;
+        test_reserve_and_cancel_inner(TaskDistribution::RoundRobin).await?;
 
         Ok(())
     }
 
-    async fn test_reserve_and_cancel_inner(slots_policy: SlotsPolicy) -> 
Result<()> {
+    async fn test_reserve_and_cancel_inner(
+        task_distribution: TaskDistribution,
+    ) -> Result<()> {
         let cluster = test_cluster_context();
 
         let executor_manager =
-            ExecutorManager::new(cluster.cluster_state(), slots_policy);
+            ExecutorManager::new(cluster.cluster_state(), task_distribution);
 
         let executors = test_executors(10, 4);
 
@@ -739,7 +512,7 @@ mod test {
         assert_eq!(
             reservations.len(),
             40,
-            "Expected 40 reservations for policy {slots_policy:?}"
+            "Expected 40 reservations for policy {task_distribution:?}"
         );
 
         // Now cancel them
@@ -751,7 +524,7 @@ mod test {
         assert_eq!(
             reservations.len(),
             40,
-            "Expected 40 reservations for policy {slots_policy:?}"
+            "Expected 40 reservations for policy {task_distribution:?}"
         );
 
         Ok(())
@@ -759,18 +532,19 @@ mod test {
 
     #[tokio::test]
     async fn test_reserve_partial() -> Result<()> {
-        test_reserve_partial_inner(SlotsPolicy::Bias).await?;
-        test_reserve_partial_inner(SlotsPolicy::RoundRobin).await?;
-        test_reserve_partial_inner(SlotsPolicy::RoundRobinLocal).await?;
+        test_reserve_partial_inner(TaskDistribution::Bias).await?;
+        test_reserve_partial_inner(TaskDistribution::RoundRobin).await?;
 
         Ok(())
     }
 
-    async fn test_reserve_partial_inner(slots_policy: SlotsPolicy) -> 
Result<()> {
+    async fn test_reserve_partial_inner(
+        task_distribution: TaskDistribution,
+    ) -> Result<()> {
         let cluster = test_cluster_context();
 
         let executor_manager =
-            ExecutorManager::new(cluster.cluster_state(), slots_policy);
+            ExecutorManager::new(cluster.cluster_state(), task_distribution);
 
         let executors = test_executors(10, 4);
 
@@ -810,14 +584,15 @@ mod test {
 
     #[tokio::test]
     async fn test_reserve_concurrent() -> Result<()> {
-        test_reserve_concurrent_inner(SlotsPolicy::Bias).await?;
-        test_reserve_concurrent_inner(SlotsPolicy::RoundRobin).await?;
-        test_reserve_concurrent_inner(SlotsPolicy::RoundRobinLocal).await?;
+        test_reserve_concurrent_inner(TaskDistribution::Bias).await?;
+        test_reserve_concurrent_inner(TaskDistribution::RoundRobin).await?;
 
         Ok(())
     }
 
-    async fn test_reserve_concurrent_inner(slots_policy: SlotsPolicy) -> 
Result<()> {
+    async fn test_reserve_concurrent_inner(
+        task_distribution: TaskDistribution,
+    ) -> Result<()> {
         let (sender, mut receiver) =
             
tokio::sync::mpsc::channel::<Result<Vec<ExecutorReservation>>>(1000);
 
@@ -825,7 +600,7 @@ mod test {
 
         let cluster = test_cluster_context();
         let executor_manager =
-            ExecutorManager::new(cluster.cluster_state(), slots_policy);
+            ExecutorManager::new(cluster.cluster_state(), task_distribution);
 
         for (executor_metadata, executor_data) in executors {
             executor_manager
@@ -860,18 +635,19 @@ mod test {
 
     #[tokio::test]
     async fn test_register_reserve() -> Result<()> {
-        test_register_reserve_inner(SlotsPolicy::Bias).await?;
-        test_register_reserve_inner(SlotsPolicy::RoundRobin).await?;
-        test_register_reserve_inner(SlotsPolicy::RoundRobinLocal).await?;
+        test_register_reserve_inner(TaskDistribution::Bias).await?;
+        test_register_reserve_inner(TaskDistribution::RoundRobin).await?;
 
         Ok(())
     }
 
-    async fn test_register_reserve_inner(slots_policy: SlotsPolicy) -> 
Result<()> {
+    async fn test_register_reserve_inner(
+        task_distribution: TaskDistribution,
+    ) -> Result<()> {
         let cluster = test_cluster_context();
 
         let executor_manager =
-            ExecutorManager::new(cluster.cluster_state(), slots_policy);
+            ExecutorManager::new(cluster.cluster_state(), task_distribution);
 
         let executors = test_executors(10, 4);
 
@@ -893,18 +669,19 @@ mod test {
 
     #[tokio::test]
     async fn test_ignore_fenced_executors() -> Result<()> {
-        test_ignore_fenced_executors_inner(SlotsPolicy::Bias).await?;
-        test_ignore_fenced_executors_inner(SlotsPolicy::RoundRobin).await?;
-        
test_ignore_fenced_executors_inner(SlotsPolicy::RoundRobinLocal).await?;
+        test_ignore_fenced_executors_inner(TaskDistribution::Bias).await?;
+        
test_ignore_fenced_executors_inner(TaskDistribution::RoundRobin).await?;
 
         Ok(())
     }
 
-    async fn test_ignore_fenced_executors_inner(slots_policy: SlotsPolicy) -> 
Result<()> {
+    async fn test_ignore_fenced_executors_inner(
+        task_distribution: TaskDistribution,
+    ) -> Result<()> {
         let cluster = test_cluster_context();
 
         let executor_manager =
-            ExecutorManager::new(cluster.cluster_state(), slots_policy);
+            ExecutorManager::new(cluster.cluster_state(), task_distribution);
 
         // Setup two executors initially
         let executors = test_executors(2, 4);
diff --git a/ballista/scheduler/src/state/mod.rs 
b/ballista/scheduler/src/state/mod.rs
index 57ea0be6..0c3b8bf0 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -116,7 +116,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
         Self {
             executor_manager: ExecutorManager::new(
                 cluster.cluster_state(),
-                config.executor_slots_policy,
+                config.task_distribution,
             ),
             task_manager: TaskManager::new(
                 cluster.job_state(),
@@ -140,7 +140,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
         Self {
             executor_manager: ExecutorManager::new(
                 cluster.cluster_state(),
-                config.executor_slots_policy,
+                config.task_distribution,
             ),
             task_manager: TaskManager::with_launcher(
                 cluster.job_state(),

Reply via email to