This is an automated email from the ASF dual-hosted git repository.
nju_yaho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 0b496e59 Remove redundant fields in ExecutorManager (#728)
0b496e59 is described below
commit 0b496e59955735e0e3fbef60b107c4fa0f879b0d
Author: yahoNanJing <[email protected]>
AuthorDate: Fri Mar 31 07:46:33 2023 +0800
Remove redundant fields in ExecutorManager (#728)
* Remove redundant fields in ExecutorManager
* Remove RoundRobinLocal slot policy
* Rename SlotsPolicy to TaskDistribution
* Introduce executor heartbeat cache to KeyValueState
* Remove buggy executor check for pull-staged task scheduling
* Add ExecutorMetadata cache for KeyValueState
---------
Co-authored-by: yangzhong <[email protected]>
---
ballista/scheduler/scheduler_config_spec.toml | 8 +-
ballista/scheduler/src/bin/main.rs | 2 +-
ballista/scheduler/src/cluster/kv.rs | 199 ++++++++-----
ballista/scheduler/src/cluster/memory.rs | 74 ++---
ballista/scheduler/src/cluster/mod.rs | 27 +-
ballista/scheduler/src/cluster/test/mod.rs | 36 +--
ballista/scheduler/src/config.rs | 34 +--
ballista/scheduler/src/scheduler_server/grpc.rs | 45 +--
ballista/scheduler/src/scheduler_server/mod.rs | 4 +-
ballista/scheduler/src/state/executor_manager.rs | 363 +++++------------------
ballista/scheduler/src/state/mod.rs | 4 +-
11 files changed, 283 insertions(+), 513 deletions(-)
diff --git a/ballista/scheduler/scheduler_config_spec.toml
b/ballista/scheduler/scheduler_config_spec.toml
index 93ffe4c1..2e7d0d65 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -96,10 +96,10 @@ default = "3600"
doc = "Delayed interval for cleaning up finished job state. Default: 3600"
[[param]]
-name = "executor_slots_policy"
-type = "ballista_scheduler::config::SlotsPolicy"
-doc = "The executor slots policy for the scheduler, possible values: bias,
round-robin, round-robin-local. Default: bias"
-default = "ballista_scheduler::config::SlotsPolicy::Bias"
+name = "task_distribution"
+type = "ballista_scheduler::config::TaskDistribution"
+doc = "The policy of distributing tasks to available executor slots, possible
values: bias, round-robin. Default: bias"
+default = "ballista_scheduler::config::TaskDistribution::Bias"
[[param]]
name = "plugin_dir"
diff --git a/ballista/scheduler/src/bin/main.rs
b/ballista/scheduler/src/bin/main.rs
index 93904901..7511f4d3 100644
--- a/ballista/scheduler/src/bin/main.rs
+++ b/ballista/scheduler/src/bin/main.rs
@@ -109,7 +109,7 @@ async fn main() -> Result<()> {
bind_port: opt.bind_port,
scheduling_policy: opt.scheduler_policy,
event_loop_buffer_size: opt.event_loop_buffer_size,
- executor_slots_policy: opt.executor_slots_policy,
+ task_distribution: opt.task_distribution,
finished_job_data_clean_up_interval_seconds: opt
.finished_job_data_clean_up_interval_seconds,
finished_job_state_clean_up_interval_seconds: opt
diff --git a/ballista/scheduler/src/cluster/kv.rs
b/ballista/scheduler/src/cluster/kv.rs
index e2869977..eb164753 100644
--- a/ballista/scheduler/src/cluster/kv.rs
+++ b/ballista/scheduler/src/cluster/kv.rs
@@ -20,7 +20,7 @@ use crate::cluster::{
reserve_slots_bias, reserve_slots_round_robin, ClusterState,
ExecutorHeartbeatStream,
JobState, JobStateEvent, JobStateEventStream, JobStatus, TaskDistribution,
};
-use crate::scheduler_server::SessionBuilder;
+use crate::scheduler_server::{timestamp_secs, SessionBuilder};
use crate::state::execution_graph::ExecutionGraph;
use crate::state::executor_manager::ExecutorReservation;
use crate::state::session_manager::create_datafusion_context;
@@ -42,12 +42,11 @@ use datafusion_proto::physical_plan::AsExecutionPlan;
use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
use futures::StreamExt;
use itertools::Itertools;
-use log::warn;
+use log::{info, warn};
use prost::Message;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::sync::Arc;
-use std::time::{SystemTime, UNIX_EPOCH};
/// State implementation based on underlying `KeyValueStore`
pub struct KeyValueState<
@@ -57,6 +56,10 @@ pub struct KeyValueState<
> {
/// Underlying `KeyValueStore`
store: S,
+ /// ExecutorMetadata cache, executor_id -> ExecutorMetadata
+ executors: Arc<DashMap<String, ExecutorMetadata>>,
+ /// ExecutorHeartbeat cache, executor_id -> ExecutorHeartbeat
+ executor_heartbeats: Arc<DashMap<String, ExecutorHeartbeat>>,
/// Codec used to serialize/deserialize execution plan
codec: BallistaCodec<T, U>,
/// Name of current scheduler. Should be `{host}:{port}`
@@ -79,18 +82,98 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U:
'static + AsExecutionPlan>
) -> Self {
Self {
store,
+ executors: Arc::new(DashMap::new()),
+ executor_heartbeats: Arc::new(DashMap::new()),
scheduler: scheduler.into(),
codec,
queued_jobs: DashMap::new(),
session_builder,
}
}
+
+ /// Initialize the set of active executor heartbeats from storage
+ async fn init_active_executor_heartbeats(&self) -> Result<()> {
+ let heartbeats = self.store.scan(Keyspace::Heartbeats, None).await?;
+
+ for (_, value) in heartbeats {
+ let data: ExecutorHeartbeat = decode_protobuf(&value)?;
+ if let Some(protobuf::ExecutorStatus {
+ status: Some(protobuf::executor_status::Status::Active(_)),
+ }) = &data.status
+ {
+ self.executor_heartbeats
+ .insert(data.executor_id.clone(), data);
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Return the stream of executor heartbeats observed by all schedulers in
the cluster.
+ /// This can be aggregated to provide an eventually consistent view of all
executors within the cluster
+ async fn executor_heartbeat_stream(&self) ->
Result<ExecutorHeartbeatStream> {
+ let events = self
+ .store
+ .watch(Keyspace::Heartbeats, String::default())
+ .await?;
+
+ Ok(events
+ .filter_map(|event| {
+ futures::future::ready(match event {
+ WatchEvent::Put(_, value) => {
+ if let Ok(heartbeat) =
+ decode_protobuf::<ExecutorHeartbeat>(&value)
+ {
+ Some(heartbeat)
+ } else {
+ None
+ }
+ }
+ WatchEvent::Delete(_) => None,
+ })
+ })
+ .boxed())
+ }
}
#[async_trait]
impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
ClusterState for KeyValueState<S, T, U>
{
+ /// Initialize a background process that will listen for executor
heartbeats and update the in-memory cache
+ /// of executor heartbeats
+ async fn init(&self) -> Result<()> {
+ self.init_active_executor_heartbeats().await?;
+
+ let mut heartbeat_stream = self.executor_heartbeat_stream().await?;
+
+ info!("Initializing heartbeat listener");
+
+ let heartbeats = self.executor_heartbeats.clone();
+ let executors = self.executors.clone();
+ tokio::task::spawn(async move {
+ while let Some(heartbeat) = heartbeat_stream.next().await {
+ let executor_id = heartbeat.executor_id.clone();
+
+ match heartbeat
+ .status
+ .as_ref()
+ .and_then(|status| status.status.as_ref())
+ {
+ Some(protobuf::executor_status::Status::Dead(_)) => {
+ heartbeats.remove(&executor_id);
+ executors.remove(&executor_id);
+ }
+ _ => {
+ heartbeats.insert(executor_id, heartbeat);
+ }
+ }
+ }
+ });
+
+ Ok(())
+ }
+
async fn reserve_slots(
&self,
num_slots: u32,
@@ -240,22 +323,17 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U:
'static + AsExecutionPlan>
) -> Result<Vec<ExecutorReservation>> {
let executor_id = metadata.id.clone();
- let current_ts = SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .map_err(|e| {
- BallistaError::Internal(format!("Error getting current
timestamp: {e:?}"))
- })?
- .as_secs();
-
//TODO this should be in a transaction
// Now that we know we can connect, save the metadata and slots
self.save_executor_metadata(metadata).await?;
self.save_executor_heartbeat(ExecutorHeartbeat {
executor_id: executor_id.clone(),
- timestamp: current_ts,
+ timestamp: timestamp_secs(),
metrics: vec![],
status: Some(protobuf::ExecutorStatus {
- status:
Some(protobuf::executor_status::Status::Active("".to_string())),
+ status: Some(
+
protobuf::executor_status::Status::Active(String::default()),
+ ),
}),
})
.await?;
@@ -341,39 +419,54 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U:
'static + AsExecutionPlan>
async fn save_executor_metadata(&self, metadata: ExecutorMetadata) ->
Result<()> {
let executor_id = metadata.id.clone();
- let proto: protobuf::ExecutorMetadata = metadata.into();
+ let proto: protobuf::ExecutorMetadata = metadata.clone().into();
self.store
- .put(Keyspace::Executors, executor_id, proto.encode_to_vec())
- .await
+ .put(
+ Keyspace::Executors,
+ executor_id.clone(),
+ proto.encode_to_vec(),
+ )
+ .await?;
+
+ self.executors.insert(executor_id, metadata);
+
+ Ok(())
}
async fn get_executor_metadata(&self, executor_id: &str) ->
Result<ExecutorMetadata> {
- let value = self.store.get(Keyspace::Executors, executor_id).await?;
+ let metadata = if let Some(metadata) = self.executors.get(executor_id)
{
+ metadata.value().clone()
+ } else {
+ let value = self.store.get(Keyspace::Executors,
executor_id).await?;
+ let decoded =
+ decode_into::<protobuf::ExecutorMetadata,
ExecutorMetadata>(&value)?;
+ self.executors
+ .insert(executor_id.to_string(), decoded.clone());
- let decoded =
- decode_into::<protobuf::ExecutorMetadata,
ExecutorMetadata>(&value)?;
- Ok(decoded)
+ decoded
+ };
+
+ Ok(metadata)
}
async fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) ->
Result<()> {
let executor_id = heartbeat.executor_id.clone();
self.store
- .put(Keyspace::Heartbeats, executor_id, heartbeat.encode_to_vec())
- .await
+ .put(
+ Keyspace::Heartbeats,
+ executor_id.clone(),
+ heartbeat.clone().encode_to_vec(),
+ )
+ .await?;
+ self.executor_heartbeats.insert(executor_id, heartbeat);
+ Ok(())
}
async fn remove_executor(&self, executor_id: &str) -> Result<()> {
- let current_ts = SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .map_err(|e| {
- BallistaError::Internal(format!("Error getting current
timestamp: {e:?}"))
- })?
- .as_secs();
-
let value = ExecutorHeartbeat {
executor_id: executor_id.to_owned(),
- timestamp: current_ts,
+ timestamp: timestamp_secs(),
metrics: vec![],
status: Some(protobuf::ExecutorStatus {
status:
Some(protobuf::executor_status::Status::Dead("".to_string())),
@@ -384,52 +477,24 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U:
'static + AsExecutionPlan>
self.store
.put(Keyspace::Heartbeats, executor_id.to_owned(), value)
.await?;
+ self.executor_heartbeats.remove(executor_id);
// TODO Check the Executor reservation logic for push-based scheduling
Ok(())
}
- async fn executor_heartbeat_stream(&self) ->
Result<ExecutorHeartbeatStream> {
- let events = self
- .store
- .watch(Keyspace::Heartbeats, String::default())
- .await?;
-
- Ok(events
- .filter_map(|event| {
- futures::future::ready(match event {
- WatchEvent::Put(_, value) => {
- if let Ok(heartbeat) =
- decode_protobuf::<ExecutorHeartbeat>(&value)
- {
- Some(heartbeat)
- } else {
- None
- }
- }
- WatchEvent::Delete(_) => None,
- })
- })
- .boxed())
+ fn executor_heartbeats(&self) -> HashMap<String, ExecutorHeartbeat> {
+ self.executor_heartbeats
+ .iter()
+ .map(|r| (r.key().clone(), r.value().clone()))
+ .collect()
}
- async fn executor_heartbeats(&self) -> Result<HashMap<String,
ExecutorHeartbeat>> {
- let heartbeats = self.store.scan(Keyspace::Heartbeats, None).await?;
-
- let mut heartbeat_map = HashMap::with_capacity(heartbeats.len());
-
- for (_, value) in heartbeats {
- let data: ExecutorHeartbeat = decode_protobuf(&value)?;
- if let Some(protobuf::ExecutorStatus {
- status: Some(protobuf::executor_status::Status::Active(_)),
- }) = &data.status
- {
- heartbeat_map.insert(data.executor_id.clone(), data);
- }
- }
-
- Ok(heartbeat_map)
+ fn get_executor_heartbeat(&self, executor_id: &str) ->
Option<ExecutorHeartbeat> {
+ self.executor_heartbeats
+ .get(executor_id)
+ .map(|r| r.value().clone())
}
}
diff --git a/ballista/scheduler/src/cluster/memory.rs
b/ballista/scheduler/src/cluster/memory.rs
index 95c6b7a2..6d7d7bc6 100644
--- a/ballista/scheduler/src/cluster/memory.rs
+++ b/ballista/scheduler/src/cluster/memory.rs
@@ -16,8 +16,8 @@
// under the License.
use crate::cluster::{
- reserve_slots_bias, reserve_slots_round_robin, ClusterState,
ExecutorHeartbeatStream,
- JobState, JobStateEvent, JobStateEventStream, JobStatus, TaskDistribution,
+ reserve_slots_bias, reserve_slots_round_robin, ClusterState, JobState,
JobStateEvent,
+ JobStateEventStream, JobStatus, TaskDistribution,
};
use crate::state::execution_graph::ExecutionGraph;
use crate::state::executor_manager::ExecutorReservation;
@@ -53,9 +53,6 @@ pub struct InMemoryClusterState {
executors: DashMap<String, ExecutorMetadata>,
/// Last heartbeat received for each executor
heartbeats: DashMap<String, ExecutorHeartbeat>,
- /// Broadcast channel sender for heartbeats, If `None` there are not
- /// subscribers
- heartbeat_sender: ClusterEventSender<ExecutorHeartbeat>,
}
#[async_trait]
@@ -163,14 +160,18 @@ impl ClusterState for InMemoryClusterState {
mut spec: ExecutorData,
reserve: bool,
) -> Result<Vec<ExecutorReservation>> {
- let heartbeat = ExecutorHeartbeat {
- executor_id: metadata.id.clone(),
+ let executor_id = metadata.id.clone();
+
+ self.save_executor_metadata(metadata).await?;
+ self.save_executor_heartbeat(ExecutorHeartbeat {
+ executor_id: executor_id.clone(),
timestamp: timestamp_secs(),
metrics: vec![],
status: Some(ExecutorStatus {
status:
Some(executor_status::Status::Active(String::default())),
}),
- };
+ })
+ .await?;
let mut guard = self.task_slots.lock();
@@ -178,38 +179,29 @@ impl ClusterState for InMemoryClusterState {
if let Some((idx, _)) = guard
.task_slots
.iter()
- .find_position(|slots| slots.executor_id == metadata.id)
+ .find_position(|slots| slots.executor_id == executor_id)
{
guard.task_slots.swap_remove(idx);
}
if reserve {
let slots = std::mem::take(&mut spec.available_task_slots) as
usize;
-
let reservations = (0..slots)
- .map(|_| ExecutorReservation::new_free(metadata.id.clone()))
+ .map(|_| ExecutorReservation::new_free(executor_id.clone()))
.collect();
- self.executors.insert(metadata.id.clone(), metadata.clone());
-
guard.task_slots.push(AvailableTaskSlots {
- executor_id: metadata.id,
+ executor_id,
slots: 0,
});
- self.heartbeat_sender.send(&heartbeat);
-
Ok(reservations)
} else {
- self.executors.insert(metadata.id.clone(), metadata.clone());
-
guard.task_slots.push(AvailableTaskSlots {
- executor_id: metadata.id,
+ executor_id,
slots: spec.available_task_slots,
});
- self.heartbeat_sender.send(&heartbeat);
-
Ok(vec![])
}
}
@@ -231,15 +223,13 @@ impl ClusterState for InMemoryClusterState {
}
async fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) ->
Result<()> {
- if let Some(mut last) =
self.heartbeats.get_mut(&heartbeat.executor_id) {
- let _ = std::mem::replace(last.deref_mut(), heartbeat.clone());
+ let executor_id = heartbeat.executor_id.clone();
+ if let Some(mut last) = self.heartbeats.get_mut(&executor_id) {
+ let _ = std::mem::replace(last.deref_mut(), heartbeat);
} else {
- self.heartbeats
- .insert(heartbeat.executor_id.clone(), heartbeat.clone());
+ self.heartbeats.insert(executor_id, heartbeat);
}
- self.heartbeat_sender.send(&heartbeat);
-
Ok(())
}
@@ -256,34 +246,20 @@ impl ClusterState for InMemoryClusterState {
}
}
- if let Some(heartbeat) =
self.heartbeats.get_mut(executor_id).as_deref_mut() {
- let new_heartbeat = ExecutorHeartbeat {
- executor_id: executor_id.to_string(),
- timestamp: timestamp_secs(),
- metrics: vec![],
- status: Some(ExecutorStatus {
- status:
Some(executor_status::Status::Dead(String::default())),
- }),
- };
-
- *heartbeat = new_heartbeat;
-
- self.heartbeat_sender.send(heartbeat);
- }
+ self.heartbeats.remove(executor_id);
Ok(())
}
- async fn executor_heartbeat_stream(&self) ->
Result<ExecutorHeartbeatStream> {
- Ok(Box::pin(self.heartbeat_sender.subscribe()))
- }
-
- async fn executor_heartbeats(&self) -> Result<HashMap<String,
ExecutorHeartbeat>> {
- Ok(self
- .heartbeats
+ fn executor_heartbeats(&self) -> HashMap<String, ExecutorHeartbeat> {
+ self.heartbeats
.iter()
.map(|r| (r.key().clone(), r.value().clone()))
- .collect())
+ .collect()
+ }
+
+ fn get_executor_heartbeat(&self, executor_id: &str) ->
Option<ExecutorHeartbeat> {
+ self.heartbeats.get(executor_id).map(|r| r.value().clone())
}
}
diff --git a/ballista/scheduler/src/cluster/mod.rs
b/ballista/scheduler/src/cluster/mod.rs
index 35a5052a..8bea6d77 100644
--- a/ballista/scheduler/src/cluster/mod.rs
+++ b/ballista/scheduler/src/cluster/mod.rs
@@ -29,7 +29,7 @@ use crate::cluster::memory::{InMemoryClusterState,
InMemoryJobState};
use crate::cluster::storage::etcd::EtcdClient;
use crate::cluster::storage::sled::SledClient;
use crate::cluster::storage::KeyValueStore;
-use crate::config::{ClusterStorageConfig, SchedulerConfig};
+use crate::config::{ClusterStorageConfig, SchedulerConfig, TaskDistribution};
use crate::scheduler_server::SessionBuilder;
use crate::state::execution_graph::ExecutionGraph;
use crate::state::executor_manager::ExecutorReservation;
@@ -195,20 +195,14 @@ impl BallistaCluster {
/// by any schedulers with a shared `ClusterState`
pub type ExecutorHeartbeatStream = Pin<Box<dyn Stream<Item =
ExecutorHeartbeat> + Send>>;
-/// Method of distributing tasks to available executor slots
-#[derive(Debug, Clone, Copy)]
-pub enum TaskDistribution {
- /// Eagerly assign tasks to executor slots. This will assign as many task
slots per executor
- /// as are currently available
- Bias,
- /// Distributed tasks evenely across executors. This will try and iterate
through available executors
- /// and assign one task to each executor until all tasks are assigned.
- RoundRobin,
-}
-
/// A trait that contains the necessary method to maintain a globally
consistent view of cluster resources
#[tonic::async_trait]
pub trait ClusterState: Send + Sync + 'static {
+ /// Initialize when it's necessary, especially for state with backend
storage
+ async fn init(&self) -> Result<()> {
+ Ok(())
+ }
+
/// Reserve up to `num_slots` executor task slots. If not enough task
slots are available, reserve
/// as many as possible.
///
@@ -261,12 +255,11 @@ pub trait ClusterState: Send + Sync + 'static {
/// Remove the executor from the cluster
async fn remove_executor(&self, executor_id: &str) -> Result<()>;
- /// Return the stream of executor heartbeats observed by all schedulers in
the cluster.
- /// This can be aggregated to provide an eventually consistent view of all
executors within the cluster
- async fn executor_heartbeat_stream(&self) ->
Result<ExecutorHeartbeatStream>;
-
/// Return a map of the last seen heartbeat for all active executors
- async fn executor_heartbeats(&self) -> Result<HashMap<String,
ExecutorHeartbeat>>;
+ fn executor_heartbeats(&self) -> HashMap<String, ExecutorHeartbeat>;
+
+ /// Get executor heartbeat for the provided executor ID. Return None if
the executor does not exist
+ fn get_executor_heartbeat(&self, executor_id: &str) ->
Option<ExecutorHeartbeat>;
}
/// Events related to the state of jobs. Implementations may or may not
support all event types.
diff --git a/ballista/scheduler/src/cluster/test/mod.rs
b/ballista/scheduler/src/cluster/test/mod.rs
index 4d4001c6..b9056bfd 100644
--- a/ballista/scheduler/src/cluster/test/mod.rs
+++ b/ballista/scheduler/src/cluster/test/mod.rs
@@ -22,11 +22,10 @@ use crate::state::executor_manager::ExecutorReservation;
use crate::test_utils::{await_condition, mock_completed_task, mock_executor};
use ballista_core::error::{BallistaError, Result};
use ballista_core::serde::protobuf::job_status::Status;
-use ballista_core::serde::protobuf::{executor_status, ExecutorHeartbeat,
JobStatus};
+use ballista_core::serde::protobuf::{executor_status, JobStatus};
use ballista_core::serde::scheduler::{
ExecutorData, ExecutorMetadata, ExecutorSpecification,
};
-use dashmap::DashMap;
use futures::StreamExt;
use itertools::Itertools;
use std::collections::HashSet;
@@ -36,27 +35,14 @@ use tokio::sync::RwLock;
pub struct ClusterStateTest<S: ClusterState> {
state: Arc<S>,
- received_heartbeats: Arc<DashMap<String, ExecutorHeartbeat>>,
reservations: Vec<ExecutorReservation>,
total_task_slots: u32,
}
impl<S: ClusterState> ClusterStateTest<S> {
pub async fn new(state: S) -> Result<Self> {
- let received_heartbeats = Arc::new(DashMap::new());
-
- let mut heartbeat_stream = state.executor_heartbeat_stream().await?;
- let received_heartbeat_clone = received_heartbeats.clone();
-
- tokio::spawn(async move {
- while let Some(heartbeat) = heartbeat_stream.next().await {
- received_heartbeat_clone.insert(heartbeat.executor_id.clone(),
heartbeat);
- }
- });
-
Ok(Self {
state: Arc::new(state),
- received_heartbeats,
reservations: vec![],
total_task_slots: 0,
})
@@ -115,17 +101,19 @@ impl<S: ClusterState> ClusterStateTest<S> {
// Heratbeat stream is async so wait up to 500ms for it to show up
await_condition(Duration::from_millis(50), 10, || {
- let found_heartbeat =
- self.received_heartbeats.get(executor_id).map(|heartbeat| {
+ let found_heartbeat =
self.state.get_executor_heartbeat(executor_id).map_or(
+ false,
+ |heartbeat| {
matches!(
heartbeat.status,
Some(ballista_core::serde::generated::ballista::ExecutorStatus {
status: Some(executor_status::Status::Active(_))
})
)
- });
+ },
+ );
- futures::future::ready(Ok(found_heartbeat.unwrap_or_default()))
+ futures::future::ready(Ok(found_heartbeat))
})
.await?;
@@ -135,17 +123,19 @@ impl<S: ClusterState> ClusterStateTest<S> {
pub async fn assert_dead_executor(self, executor_id: &str) -> Result<Self>
{
// Heratbeat stream is async so wait up to 500ms for it to show up
await_condition(Duration::from_millis(50), 10, || {
- let found_heartbeat =
- self.received_heartbeats.get(executor_id).map(|heartbeat| {
+ let found_heartbeat =
self.state.get_executor_heartbeat(executor_id).map_or(
+ true,
+ |heartbeat| {
matches!(
heartbeat.status,
Some(ballista_core::serde::generated::ballista::ExecutorStatus {
status: Some(executor_status::Status::Dead(_))
})
)
- });
+ },
+ );
- futures::future::ready(Ok(found_heartbeat.unwrap_or_default()))
+ futures::future::ready(Ok(found_heartbeat))
})
.await?;
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index 087e3867..7a2518c3 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -26,7 +26,7 @@ use std::fmt;
#[derive(Debug, Clone)]
pub struct SchedulerConfig {
/// Namespace of this scheduler. Schedulers using the same cluster storage
and namespace
- /// will share gloabl cluster state.
+ /// will share global cluster state.
pub namespace: String,
/// The external hostname of the scheduler
pub external_host: String,
@@ -36,8 +36,8 @@ pub struct SchedulerConfig {
pub scheduling_policy: TaskSchedulingPolicy,
/// The event loop buffer size. for a system of high throughput, a larger
value like 1000000 is recommended
pub event_loop_buffer_size: u32,
- /// The executor slots policy for the scheduler. For a cluster with single
scheduler, round-robin-local is recommended
- pub executor_slots_policy: SlotsPolicy,
+ /// Policy of distributing tasks to available executor slots. For a
cluster with single scheduler, round-robin is recommended
+ pub task_distribution: TaskDistribution,
/// The delayed interval for cleaning up finished job data, mainly the
shuffle data, 0 means the cleaning up is disabled
pub finished_job_data_clean_up_interval_seconds: u64,
/// The delayed interval for cleaning up finished job state stored in the
backend, 0 means the cleaning up is disabled.
@@ -62,7 +62,7 @@ impl Default for SchedulerConfig {
bind_port: 50050,
scheduling_policy: TaskSchedulingPolicy::PullStaged,
event_loop_buffer_size: 10000,
- executor_slots_policy: SlotsPolicy::Bias,
+ task_distribution: TaskDistribution::Bias,
finished_job_data_clean_up_interval_seconds: 300,
finished_job_state_clean_up_interval_seconds: 3600,
advertise_flight_sql_endpoint: None,
@@ -131,8 +131,8 @@ impl SchedulerConfig {
self
}
- pub fn with_executor_slots_policy(mut self, policy: SlotsPolicy) -> Self {
- self.executor_slots_policy = policy;
+ pub fn with_task_distribution(mut self, policy: TaskDistribution) -> Self {
+ self.task_distribution = policy;
self
}
@@ -161,22 +161,20 @@ pub enum ClusterStorageConfig {
Sled(Option<String>),
}
-// an enum used to configure the executor slots policy
-// needs to be visible to code generated by configure_me
+/// Policy of distributing tasks to available executor slots
+///
+/// It needs to be visible to code generated by configure_me
#[derive(Clone, ArgEnum, Copy, Debug, serde::Deserialize)]
-pub enum SlotsPolicy {
+pub enum TaskDistribution {
+ /// Eagerly assign tasks to executor slots. This will assign as many task
slots per executor
+ /// as are currently available
Bias,
+ /// Distributed tasks evenly across executors. This will try and iterate
through available executors
+ /// and assign one task to each executor until all tasks are assigned.
RoundRobin,
- RoundRobinLocal,
}
-impl SlotsPolicy {
- pub fn is_local(&self) -> bool {
- matches!(self, SlotsPolicy::RoundRobinLocal)
- }
-}
-
-impl std::str::FromStr for SlotsPolicy {
+impl std::str::FromStr for TaskDistribution {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
@@ -184,7 +182,7 @@ impl std::str::FromStr for SlotsPolicy {
}
}
-impl parse_arg::ParseArgFromStr for SlotsPolicy {
+impl parse_arg::ParseArgFromStr for TaskDistribution {
fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
write!(writer, "The executor slots policy for the scheduler")
}
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs
b/ballista/scheduler/src/scheduler_server/grpc.rs
index de07a06d..dfef9809 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -22,12 +22,12 @@ use std::convert::TryInto;
use ballista_core::serde::protobuf::executor_registration::OptionalHost;
use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpc;
use ballista_core::serde::protobuf::{
- executor_status, CancelJobParams, CancelJobResult, CleanJobDataParams,
- CleanJobDataResult, ExecuteQueryParams, ExecuteQueryResult,
ExecutorHeartbeat,
- ExecutorStatus, ExecutorStoppedParams, ExecutorStoppedResult,
GetFileMetadataParams,
- GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult,
HeartBeatParams,
- HeartBeatResult, PollWorkParams, PollWorkResult, RegisterExecutorParams,
- RegisterExecutorResult, UpdateTaskStatusParams, UpdateTaskStatusResult,
+ CancelJobParams, CancelJobResult, CleanJobDataParams, CleanJobDataResult,
+ ExecuteQueryParams, ExecuteQueryResult, ExecutorHeartbeat,
ExecutorStoppedParams,
+ ExecutorStoppedResult, GetFileMetadataParams, GetFileMetadataResult,
+ GetJobStatusParams, GetJobStatusResult, HeartBeatParams, HeartBeatResult,
+ PollWorkParams, PollWorkResult, RegisterExecutorParams,
RegisterExecutorResult,
+ UpdateTaskStatusParams, UpdateTaskStatusResult,
};
use ballista_core::serde::scheduler::ExecutorMetadata;
@@ -47,7 +47,7 @@ use datafusion::prelude::SessionContext;
use std::time::{SystemTime, UNIX_EPOCH};
use tonic::{Request, Response, Status};
-use crate::scheduler_server::{timestamp_secs, SchedulerServer};
+use crate::scheduler_server::SchedulerServer;
use crate::state::executor_manager::ExecutorReservation;
#[tonic::async_trait]
@@ -72,19 +72,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
} = request.into_inner()
{
trace!("Received poll_work request for {:?}", metadata);
- // We might receive buggy poll work requests from dead executors.
- if self
- .state
- .executor_manager
- .is_dead_executor(&metadata.id.clone())
- {
- let error_msg = format!(
- "Receive buggy poll work request from dead Executor {}",
- metadata.id.clone()
- );
- warn!("{}", error_msg);
- return Err(Status::internal(error_msg));
- }
let metadata = ExecutorMetadata {
id: metadata.id,
host: metadata
@@ -97,14 +84,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
grpc_port: metadata.grpc_port as u16,
specification: metadata.specification.unwrap().into(),
};
- let executor_heartbeat = ExecutorHeartbeat {
- executor_id: metadata.id.clone(),
- timestamp: timestamp_secs(),
- metrics: vec![],
- status: Some(ExecutorStatus {
- status:
Some(executor_status::Status::Active("".to_string())),
- }),
- };
self.state
.executor_manager
@@ -116,16 +95,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
Status::internal(msg)
})?;
- self.state
- .executor_manager
- .save_executor_heartbeat(executor_heartbeat)
- .await
- .map_err(|e| {
- let msg = format!("Could not save executor heartbeat:
{e}");
- error!("{}", msg);
- Status::internal(msg)
- })?;
-
self.update_task_status(&metadata.id, task_status)
.await
.map_err(|e| {
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs
b/ballista/scheduler/src/scheduler_server/mod.rs
index 69a8c1b4..217711f7 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -189,7 +189,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
tasks_status: Vec<TaskStatus>,
) -> Result<()> {
// We might receive buggy task updates from dead executors.
- if self.state.executor_manager.is_dead_executor(executor_id) {
+ if self.state.config.is_push_staged_scheduling()
+ && self.state.executor_manager.is_dead_executor(executor_id)
+ {
let error_msg = format!(
"Receive buggy tasks status from dead Executor {executor_id},
task status update ignored."
);
diff --git a/ballista/scheduler/src/state/executor_manager.rs
b/ballista/scheduler/src/state/executor_manager.rs
index d3a9468e..63451014 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -17,26 +17,23 @@
use std::time::{Duration, SystemTime, UNIX_EPOCH};
-use crate::cluster::TaskDistribution;
-
-use ballista_core::error::{BallistaError, Result};
+#[cfg(not(test))]
+use ballista_core::error::BallistaError;
+use ballista_core::error::Result;
use ballista_core::serde::protobuf;
use crate::cluster::ClusterState;
-use crate::config::SlotsPolicy;
+use crate::config::TaskDistribution;
use crate::state::execution_graph::RunningTaskInfo;
use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
use ballista_core::serde::protobuf::{
- executor_status, CancelTasksParams, ExecutorHeartbeat, ExecutorStatus,
- RemoveJobDataParams,
+ executor_status, CancelTasksParams, ExecutorHeartbeat, RemoveJobDataParams,
};
use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
use ballista_core::utils::create_grpc_client_connection;
-use dashmap::{DashMap, DashSet};
-use futures::StreamExt;
+use dashmap::DashMap;
use log::{debug, error, info, warn};
-use parking_lot::Mutex;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tonic::transport::Channel;
@@ -91,75 +88,25 @@ pub const EXPIRE_DEAD_EXECUTOR_INTERVAL_SECS: u64 = 15;
#[derive(Clone)]
pub struct ExecutorManager {
- // executor slot policy
- slots_policy: SlotsPolicy,
task_distribution: TaskDistribution,
cluster_state: Arc<dyn ClusterState>,
- // executor_id -> ExecutorMetadata map
- executor_metadata: Arc<DashMap<String, ExecutorMetadata>>,
- // executor_id -> ExecutorHeartbeat map
- executors_heartbeat: Arc<DashMap<String, protobuf::ExecutorHeartbeat>>,
- // executor_id -> ExecutorData map, only used when the slots policy is of
local
- executor_data: Arc<Mutex<HashMap<String, ExecutorData>>>,
- // dead executor sets:
- dead_executors: Arc<DashSet<String>>,
clients: ExecutorClients,
}
impl ExecutorManager {
pub(crate) fn new(
cluster_state: Arc<dyn ClusterState>,
- slots_policy: SlotsPolicy,
+ task_distribution: TaskDistribution,
) -> Self {
- let task_distribution = match slots_policy {
- SlotsPolicy::Bias => TaskDistribution::Bias,
- SlotsPolicy::RoundRobin | SlotsPolicy::RoundRobinLocal => {
- TaskDistribution::RoundRobin
- }
- };
-
Self {
- slots_policy,
task_distribution,
cluster_state,
- executor_metadata: Arc::new(DashMap::new()),
- executors_heartbeat: Arc::new(DashMap::new()),
- executor_data: Arc::new(Mutex::new(HashMap::new())),
- dead_executors: Arc::new(DashSet::new()),
clients: Default::default(),
}
}
- /// Initialize a background process that will listen for executor
heartbeats and update the in-memory cache
- /// of executor heartbeats
pub async fn init(&self) -> Result<()> {
- self.init_active_executor_heartbeats().await?;
-
- let mut heartbeat_stream =
self.cluster_state.executor_heartbeat_stream().await?;
-
- info!("Initializing heartbeat listener");
-
- let heartbeats = self.executors_heartbeat.clone();
- let dead_executors = self.dead_executors.clone();
- tokio::task::spawn(async move {
- while let Some(heartbeat) = heartbeat_stream.next().await {
- let executor_id = heartbeat.executor_id.clone();
-
- match heartbeat
- .status
- .as_ref()
- .and_then(|status| status.status.as_ref())
- {
- Some(executor_status::Status::Dead(_)) => {
- heartbeats.remove(&executor_id);
- dead_executors.insert(executor_id);
- }
- _ => {
- heartbeats.insert(executor_id, heartbeat);
- }
- }
- }
- });
+ self.cluster_state.init().await?;
Ok(())
}
@@ -168,88 +115,13 @@ impl ExecutorManager {
/// for scheduling.
/// This operation is atomic, so if this method return an Err, no slots
have been reserved.
pub async fn reserve_slots(&self, n: u32) ->
Result<Vec<ExecutorReservation>> {
- if self.slots_policy.is_local() {
- self.reserve_slots_local(n).await
- } else {
- let alive_executors = self.get_alive_executors_within_one_minute();
-
- debug!("Alive executors: {alive_executors:?}");
-
- self.cluster_state
- .reserve_slots(n, self.task_distribution,
Some(alive_executors))
- .await
- }
- }
-
- async fn reserve_slots_local(&self, n: u32) ->
Result<Vec<ExecutorReservation>> {
- debug!("Attempting to reserve {} executor slots", n);
-
let alive_executors = self.get_alive_executors_within_one_minute();
- match self.slots_policy {
- SlotsPolicy::RoundRobinLocal => {
- self.reserve_slots_local_round_robin(n, alive_executors)
- .await
- }
- _ => Err(BallistaError::General(format!(
- "Reservation policy {:?} is not supported",
- self.slots_policy
- ))),
- }
- }
-
- /// Create ExecutorReservation in a round robin way to evenly assign tasks
to executors
- async fn reserve_slots_local_round_robin(
- &self,
- mut n: u32,
- alive_executors: HashSet<String>,
- ) -> Result<Vec<ExecutorReservation>> {
- let mut executor_data = self.executor_data.lock();
-
- let mut available_executor_data: Vec<&mut ExecutorData> = executor_data
- .values_mut()
- .filter_map(|data| {
- (data.available_task_slots > 0
- && alive_executors.contains(&data.executor_id))
- .then_some(data)
- })
- .collect();
- available_executor_data
- .sort_by(|a, b| Ord::cmp(&b.available_task_slots,
&a.available_task_slots));
-
- let mut reservations: Vec<ExecutorReservation> = vec![];
-
- // Exclusive
- let mut last_updated_idx = 0usize;
- loop {
- let n_before = n;
- for (idx, data) in available_executor_data.iter_mut().enumerate() {
- if n == 0 {
- break;
- }
-
- // Since the vector is sorted in descending order,
- // if finding one executor has not enough slots, the following
will have not enough, either
- if data.available_task_slots == 0 {
- break;
- }
-
- reservations
-
.push(ExecutorReservation::new_free(data.executor_id.clone()));
- data.available_task_slots -= 1;
- n -= 1;
-
- if idx >= last_updated_idx {
- last_updated_idx = idx + 1;
- }
- }
-
- if n_before == n {
- break;
- }
- }
+ debug!("Alive executors: {alive_executors:?}");
- Ok(reservations)
+ self.cluster_state
+ .reserve_slots(n, self.task_distribution, Some(alive_executors))
+ .await
}
/// Returned reserved task slots to the pool of available slots. This
operation is atomic
@@ -258,36 +130,7 @@ impl ExecutorManager {
&self,
reservations: Vec<ExecutorReservation>,
) -> Result<()> {
- if self.slots_policy.is_local() {
- self.cancel_reservations_local(reservations).await
- } else {
- self.cluster_state.cancel_reservations(reservations).await
- }
- }
-
- async fn cancel_reservations_local(
- &self,
- reservations: Vec<ExecutorReservation>,
- ) -> Result<()> {
- let mut executor_slots: HashMap<String, u32> = HashMap::new();
- for reservation in reservations {
- if let Some(slots) =
executor_slots.get_mut(&reservation.executor_id) {
- *slots += 1;
- } else {
- executor_slots.insert(reservation.executor_id, 1);
- }
- }
-
- let mut executor_data = self.executor_data.lock();
- for (id, released_slots) in executor_slots.into_iter() {
- if let Some(slots) = executor_data.get_mut(&id) {
- slots.available_task_slots += released_slots;
- } else {
- warn!("ExecutorData for {} is not cached in memory", id);
- }
- }
-
- Ok(())
+ self.cluster_state.cancel_reservations(reservations).await
}
/// Send rpc to Executors to cancel the running tasks
@@ -411,15 +254,12 @@ impl ExecutorManager {
/// Get a list of all executors along with the timestamp of their last
recorded heartbeat
pub async fn get_executor_state(&self) -> Result<Vec<(ExecutorMetadata,
Duration)>> {
- let heartbeat_timestamps: Vec<(String, u64)> = {
- self.executors_heartbeat
- .iter()
- .map(|item| {
- let (executor_id, heartbeat) = item.pair();
- (executor_id.clone(), heartbeat.timestamp)
- })
- .collect()
- };
+ let heartbeat_timestamps: Vec<(String, u64)> = self
+ .cluster_state
+ .executor_heartbeats()
+ .into_iter()
+ .map(|(executor_id, heartbeat)| (executor_id, heartbeat.timestamp))
+ .collect();
let mut state: Vec<(ExecutorMetadata, Duration)> = vec![];
for (executor_id, ts) in heartbeat_timestamps {
@@ -437,12 +277,6 @@ impl ExecutorManager {
&self,
executor_id: &str,
) -> Result<ExecutorMetadata> {
- {
- if let Some(cached) = self.executor_metadata.get(executor_id) {
- return Ok(cached.clone());
- }
- }
-
self.cluster_state.get_executor_metadata(executor_id).await
}
@@ -471,36 +305,11 @@ impl ExecutorManager {
self.test_scheduler_connectivity(&metadata).await?;
- let current_ts = SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .map_err(|e| {
- BallistaError::Internal(format!("Error getting current
timestamp: {e:?}"))
- })?
- .as_secs();
-
- let initial_heartbeat = ExecutorHeartbeat {
- executor_id: metadata.id.clone(),
- timestamp: current_ts,
- metrics: vec![],
- status: Some(ExecutorStatus {
- status:
Some(executor_status::Status::Active(String::default())),
- }),
- };
-
if !reserve {
- if self.slots_policy.is_local() {
- let mut executor_data = self.executor_data.lock();
- executor_data
- .insert(specification.executor_id.clone(),
specification.clone());
- }
-
self.cluster_state
.register_executor(metadata, specification.clone(), reserve)
.await?;
- self.executors_heartbeat
- .insert(initial_heartbeat.executor_id.clone(),
initial_heartbeat);
-
Ok(vec![])
} else {
let mut specification = specification;
@@ -512,19 +321,10 @@ impl ExecutorManager {
specification.available_task_slots = 0;
- if self.slots_policy.is_local() {
- let mut executor_data = self.executor_data.lock();
- executor_data
- .insert(specification.executor_id.clone(),
specification.clone());
- }
-
self.cluster_state
.register_executor(metadata, specification, reserve)
.await?;
- self.executors_heartbeat
- .insert(initial_heartbeat.executor_id.clone(),
initial_heartbeat);
-
Ok(reservations)
}
}
@@ -536,21 +336,7 @@ impl ExecutorManager {
reason: Option<String>,
) -> Result<()> {
info!("Removing executor {}: {:?}", executor_id, reason);
- self.cluster_state.remove_executor(executor_id).await?;
-
- let executor_id = executor_id.to_owned();
-
- self.executors_heartbeat.remove(&executor_id);
-
- // Remove executor data cache for dead executors
- {
- let mut executor_data = self.executor_data.lock();
- executor_data.remove(&executor_id);
- }
-
- self.dead_executors.insert(executor_id);
-
- Ok(())
+ self.cluster_state.remove_executor(executor_id).await
}
#[cfg(not(test))]
@@ -587,30 +373,20 @@ impl ExecutorManager {
.save_executor_heartbeat(heartbeat.clone())
.await?;
- self.executors_heartbeat
- .insert(heartbeat.executor_id.clone(), heartbeat);
-
Ok(())
}
pub(crate) fn is_dead_executor(&self, executor_id: &str) -> bool {
- self.dead_executors.contains(executor_id)
- }
-
- /// Initialize the set of active executor heartbeats from storage
- async fn init_active_executor_heartbeats(&self) -> Result<()> {
- let heartbeats = self.cluster_state.executor_heartbeats().await?;
-
- for (executor_id, heartbeat) in heartbeats {
- // let data: protobuf::ExecutorHeartbeat =
decode_protobuf(&value)?;
- if let Some(ExecutorStatus {
- status: Some(executor_status::Status::Active(_)),
- }) = heartbeat.status
- {
- self.executors_heartbeat.insert(executor_id, heartbeat);
- }
- }
- Ok(())
+ self.cluster_state
+ .get_executor_heartbeat(executor_id)
+ .map_or(true, |heartbeat| {
+ matches!(
+ heartbeat.status,
+
Some(ballista_core::serde::generated::ballista::ExecutorStatus {
+ status: Some(executor_status::Status::Dead(_))
+ })
+ )
+ })
}
/// Retrieve the set of all executor IDs where the executor has been
observed in the last
@@ -619,11 +395,10 @@ impl ExecutorManager {
&self,
last_seen_ts_threshold: u64,
) -> HashSet<String> {
- self.executors_heartbeat
+ self.cluster_state
+ .executor_heartbeats()
.iter()
- .filter_map(|pair| {
- let (exec, heartbeat) = pair.pair();
-
+ .filter_map(|(exec, heartbeat)| {
let active = matches!(
heartbeat
.status
@@ -658,12 +433,10 @@ impl ExecutorManager {
.unwrap_or_else(|| Duration::from_secs(0))
.as_secs();
- let expired_executors = self
- .executors_heartbeat
+ self.cluster_state
+ .executor_heartbeats()
.iter()
- .filter_map(|pair| {
- let (_exec, heartbeat) = pair.pair();
-
+ .filter_map(|(_exec, heartbeat)| {
let terminating = matches!(
heartbeat
.status
@@ -680,8 +453,7 @@ impl ExecutorManager {
((terminating && grace_period_expired) || expired)
.then(|| heartbeat.clone())
})
- .collect::<Vec<_>>();
- expired_executors
+ .collect::<Vec<_>>()
}
pub(crate) fn get_alive_executors_within_one_minute(&self) ->
HashSet<String> {
@@ -698,7 +470,7 @@ impl ExecutorManager {
#[cfg(test)]
mod test {
- use crate::config::SlotsPolicy;
+ use crate::config::TaskDistribution;
use crate::scheduler_server::timestamp_secs;
use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
@@ -712,18 +484,19 @@ mod test {
#[tokio::test]
async fn test_reserve_and_cancel() -> Result<()> {
- test_reserve_and_cancel_inner(SlotsPolicy::Bias).await?;
- test_reserve_and_cancel_inner(SlotsPolicy::RoundRobin).await?;
- test_reserve_and_cancel_inner(SlotsPolicy::RoundRobinLocal).await?;
+ test_reserve_and_cancel_inner(TaskDistribution::Bias).await?;
+ test_reserve_and_cancel_inner(TaskDistribution::RoundRobin).await?;
Ok(())
}
- async fn test_reserve_and_cancel_inner(slots_policy: SlotsPolicy) ->
Result<()> {
+ async fn test_reserve_and_cancel_inner(
+ task_distribution: TaskDistribution,
+ ) -> Result<()> {
let cluster = test_cluster_context();
let executor_manager =
- ExecutorManager::new(cluster.cluster_state(), slots_policy);
+ ExecutorManager::new(cluster.cluster_state(), task_distribution);
let executors = test_executors(10, 4);
@@ -739,7 +512,7 @@ mod test {
assert_eq!(
reservations.len(),
40,
- "Expected 40 reservations for policy {slots_policy:?}"
+ "Expected 40 reservations for policy {task_distribution:?}"
);
// Now cancel them
@@ -751,7 +524,7 @@ mod test {
assert_eq!(
reservations.len(),
40,
- "Expected 40 reservations for policy {slots_policy:?}"
+ "Expected 40 reservations for policy {task_distribution:?}"
);
Ok(())
@@ -759,18 +532,19 @@ mod test {
#[tokio::test]
async fn test_reserve_partial() -> Result<()> {
- test_reserve_partial_inner(SlotsPolicy::Bias).await?;
- test_reserve_partial_inner(SlotsPolicy::RoundRobin).await?;
- test_reserve_partial_inner(SlotsPolicy::RoundRobinLocal).await?;
+ test_reserve_partial_inner(TaskDistribution::Bias).await?;
+ test_reserve_partial_inner(TaskDistribution::RoundRobin).await?;
Ok(())
}
- async fn test_reserve_partial_inner(slots_policy: SlotsPolicy) ->
Result<()> {
+ async fn test_reserve_partial_inner(
+ task_distribution: TaskDistribution,
+ ) -> Result<()> {
let cluster = test_cluster_context();
let executor_manager =
- ExecutorManager::new(cluster.cluster_state(), slots_policy);
+ ExecutorManager::new(cluster.cluster_state(), task_distribution);
let executors = test_executors(10, 4);
@@ -810,14 +584,15 @@ mod test {
#[tokio::test]
async fn test_reserve_concurrent() -> Result<()> {
- test_reserve_concurrent_inner(SlotsPolicy::Bias).await?;
- test_reserve_concurrent_inner(SlotsPolicy::RoundRobin).await?;
- test_reserve_concurrent_inner(SlotsPolicy::RoundRobinLocal).await?;
+ test_reserve_concurrent_inner(TaskDistribution::Bias).await?;
+ test_reserve_concurrent_inner(TaskDistribution::RoundRobin).await?;
Ok(())
}
- async fn test_reserve_concurrent_inner(slots_policy: SlotsPolicy) ->
Result<()> {
+ async fn test_reserve_concurrent_inner(
+ task_distribution: TaskDistribution,
+ ) -> Result<()> {
let (sender, mut receiver) =
tokio::sync::mpsc::channel::<Result<Vec<ExecutorReservation>>>(1000);
@@ -825,7 +600,7 @@ mod test {
let cluster = test_cluster_context();
let executor_manager =
- ExecutorManager::new(cluster.cluster_state(), slots_policy);
+ ExecutorManager::new(cluster.cluster_state(), task_distribution);
for (executor_metadata, executor_data) in executors {
executor_manager
@@ -860,18 +635,19 @@ mod test {
#[tokio::test]
async fn test_register_reserve() -> Result<()> {
- test_register_reserve_inner(SlotsPolicy::Bias).await?;
- test_register_reserve_inner(SlotsPolicy::RoundRobin).await?;
- test_register_reserve_inner(SlotsPolicy::RoundRobinLocal).await?;
+ test_register_reserve_inner(TaskDistribution::Bias).await?;
+ test_register_reserve_inner(TaskDistribution::RoundRobin).await?;
Ok(())
}
- async fn test_register_reserve_inner(slots_policy: SlotsPolicy) ->
Result<()> {
+ async fn test_register_reserve_inner(
+ task_distribution: TaskDistribution,
+ ) -> Result<()> {
let cluster = test_cluster_context();
let executor_manager =
- ExecutorManager::new(cluster.cluster_state(), slots_policy);
+ ExecutorManager::new(cluster.cluster_state(), task_distribution);
let executors = test_executors(10, 4);
@@ -893,18 +669,19 @@ mod test {
#[tokio::test]
async fn test_ignore_fenced_executors() -> Result<()> {
- test_ignore_fenced_executors_inner(SlotsPolicy::Bias).await?;
- test_ignore_fenced_executors_inner(SlotsPolicy::RoundRobin).await?;
-
test_ignore_fenced_executors_inner(SlotsPolicy::RoundRobinLocal).await?;
+ test_ignore_fenced_executors_inner(TaskDistribution::Bias).await?;
+
test_ignore_fenced_executors_inner(TaskDistribution::RoundRobin).await?;
Ok(())
}
- async fn test_ignore_fenced_executors_inner(slots_policy: SlotsPolicy) ->
Result<()> {
+ async fn test_ignore_fenced_executors_inner(
+ task_distribution: TaskDistribution,
+ ) -> Result<()> {
let cluster = test_cluster_context();
let executor_manager =
- ExecutorManager::new(cluster.cluster_state(), slots_policy);
+ ExecutorManager::new(cluster.cluster_state(), task_distribution);
// Setup two executors initially
let executors = test_executors(2, 4);
diff --git a/ballista/scheduler/src/state/mod.rs
b/ballista/scheduler/src/state/mod.rs
index 57ea0be6..0c3b8bf0 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -116,7 +116,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
Self {
executor_manager: ExecutorManager::new(
cluster.cluster_state(),
- config.executor_slots_policy,
+ config.task_distribution,
),
task_manager: TaskManager::new(
cluster.job_state(),
@@ -140,7 +140,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
Self {
executor_manager: ExecutorManager::new(
cluster.cluster_state(),
- config.executor_slots_policy,
+ config.task_distribution,
),
task_manager: TaskManager::with_launcher(
cluster.job_state(),