This is an automated email from the ASF dual-hosted git repository.
nju_yaho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new a1473d0c Add RoundRobinLocal slots policy for caching executor data to
avoid seld persistency (#396)
a1473d0c is described below
commit a1473d0c53570596d2460e563f4275aefbf0cfac
Author: yahoNanJing <[email protected]>
AuthorDate: Fri Oct 21 10:14:02 2022 +0800
Add RoundRobinLocal slots policy for caching executor data to avoid seld
persistency (#396)
* Add RoundRobinLocal slots policy for caching executor data to avoid seld
persistency
* Rename clean_up_executors_data to clean_up_job_data in executor manager
Co-authored-by: yangzhong <[email protected]>
---
ballista/scheduler/scheduler_config_spec.toml | 2 +-
ballista/scheduler/src/config.rs | 7 +
.../src/scheduler_server/query_stage_scheduler.rs | 6 +-
ballista/scheduler/src/state/executor_manager.rs | 149 ++++++++++++++++++++-
4 files changed, 158 insertions(+), 6 deletions(-)
diff --git a/ballista/scheduler/scheduler_config_spec.toml
b/ballista/scheduler/scheduler_config_spec.toml
index 625e23b9..7549260e 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -75,7 +75,7 @@ default =
"ballista_core::config::TaskSchedulingPolicy::PullStaged"
[[param]]
name = "executor_slots_policy"
type = "ballista_scheduler::config::SlotsPolicy"
-doc = "The executor slots policy for the scheduler, possible values: bias,
round-robin. Default: bias"
+doc = "The executor slots policy for the scheduler, possible values: bias,
round-robin, round-robin-local. Default: bias"
default = "ballista_scheduler::config::SlotsPolicy::Bias"
[[param]]
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index 4f1f5c24..2cb940c9 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -27,6 +27,13 @@ use std::fmt;
pub enum SlotsPolicy {
Bias,
RoundRobin,
+ RoundRobinLocal,
+}
+
+impl SlotsPolicy {
+ pub fn is_local(&self) -> bool {
+ matches!(self, SlotsPolicy::RoundRobinLocal)
+ }
}
impl std::str::FromStr for SlotsPolicy {
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 4f7d12a1..ee190170 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -155,7 +155,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
CLEANUP_FINISHED_JOB_DELAY_SECS,
))
.await;
- executor_manager.clean_up_executors_data(job_id).await;
+ executor_manager.clean_up_job_data(job_id).await;
});
}
QueryStageSchedulerEvent::JobRunningFailed(job_id, failure_reason)
=> {
@@ -176,7 +176,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
CLEANUP_FINISHED_JOB_DELAY_SECS,
))
.await;
- executor_manager.clean_up_executors_data(job_id).await;
+ executor_manager.clean_up_job_data(job_id).await;
});
}
QueryStageSchedulerEvent::JobUpdated(job_id) => {
@@ -194,7 +194,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
CLEANUP_FINISHED_JOB_DELAY_SECS,
))
.await;
- executor_manager.clean_up_executors_data(job_id).await;
+ executor_manager.clean_up_job_data(job_id).await;
});
}
QueryStageSchedulerEvent::TaskUpdating(executor_id, tasks_status)
=> {
diff --git a/ballista/scheduler/src/state/executor_manager.rs
b/ballista/scheduler/src/state/executor_manager.rs
index 2da288fe..8fd776b3 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -35,6 +35,7 @@ use ballista_core::utils::create_grpc_client_connection;
use dashmap::{DashMap, DashSet};
use futures::StreamExt;
use log::{debug, error, info, warn};
+use parking_lot::Mutex;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tonic::transport::Channel;
@@ -92,6 +93,8 @@ pub(crate) struct ExecutorManager {
executor_metadata: Arc<DashMap<String, ExecutorMetadata>>,
// executor_id -> ExecutorHeartbeat map
executors_heartbeat: Arc<DashMap<String, protobuf::ExecutorHeartbeat>>,
+ // executor_id -> ExecutorData map, only used when the slots policy is of
local
+ executor_data: Arc<Mutex<HashMap<String, ExecutorData>>>,
// dead executor sets:
dead_executors: Arc<DashSet<String>>,
clients: ExecutorClients,
@@ -107,6 +110,7 @@ impl ExecutorManager {
state,
executor_metadata: Arc::new(DashMap::new()),
executors_heartbeat: Arc::new(DashMap::new()),
+ executor_data: Arc::new(Mutex::new(HashMap::new())),
dead_executors: Arc::new(DashSet::new()),
clients: Default::default(),
}
@@ -128,7 +132,82 @@ impl ExecutorManager {
/// for scheduling.
/// This operation is atomic, so if this method return an Err, no slots
have been reserved.
pub async fn reserve_slots(&self, n: u32) ->
Result<Vec<ExecutorReservation>> {
- self.reserve_slots_global(n).await
+ if self.slots_policy.is_local() {
+ self.reserve_slots_local(n).await
+ } else {
+ self.reserve_slots_global(n).await
+ }
+ }
+
+ async fn reserve_slots_local(&self, n: u32) ->
Result<Vec<ExecutorReservation>> {
+ debug!("Attempting to reserve {} executor slots", n);
+
+ let alive_executors = self.get_alive_executors_within_one_minute();
+
+ match self.slots_policy {
+ SlotsPolicy::RoundRobinLocal => {
+ self.reserve_slots_local_round_robin(n, alive_executors)
+ .await
+ }
+ _ => Err(BallistaError::General(format!(
+ "Reservation policy {:?} is not supported",
+ self.slots_policy
+ ))),
+ }
+ }
+
+ /// Create ExecutorReservation in a round robin way to evenly assign tasks
to executors
+ async fn reserve_slots_local_round_robin(
+ &self,
+ mut n: u32,
+ alive_executors: HashSet<String>,
+ ) -> Result<Vec<ExecutorReservation>> {
+ let mut executor_data = self.executor_data.lock();
+
+ let mut available_executor_data: Vec<&mut ExecutorData> = executor_data
+ .values_mut()
+ .filter_map(|data| {
+ (data.available_task_slots > 0
+ && alive_executors.contains(&data.executor_id))
+ .then_some(data)
+ })
+ .collect();
+ available_executor_data
+ .sort_by(|a, b| Ord::cmp(&b.available_task_slots,
&a.available_task_slots));
+
+ let mut reservations: Vec<ExecutorReservation> = vec![];
+
+ // Exclusive
+ let mut last_updated_idx = 0usize;
+ loop {
+ let n_before = n;
+ for (idx, data) in available_executor_data.iter_mut().enumerate() {
+ if n == 0 {
+ break;
+ }
+
+ // Since the vector is sorted in descending order,
+ // if finding one executor has not enough slots, the following
will have not enough, either
+ if data.available_task_slots == 0 {
+ break;
+ }
+
+ reservations
+
.push(ExecutorReservation::new_free(data.executor_id.clone()));
+ data.available_task_slots -= 1;
+ n -= 1;
+
+ if idx >= last_updated_idx {
+ last_updated_idx = idx + 1;
+ }
+ }
+
+ if n_before == n {
+ break;
+ }
+ }
+
+ Ok(reservations)
}
/// Reserve up to n executor task slots with considering the global
resource snapshot
@@ -149,6 +228,12 @@ impl ExecutorManager {
self.reserve_slots_global_round_robin(n, alive_executors)
.await?
}
+ _ => {
+ return Err(BallistaError::General(format!(
+ "Reservation policy {:?} is not supported",
+ self.slots_policy
+ )))
+ }
};
self.state.apply_txn(txn_ops).await?;
@@ -276,6 +361,42 @@ impl ExecutorManager {
pub async fn cancel_reservations(
&self,
reservations: Vec<ExecutorReservation>,
+ ) -> Result<()> {
+ if self.slots_policy.is_local() {
+ self.cancel_reservations_local(reservations).await
+ } else {
+ self.cancel_reservations_global(reservations).await
+ }
+ }
+
+ async fn cancel_reservations_local(
+ &self,
+ reservations: Vec<ExecutorReservation>,
+ ) -> Result<()> {
+ let mut executor_slots: HashMap<String, u32> = HashMap::new();
+ for reservation in reservations {
+ if let Some(slots) =
executor_slots.get_mut(&reservation.executor_id) {
+ *slots += 1;
+ } else {
+ executor_slots.insert(reservation.executor_id, 1);
+ }
+ }
+
+ let mut executor_data = self.executor_data.lock();
+ for (id, released_slots) in executor_slots.into_iter() {
+ if let Some(slots) = executor_data.get_mut(&id) {
+ slots.available_task_slots += released_slots;
+ } else {
+ warn!("ExecutorData for {} is not cached in memory", id);
+ }
+ }
+
+ Ok(())
+ }
+
+ async fn cancel_reservations_global(
+ &self,
+ reservations: Vec<ExecutorReservation>,
) -> Result<()> {
let lock = self.state.lock(Keyspace::Slots, "global").await?;
@@ -363,7 +484,7 @@ impl ExecutorManager {
}
/// Send rpc to Executors to clean up the job data
- pub async fn clean_up_executors_data(&self, job_id: String) {
+ pub async fn clean_up_job_data(&self, job_id: String) {
let alive_executors = self.get_alive_executors_within_one_minute();
for executor in alive_executors {
let job_id_clone = job_id.to_owned();
@@ -506,6 +627,12 @@ impl ExecutorManager {
.await?;
if !reserve {
+ if self.slots_policy.is_local() {
+ let mut executor_data = self.executor_data.lock();
+ executor_data
+ .insert(specification.executor_id.clone(),
specification.clone());
+ }
+
let proto: protobuf::ExecutorData = specification.into();
let value = encode_protobuf(&proto)?;
self.state.put(Keyspace::Slots, executor_id, value).await?;
@@ -519,6 +646,13 @@ impl ExecutorManager {
}
specification.available_task_slots = 0;
+
+ if self.slots_policy.is_local() {
+ let mut executor_data = self.executor_data.lock();
+ executor_data
+ .insert(specification.executor_id.clone(),
specification.clone());
+ }
+
let proto: protobuf::ExecutorData = specification.into();
let value = encode_protobuf(&proto)?;
self.state.put(Keyspace::Slots, executor_id, value).await?;
@@ -611,6 +745,13 @@ impl ExecutorManager {
self.executors_heartbeat
.remove(&heartbeat.executor_id.clone());
+
+ // Remove executor data cache for dead executors
+ {
+ let mut executor_data = self.executor_data.lock();
+ executor_data.remove(&executor_id);
+ }
+
self.dead_executors.insert(executor_id);
Ok(())
}
@@ -754,6 +895,7 @@ mod test {
async fn test_reserve_and_cancel() -> Result<()> {
test_reserve_and_cancel_inner(SlotsPolicy::Bias).await?;
test_reserve_and_cancel_inner(SlotsPolicy::RoundRobin).await?;
+ test_reserve_and_cancel_inner(SlotsPolicy::RoundRobinLocal).await?;
Ok(())
}
@@ -791,6 +933,7 @@ mod test {
async fn test_reserve_partial() -> Result<()> {
test_reserve_partial_inner(SlotsPolicy::Bias).await?;
test_reserve_partial_inner(SlotsPolicy::RoundRobin).await?;
+ test_reserve_partial_inner(SlotsPolicy::RoundRobinLocal).await?;
Ok(())
}
@@ -840,6 +983,7 @@ mod test {
async fn test_reserve_concurrent() -> Result<()> {
test_reserve_concurrent_inner(SlotsPolicy::Bias).await?;
test_reserve_concurrent_inner(SlotsPolicy::RoundRobin).await?;
+ test_reserve_concurrent_inner(SlotsPolicy::RoundRobinLocal).await?;
Ok(())
}
@@ -889,6 +1033,7 @@ mod test {
async fn test_register_reserve() -> Result<()> {
test_register_reserve_inner(SlotsPolicy::Bias).await?;
test_register_reserve_inner(SlotsPolicy::RoundRobin).await?;
+ test_register_reserve_inner(SlotsPolicy::RoundRobinLocal).await?;
Ok(())
}