yahoNanJing commented on a change in pull request #1810:
URL: https://github.com/apache/arrow-datafusion/pull/1810#discussion_r805470176
##########
File path: ballista/rust/scheduler/src/state/mod.rs
##########
@@ -82,237 +85,639 @@ pub enum WatchEvent {
Delete(String),
}
+type JobTasks = HashMap<u32, HashMap<u32, TaskStatus>>;
+
#[derive(Clone)]
-pub(super) struct SchedulerState {
+struct VolatileSchedulerState {
+ executors_heartbeat: Arc<std::sync::RwLock<HashMap<String,
ExecutorHeartbeat>>>,
+ executors_data: Arc<std::sync::RwLock<HashMap<String, SExecutorData>>>,
+
+ // job -> stage -> partition
+ tasks: Arc<std::sync::RwLock<HashMap<String, JobTasks>>>,
+}
+
+/// For in-memory state, we don't use async to provide related services
+impl VolatileSchedulerState {
+ fn new() -> Self {
+ Self {
+ executors_heartbeat:
Arc::new(std::sync::RwLock::new(HashMap::new())),
+ executors_data: Arc::new(std::sync::RwLock::new(HashMap::new())),
+ tasks: Arc::new(std::sync::RwLock::new(HashMap::new())),
+ }
+ }
+
+ fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) {
+ let mut executors_heartbeat =
self.executors_heartbeat.write().unwrap();
+ executors_heartbeat.insert(heartbeat.executor_id.clone(), heartbeat);
+ }
+
+ fn get_executors_heartbeat(&self) -> Vec<ExecutorHeartbeat> {
+ let executors_heartbeat = self.executors_heartbeat.read().unwrap();
+ executors_heartbeat
+ .iter()
+ .map(|(_exec, heartbeat)| heartbeat.clone())
+ .collect()
+ }
+
+ /// last_seen_ts_threshold is in seconds
+ fn get_alive_executors(&self, last_seen_ts_threshold: u64) ->
HashSet<String> {
+ let executors_heartbeat = self.executors_heartbeat.read().unwrap();
+ executors_heartbeat
+ .iter()
+ .filter_map(|(exec, heartbeat)| {
+ (heartbeat.timestamp > last_seen_ts_threshold).then(||
exec.clone())
+ })
+ .collect()
+ }
+
+ fn get_alive_executors_within_one_minute(&self) -> HashSet<String> {
+ let now_epoch_ts = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("Time went backwards");
+ let last_seen_threshold = now_epoch_ts
+ .checked_sub(Duration::from_secs(60))
+ .unwrap_or_else(|| Duration::from_secs(0));
+ self.get_alive_executors(last_seen_threshold.as_secs())
+ }
+
+ fn save_executor_data(&self, executor_data: SExecutorData) {
+ let mut executors_data = self.executors_data.write().unwrap();
+ executors_data.insert(executor_data.executor_id.clone(),
executor_data);
+ }
+
+ fn get_executor_data(&self, executor_id: &str) -> Option<SExecutorData> {
+ let executors_data = self.executors_data.read().unwrap();
+ executors_data.get(executor_id).cloned()
+ }
+
+ /// There are too checks:
+ /// 1. firstly alive
+ /// 2. secondly available task slots > 0
+ fn get_available_executors_data(&self) -> Vec<SExecutorData> {
+ let mut res = {
+ let alive_executors = self.get_alive_executors_within_one_minute();
+ let executors_data = self.executors_data.read().unwrap();
+ executors_data
+ .iter()
+ .filter_map(|(exec, data)| {
+ alive_executors.contains(exec).then(|| data.clone())
+ })
+ .collect::<Vec<SExecutorData>>()
+ };
+ res.sort_by(|a, b| Ord::cmp(&b.available_task_slots,
&a.available_task_slots));
+ res
+ }
+
+ fn save_task_status(&self, status: &TaskStatus) {
+ let partition_id = status.partition_id.as_ref().unwrap();
+ let mut tasks = self.tasks.write().unwrap();
+ let job_tasks = tasks
+ .entry(partition_id.job_id.clone())
+ .or_insert_with(HashMap::new);
+ let stage_tasks = job_tasks
+ .entry(partition_id.stage_id)
+ .or_insert_with(HashMap::new);
+ stage_tasks.insert(partition_id.partition_id, status.clone());
Review comment:
Agree. It's better to rename the partition_id in the
protobuf::TaskStatus to task_id
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]