This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new a1473d0c Add RoundRobinLocal slots policy for caching executor data to 
avoid seld persistency (#396)
a1473d0c is described below

commit a1473d0c53570596d2460e563f4275aefbf0cfac
Author: yahoNanJing <[email protected]>
AuthorDate: Fri Oct 21 10:14:02 2022 +0800

    Add RoundRobinLocal slots policy for caching executor data to avoid seld 
persistency (#396)
    
    * Add RoundRobinLocal slots policy for caching executor data to avoid seld 
persistency
    
    * Rename clean_up_executors_data to clean_up_job_data in executor manager
    
    Co-authored-by: yangzhong <[email protected]>
---
 ballista/scheduler/scheduler_config_spec.toml      |   2 +-
 ballista/scheduler/src/config.rs                   |   7 +
 .../src/scheduler_server/query_stage_scheduler.rs  |   6 +-
 ballista/scheduler/src/state/executor_manager.rs   | 149 ++++++++++++++++++++-
 4 files changed, 158 insertions(+), 6 deletions(-)

diff --git a/ballista/scheduler/scheduler_config_spec.toml 
b/ballista/scheduler/scheduler_config_spec.toml
index 625e23b9..7549260e 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -75,7 +75,7 @@ default = 
"ballista_core::config::TaskSchedulingPolicy::PullStaged"
 [[param]]
 name = "executor_slots_policy"
 type = "ballista_scheduler::config::SlotsPolicy"
-doc = "The executor slots policy for the scheduler, possible values: bias, 
round-robin. Default: bias"
+doc = "The executor slots policy for the scheduler, possible values: bias, 
round-robin, round-robin-local. Default: bias"
 default = "ballista_scheduler::config::SlotsPolicy::Bias"
 
 [[param]]
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index 4f1f5c24..2cb940c9 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -27,6 +27,13 @@ use std::fmt;
 pub enum SlotsPolicy {
     Bias,
     RoundRobin,
+    RoundRobinLocal,
+}
+
+impl SlotsPolicy {
+    pub fn is_local(&self) -> bool {
+        matches!(self, SlotsPolicy::RoundRobinLocal)
+    }
 }
 
 impl std::str::FromStr for SlotsPolicy {
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs 
b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 4f7d12a1..ee190170 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -155,7 +155,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                         CLEANUP_FINISHED_JOB_DELAY_SECS,
                     ))
                     .await;
-                    executor_manager.clean_up_executors_data(job_id).await;
+                    executor_manager.clean_up_job_data(job_id).await;
                 });
             }
             QueryStageSchedulerEvent::JobRunningFailed(job_id, failure_reason) 
=> {
@@ -176,7 +176,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                         CLEANUP_FINISHED_JOB_DELAY_SECS,
                     ))
                     .await;
-                    executor_manager.clean_up_executors_data(job_id).await;
+                    executor_manager.clean_up_job_data(job_id).await;
                 });
             }
             QueryStageSchedulerEvent::JobUpdated(job_id) => {
@@ -194,7 +194,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                         CLEANUP_FINISHED_JOB_DELAY_SECS,
                     ))
                     .await;
-                    executor_manager.clean_up_executors_data(job_id).await;
+                    executor_manager.clean_up_job_data(job_id).await;
                 });
             }
             QueryStageSchedulerEvent::TaskUpdating(executor_id, tasks_status) 
=> {
diff --git a/ballista/scheduler/src/state/executor_manager.rs 
b/ballista/scheduler/src/state/executor_manager.rs
index 2da288fe..8fd776b3 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -35,6 +35,7 @@ use ballista_core::utils::create_grpc_client_connection;
 use dashmap::{DashMap, DashSet};
 use futures::StreamExt;
 use log::{debug, error, info, warn};
+use parking_lot::Mutex;
 use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
 use tonic::transport::Channel;
@@ -92,6 +93,8 @@ pub(crate) struct ExecutorManager {
     executor_metadata: Arc<DashMap<String, ExecutorMetadata>>,
     // executor_id -> ExecutorHeartbeat map
     executors_heartbeat: Arc<DashMap<String, protobuf::ExecutorHeartbeat>>,
+    // executor_id -> ExecutorData map, only used when the slots policy is of 
local
+    executor_data: Arc<Mutex<HashMap<String, ExecutorData>>>,
     // dead executor sets:
     dead_executors: Arc<DashSet<String>>,
     clients: ExecutorClients,
@@ -107,6 +110,7 @@ impl ExecutorManager {
             state,
             executor_metadata: Arc::new(DashMap::new()),
             executors_heartbeat: Arc::new(DashMap::new()),
+            executor_data: Arc::new(Mutex::new(HashMap::new())),
             dead_executors: Arc::new(DashSet::new()),
             clients: Default::default(),
         }
@@ -128,7 +132,82 @@ impl ExecutorManager {
     /// for scheduling.
     /// This operation is atomic, so if this method return an Err, no slots 
have been reserved.
     pub async fn reserve_slots(&self, n: u32) -> 
Result<Vec<ExecutorReservation>> {
-        self.reserve_slots_global(n).await
+        if self.slots_policy.is_local() {
+            self.reserve_slots_local(n).await
+        } else {
+            self.reserve_slots_global(n).await
+        }
+    }
+
+    async fn reserve_slots_local(&self, n: u32) -> 
Result<Vec<ExecutorReservation>> {
+        debug!("Attempting to reserve {} executor slots", n);
+
+        let alive_executors = self.get_alive_executors_within_one_minute();
+
+        match self.slots_policy {
+            SlotsPolicy::RoundRobinLocal => {
+                self.reserve_slots_local_round_robin(n, alive_executors)
+                    .await
+            }
+            _ => Err(BallistaError::General(format!(
+                "Reservation policy {:?} is not supported",
+                self.slots_policy
+            ))),
+        }
+    }
+
+    /// Create ExecutorReservation in a round robin way to evenly assign tasks 
to executors
+    async fn reserve_slots_local_round_robin(
+        &self,
+        mut n: u32,
+        alive_executors: HashSet<String>,
+    ) -> Result<Vec<ExecutorReservation>> {
+        let mut executor_data = self.executor_data.lock();
+
+        let mut available_executor_data: Vec<&mut ExecutorData> = executor_data
+            .values_mut()
+            .filter_map(|data| {
+                (data.available_task_slots > 0
+                    && alive_executors.contains(&data.executor_id))
+                .then_some(data)
+            })
+            .collect();
+        available_executor_data
+            .sort_by(|a, b| Ord::cmp(&b.available_task_slots, 
&a.available_task_slots));
+
+        let mut reservations: Vec<ExecutorReservation> = vec![];
+
+        // Exclusive
+        let mut last_updated_idx = 0usize;
+        loop {
+            let n_before = n;
+            for (idx, data) in available_executor_data.iter_mut().enumerate() {
+                if n == 0 {
+                    break;
+                }
+
+                // Since the vector is sorted in descending order,
+                // if finding one executor has not enough slots, the following 
will have not enough, either
+                if data.available_task_slots == 0 {
+                    break;
+                }
+
+                reservations
+                    
.push(ExecutorReservation::new_free(data.executor_id.clone()));
+                data.available_task_slots -= 1;
+                n -= 1;
+
+                if idx >= last_updated_idx {
+                    last_updated_idx = idx + 1;
+                }
+            }
+
+            if n_before == n {
+                break;
+            }
+        }
+
+        Ok(reservations)
     }
 
     /// Reserve up to n executor task slots with considering the global 
resource snapshot
@@ -149,6 +228,12 @@ impl ExecutorManager {
                     self.reserve_slots_global_round_robin(n, alive_executors)
                         .await?
                 }
+                _ => {
+                    return Err(BallistaError::General(format!(
+                        "Reservation policy {:?} is not supported",
+                        self.slots_policy
+                    )))
+                }
             };
 
             self.state.apply_txn(txn_ops).await?;
@@ -276,6 +361,42 @@ impl ExecutorManager {
     pub async fn cancel_reservations(
         &self,
         reservations: Vec<ExecutorReservation>,
+    ) -> Result<()> {
+        if self.slots_policy.is_local() {
+            self.cancel_reservations_local(reservations).await
+        } else {
+            self.cancel_reservations_global(reservations).await
+        }
+    }
+
+    async fn cancel_reservations_local(
+        &self,
+        reservations: Vec<ExecutorReservation>,
+    ) -> Result<()> {
+        let mut executor_slots: HashMap<String, u32> = HashMap::new();
+        for reservation in reservations {
+            if let Some(slots) = 
executor_slots.get_mut(&reservation.executor_id) {
+                *slots += 1;
+            } else {
+                executor_slots.insert(reservation.executor_id, 1);
+            }
+        }
+
+        let mut executor_data = self.executor_data.lock();
+        for (id, released_slots) in executor_slots.into_iter() {
+            if let Some(slots) = executor_data.get_mut(&id) {
+                slots.available_task_slots += released_slots;
+            } else {
+                warn!("ExecutorData for {} is not cached in memory", id);
+            }
+        }
+
+        Ok(())
+    }
+
+    async fn cancel_reservations_global(
+        &self,
+        reservations: Vec<ExecutorReservation>,
     ) -> Result<()> {
         let lock = self.state.lock(Keyspace::Slots, "global").await?;
 
@@ -363,7 +484,7 @@ impl ExecutorManager {
     }
 
     /// Send rpc to Executors to clean up the job data
-    pub async fn clean_up_executors_data(&self, job_id: String) {
+    pub async fn clean_up_job_data(&self, job_id: String) {
         let alive_executors = self.get_alive_executors_within_one_minute();
         for executor in alive_executors {
             let job_id_clone = job_id.to_owned();
@@ -506,6 +627,12 @@ impl ExecutorManager {
         .await?;
 
         if !reserve {
+            if self.slots_policy.is_local() {
+                let mut executor_data = self.executor_data.lock();
+                executor_data
+                    .insert(specification.executor_id.clone(), 
specification.clone());
+            }
+
             let proto: protobuf::ExecutorData = specification.into();
             let value = encode_protobuf(&proto)?;
             self.state.put(Keyspace::Slots, executor_id, value).await?;
@@ -519,6 +646,13 @@ impl ExecutorManager {
             }
 
             specification.available_task_slots = 0;
+
+            if self.slots_policy.is_local() {
+                let mut executor_data = self.executor_data.lock();
+                executor_data
+                    .insert(specification.executor_id.clone(), 
specification.clone());
+            }
+
             let proto: protobuf::ExecutorData = specification.into();
             let value = encode_protobuf(&proto)?;
             self.state.put(Keyspace::Slots, executor_id, value).await?;
@@ -611,6 +745,13 @@ impl ExecutorManager {
 
         self.executors_heartbeat
             .remove(&heartbeat.executor_id.clone());
+
+        // Remove executor data cache for dead executors
+        {
+            let mut executor_data = self.executor_data.lock();
+            executor_data.remove(&executor_id);
+        }
+
         self.dead_executors.insert(executor_id);
         Ok(())
     }
@@ -754,6 +895,7 @@ mod test {
     async fn test_reserve_and_cancel() -> Result<()> {
         test_reserve_and_cancel_inner(SlotsPolicy::Bias).await?;
         test_reserve_and_cancel_inner(SlotsPolicy::RoundRobin).await?;
+        test_reserve_and_cancel_inner(SlotsPolicy::RoundRobinLocal).await?;
 
         Ok(())
     }
@@ -791,6 +933,7 @@ mod test {
     async fn test_reserve_partial() -> Result<()> {
         test_reserve_partial_inner(SlotsPolicy::Bias).await?;
         test_reserve_partial_inner(SlotsPolicy::RoundRobin).await?;
+        test_reserve_partial_inner(SlotsPolicy::RoundRobinLocal).await?;
 
         Ok(())
     }
@@ -840,6 +983,7 @@ mod test {
     async fn test_reserve_concurrent() -> Result<()> {
         test_reserve_concurrent_inner(SlotsPolicy::Bias).await?;
         test_reserve_concurrent_inner(SlotsPolicy::RoundRobin).await?;
+        test_reserve_concurrent_inner(SlotsPolicy::RoundRobinLocal).await?;
 
         Ok(())
     }
@@ -889,6 +1033,7 @@ mod test {
     async fn test_register_reserve() -> Result<()> {
         test_register_reserve_inner(SlotsPolicy::Bias).await?;
         test_register_reserve_inner(SlotsPolicy::RoundRobin).await?;
+        test_register_reserve_inner(SlotsPolicy::RoundRobinLocal).await?;
 
         Ok(())
     }

Reply via email to